文章目录
- 1.简介、安装部署
- 2.Springboot集成RocketMQ
- 2.1.添加maven依赖:
- 2.2.RocketMQ配置
- 生产者配置
- 消费者配置
 
- 2.3.生产者(发送消息)
- 2.4.消费者(接收消息)
 
- 3.实战结果
- 3.1.消费者服务
- 3.2.生产者服务
- 3.3.运行日志
- 生产日志
- 消费日志
 
 
 
1.简介、安装部署
此部分不做赘述,可从官网查阅
说明文档 https://rocketmq.apache.org/zh/docs/4.x/
下载地址 https://rocketmq.apache.org/zh/download (建议下载二进制包)
安装和启动教程 https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart
其实官网讲解已经很详细了,所以这里只是详细描述一下启动和停止命令
### 启动namesrv, <mqnamesrv.path> 表示安装好的二进制包中bin目录下的 mqnamesrv 文件地址,使用命令时替换为实际地址即可
$ nohup sh <mqnamesrv.path> &
### 先启动broker <mqbroker.path> 表示安装好的二进制包中bin目录下的 mqbroker 文件地址
### -n NameServer地址
$ nohup sh <mqbroker.path> -n localhost:9876 &
### 关闭broker <mqbroker.path> 表示安装好的二进制包中bin目录下的 mqshutdown 文件地址
sh <mqshutdown.path> broker
### 关闭namesrv
sh <mqshutdown.path> namesrv
2.Springboot集成RocketMQ
点击前往 官方案例
2.1.添加maven依赖:
<!--在pom.xml中添加依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${RELEASE.VERSION}</version>
</dependency>
2.2.RocketMQ配置
生产者配置
# rocketmq配置
rocketmq:
  # NameServer 服务器地址
  name-server: localhost:9876
  # 见源码 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer
  producer:
    # 发送同一类消息的生产者设置为同一个group,保证唯一
    group: my_producer_group
    # 发送消息失败重试次数,默认2
    retryTimesWhenSendFailed: 2
    # 异步消息重试此处,默认2
    retryTimesWhenSendAsyncFailed: 2
    # 发送消息超时时间,默认3000
    sendMessageTimeout: 3000
    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
    maxMessageSize: 4096
    # 压缩消息阈值,默认4k(1024 * 4)
    compressMessageBodyThreshold: 4096
    # 是否在内部发送失败时重试另一个broker,默认false
    retryNextServer: false
    # access-key
    #accessKey: xxx
    # secret-key
    #secretKey: xxx
    # 是否启用消息跟踪,默认false
    enableMsgTrace: false
    # 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称
    customizedTraceTopic: RMQ_SYS_TRACE_TOPIC
消费者配置
# rocketmq配置
rocketmq:
  # NameServer 服务器地址
  name-server: localhost:9876
  #Push模式, 对应name为rocketMQTemplate的RocketMQTemplate
  # 见源码 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer
  consumer:
    # 配置指定group是否启动监听器 group.topic = false
    listeners:
      # key:group名称。 value:{key: topic名称: value: true/false}
      my_consumer_group:
        mytopic: true
2.3.生产者(发送消息)
通过 RocketMQTemplate 模版类,我们进行了二次封装,构建了一个统一的消息发送工具类 MessageHelper,目的在于核心代码抽离实现解耦合。哪怕后续如果更换了新的消息中间件,只需统一更改该工具类的结构即可。
重点方法 selectSendWay 根据处理好的 destination 和消息方式方式 way 选择 RocketMQTemplate 中对应的方法版本。
/**
 * @Name: MessageHelper
 * @Description: 消息发送工具
 * @Author: ahao
 * @Date: 2024/4/12 7:22 PM
 */
@Component
public class MessageHelper {
    /**
     * 默认标签
     */
    public static final String DEFAULT_TAG = "none";
    public static final int SYNCHRONOUSLY = 1;
    public static final int ASYNCHRONOUSLY = 2;
    public static final int ORDERLY_SYNCHRONOUSLY = 3;
    public static final int ORDERLY_ASYNCHRONOUSLY = 4;
    /**
     * 导入RocketMQ模版工具
     */
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 同步发送消息,会阻塞等待消息发送结果
     *
     * @param destination 消息发送目的地(主题:标签)
     * @param msg         消息内容
     */
    public MsgSendResult send(@NotNull String destination, Object msg) {
        return selectSendWay(destination, msg, null, null, SYNCHRONOUSLY);
    }
    /**
     * 同步发送消息,会阻塞等待消息发送结果
     *
     * @param topic 主题
     * @param tag   标签
     * @param msg   消息内容
     * @return
     */
    public MsgSendResult send(@NotNull String topic, @Nullable String tag, Object msg) {
        return selectSendWay(topic, tag, msg, null, null, SYNCHRONOUSLY);
    }
    /**
     * 异步发送消息
     *
     * @param destination 消息发送目的地
     * @param msg         消息内容
     * @param callback    消息发送回调通知 {@link SendCallback}
     */
    public void asyncSend(@NotNull String destination, Object msg, SendCallback callback) {
        selectSendWay(destination, msg, callback, null, ASYNCHRONOUSLY);
    }
    /**
     * 异步发送消息
     *
     * @param topic    主题
     * @param tag      标签
     * @param msg      消息内容
     * @param callback 消息发送回调通知 {@link SendCallback}
     */
    public void asyncSend(@NotNull String topic, @Nullable String tag, Object msg, SendCallback callback) {
        selectSendWay(topic, tag, msg, callback, null, ASYNCHRONOUSLY);
    }
    /**
     * 同步发送顺序消息,会阻塞等待消息发送结果
     *
     * @param destination 消息发送目的地(主题:标签)
     * @param msg         消息内容
     * @param key         分区键关键字,相同key的消息发送到同一个队列中
     */
    public MsgSendResult sendOrderly(@NotNull String destination, Object msg, String key) {
        return selectSendWay(destination, msg, null, key, ORDERLY_SYNCHRONOUSLY);
    }
    /**
     * 异步发送顺序消息,会阻塞等待消息发送结果
     *
     * @param destination 消息发送目的地(主题:标签)
     * @param msg         消息内容
     * @param key         分区键关键字,相同key的消息发送到同一个队列中
     */
    public void asyncSendOrderly(@NotNull String destination, Object msg, String key, SendCallback callback) {
        selectSendWay(destination, msg, callback, key, ORDERLY_ASYNCHRONOUSLY);
    }
    private MsgSendResult selectSendWay(String destination, Object msg, SendCallback callback, String key, int way) {
        if (!destination.contains(":")) {
            destination = destination + ":" + DEFAULT_TAG;
        }
        if (way == SYNCHRONOUSLY) {
            return new MsgSendResult(rocketMQTemplate.syncSend(destination, msg));
        } else if (way == ASYNCHRONOUSLY) {
            rocketMQTemplate.asyncSend(destination, msg, callback);
        } else if (way == ORDERLY_SYNCHRONOUSLY) {
            return new MsgSendResult(rocketMQTemplate.syncSendOrderly(destination, msg, key));
        } else if (way == ORDERLY_ASYNCHRONOUSLY) {
            rocketMQTemplate.asyncSendOrderly(destination, msg, key, callback);
        }
        return null;
    }
    private MsgSendResult selectSendWay(String topic, String tag, Object msg, SendCallback callback, String key, int way) {
        if (topic.contains(":")) {
            // 这里其实也可以抛异常,提示topic不合法,包含':'
            int i = topic.indexOf(':');
            topic = topic.substring(0, i);
        }
        if (tag == null) {
            tag = DEFAULT_TAG;
        }
        String destination = topic + ":" + tag;
        return selectSendWay(destination, msg, callback, key, way);
    }
}
消息发送结果
// 内部实现可以自定义,这里偷懒,作者只是继承RocketMQ的SendResult
public class MsgSendResult extends SendResult {
    
    private SendResult result;
    
    public MsgSendResult(SendResult result){}
    
}
2.4.消费者(接收消息)
/**
 * @Name: ConsumerDemo
 * @Description: 消费样例
 * @Author: ahao
 * @Date: 2024/4/12 7:35 PM
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "my_topic",
        consumerGroup = "my_consumer_group",
        selectorType = SelectorType.TAG,
        // * 表示匹配所有
        selectorExpression = "*",
        consumeMode = ConsumeMode.CONCURRENTLY,
        messageModel = MessageModel.CLUSTERING
)
public class ConsumerDemo implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("接收到的消息:{}",message);
    }
}
@RocketMQMessageListener 相关属性解析:
- topic:主题
- consumerGroup:消费分组
- selectorType:筛选方式 
  - SelectorType.TAG:根据TAG选择。仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息
- SelectorType.SQL92:根据SQL92表达式选择。支持类似SQL的关键词语法 AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS,NULL
 
- selectorExpression:筛选条件,与selectorType关联
- consumeMode:消费模式 
  - ConsumeMode.CONCURRENTLY:并行处理
- ConsumeMode.ORDERLY:顺序处理
 
- messageModel:消息模式 
  - MessageModel.CLUSTERING:集群模式即负载均衡模式,每一个消息只会被某一个消费者消费一次
- MessageModel.BROADCASTING:广播模式,每个消费者都会消费消息
 
- consumeThreadMax:最大线程数
- consumeTimeout:消息阻塞消费线程的最长时间(以分钟为单位)
- enableMsgTrace:是否启用消息轨迹
- customizedTraceTopic:自定义的消息轨迹主题
- nameServer:命名服务器地址
- accessKey:标识用户身份的字符串(access-key)
- secretKey:密钥(secret-key)
3.实战结果
3.1.消费者服务
启动类
@Slf4j
@SpringBootApplication
public class MyApplication implements ApplicationRunner {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
    @Override
    public void run(ApplicationArguments args) throws Exception {
        TimeUnit.SECONDS.sleep(10000);
    }
}
目录结构

3.2.生产者服务
启动类
@Slf4j
@SpringBootApplication
public class MyApplication implements ApplicationRunner {
    @Resource
    private MessageHelper messageHelper;
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
    @Override
    public void run(ApplicationArguments args) throws Exception {
        new Thread(() -> {
            SimpleDateFormat format = new SimpleDateFormat("hh:MM:ss");
            while (true){
                String curDate = format.format(new Date());
                try {
                    log.info("发送消息:{}",curDate);
                    messageHelper.send("my_topic", curDate);
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        },"producer_test").start();
        TimeUnit.SECONDS.sleep(10000);
    }
    
}
目录结构

3.3.运行日志
生产日志

消费日志




















