RocketMQ 事务消息 详解

news2025/6/20 12:08:13

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年4月9日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 事务消息发送流程
  • 发送事务消息源码分析
  • 事务消息回查
    • Broker发起

事务消息发送流程

在这里插入图片描述
半消息实现了分布式环境下的数据一致性的处理,生产者发送事务消息的流程如上图所示,通过对源码的学习,我们可以弄清楚下面几点,也是半消息机制的核心:

  1. 为什么prepare消息不会被Consumer消费?
  2. 事务消息是如何提交和回滚的?
  3. 定时回查本地事务状态的实现细节。

发送事务消息源码分析

发送事务消息方法TransactionMQProducer.sendMessageInTransaction

  • msg:消息
  • tranExecuter:本地事务执行器
  • arg:本地事务执行器参数
public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // 忽视消息延迟的属性
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        Validators.checkMessage(msg, this.defaultMQProducer);
		
		// 发送半消息
        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
		
		// 处理发送半消息的结果
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
        	// 发送半消息成功,执行本地事务逻辑
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    // 执行本地事务逻辑
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            // 发送半消息失败,标记本地事务状态为回滚
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
		
		// 结束事务,设置消息 COMMIT / ROLLBACK
        try {
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
		
		// 返回事务发送结果
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        
        // 提取Prepared消息的uniqID
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

该方法的入参包含有一个需要用户实现本地事务的LocalTransactionExecuter executerexecuter中会进行事务操作以保证本地事务和消息发送这两个操作的原子性。


由上面的源码可知:

Producer会首先发送一个半消息到Broker中:

  1. 半消息发送成功,执行事务
  2. 半消息发送失败,不执行事务

半消息发送到Broker后不会被Consumer消费掉的原因有以下两点:

  1. Broker在将消息写入CommitLog时会判断消息类型,如果是prepare或者rollback消息,ConsumeQueueoffset不变
  2. Broker在构造ConsumeQueue时会判断是否是处于prepare或者rollback状态的消息,如果是则不会将该消息放入ConsumeQueue里,Consumer在拉取消息时也就不会拉取到这条消息

Producer会根据半消息的发送结果和本地任务执行结果来决定如何处理事务(commitrollback),方法最后调用了endTransaction来处理事务的执行结果,源码如下:

  • sendResult:发送半消息的结果
  • localTransactionState:本地事务状态
  • localException:执行本地事务逻辑产生的异常
  • RemotingException:远程调用异常
  • MQBrokerExceptionBroker异常
  • InterruptedException:当线程中断异常
  • UnknownHostException:未知host异常

public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        // 解码消息id
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }

		// 创建请求
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;

		// 提交 commit / rollback 消息 
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

该方法是将事务执行的结果发送给Broker,再由Broker决定是否进行消息投递,执行步骤如下:

  1. 收到消息后先检查是否是事务消息,如果不是事务消息则直接返回
  2. 根据请求头里的offset查询半消息,如果查询结果为空则直接返回
  3. 根据半消息构造新消息,新构造的消息会被重新写入到CommitLog里,rollback消息的消息体为空
  4. 如果是rollback消息,则该消息不会被投递

具体原因上文中已经分析过:只有commit消息才会被Broker投递给consumer

RocketMQ会将commit消息和rollback消息都写入到commitLog里,但rollback消息的消息体为空且不会被投递,CommitLog在删除过期消息时才会将其删除。当事务commit成功之后,RocketMQ会重新封装半消息并将其投递给Consumer端消费。


事务消息回查

Broker发起

相较于普通消息,事务消息主要依赖下面三个类:

  1. TransactionStateService:事务状态服务,负责对事务消息进行管理,包括存储和更新事务消息状态、回查状态等
  2. TranStateTable:事务消息状态存储表,基于MappedFileQueue实现
  3. TranRedoLogTranStateTable的日志,每次写入操作都会记录日志,当Broker宕机时,可以利用这个文件做数据恢复

存储半消息到CommitLog时,使用offset索引到对应的TranStateTable的位置


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/411512.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VMware ESXi 8.0c - 领先的裸机 Hypervisor (sysin Custom Image)

本站发布 Dell 和 HPE 定制版 ESXi 8.0c 镜像 请访问原文链接:https://sysin.org/blog/vmware-esxi-8/,查看最新版。原创作品,转载请保留出处。 作者主页:sysin.org 产品简介 VMware ESXi:专门构建的裸机 Hyperviso…

【MATLAB数学建模编程实战】Kmeans算法编程及算法的简单原理

欢迎关注,本专栏主要更新MATLAB仿真、界面、基础编程、画图、算法、矩阵处理等操作,拥有丰富的实例练习代码,欢迎订阅该专栏!(等该专栏建设成熟后将开始收费,快快上车吧~~) 【MATLAB数学建模编…

【MySQL数据库原理】MySQL Community安装与配置

目录 安装成功之后查看版本验证1、介绍、安装与配置数据库2、操作MySQL数据库3、MySQL数据库原理安装成功之后查看版本验证 SELECT VERSION();查看mysql版本号 1、介绍、安装与配置数据库 下载安装包:https://download.csdn.net/download/weixin_41194129/87672588 MySQL…

NumPy 秘籍中文第二版:二、高级索引和数组概念

原文:NumPy Cookbook - Second Edition 协议:CC BY-NC-SA 4.0 译者:飞龙 在本章中,我们将介绍以下秘籍: 安装 SciPy安装 PIL调整图像大小比较视图和副本翻转 Lena花式索引位置列表索引布尔值索引数独的步幅技巧广播数…

比尔·盖茨最新分享:ChatGPT的发展,不止于此

来源: 笔记侠 最近,ChatGPT、GPT 4、文心一言、Copilot,人工智能产品层出不容,一路轰炸;王慧文、王兴、李开复等各位高调以及低调的商业领袖和技术专家,纷纷入局AI赛道。人声鼎沸,十分热闹。 昨天&#xff…

自动写作ai-自动写作神器

自动生成文章 自动生成文章是指使用自然语言处理和人工智能技术,通过算法来自动生成文章的过程。一些自动生成文章的工具可以使用大量数据,学习数据背后的语言规范和知识结构,从而生成高质量和有用的文章。这种技术能够减少写作时间和人力成…

Nature子刊 定制饮食去除半胱氨酸和蛋氨酸可诱导细胞自毁进而治疗脑瘤?

恶性胶质瘤是成人最常见的脑部肿瘤。恶性胶质瘤的致死率为100%,无法治愈,是一种极度的恶性肿瘤。如此糟糕的预后促使研究者及神经外科医生不断学习研究肿瘤生物学,期望创造更好的疗法。神经外科助理教授Dominique Higgins博士从事肿瘤生物学的…

必胜方法,矩阵堆量

0x1 问题 今天不讲量化,来聊聊其他的 铁废柴一年到头也想不明白,那些几万人用的程序,几百万人关注的自媒体账号 到底是怎么做出来的啊 为什么我发一百个视频才赚100块钱 我要怎么才可以过上小康生活? 0x2 答案 矩阵&#xff…

WPF mvvm框架Stylet使用教程-特殊用法

事件绑定 除了绑定Command属性&#xff0c;在WPF中经常需要绑定一些事件的操作&#xff0c;在别的框架中需要引入其他包支持&#xff0c;在Stylet框架中&#xff0c;可以 同样使用s:Action进行绑定&#xff0c;对应绑定ViewModel中的方法。 示例&#xff1a; XAML:<Button…

中科网联CCData借助亚马逊云科技实现高效融媒体测量

近年来&#xff0c;随着媒体与广告传媒行业数字化转型向纵深发展&#xff0c;如何利用数据洞察用户生态、实现精准触达以及业务持续创新已成为媒体产业深入发展的“必答题”。与此同时&#xff0c;随着数据应用的不断深入&#xff0c;借助人工智能和机器学习技术&#xff0c;找…

TCP为什么要三次握手,而不是两次或四次?

文章目录TCP为什么要三次握手&#xff0c;而不是两次或四次&#xff1f;三次握手才可以阻止重复历史连接的初始化&#xff08;主要原因&#xff09;同步双方初始序列号避免资源浪费小结TCP为什么要三次握手&#xff0c;而不是两次或四次&#xff1f; TCP连接时用于保证可靠性和…

eclipse下载与安装(汉化教程)超详细

一、下载eclipse安装包 首先进入 eclipse官网 如下&#xff1a; 这里面有很多版本&#xff1b;我们小白一般选择第二个&#xff0c;向下滑动&#xff1b; 点击符合自己系统的版本。 这里我们切换镜像下载&#xff0c;一般选择离你最近的地址下载。 我建议选择大连东软信息学…

【MATLAB图像处理实用案例详解(9)】——基于最大类间方差遗传算法的道路分割

目录一、最大类间方差遗传算法二、代码示例一、最大类间方差遗传算法 最大类间方差的求解过程&#xff0c;就是在解空间中查找到一个最优的解&#xff0c;使得其方差最大&#xff0c;而遗传算法能非线性快速查找最优解k*及最大的方差&#xff0c;其步骤如下&#xff1a; ①为了…

【算法】01-算法解剖学-二分查找

汝之观览&#xff0c;吾之幸也&#xff01;本系列主要讲解的是算法知识&#xff0c;从算法基本概念&#xff0c;利用图解的方式更好的认识算法&#xff0c;再通过letcode算法题 进行进一步的巩固。刷题三步走&#xff08;1、掌握一门基本的编程语言&#xff1b;2、深入理解基础…

Redis7持久化

一、redis持久化 1、RDB RDB持久性以指定的时间间隔执行数据集的时间点快照 也就是说在一定的时间间隔内&#xff0c;将某一时刻的数据和状态以文件的形式写到磁盘上&#xff0c;这个快照文件交dump.rdb Redis6更新策略 Redis7更新策略 RDB手动触发 5秒2次修改 RDB手动触…

【安卓源码】SystemServer系统进程启动原理

一. SystemServer进程启动概括 Android系统中&#xff0c;第一个启动的是init进程&#xff0c;通过解析init.rc文件启动对应的service。Zygote就是由init启动起来的。Zygote作为应用的孵化器&#xff0c;所有的应用程序都是由他创建而来的。 Zygote是C/S架构的&#xff0c;当…

GANSeg:通过无监督分层图像生成学习分割

文章目录GANSeg: Learning to Segment by Unsupervised Hierarchical Image Generation摘要引言方法Level 1: Point Generation and Part ScaleLevel 2: From Points to MasksLevel 3: Mask-conditioned Image Generation损失函数实验结果GANSeg: Learning to Segment by Unsup…

挖掘潜在用户:2023年海外社交电商网红营销策略解析

近年来&#xff0c;随着社交媒体的不断发展和电商行业的崛起&#xff0c;海外社交电商平台已经成为了一个飞速发展的领域。然而&#xff0c;随着市场竞争的加剧&#xff0c;越来越多的海外社交电商平台开始采用网红营销策略来提升品牌知名度和销售业绩。本文Nox聚星将和大家探讨…

D. Rating Compression(双指针 + 思维)

Problem - D - Codeforces 在竞争编程平台CodeCook上&#xff0c;每个人都有一个由长度为n的整数数组a描述的评分图。现在你正在更新基础设施&#xff0c;所以你已经创建了一个程序来压缩这些图。程序的工作原理如下。给定一个整数参数k&#xff0c;程序取in中每个长度为k的连续…

给您的 MongoDB 定期做个体检:MongoDB 诊断

新钛云服已累计为您分享739篇技术干货接下来的一些列文章会为大家介绍日常工作中常用的 NoSQL 产品 MongoDB。主要涉及到&#xff1a;MongoDB 的安装及基本使用 MongoDB 文档查询 MongoDB 复制集 MongoDB 分片集群的介绍及搭建 MongoDB 安全加密 MongoDB 诊断我们会用…