(七)Spring Cloud Alibaba 2023.x:RocketMQ 消息队列配置与实现
目录前言准备安装RocketMq服务下载rocketmq服务下载rocketmq 控制台项目集成引入依赖生产者服务配置消费者服务配置发送队列消息前言在微服务架构中异步消息通信是实现系统解耦、提高性能和增强系统可靠性的重要手段。在 Spring Cloud Alibaba 生态中RocketMQ 与 Spring Boot 深度集成提供了开箱即用的消息通信解决方案极大地简化了开发流程提升了系统的扩展性和可维护性 。准备jdk17maven3.9.4idea2023spring cloud: 2023.0.1.0spring cloud alibaba: 2023.0.1源码获取GitHub - /spring-cloud-alibaba-base-demo: 基于spring cloud alibaba生态快速构建微服务脚手架安装RocketMq服务本地window系统安装rocketmq服务下载rocketmq服务当前博文版本使用的是5.2官方下载地址下载 | RocketMQ百度网盘地址百度网盘 请输入提取码 提取码: 92h6下载完成解压在window系统中本地启动rocketmq服务需要配置环境变量步骤和配置与jdk配置相似Path变量中添加环境变量配置完成后进入本地rocketmq的bin文件夹中首先双击mqnamesrv.cmd启动NameServer服务: 它的作用是提供服务发现、路由信息管理、Broker注册和客户端查询等功能NameServer服务启动成功后启动Broker服务它的作用是消息的存储和传递当前文件夹cmd执行命令mqbroker -n localhost:9876下载rocketmq 控制台当前博文版本使用的是2.0.1官方下载地址Releases · apache/rocketmq-dashboard百度网盘地址百度网盘 请输入提取码 提取码: 92h6下载完成解压需要我们自行对项目打包获取jar包在当前文件夹中cmd执行命令打包mvn clean package -Dmaven.test.skiptrue在target文件中找到生成的jar执行命令启动java -jar rocketmq-dashboard-2.0.1.jar浏览器访问地址http://localhost:8080注:如果rocketmq的服务地址或者端口进行过调整那么在打包前自行到项目的application.yml配置文件中更改后再打包项目集成根据前面的博文目前我们已经创建了三个微服务引入依赖分别在子模块生产者和消费者服务的pom.xml文件中引入依赖!-- 引入消息队列 stream rocketmq -- dependency groupIdcom.alibaba.cloud/groupId artifactIdspring-cloud-starter-stream-rocketmq/artifactId /dependency生产者服务配置yml文件配置spring: application: name: http-cloud-producer cloud: stream: #消息中间件 rocketmq: binder: name-server: localhost:9876 bindings: producer-out-0: producer: group: output_1 bindings: producer-out-0: destination: topic0 producer-out-1: destination: topic1相关参数/值解释:output_1 :自定义的消费者组名称producer-out-0 自定义的生产者通道名称topic0 自定义队列名称生成者发送消息服务类实现Service public class MessageProducer { // 自动注入配置文件中绑定的通道 Autowired private StreamBridge streamBridge; // 消息通道 public void sendMessageToOutput0(String messageContent) { MapString, Object headers new HashMap(); headers.put(MessageConst.PROPERTY_TAGS, tag0); MessageString msg new GenericMessage(messageContent, headers); streamBridge.send(producer-out-0, msg); } public void sendMessageToOutput1(String messageContent) { MapString, Object headers new HashMap(); headers.put(MessageConst.PROPERTY_TAGS, tag1); MessageString msg new GenericMessage(messageContent, headers); streamBridge.send(producer-out-1, msg); } }定义控制层接口RestController public class DemoController { Autowired private MessageProducer messageProducer; /** * 发送消息队列 topic0 * param message * return */ GetMapping(value /test/mq/topic0) String sendMqTopic0(RequestParam(message) String message) { messageProducer.sendMessageToOutput0(message); return topic0消息发送成功了~~~; } /** * 发送消息队列 topic1 * param message * return */ GetMapping(value /test/mq/topic1) String sendMqTopic1(RequestParam(message) String message) { messageProducer.sendMessageToOutput1(message); return topic1消息发送成功了~~~; } }消费者服务配置配置yml文件spring: application: name: http-cloud-consumer cloud: stream: #消息中间件 rocketmq: binder: name-server: localhost:9876 bindings: consumer0-in-0: consumer: messageModel: CLUSTERING consumer1-in-0: consumer: messageModel: CLUSTERING bindings: consumer0-in-0: destination: topic0 group: clustering-consumer consumer1-in-0: destination: topic1 group: clustering-consumer相关配置解释consumer0-in-0自定义的消费者通道名称messageModel消费者消费模式集群或者是广播。本文配置的是集群模式。topic0 需要监听的消费者队列名称clustering-consumer 自定义的消费者组名称实现消费者mq消费类Component public class MessageConsumer { /** * 通过方法名称自动绑定到符合条件的消费者通道 * 注当前遇到的难点是一个消费者服务只能实现一个消费方法实现多个消费方法会使消费功能失效 * return */ Bean public ConsumerMessageString consumer1() { return msg - { System.out.println(Thread.currentThread().getName() Consumer1 Receive New Messages: msg); }; } }注从配置可以看到消费者监听了2个队列但上面的MessageConsumer 类只有一个消费方法目前版本mq会自动将配置在第一个的队列和上面的方法绑定但是如果需要监听对多队列并进行消费目前版本博主还未找到实现方案后续有方案再补上。发送队列消息请求接口给队列topic0发送消息查看消费者服务控制台打印成功查看rocketMq控制台消息记录至此Spring Cloud Alibaba集成RocketMq消息队列完成了。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2412638.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!