当前位置:首页 » 《随便一记》 » 正文

RocketMQ 事务消息 详解

14 人参与  2023年04月16日 09:54  分类 : 《随便一记》  评论

点击全文阅读


? Java学习:Java从入门到精通总结

? 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

? 绝对不一样的职场干货:大厂最佳实践经验指南


? 最近更新:2023年4月9日

? 个人简介:通信工程本硕 for NJU?、Java程序员?。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

? 点赞 ? 收藏 ⭐留言 ? 都是我最大的动力!


文章目录

事务消息发送流程发送事务消息源码分析事务消息回查Broker发起

事务消息发送流程

在这里插入图片描述
半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:

为什么prepare消息不会被Consumer消费?事务消息是如何提交和回滚的?定时回查本地事务状态的实现细节。

发送事务消息源码分析

发送事务消息方法TransactionMQProducer.sendMessageInTransaction

msg:消息tranExecuter:本地事务执行器arg:本地事务执行器参数
public TransactionSendResult sendMessageInTransaction(final Message msg,        final LocalTransactionExecuter localTransactionExecuter, final Object arg)        throws MQClientException {        TransactionListener transactionListener = getCheckListener();        if (null == localTransactionExecuter && null == transactionListener) {            throw new MQClientException("tranExecutor is null", null);        }        // 忽视消息延迟的属性        if (msg.getDelayTimeLevel() != 0) {            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);        }        Validators.checkMessage(msg, this.defaultMQProducer);// 发送半消息        SendResult sendResult = null;        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());        try {            sendResult = this.send(msg);        } catch (Exception e) {            throw new MQClientException("send message Exception", e);        }// 处理发送半消息的结果        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;        Throwable localException = null;        switch (sendResult.getSendStatus()) {        // 发送半消息成功,执行本地事务逻辑            case SEND_OK: {                try {                    if (sendResult.getTransactionId() != null) {                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());                    }                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);                    if (null != transactionId && !"".equals(transactionId)) {                        msg.setTransactionId(transactionId);                    }                    // 执行本地事务逻辑                    if (null != localTransactionExecuter) {                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                    } else if (transactionListener != null) {                        log.debug("Used new transaction API");                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                    }                    if (null == localTransactionState) {                        localTransactionState = LocalTransactionState.UNKNOW;                    }                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {                        log.info("executeLocalTransactionBranch return {}", localTransactionState);                        log.info(msg.toString());                    }                } catch (Throwable e) {                    log.info("executeLocalTransactionBranch exception", e);                    log.info(msg.toString());                    localException = e;                }            }            break;            // 发送半消息失败,标记本地事务状态为回滚            case FLUSH_DISK_TIMEOUT:            case FLUSH_SLAVE_TIMEOUT:            case SLAVE_NOT_AVAILABLE:                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;                break;            default:                break;        }// 结束事务,设置消息 COMMIT / ROLLBACK        try {            this.endTransaction(msg, sendResult, localTransactionState, localException);        } catch (Exception e) {            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);        }// 返回事务发送结果        TransactionSendResult transactionSendResult = new TransactionSendResult();        transactionSendResult.setSendStatus(sendResult.getSendStatus());        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());                // 提取Prepared消息的uniqID        transactionSendResult.setMsgId(sendResult.getMsgId());        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());        transactionSendResult.setTransactionId(sendResult.getTransactionId());        transactionSendResult.setLocalTransactionState(localTransactionState);        return transactionSendResult;    }

该方法的入参包含有一个需要用户实现本地事务的LocalTransactionExecuter executerexecuter中会进行事务操作以保证本地事务和消息发送这两个操作的原子性。


由上面的源码可知:

Producer会首先发送一个半消息到Broker中:

半消息发送成功,执行事务半消息发送失败,不执行事务

半消息发送到Broker后不会被Consumer消费掉的原因有以下两点:

Broker在将消息写入CommitLog时会判断消息类型,如果是prepare或者rollback消息,ConsumeQueueoffset不变Broker在构造ConsumeQueue时会判断是否是处于prepare或者rollback状态的消息,如果是则不会将该消息放入ConsumeQueue里,Consumer在拉取消息时也就不会拉取到这条消息

Producer会根据半消息的发送结果和本地任务执行结果来决定如何处理事务(commitrollback),方法最后调用了endTransaction来处理事务的执行结果,源码如下:

sendResult:发送半消息的结果localTransactionState:本地事务状态localException:执行本地事务逻辑产生的异常RemotingException:远程调用异常MQBrokerExceptionBroker异常InterruptedException:当线程中断异常UnknownHostException:未知host异常
public void endTransaction(        final Message msg,        final SendResult sendResult,        final LocalTransactionState localTransactionState,        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {        // 解码消息id        final MessageId id;        if (sendResult.getOffsetMsgId() != null) {            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());        } else {            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());        }// 创建请求        String transactionId = sendResult.getTransactionId();        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();        requestHeader.setTransactionId(transactionId);        requestHeader.setCommitLogOffset(id.getOffset());        switch (localTransactionState) {            case COMMIT_MESSAGE:                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);                break;            case ROLLBACK_MESSAGE:                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);                break;            case UNKNOW:                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);                break;            default:                break;        }        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());        requestHeader.setMsgId(sendResult.getMsgId());        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 提交 commit / rollback 消息         this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,            this.defaultMQProducer.getSendMsgTimeout());    }

该方法是将事务执行的结果发送给Broker,再由Broker决定是否进行消息投递,执行步骤如下:

收到消息后先检查是否是事务消息,如果不是事务消息则直接返回根据请求头里的offset查询半消息,如果查询结果为空则直接返回根据半消息构造新消息,新构造的消息会被重新写入到CommitLog里,rollback消息的消息体为空如果是rollback消息,则该消息不会被投递

具体原因上文中已经分析过:只有commit消息才会被Broker投递给consumer

RocketMQ会将commit消息和rollback消息都写入到commitLog里,但rollback消息的消息体为空且不会被投递,CommitLog在删除过期消息时才会将其删除。当事务commit成功之后,RocketMQ会重新封装半消息并将其投递给Consumer端消费。


事务消息回查

Broker发起

相较于普通消息,事务消息主要依赖下面三个类:

TransactionStateService:事务状态服务,负责对事务消息进行管理,包括存储和更新事务消息状态、回查状态等TranStateTable:事务消息状态存储表,基于MappedFileQueue实现TranRedoLogTranStateTable的日志,每次写入操作都会记录日志,当Broker宕机时,可以利用这个文件做数据恢复

存储半消息到CommitLog时,使用offset索引到对应的TranStateTable的位置



点击全文阅读


本文链接:http://zhangshiyu.com/post/59653.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1