Spring Boot 集成 RocketMQ 全流程指南:从依赖引入到消息收发

news2025/5/11 2:47:25

前言

在分布式系统中,消息中间件是解耦服务、实现异步通信的核心组件。RocketMQ 作为阿里巴巴开源的高性能分布式消息中间件,凭借其高吞吐、低延迟、高可靠等特性,成为企业级应用的首选。而 Spring Boot 通过其“约定优于配置”的设计理念,极大简化了项目开发的复杂度。本文将通过 手动连接配置连接 两种方式,详细讲解如何在 Spring Boot 中集成 RocketMQ,实现消息的同步与异步发送,并提供完整示例代码。

微信图片_20250414010059

一、环境准备

在开始前,请确保:

  1. JDK 17、Maven 3.6+、Spring Boot 2.7+。
  2. 安装RocketMQ服务(本地或远程),推荐使用RocketMQ Docker镜像快速搭建(可参考之前文章)。

二、示例—Springboot集成mq(手动连接)

通过编码方式初始化生产者,适用于需要动态控制资源的场景。

2.1 新建项目

image-20230727182626779

image-20230727182647022

image

2.2 引入依赖

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.4</version>
        </dependency>

image

2.3 生产者发送消息

  • 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为jihaiProducer;
  • 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息
  • producer.start() 启动生产者
  • 构建一个内容为:技海拾贝的消息1,然后指定这个消息往jihaishibei这个topic发送
  • producer.send(msg):发送消息,打印结果
  • 关闭生产者
public class Producer {
    public static void main(String[] args) throws Exception {
        //创建一个生产者,指定生产者组为jihaiProducer
        DefaultMQProducer producer = new DefaultMQProducer("jihaiProducer");

        // 指定NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 第一次发送可能会超时,设置的比较大
        producer.setSendMsgTimeout(60000);

        // 启动生产者
        producer.start();

        // 创建一条消息
        // topic为 jihaishibei
        // 消息内容为 技海拾贝的消息1
        // tags 为 TagA
        Message msg = new Message("jihaishibei", "TagA", "技海拾贝的消息1 ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送消息并得到消息的发送结果,然后打印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }

}

image

启动,发送消息

image

在控制台可以看到这条消息

image

image

这里就能看到发送消息的详细信息。

左下角消息的消费的消费,因为我们还没有消费者订阅这个topic,所以左下角没数据。

2.4 消费者消费消息

  • 创建一个消费者实例对象,指定消费者组为jihaiConsumer
  • 指定NameServer的地址:服务器的ip:9876
  • 订阅 jihaishibei这个topic的所有信息
  • consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。
  • 启动消费者

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 通过push模式消费消息,指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jihaiConsumer");

        // 指定NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅这个topic下的所有的消息
        consumer.subscribe("jihaishibei", "*");

        // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

image

启动服务,进行消费

image

在控制台,发现被jihaiConsumer这个消费者组给消费了。

image

三、示例2—Springboot集成mq(配置连接)

在 Spring Boot 中,可以通过配置文件简化 RocketMQ 的连接配置。以下是在 application.yml​ 文件中进行的配置:

3.1 配置文件修改

image

image

image

spring:
  application:
    name: rocket-mq-demo

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: rocket-mq-demo-producer
    send-message-timeout: 10000
  comsumer:
    group: rocket-mq-demo-comsumer
    send-message-timeout: 10000

image

3.2 添加依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>  
</dependency>

根据需要选择最新版本,从中央仓库可以查看https://central.sonatype.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter

image

image

备注:如果添加rocketmq-client依赖,先注释这个依赖

3.3 消费者service类

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * messageModel=MessageModel.CLUSTERING
 * 监听模式,有消息就会消费
 */
@Service
@RocketMQMessageListener(topic = "jihaishibei-topic", consumerGroup = "rocket-mq-demo-comsumer", messageModel = MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.printf("收到消息: %s\n", s);
    }
}

3.4 生产者service类

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class RocketMQProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    private final String topic = "jihaishibei-topic";
    // 1.同步发送消息
    // 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。
    public void createAndSend(String message){
        rocketMQTemplate.convertAndSend(topic, message);
        System.out.printf("同步发送结果: %s\n", message);
    }
    // 1.同步发送消息
    // 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。
    public void sendSyncMessage(String message){
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());
        System.out.println(sendResult.getMsgId());
        System.out.printf("同步发送结果: %s\n", message);
    }

    // 2.异步发送消息
    // 异步发送是指发送方发送消息后,不等待服务器返回确认信息,而是通过回调接口处理返回结果。这种方式适用于对响应时间要求较高的场景。
    public void sendAsyncMessage(String message){
        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("异步发送成功: %s\n", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.printf("异步发送失败: %s\n", throwable.getMessage());
            }
        });
    }

    // 3.单向发送消息
    // 单向发送是指发送方只负责发送消息,不关心服务器的响应。该方式适用于对可靠性要求不高的场景,如日志收集。
    public void sendOneWayMessage(String message){
        rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());
        System.out.println("单向消息发送成功");
    }
}

3.5 测试controller类

@RequestMapping("api")
@RestController
public class RocketController {
    @Autowired
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/createAndSend")
    public String createAndSend(@RequestParam String message) {
        rocketMQProducer.createAndSend(message);
        return "同步消息发送成功";
    }

    @GetMapping("/sendSync")
    public String sendSync(@RequestParam String message) {
        rocketMQProducer.sendSyncMessage(message);
        return "同步消息发送成功";
    }

    @GetMapping("/sendAsync")
    public String sendAsync(@RequestParam String message) {
        rocketMQProducer.sendAsyncMessage(message);
        return "异步消息发送中";
    }

    @GetMapping("/sendOneWay")
    public String sendOneWay(@RequestParam String message) {
        rocketMQProducer.sendOneWayMessage(message);
        return "单向消息发送成功";
    }
}

3.6 启动服务

image

3.7 测试

同步消息1

image

image

image

同步消息2

image

image

image

异步消息

image

image

image

单向发送消息

image

image

image

四、结束语

本文通过手动连接与配置连接两种方式,展示了Spring Boot与RocketMQ的集成实践。手动连接帮助开发者理解底层API逻辑,而Spring Boot的配置化集成则极大简化了开发流程。无论是同步消息的可靠性保障,还是异步消息的性能优化,RocketMQ均能与Spring Boot无缝协作,为分布式系统提供高效的消息通信能力。

未来可进一步探索集群部署、消息重试机制及监控告警,以实现更健壮的消息服务。希望本文能为开发者快速构建高可用的消息系统提供参考!

求点关注-gif动图 138_爱给网_aigei_com

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2335227.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI与我共创WEB界面

记录一次压测后的自我技术提升 这事儿得从机房停电说起。那天吭哧吭哧做完并发压测,正准备截Zabbix监控图写报告,突然发现监控曲线神秘失踪——系统组小哥挠着头说:“上次停电后,zabbix服务好像就没起来过…” 我盯着空荡荡的图表界面,大脑的CPU温度可能比服务器还高。 其…

基于频率约束条件的最小惯量需求评估,包括频率变化率ROCOF约束和频率最低点约束matlab/simulink

基于频率约束条件的最小惯量评估&#xff0c;包括频率变化率ROCOF约束和频率最低点约束matlab/simulink 1建立了含新能源调频的频域仿真传函模型&#xff0c;虚拟惯量下垂控制 2基于构建的模型&#xff0c;考虑了不同调频系数&#xff0c;不同扰动情况下的系统最小惯量需求

深入理解浏览器的 Cookie:全面解析与实践指南

在现代 Web 开发中&#xff0c;Cookie 扮演着举足轻重的角色。它不仅用于管理用户会话、记录用户偏好&#xff0c;还在行为追踪、广告投放以及安全防护等诸多方面发挥着重要作用。随着互联网应用场景的不断丰富&#xff0c;Cookie 的使用和管理也日趋复杂&#xff0c;如何在保障…

Java 正则表达式综合实战:URL 匹配与源码解析

在 Web 应用开发中&#xff0c;我们经常需要对 URL 进行格式验证。今天我们结合 Java 的 Pattern 和 Matcher 类&#xff0c;深入理解正则表达式在实际应用中的强大功能&#xff0c;并剖析一段实际的 Java 示例源码。 package com.RegExpInfo;import java.util.regex.Matcher; …

【C++】前向声明(Forward Declaration)

前向声明&#xff08;Forward Declaration&#xff09;是在C、C等编程语言中&#xff0c;在使用一个类、结构体或其他类型之前&#xff0c;仅声明其名称而不给出完整定义的一种方式。 作用 减少编译依赖&#xff1a;当一个源文件包含大量头文件时&#xff0c;编译时间会显著增…

numpy.ma.masked_where:屏蔽满足条件的数组

1.函数功能 屏蔽满足条件的数组内容&#xff0c;返回值为掩码数组 2.语法结构 np.ma.masked_where(condition, a, copyTrue)3. 参数 参数含义condition屏蔽条件a要操作的数组copy布尔值&#xff0c;取值为True时&#xff0c;结果复制数组(原始数据不变)&#xff0c;否则返回…

【解决】bartender软件换网之后神秘变慢

下的山寨版本bartender软件&#xff0c;用着一直都挺好&#xff0c;结果一次换网之后&#xff0c;启动&#xff0c;排版&#xff0c;打印各种动作都要转个几分钟才行&#xff0c;非常奇怪。直接说解决过程。 首先联想网络没有动以及脱机的时候&#xff0c;都没有这个问题。那么…

[福游宝——AI智能旅游信息查询平台]全栈AI项目-阶段二:聊天咨询业务组件开发

简言 本项目旨在构建一个以AI智能体为核心的福建省旅游信息查询系统&#xff0c;聚焦景点推荐、路线规划、交通天气查询等功能&#xff0c;为游客提供智能化、便捷化的旅游信息服务。项目采用前后端分离架构&#xff0c;前端基于Vite TypeScript Vue3技术栈&#xff0c;搭配…

【教学类-102-11】蝴蝶外轮廓01——Python对黑白图片进行PS填充三种颜色+图案描边+图案填充白色+制作1图2图6图24图

背景需求: 用Python,对白色255背景的图片进行了透明化、制作点状或线段的描边裁剪线 【教学类-102-10】剪纸图案全套代码09——Python线条虚线优化版04(原图放大白背景)+制作1图2图6图24图-CSDN博客文章浏览阅读1k次,点赞27次,收藏8次。【教学类-102-10】剪纸图案全套代…

MCP的另一面

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

微信小程序 - swiper轮播图

官方文档&#xff1a;https://developers.weixin.qq.com/miniprogram/dev/component/swiper.html <swiper indicator-color"ivory" indicator-active-color"#d43c33" indicator-dots autoplay><swiper-item><image src"/images/banner…

2025年第十六届蓝桥杯省赛C++ 研究生组真题

2025年第十六届蓝桥杯省赛C 研究生组真题 1.说明2.题目A&#xff1a;数位倍数&#xff08;5分&#xff09;3.题目B&#xff1a;IPv6&#xff08;5分&#xff09;4.题目C&#xff1a;变换数组&#xff08;10分&#xff09;5.题目D&#xff1a;最大数字&#xff08;10分&#xff…

七、自动化概念篇

自动化测试概念 自动化测试是把以人为驱动的测试行为转化为机器执行的一种过程。通常&#xff0c;在设计了测试用例并通过评审之后&#xff0c;由测试人员根据测试用例中描述的过程一步步执行测试&#xff0c;得到实际结果与期望结果的比较。在此过程中&#xff0c;为了节省人…

【第43节】实验分析windows异常分发原理

目录 前言 一、异常处理大致流程图 二、实验一&#xff1a;分析 KiTrap03 三、实验二&#xff1a;分析CommonDispatchException 四、代码探究&#xff1a;分析 KiDispatchException 函数 五、代码探究&#xff1a;伪代码分析用户层KiUserExceptionDispatcher 前言 在Wind…

如何在AMD MI300X 服务器上部署 DeepSeek R1模型?

DeepSeek-R1凭借其深度推理能力备受关注&#xff0c;在语言模型性能基准测试中可与顶级闭源模型匹敌。 AMD Instinct MI300X GPU可在单节点上高效运行新发布的DeepSeek-R1和V3模型。 用户通过SGLang优化&#xff0c;将MI300X的性能提升至初始版本的4倍&#xff0c;且更多优化将…

RTX 5060 Ti 3DMark跑分首次流出:比RTX 4060 Ti快20%

快科技4月14日消息&#xff0c;根据VideoCardz拿到的数据&#xff0c;RTX 5060 Ti 16GB在3DMark的系列基准测试中&#xff0c;平均较上一代RTX 4060 Ti 16GB高出20%。 具体来看&#xff0c;RTX 5060 Ti 16GB在3DMark的测试中表现如下&#xff1a; TimeSpy&#xff08;1440p&a…

【STL】set

在 C C C S T L STL STL 标准库中&#xff0c; s e t set set 是一个关联式容器&#xff0c;表示一个集合&#xff0c;用于存储唯一元素的容器。 s e t set set 中的元素会自动按照一定的顺序排序&#xff08;默认情况下是升序&#xff09;。这意味着在 s e t set set 中不能…

深入剖析C++中 String 类的模拟实现

目录 引言 一、基础框架搭建 成员变量与基本构造函数 析构函数 二、拷贝与赋值操作 深拷贝的拷贝构造函数 赋值运算符重载 三、字符串操作功能实现 获取字符串长度 字符串拼接 字符串比较 字符访问 四、迭代器相关实现&#xff08;简单模拟&#xff09; 迭代器类型…

STL之priority_queue的用法与实现

目录 1. priority_queue的介绍 1.1. priority_queue的概念 1.2. priority_queue的特点 2. 仿函数 2.1. 仿函数的概念 2.2. 仿函数的应用 2.3 仿函数的灵活性 3. priority_queue的用法 4. 模拟实现priority_queue 4.1. 插入 4.2. 删除 5. 源码 priority_…

散户使用算法交易怎么做?

智能算法交易是量化交易里面最常见的一种&#xff0c;也是大多数散户被套住的股票&#xff0c;想要解套&#xff0c;降低成本最直接有效的方式。但是往往这种波动速度小&#xff0c;担心速度跟不上的情况&#xff0c;我们就要叠加快速通道。 第一&#xff1a;算法交易的应用场…