1. 概述
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
Spring Cloud Stream
2.1 Spring Cloud Stream 是什么
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,它通过 Spring Integration 与消息中间件(如 RabbitMQ、Kafka、RocketMQ)进行连接。
2.2 核心概念
- Binder:与消息中间件集成的组件,负责创建对应的 Binding。
- Binding:消息中间件与应用程序之间的桥梁,分为 Input Binding(用于消费消息)和 Output Binding(用于生产消息)。
2.3 Broker 的角色
Broker 是消息队列中间件的代理服务器,负责存储消息、转发消息。例如,在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储,同时为消费者的拉取请求作准备。

三、快速入门
3.1 搭建生产者
3.1.1 引入依赖
在 pom.xml 中引入 Spring Cloud Alibaba RocketMQ 相关依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
3.1.2 配置文件
在 application.yaml 中添加 Spring Cloud Alibaba RocketMQ 相关配置:
spring:
application:
name: demo-producer-application
cloud:
stream:
bindings:
demo01-output:
destination: DEMO-TOPIC-01
content-type: application/json
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
demo01-output:
producer:
group: test
sync: true
3.1.3 创建 MySource 接口
声明名字为 Output Binding:
public interface MySource {
@Output("demo01-output")
MessageChannel demo01Output();
}
3.1.4 创建 Demo01Message 类
作为示例消息:
public class Demo01Message {
private Integer id;
// getter 和 setter 方法
}
3.1.5 创建 Demo01Controller 类
提供发送消息的 HTTP 接口:
@RestController
@RequestMapping("/demo01")
public class Demo01Controller {
@Autowired
private MySource mySource;
@GetMapping("/send")
public boolean send() {
Demo01Message message = new Demo01Message().setId(new Random().nextInt());
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message).build();
return mySource.demo01Output().send(springMessage);
}
}
3.1.6 创建 ProducerApplication 类
启动应用:
@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
3.2 搭建消费者
3.2.1 引入依赖
与生产者类似,引入 Spring Cloud Alibaba RocketMQ 相关依赖。
3.2.2 配置文件
在 application.yaml 中添加消费者相关的配置:
spring:
application:
name: demo-consumer-application
cloud:
stream:
bindings:
demo01-input:
destination: DEMO-TOPIC-01
content-type: application/json
group: demo01-consumer-group-DEMO-TOPIC-01
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
demo01-input:
consumer:
enabled: true
broadcasting: false
3.2.3 创建 MySink 接口
声明名字为 Input Binding:
public interface MySink {
String DEMO01_INPUT = "demo01-input";
@Input(DEMO01_INPUT)
SubscribableChannel demo01Input();
}
3.2.4 创建 Demo01Message 类
与生产者一致。
3.2.5 创建 Demo01Consumer 类
消费消息:
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
3.2.6 创建 ConsumerApplication 类
启动应用:
@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
四、定时消息
4.1 定时消息的概念
定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
4.2 实现定时消息
在发送消息时,通过设置消息的延迟级别来实现定时消息。例如:
@GetMapping("/send_delay")
public boolean sendDelay() {
Demo01Message message = new Demo01Message().setId(new Random().nextInt());
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 设置延迟级别为 3,10 秒后消费
.build();
return mySource.demo01Output().send(springMessage);
}
五、消费重试
5.1 消费重试的机制
当消息消费失败时,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer,让 Consumer 有机会重新消费消息。
5.2 配置消费重试
在配置文件中设置消费重试相关的配置项:
spring:
cloud:
stream:
bindings:
demo01-input:
consumer:
max-attempts: 1
rocketmq:
bindings:
demo01-input:
consumer:
delay-level-when-next-consume: 0
六、消费异常处理机制
6.1 异常处理的方式
Spring Cloud Stream 提供了通用的消费异常处理机制,可以通过 @ServiceActivator 或 @StreamListener 注解订阅错误通道,实现自定义的异常处理逻辑。
6.2 实现异常处理
在消费者中添加异常处理方法:
@Component
public class Demo01Consumer {
// ...
@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")
public void handleError(ErrorMessage errorMessage) {
logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
}
@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public void globalHandleError(ErrorMessage errorMessage) {
logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
}
}
七、广播消费
7.1 广播消费的概念
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
7.2 配置广播消费
在配置文件中设置 broadcasting 配置项为 true:
spring:
cloud:
stream:
bindings:
demo01-input:
consumer:
broadcasting: true
八、顺序消息
8.1 顺序消息的概念
RocketMQ 支持普通顺序消息和完全严格顺序消息,确保消息按顺序消费。
8.2 实现顺序消息
在生产者中设置分区 key 表达式,在消费者中设置顺序消费:
# 生产者配置
spring:
cloud:
stream:
bindings:
demo01-output:
producer:
partition-key-expression: payload['id']
rocketmq:
bindings:
demo01-output:
producer:
group: test
sync: true
# 消费者配置
spring:
cloud:
stream:
bindings:
demo01-input:
consumer:
orderly: true
九、消息过滤
9.1 消息过滤的方式
RocketMQ 提供基于 Tag 和 SQL92 的消息过滤方式。
9.2 基于 Tag 过滤
在生产者中设置消息的 Tag,在消费者中设置过滤的 Tag:
// 生产者
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, "yunai")
.build();
// 消费者配置
spring:
cloud:
stream:
bindings:
demo01-input:
consumer:
tags: yunai || yutou
9.3 基于 SQL92 过滤
在消费者中设置 SQL92 过滤表达式:
spring:
cloud:
stream:
bindings:
demo01-input:
consumer:
sql: "id > 100"
十、事务消息
10.1 事务消息的概念
RocketMQ 提供完整的事务消息功能,确保分布式事务的最终一致性。
10.2 实现事务消息
在生产者中发送事务消息,并实现事务监听器:
@GetMapping("/send_transaction")
public boolean sendTransaction() {
Demo01Message message = new Demo01Message().setId(new Random().nextInt());
Args args = new Args().setArgs1(1).setArgs2("2");
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader("args", JSON.toJSONString(args))
.build();
return mySource.demo01Output().send(springMessage);
}
@RocketMQTransactionListener(txProducerGroup = "test")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务逻辑
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 回查本地事务状态
return RocketMQLocalTransactionState.COMMIT;
}
}
十一、监控端点
11.1 监控端点的作用
Spring Cloud Stream 提供了自定义监控端点,用于获取 Binding 和 Channel 信息,以及 RocketMQ 客户端的健康状态。
11.2 配置监控端点
在 pom.xml 中引入 Spring Boot Actuator 相关依赖,并在配置文件中开放监控端点:
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
enabled: true
show-details: ALWAYS
十二、更多的配置项信息
12.1 RocketMQ Binder Properties
配置项包括 name-server、access-key、secret-key 等。
12.2 RocketMQ Consumer Properties
配置项包括 enable、tags、sql、broadcasting、orderly 等。
12.3 RocketMQ Provider Properties
配置项包括 enable、group、maxMessageSize、transactional 等。
十三、接入阿里云的消息队列 RocketMQ
13.1 配置阿里云 RocketMQ
在配置文件中设置访问阿里云 RocketMQ 的账号、Namesrv 地址等参数:
spring:
cloud:
stream:
bindings:
demo01-output:
destination: TOPIC_YUNAI_TEST
rocketmq:
binder:
name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
access-key: ${ALIYUN_ACCESS_KEY}
secret-key: ${ALIYUN_SECRET_KEY}
bindings:
demo01-output:
producer:
group: GID_PRODUCER_GROUP_YUNAI_TEST
sync: true
总结
本文详细介绍了如何在 Spring Cloud Alibaba 中使用 RocketMQ 作为消息队列,从基础概念到快速入门,再到高级特性,如定时消息、消费重试、广播消费等,帮助开发者全面了解并应用 RocketMQ 到实际项目中。







![[网络安全] 滥用Azure内置Contributor角色横向移动至Azure VM](https://i-blog.csdnimg.cn/direct/bb95915fc47947d6958f46e6b6337ef4.png#pic_center)










![RAG优化:python从零实现[吃一堑长一智]循环反馈Feedback](https://i-blog.csdnimg.cn/direct/88f4ba2808124d05ae52c8f3692ad5a2.png)
