SpringBoot整合RocketMQ与客户端注意事项

news2025/6/7 17:43:31

SpringBoot整合RocketMQ

引入依赖(5.3.0比较稳定)

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.3.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置文件

rocketmq.name-server=192.168.234.141:9876
#配了发送者,才会初始化RocketMQTemplate或者不配置,自己根据业务注入不同的RocketMQTemplate
rocketmq.producer.group=springBootGroup
rocketmq.producer.send-message-timeout=10000
rocketmq.

#如果这里不配,那就需要在消费者的注解中配。
#rocketmq.consumer.topic=
rocketmq.consumer.group=testGroup

#rocketmq.pull-consumer.group=
#rocketmq.pull-consumer.topic=

消费者注册到spring容器

@Component
//@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY,messageModel= MessageModel.BROADCASTING)
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("Received message : "+ message);
    }
}

生产者消息在需要生产消息的地方注入生产者调用方法

@Component
public class SpringProducer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Resource
    private DefaultMQProducer defaultMQProducer;

    public void sendMessage(String topic,String msg){
        this.rocketMQTemplate.convertAndSend(topic,msg);
        //pull消息需要配置rocketmq.consumer.topic和rocketmq.consumer.group参数
        //this.rocketMQTemplate.receive();
        //事务消息
        //this.rocketMQTemplate.sendMessageInTransaction
    }
   
 }

事务监听
2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。

//@RocketMQTransactionListener(txProducerGroup = "springBootGroup2")
//@RocketMQTransactionListener(rocketMQTemplateBeanName = "ExtRocketMQTemplate")
@RocketMQTransactionListener()
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

    }
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

    }
}

客户端注意事项

消息的ID,Key和Tag

  • MessageId是RocketMQ内部给每条消息分配的唯⼀索引
  • key是Message中的补充信息
    针对key这⼀个属性,建议在业务中可以添加⼀些带有业务唯⼀性的数据,作为MessageId的补充。RocketMQ基于Keys属性,实现了消息溯源、消息压缩等⼀系列功能。
  • 通过Tag进⾏消息过滤性能⾮常⾼

最佳实践

⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识。tags可以由应⽤⾃由设置,只有⽣产者在发送消息设置了tags,消费⽅在订阅消息时才可以利⽤tags通过broker做消息过滤:message.setTags(“TagA”)。

Kafka的⼀⼤问题是Topic过多,会造成Partition⽂件过多,影响性能。⽽RocketMQ中的Topic完全不会对消息转发性能有影响。但是Topic过多,还是会加⼤RocketMQ的元数据维护的性能消耗。所以,在使⽤时,还是需要对Topic进⾏合理的分配。使⽤Tag区分消息时,尽量直接使⽤Tag过滤,不要使⽤复杂的SQL过滤。因为消息过滤机制虽然可以减少⽹络IO,但是毕竟会加⼤Broker端的消息处理压⼒。所以,消息过滤的逻辑,还是越简单越好。

消费者端进行幂等控制

在MQ系统中,对于消息幂等有三种实现语义:

  • at most once 最多⼀次:每条消息最多只会被消费⼀次
  • at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
  • exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次

消息幂等的必要性

在互联⽹应⽤中,尤其在⽹络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复
    当⼀条消息已被成功发送到服务端并完成持久化,此时出现了⽹络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
  • 投递时消息重复
    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候⽹络闪断。 为了保证消息⾄少被消费⼀次,消息队列 RocketMQ 的服务端将在⽹络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
  • 负载均衡时消息重复(包括但不限于⽹络抖动、Broker 重启以及订阅⽅应⽤重启)
    当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

而要处理这个问题,RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进行传递。

关注错误消息重试

多关注重试队列,可以及时了解消费者端的运⾏情况。这个队列中出现了⼤量的消息,就意味着消费者的运行出现了问题,要及时跟踪进行干预
RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
在这里插入图片描述

这个重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个重试时间可以将源码中的org.apache.rocketmq.example.quickstart.Consumer⾥的消息监听器返回状态改为RECONSUME_LATER测试⼀下。

如果消息重试16次后仍然失败,消息将不再投递。转为进⼊死信队列。

手动处理死信队列

当⼀条消息消费失败,RocketMQ就会⾃动进⾏消息重试。而如果消息超过最⼤重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的⼀种特殊队列:死信队列。

意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,⼀般需要人工去查看死信队列中的消息,对错误原因进⾏排查。然后对死信消息进行处理,⽐如转发到正常的Topic重新进⾏消费,或者丢弃。

死信队列的名称是%DLQ%+ConsumGroup

死信队列的特征:

  • ⼀个死信队列对应⼀个ConsumGroup,而不是对应某个消费者实例。
  • 如果⼀个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • ⼀个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

默认创建出来的死信队列,他⾥⾯的消息是⽆法读取的,在控制台和消费者中都⽆法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要⼿动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台操作)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2403177.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql的卸载与安装

确保卸载干净mysql 不然在进行mysal安装时候会出现不一的页面和问题 1、卸载 在应用页面将查询到的mysql相关应用卸载 2、到c盘下将残留的软件包进行数据删除 3、删除programData下的mysql数据 4、检查系统中的mysql是否存在 cmd中执行 sc deleted mysql80 5、删除注册表中的…

Excel处理控件Aspose.Cells教程:使用 C# 在 Excel 中创建组合图表

可视化项目时间线对于有效规划和跟踪至关重要。在本篇教程中&#xff0c;您将学习如何使用 C# 在 Excel 中创建组合图。只需几行代码&#xff0c;即可自动生成动态、美观的组合图。无论您是在构建项目管理工具还是处理内部报告&#xff0c;本指南都将向您展示如何将任务数据转换…

【多线程初阶】阻塞队列 生产者消费者模型

文章目录 一、阻塞队列二、生产者消费者模型(一)概念(二)生产者消费者的两个重要优势(阻塞队列的运用)1) 解耦合(不一定是两个线程之间,也可以是两个服务器之间)2) 削峰填谷 (三)生产者消费者模型付出的代价 三、标准库中的阻塞队列(一)观察模型的运行效果(二)观察阻塞效果1) 队…

《100天精通Python——基础篇 2025 第5天:巩固核心知识,选择题实战演练基础语法》

目录 一、踏上Python之旅二、Python输入与输出三、变量与基本数据类型四、运算符五、流程控制 一、踏上Python之旅 1.想要输出 I Love Python,应该使用()函数。 A.printf() B.print() C.println() D.Print() 在Python中想要在屏幕中输出内容&#xff0c;应该使用print()函数…

机器人夹爪的选型与ROS通讯——机器人抓取系统基础系列(六)

文章目录 前言一、夹爪的选型1.1 任务需求分析1.2 软体夹爪的选型 二、夹爪的ROS通讯2.1 夹爪的通信方式介绍2.2 串口助手测试2.3 ROS通讯节点实现 总结Reference: 前言 本文将介绍夹爪的选型方法和通讯方式。以鞋子这类操作对象为例&#xff0c;将详细阐述了对应的夹爪选型过…

第二十八章 RTC——实时时钟

第二十八章 RTC——实时时钟​​​​​​​ 目录 第二十八章 RTC——实时时钟 1 RTC实时时钟简介 2 RTC外设框图剖析 3 UNIX时间戳 4 与RTC控制相关的库函数 4.1 等待时钟同步和操作完成 4.2 使能备份域涉及RTC配置 4.3 设置RTC时钟分频 4.4 设置、获取RTC计数器及闹钟 5 实时时…

使用 DuckLake 和 DuckDB 构建 S3 数据湖实战指南

本文介绍了由 DuckDB 和 DuckLake 组成的轻量级数据湖方案&#xff0c;旨在解决传统数据湖&#xff08;如HadoopHive&#xff09;元数据管理复杂、查询性能低及厂商锁定等问题。该方案为中小规模数据湖场景提供了简单、高性能且无厂商锁定的替代选择。 1. 什么是 DuckLake 和 D…

大语言模型提示词(LLM Prompt)工程系统性学习指南:从理论基础到实战应用的完整体系

文章目录 前言&#xff1a;为什么提示词工程成为AI时代的核心技能一、提示词的本质探源&#xff1a;认知科学与逻辑学的理论基础1.1 认知科学视角下的提示词本质信息处理理论的深层机制图式理论的实际应用认知负荷理论的优化策略 1.2 逻辑学框架下的提示词架构形式逻辑的三段论…

如何基于Mihomo Party http端口配置git与bash命令行代理

如何基于Mihomo Party http端口配置git与bash命令行代理 1. 确定Mihomo Party http端口配置 点击内核设置后即可查看 默认7892端口&#xff0c;开启允许局域网连接 2. 配置git代理 配置本机代理可以使用 127.0.0.1 配置局域网内其它机代理需要使用本机的非回环地址 IP&am…

埃文科技智能数据引擎产品入选《中国网络安全细分领域产品名录》

嘶吼安全产业研究院发布《中国网络安全细分领域产品名录》&#xff0c;埃文科技智能数据引擎产品成功入选数据分级分类产品名录。 在数字化转型加速的今天&#xff0c;网络安全已成为企业生存与发展的核心基石&#xff0c;为了解这一蓬勃发展的产业格局&#xff0c;嘶吼安全产业…

NLP学习路线图(二十六):自注意力机制

一、为何需要你&#xff1f;序列建模的困境 在你出现之前&#xff0c;循环神经网络&#xff08;RNN&#xff09;及其变种LSTM、GRU是处理序列数据&#xff08;如文本、语音、时间序列&#xff09;的主流工具。它们按顺序逐个处理输入元素&#xff0c;将历史信息压缩在一个隐藏…

Unity3D仿星露谷物语开发60之定制角色其他部位

1、目标 上一篇中定制了角色的衬衫、手臂。 本篇中将定制角色其他部位的图形&#xff0c;包括&#xff1a;裤子、发型、皮肤、帽子等。 2、定制裤子 &#xff08;1&#xff09;修改ApplyCharacterCustomisation.cs脚本 我们需要设置一个输入框选择裤子的颜色。 // Select …

Google机器学习实践指南(机器学习模型泛化能力)

&#x1f525; Google机器学习(14)-机器学习模型泛化能力解析 Google机器学习(14)-机器学习模型泛化原理与优化&#xff08;约10分钟&#xff09; 一、泛化问题引入 ▲ 模型表现对比&#xff1a; 假设森林中树木健康状况预测模型&#xff1a; 图1&#xff1a;初始模型表现 …

MySQL性能调优:Mysql8高频面试题汇总

1&#xff0c;主键和唯一键有什么区别&#xff1f; 主键不能重复&#xff0c;不能为空&#xff0c;唯一键不能重复&#xff0c;可以为空。 建立主键的目的是让外键来引用。 一个表最多只有一个主键&#xff0c;但可以有很多唯一键 2&#xff0c;MySQL常用的存储引擎有哪些&…

vue+elementUI+springboot实现文件合并前端展示文件类型

项目场景&#xff1a; element的table上传文件并渲染出文件名称点击所属行可以查看文件,并且可以导出合并文件,此文章是记录合并文档前端展示的帖子 解决方案&#xff1a; 后端定义三个工具类 分别是pdf,doc和word的excle的目前我没整 word的工具类 package com.sc.modules…

高效绘制业务流程图!专业模板免费下载

在复杂的业务流程管理中&#xff0c;可视化工具已成为提升效能的核心基础设施。为助力开发者、项目经理及业务架构师高效落地流程标准化&#xff0c;本文将为你精选5套开箱即用的专业流程图模板。这些模板覆盖跨部门协作、电商订单、客户服务等高频场景&#xff0c;具备以下核心…

Spring Boot + Prometheus 实现应用监控(基于 Actuator 和 Micrometer)

文章目录 Spring Boot Prometheus 实现应用监控&#xff08;基于 Actuator 和 Micrometer&#xff09;环境准备示例结构启动和验证验证 Spring Boot 应用Prometheus 抓取配置&#xff08;静态方式&#xff09;Grafana 面板配置总结 Spring Boot Prometheus 实现应用监控&…

PowerBI企业运营分析—列互换式中国式报表分析

PowerBI企业运营分析—列互换式中国式报表分析 欢迎来到Powerbi小课堂&#xff0c;在竞争激烈的市场环境中&#xff0c;企业运营分析平台成为提升竞争力的核心工具。 该平台通过高效整合多源数据&#xff0c;并实时监控关键指标&#xff0c;能够迅速揭示业务表现的全貌&#…

BugKu Web渗透之需要管理员

启动场景&#xff0c;打开网页&#xff0c;显示如下&#xff1a; 一般没有上面头绪的时候&#xff0c;就是两步&#xff1a;右键查看源代码 和 扫描网站目录。 步骤一&#xff1a; 右键查看源代码 和 扫描网站目录。 右键查看源代码没有发现异常。 于是扫描网站目录&…

TDengine 开发指南—— UDF函数

UDF 简介 在某些应用场景中&#xff0c;应用逻辑需要的查询功能无法直接使用内置函数来实现&#xff0c;TDengine 允许编写用户自定义函数&#xff08;UDF&#xff09;&#xff0c;以便解决特殊应用场景中的使用需求。UDF 在集群中注册成功后&#xff0c;可以像系统内置函数一…