一、什么是消息队列
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
官方首页:RocketMQ · 官方网站 | RocketMQ
官方文档:RocketMQ · 官方网站 | RocketMQ
下载链接:点击下载
可视化工具:点击下载
1.1、应用场景
- 异步处理
- 流量削峰填谷(比如:秒杀)
- 解耦微服务
1.2、常见MQ产品对比

1.3、为什么选择RocketMQ
第一点,开发语言优势。
RocketMQ 使用 Java 语言开发,比起使用 Erlang 开发的 RabbitMQ 来说,有着更容易上手的阅读体验和受众。在遇到 RocketMQ 较为底层的问题时,大部分熟悉 Java 的同学都可以深入阅读其源码,分析、排查问题,甚至可以在社区版的基础上进行二次开发。
第二点,丰富的高级特性。
根据 RocketMQ 官方文档的列举,其高级特性达到了 12 种,例如顺序消息、事务消息、消息过滤、定时消息等。RocketMQ 丰富的特性,能够为我们在复杂的业务场景下尽可能多地提供思路及解决方案。
第三点,良好的商业前景。
比如阿里,在其生产环境中已经部署了数百个 RocketMQ 集群,上千个节点,如此大量地将 RocketMQ 应用于生产环境,足以说明 RocketMQ 是的确经得起残酷的生产环境考验的,并且能够针对线上环境复杂的需求场景提供相应的解决方案。
RocketMQ 不仅仅解决了阿里的内部需求,同时还被搬到了阿里云上,作为一个商业化的产品对外提供服务。在阿里云的产品叫作“消息队列 RocketMQ 版”,自从 2016 年开始商业化到现在,RocketMQ 商业化版已经具有相当大的规模。良好的商业前景,也反向推动着 RocketMQ 在业界的普及,两者相辅相成、相得益彰。
第四点,众多大厂背书。
RocketMQ 现在被广泛应用于各个大厂的内部业务中,其诞生之地阿里自不必多说,历年来大家所熟悉的双十一促销,在阿里内部就是使用的 RocketMQ 来承载消息,面对如此庞大的流量洪峰,RocketMQ 交出了一份令人满意的答卷。
再比如,RocketMQ 也是阿里交易链路中的核心产品。交易链路本已是核心中的核心,而这个核心又将 RocketMQ 当作核心链路中的核心,足以见对 RocketMQ 的重视程度。同时,字节跳动内部不同的业务也都在大量地使用 RocketMQ,同样也是作为业务主流程中的核心组件,RocketMQ 在生产环境中保持着非常高的稳定性、可用性,非常良好地支撑了业务的运行与发展。
二、如何使用RocketMQ
2.1、搭建RocketMQ
2.1.1、下载安装Apache RocketMQ
本博客使用的Rocket MQ版本为:rocketmq-all-5.0.0-bin-release,如果需要可以自行下载或者找博主要
2.1.2、配置环境变量
ROCKETMQ_HOME 本地解压路径

NAMESRV_ADDR localhost:9876

2.1.3、启动mqnamesrv
powerShell 管理员身份运行,否则服务会出现闪退现象!!!
若出现以下提示则服务启动成功,可进行下一步操作

2.1.4、启动mqbroker

2.2、搭建RocketMQ控制台
下载可视化工具zip包,本地解压后使用IDEA打开

运行,浏览器访问http://localhost:8080/

控制台使用说明参考:https://github.com/eacdy/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md
三、SpringBoot整合RocketMQ
SpringBoot版本:2.7.4
3.1、pom.xml
<!--RocketMQ坐标-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
3.2、application.yml
server:
port: 8083
rocketmq:
#RocketMQ Namesrv
name-server: 127.0.0.1:9876
producer:
#生产者分组,RocketMQ必填项,字符可随意
group: test_mq
#发送消息超时时间,单位:毫秒。默认为 3000
send-message-timeout: 3000
#消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
compress-message-body-threshold: 4096
#消息体的最大允许大小。。默认为 4 * 1024 * 1024B
max-message-size: 4194304
#同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-failed: 2
#异步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2
#发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
retry-next-server: false
3.3、编写生产者
package com.rocket.rocketdemo.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
* convertAndSend(String destination, Object payload) 发送字符串比较方便
*/
@RequestMapping("/send")
public void send(){
//参数一:topic
//参数二:消息内容
rocketMQTemplate.convertAndSend("test1","test-message");
}
/**
* 发送同步消息
* Tip:该方法底层调用的是 producer.send()方法,是阻塞的,producer 一定要等到Broker进行了响应后才会返回,才能继续往下执行。如果超时,或者失败了,会触发两次默认的重试。
*/
@RequestMapping("/testSyncSend")
public void testSyncSend(){
//参数一:topic
//参数二:消息内容
SendResult sendResult = rocketMQTemplate.syncSend("test1","同步消息测试");
System.out.println(sendResult);
}
/**
* 发送异步消息
* Tip:该方法是非阻塞的,发送结果将由一个回调函数callback进行回调。它与同步发送消息的区别是它在发送消息时多传递了一个SendCallback对象,该方法一调用立马返回,而不需要等待Broker的响应返回。消息发送成功或失败后将回调SendCallback对象的对应方法。
*/
@RequestMapping("/testASyncSend")
public void testASyncSend(){
//参数一:topic
//参数二:消息内容
//参数三:回调
rocketMQTemplate.asyncSend("test1", "异步消息测试", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息发送异常");
throwable.printStackTrace();
}
});
}
/**
* 发送单向消息
* Tip:它的发送是单向的,即它不需要等待Broker的响应,只管发送即可,而不论发送成功与失败。通常应用于一些消息不是那么重要,可丢失的场景。
*/
@RequestMapping("/testOneWay")
public void testOneWay(){
for (int i = 0; i <10 ; i++) {
//参数一:topic 如果想添加tag,可以使用"topic:tag"的写法
//参数二:消息内容
rocketMQTemplate.sendOneWay("test1","单向消息测试测试下"+i);
}
}
}
3.4、编写消费者
package com.rocket.rocketdemo.config;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
//consumerGroup与application.yml中rocketmq.producer.group值一致
//topic与消息生产者发送的消息topic一致
@RocketMQMessageListener(consumerGroup = "test_mq",topic = "test1")
public class RocketMQConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费消息:"+s);
}
}
3.5、测试
发送同步消息,用postman发送请求 http://localhost:8083/send
控制台效果:

发送异步消息,用postman发送请求 http://localhost:8083/testSyncSend
控制台效果:




















