RabbitMQ最新入门教程

news2025/5/17 21:38:08

文章目录

  • RabbitMQ最新入门教程
    • 1.什么是消息队列
    • 2.为什么使用消息队列
    • 3.消息队列协议
    • 4.安装Erlang
    • 5.安装RabbitMQ
    • 6.RabbitMQ核心模块
    • 7.RabbitMQ六大模式
      • 7.1 简单模式
      • 7.2 工作模式
      • 7.3 发布订阅模式
      • 7.4 路由模式
      • 7.5 主题模式
      • 7.6 RPC模式
    • 8.RabbitMQ四种交换机
      • 8.1 直连交换机
      • 8.2 主题交换机
      • 8.3 扇形交换机
      • 8.4 首部交换机
    • 9.RabbitMQ工作原理
    • 10.RabbitMQ Management使用
      • 10.1 Queues操作
      • 10.2 Exchanges操作
    • 11.在Java中使用RabbitMQ
    • 12.SpringBoot整合RabbitMQ
    • 参考

RabbitMQ最新入门教程

1.什么是消息队列

消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

image-20250512095525228

常用的 MQ 有 ActiveMQ、RabbitMQ、Kafka、RocketMQ。

消息队列诞生时间优点缺点使用场景特点
ActiveMQ2007 年功能全面,支持消息持久化、事务处理、集群等功能;跨平台性和兼容性良好,能与不同系统集成性能在高并发场景下相对其他几款稍逊一筹;配置复杂,容易出现性能问题;社区活跃度不如 RabbitMQ 和 Kafka适用于企业级应用中多种不同类型系统之间的消息通信和集成,例如,在包含多种不同技术架构子系统的大型企业应用中,作为消息桥梁实现各子系统之间的通信和协作功能全面、协议支持丰富、跨平台性好
RabbitMQ2007 年可靠性高,支持多种消息持久化方式;路由功能强大,交换器类型丰富;客户端支持广泛,几乎涵盖所有主流编程语言;社区活跃,文档丰富,易于学习和使用性能相对 Kafka 等稍低,在处理大规模、高并发消息时可能存在瓶颈;Erlang 语言编写,对开发者技术栈有一定要求适用于各种规模的企业级应用,尤其是对消息可靠性和灵活性要求较高的场景,如电商系统中的订单处理、物流通知,金融系统中的交易消息处理等可靠性强、路由灵活、语言支持广泛
Kafka2011 年高吞吐量,能处理海量消息数据,低延迟;支持水平扩展,可通过添加节点增加处理能力;消息持久化到磁盘,数据可靠性高;适用于实时数据处理、日志收集和分析等场景不适合处理大量的小消息;对消息的顺序性支持相对较弱,虽然可以通过一些配置保证,但会影响性能;运维相对复杂,需要一定的技术门槛常用于大数据处理、实时流计算、日志收集与分析等领域,如电商网站的用户行为日志收集、金融行业的交易流水记录等,以便进行实时分析和处理高吞吐量、可扩展性好、数据持久化
RocketMQ2012 年支持大规模消息堆积和高并发消息处理;提供丰富的消息过滤机制和事务消息功能;扩展性和运维管理能力良好开源社区活跃度相比 RabbitMQ 和 Kafka 稍低;对某些复杂场景的支持可能不够灵活,需要一定的二次开发在电商、金融等领域应用广泛,尤其适用于对消息可靠性、事务性和高并发处理要求较高的场景,如电商的订单支付、库存变更等业务场景支持高并发、消息过滤功能强大、事务消息支持

2.为什么使用消息队列

  • 解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

    img

  • 异步。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

    img

  • 削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃

    img

3.消息队列协议

常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka、OpenMessage协议。RabbitMQ 实现的两个核心协议:AMQP 1.0 和 AMQP 0-9-1。

协议名称简介特点应用场景
OpenWireActiveMQ 默认的通信协议,基于 TCP/IP,提供了丰富的消息传递功能,支持多种编程语言。支持多种消息传递模式,如点对点、发布 / 订阅等;具有较好的兼容性和扩展性。适用于企业级应用中,需要可靠消息传递和多种消息模式支持的场景。
AMQP(高级消息队列协议)一种面向消息中间件的开放标准协议,具有丰富的功能集,支持事务、消息确认等。具有很强的规范性和通用性,支持多种语言,能保证消息的可靠传递和处理。常用于金融、电商等对数据一致性和可靠性要求较高的领域。
MQTT(消息队列遥测传输)轻量级的发布 / 订阅模式的消息传输协议,基于 TCP/IP,具有低带宽、低功耗、高可靠性等特点。采用简单的客户端 - 服务器架构,消息以主题进行分类,适合资源受限的设备和网络环境。广泛应用于物联网、移动应用、实时监控等场景,如智能家居设备之间的通信。
Kafka由 Apache 开发的分布式流平台,其协议主要用于处理大规模的实时数据。具有高吞吐量、可扩展性、持久性和容错性等特点,支持消息的批量处理和异步处理。常用于大数据处理、实时数据处理、日志收集等场景,如电商网站的用户行为日志收集和分析。
OpenMessage是阿里云开源的新一代分布式消息中间件协议,旨在提供高性能、高可靠、高可扩展的消息通信能力。支持大规模集群部署,具有低延迟、高并发的特点,同时提供了丰富的消息特性,如事务消息、顺序消息等。适用于互联网公司的大规模分布式系统,尤其是对消息处理性能和可靠性要求极高的场景,如电商的订单系统、物流系统等。

4.安装Erlang

  • 安装 Erlang。RabbitMQ 服务器是用 Erlang 语言编写的,它的安装包里并没有集成 Erlang 的环境,因此需要先安装 Erlang。下载链接,傻瓜式安装,一直点next就行。

    image-20250512121239433

  • 为 Erlang 配置环境变量。「编辑系统环境变量」->「环境变量」->「系统变量」->「path」->「编辑」->「新建」,填入 Erlang 的 bin 路径(D:\Erlang\Erlang OTP\bin)。

    image-20250512122438921

  • 验证是否安装成功。「cmd」->「输入erl -version

    image-20250512122616612

5.安装RabbitMQ

  • 安装 RabbitMQ 服务器端,下载链接,傻瓜式安装,一直点 next 就行。

  • 安装完成后,进入安装目录(sbin 目录下),运行 cmd,输入 rabbitmqctl.bat status 可确认 RabbitMQ 的启动状态。

    image-20250512171706494

  • 输入 rabbitmq-plugins enable rabbitmq_management,启用客户端管理 UI 的插件。

    image-20250512172020721

  • 在浏览器地址栏输入 http://localhost:15672/ 进入管理端界面。账号与密码默认是:guest/guest。出现如下界面,则表示安装成功完成。

    image-20250512172333833

note:在通过 rabbitmqctl.bat status 查看 RabbitMQ 的启动状态时,出现了错误。Error: unable to perform an operation on node 'rabbit@WIN-H34GJTET6NT'. Please see diagnostics information and suggestions below. 这个错误是因为 erlang 和 rabbitmq 的 .erlang.cookie 不同步所导致的。

解决办法:将 C:\Users\Administrator\.erlang.cookie 复制到 C:\Windows\System32\config\systemprofile 目录,重启 rabbitMQ 服务。

6.RabbitMQ核心模块

image-20250512175323132

  • Publisher:生产者,发送消息给交换机。
  • consumer:消费者,消费消息,和队列进行绑定。
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。交换机只能路由消息,无法存储消息。交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定。
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理。
  • Virtual Host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,因为RabiitMQ性能很强,单个项目使用会造成巨大的浪费,所以多个项目,实现一套MQ,virtual host就是为了不同交换机产生隔离。
  • Broker:就是 RabbitMQ 服务,用于接收和分发消息,接受客户端的连接,实现 AMQP 实体服务。
  • Connection:连接,生产者/消费者与 Broker 之间的 TCP 网络连接。
  • Channel:网络信道,如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立连接的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由 Properties 和 Body 组成。Properties 为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body 就是消息体内容。
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个 RoutingKey。
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个 RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。

7.RabbitMQ六大模式

7.1 简单模式

简单模式是最基本的工作模式,也是最简单的消息传递模式。在简单模式中,一个生产者将消息发到一个队列中,一个消费者从队列中获取并消费消息。这种模式适用于单个生产者和单个消费者的简单场景,消息的处理是同步的。

image-20250513171215013

7.2 工作模式

工作模式用于实现一个任务在多个消费者之间的并发处理,在工作队列模式中,一个生产者将消息发到一个队列中,多个消费者从队列中获取并处理消息,每个消息只能被一个消费者处理。这种模式适用于多个消费者并发处理消息的情况,提高了系统的处理能力和吞吐量。

image-20250513171518789

7.3 发布订阅模式

发布/订阅模式用于实现一条消息被多个消费者同时接受和处理。在发布/订阅模式中,一个生产者将消息发送到交换器(Exchange)中,交换器将消息广播到所有绑定的队列,每个队列对应一个消费者。这种模式适用于消息需要被多个消费者同时接受和处理的广播场景,如日志订阅和消息通知等。

image-20250513171714911

7.4 路由模式

路由模式用于实现根据消息的路由键(Routing Key)将消息路由到不同的队列中。在路由模式中,一个生产者将消息发送到交换器中,并制定消息的路由键,交换器根据路由键将消息路由到与之匹配的队列中。这种模式适用于根据不同的条件将消息分发到不同的队列中,以实现消息的筛选和分发。

image-20250513171833186

7.5 主题模式

主题模式是一种更灵活的消息路由,它使用通配符匹配路由键,将消息路由到多个队列中。在主题模式中,一个生产者将消息发送到交换器中,并指定主题(Topic)作为路由键,交换器根据通配符匹配将消息路由到与之匹配的队列中。

image-20250513172016870

7.6 RPC模式

RPC模式是一种用于实现分布式系统中远程调用的工作模式。指的是通过RabbitMQ来实现一种RPC的能力。

image-20250513172138672

8.RabbitMQ四种交换机

生产者发布消息、消费者接收消息,但是这中间的消息是怎么传递的,就用到了一个很重要的概念交换机(Exchange),RabbitMQ 消息投递到交换机上之后,通过路由关系再投递到指定的一个或多个队列上。

Exchange 参数介绍

  • Name:交换机名称
  • Type:交换机类型 direct、topic、fanout、headers
  • Durability:是否需要持久化
  • Auto Delete:最后一个绑定到 Exchange 上的队列删除之后自动删除该 Exchange
  • Internal:当前 Exchange 是否应用于 RabbitMQ 内部使用,默认false。
  • Arguments:扩展参数

Exchange 四种类型

  • direct:不需要 Exchange 进行绑定,根据 RoutingKey 匹配消息路由到指定的队列。
  • topic:生产者指定 RoutingKey 消息根据消费端指定的队列通过模糊匹配的方式进行相应转发,两种通配符模式:
    • #:可匹配一个或多个关键字
    • *:只能匹配一个关键字
  • fanout:这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的。
  • headers:根据发送消息内容中的 headers 属性来匹配

8.1 直连交换机

direct 通过 RoutingKey 匹配消息路由到指定的队列,因此也可以无需指定交换机,在不指定交换机的情况下会使用 AMQP default 默认的交换机,另外在消息投递时要注意 RoutingKey 要完全匹配才能被队列所接收,否则消息会被丢弃的。

image-20250513174139959

上图三个队列,第一个队列的 Binding routingKey 为 black,第二个队列和第三个队列的 Binding routingKey 为 green,也很清晰的能看到消息投递 1 仅被 Queue1 接收,而 消息投递 2 同时可以被广播到 Queue2 和 Queue3,这是因为 Queue2 和 Queue3 的路由键是相同的,再一次的说明了交换机的 direct 模式是通过 RoutingKey 进行消息路由的

8.2 主题交换机

生产者指定 RoutingKey ,消费端根据指定的队列通过模糊匹配的方式进行相应转发。

image-20250513174553662

上图展示了交换机 Topic 模式的消息流转过程,Queue1 的路由键通过使用 \* 符合匹配到了 black.test1 和 black.test2 但是 black.test3.1 由于有多个关键字是匹配不到的。另一个队列 Queue2 使用了 # 符号即可以一个也可以匹配多个关键字,同时匹配到了 black.test4 和 black.test5.1。

8.3 扇形交换机

fanout 只需要将队列绑定到交换机上即可,是不需要设置路由键的,便可将消息转发到绑定的队列上,由于不需要路由键,所以 fanout 也是四个交换机类型中最快的一个,如果是做广播模式的就很适合。

image-20250513174819470

8.4 首部交换机

Headers 类型的交换机是根据发送消息内容中的 headers 属性来匹配的,headers 类型的交换机基本上不会用到,因此这里也不会过多介绍,掌握以上三种类型的交换机模型在平常的业务场景中就足够了。

9.RabbitMQ工作原理

image-20250516215833230

AMQP 协议模型由三部分组成:生产者、消费者和服务端,执行流程如下:

  • 生产者是连接到 Server,建立一个连接,开启一个信道。
  • 生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。
  • 消费者也需要进行建立连接,开启信道等操作,便于接收消息。
  • 生产者发送消息,发送到服务端中的虚拟主机。
  • 虚拟主机中的交换器根据路由键选择路由规则,发送到不同的消息队列中。
  • 订阅了消息队列的消费者就可以获取到消息,进行消费。

10.RabbitMQ Management使用

通过 http://localhost:15672 访问 RabbitMQ 的控制台管理工具 RabbitMQ Management,用户名/密码都是 guest。

10.1 Queues操作

  • 创建队列。点击「导航栏Queues」->「Add a new queue」展开队列信息,填好信息后,add 添加。

    image-20250513194945485

  • 添加成功后,可以在 All queues 下看到添加的队列。点击队列名就会跳转到队列详情页面。

    image-20250513195437252

  • 在队列详情页面,可以进行一些操作,如下图所示。

    image-20250513195833032

10.2 Exchanges操作

  • 创建队列。点击「导航栏Exchanges」->「Add a new exchange」展开交换机信息,填好信息后,add 添加。

    image-20250513201112757

  • 在交换机详情页面,同样可以进行一些操作,如下图所示。

    image-20250513201917895

11.在Java中使用RabbitMQ

模拟一个最简单的场景,一个生产者发送消息到队列中,一个消费者从队列中读取消息并打印。

  • 第一步,在项目中添加 RabbitMQ 客户端依赖:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.22.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.16</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.16</version>
        <!-- 注意,若无type为jar则报错-->
        <type>jar</type>
    </dependency>
    

    note:如果没有导入 slf4j-api 和 slf4j-simple 依赖,会报错 SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”. SLF4J: Defaulting to no-operation (NOP) logger implementation. 如下:

    image-20250513233624699

  • 第二步,创建生产者类。

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    public class Producer {
        // 声明队列名称,设置为final static表示全局唯一常量
        private final static String QUEUE_NAME = "test";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建RabbitMQ连接工厂实例
            ConnectionFactory factory = new ConnectionFactory();
    
            // 使用try-with-resources自动关闭连接和通道
            try(Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()){
                
                // 声明持久化队列(durable=true),若队列不存在则创建
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                
                // 定义要发送的消息内容
                String message = "this is a rabbitmq test in java";
                
                // 发布消息到默认交换机(""),路由到指定队列
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                
                // 打印发送成功的消息内容
                System.out.println("发送消息:"+ message);
            }
            // 资源会在此自动关闭
        }
    }
    
  • 第三步,创建消费者类。

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        // 与生产者保持一致的队列名称
        private final static String QUEUE_NAME = "test";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建RabbitMQ连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            // 建立TCP连接
            Connection connection = factory.newConnection();
            // 创建AMQP通道(轻量级连接)
            Channel channel = connection.createChannel();
    
            // 声明持久化队列(需与生产者配置一致)
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            System.out.println("等待接收消息");
    
            // 定义消息回调函数,处理接收到的消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                // 将字节数组转换为UTF-8字符串
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("接收消息:" + message);
            };
    
            // 启动消费者并设置自动确认模式(autoAck=true)
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { 
                // 消费者取消时的回调(空实现)
            });
        }
    }
    
  • 第四步,运行,查看效果。

    image-20250513235510056

12.SpringBoot整合RabbitMQ

  • 新建一个 SpringBoot 项目。

    image-20250514093049642

  • 修改 pom.xml 文件,添加 RabbitMQ 依赖。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.5.9</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <groupId>org.example</groupId>
        <artifactId>RabbitMQSpring</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>RabbitMQSpring</name>
        <description>RabbitMQSpring</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
  • 将配置文件 application.properties 重命名为 application.yml,并修改配置文件的内容。

    spring:
      application:
        name: RabbitMQSpring
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        virtual-host: /
    
    server:
      port: 8088
    
    logging:
      level:
        org.example.rabbitmqspring: debug
    
  • 在项目根目录下创建 config 包,在 config 包下创建一个 RabbitMQ 配置类 RabbitMQConfig。

    package org.example.rabbitmqspring.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMQConfig {
    
        // 交换机的名称
        public static final String DEFAULT_EXCHANGE = "exchange";
    
        // 路由Key的名称
        public static final String DEFAULT_ROUTE = "route";
    
        // 队列的名称
        public static final String DEFAULT_QUEUE = "queue";
    
        /**
         * 声明交换机
         * @return DirectExchange
         */
        @Bean
        public DirectExchange exchange() {
            return new DirectExchange(DEFAULT_EXCHANGE);
        }
    
        /**
         * 声明队列
         * @return Queue
         */
        @Bean
        public Queue queue(){
            return new Queue(DEFAULT_QUEUE);
        }
    
        /**
         * 声明路由Key(交换机和队列的关系)
         * @param exchange DirectExchange
         * @param queue Queue
         * @return Binding
         */
        @Bean
        public Binding binding(DirectExchange exchange, Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with(DEFAULT_ROUTE);
        }
    
    }
    
  • 在项目根目录下创建 producer 包,在 producer 包下创建一个 RabbitProducer 类。

    package org.example.rabbitmqspring.producer;
    
    import org.example.rabbitmqspring.config.RabbitMQConfig;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * RabbitMQ消息生产者组件,负责将消息发送到指定队列
     * 基于Spring AMQP的RabbitTemplate实现消息发送
     */
    @Component  // 声明为Spring管理的组件,会自动扫描并注册到应用上下文中
    public class RabbitProducer {
    
        private final RabbitTemplate rabbitTemplate;  // RabbitMQ操作模板,用于发送和接收消息
    
        @Autowired  // 通过构造器注入RabbitTemplate实例
        public RabbitProducer(RabbitTemplate rabbitTemplate){
            this.rabbitTemplate = rabbitTemplate;
        }
    
        /**
         * 发送消息到默认交换机和路由
         * @param message 待发送的消息内容
         */
        public void sendMessage(String message){ 
            // 调用RabbitTemplate将消息转换并发送到指定交换机和路由键
            rabbitTemplate.convertAndSend(RabbitMQConfig.DEFAULT_EXCHANGE, RabbitMQConfig.DEFAULT_ROUTE, message);
        }
    }
    
  • 在项目根目录下创建 consumer 包,在 consumer 包下创建一个 RabbitConsumer 类。

    package org.example.rabbitmqspring.consumer;
    
    import org.example.rabbitmqspring.config.RabbitMQConfig;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * RabbitMQ消息消费者组件,自动监听配置的默认队列
     * 基于Spring AMQP的注解驱动模型实现消息监听
     */
    @Component  // 声明为Spring组件,由Spring容器管理实例化
    @RabbitListener(queues = RabbitMQConfig.DEFAULT_QUEUE)  // 监听配置类中定义的默认队列
    public class RabbitMQConsumer {
    
        /**
         * 消息处理方法,当队列接收到新消息时自动触发
         * @param message 从队列中获取的字符串消息体
         */
        @RabbitHandler  // 声明为RabbitMQ消息处理方法
        public void receive(String message){
            System.out.printf("收到一条消息: %s", message);
        }
    }
    
  • 在项目根目录下创建 controller 包,在 controller 包下创建一个 MessageController 类。

    package org.example.rabbitmqspring.controller;
    
    import org.example.rabbitmqspring.producer.RabbitProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController  // 声明该类为RESTful风格的控制器,会自动将返回值转换为JSON格式
    @RequestMapping(path = "/message", produces = "application/json;charset=utf-8")  // 映射请求路径,设置响应内容类型为JSON且字符集为UTF-8
    public class MessageController {
    
        private final RabbitProducer producer;  // 依赖注入RabbitMQ消息生产者
    
        @Autowired  // 自动注入RabbitProducer实例
        public MessageController(RabbitProducer producer){
            this.producer = producer;
        }
    
        @RequestMapping(value = "/send", method = RequestMethod.POST)  // 映射/send路径的POST请求
        public void sendMessage(String message){  // 处理发送消息的请求,接收一个字符串类型的消息
            producer.sendMessage(message);  // 调用生产者的发送消息方法
        }
    }
    
  • 使用 API 测试工具,测试发送消息。

    测试工具推荐:Postman、Apifox。

    由于 Postman 需要访问外网才能使用,不支持中文等使用门槛,对于国内开发者来说并不是一个最好的首选 API 管理工具,所以 Apifox 会更适合我们使用。

    image-20250515191210914

    image-20250515191453107

🤗🤗🤗

参考

  • 超详细的RabbitMQ入门,看这篇就够了!
  • 图文实践 RabbitMQ 不同类型交换机消息投递机制
  • RabbitMQ的介绍
  • RabbitMQ安装和使用详细教程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2377998.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python爬虫实战训练

前言&#xff1a;哇&#xff0c;今天终于能访问豆瓣了&#xff0c;前几天爬太多次了&#xff0c;网页都不让我访问了&#xff08;要登录&#xff09;。 先来个小练习试试手吧&#xff01; 爬取豆瓣第一页&#xff08;多页同上篇文章&#xff09;所有电影的排名、电影名称、星…

Redis(三) - 使用Java操作Redis详解

文章目录 前言一、创建项目二、导入依赖三、键操作四、字符串操作五、列表操作六、集合操作七、哈希表操作八、有序集合操作九、完整代码1. 完整代码2. 项目下载 前言 本文主要介绍如何使用 Java 操作 Redis 数据库&#xff0c;涵盖项目创建、依赖导入及 Redis 各数据类型&…

【全网首发】解决coze工作流批量上传excel数据文档数据重复的问题

注意&#xff1a;目前方法将基于前一章批量数据库导入的修改&#xff01;&#xff01;&#xff01;&#xff01;请先阅读上篇文章的操作。抄袭注明来源 背景 上一节说的方法可以批量导入文件到数据库&#xff0c;但是无法解决已经上传的条目更新问题。简单来说&#xff0c;不…

xss-labs靶场第11-14关基础详解

前言&#xff1a; 目录 第11关 第12关 第13关前期思路&#xff1a; 第十四关 内容&#xff1a; 第11关 也和上一关一样&#xff0c;什么输入框都没有&#xff0c;也就是 也是一样的操作&#xff0c;先将这里的hidden属性删掉一个&#xff0c;注意是删掉一个 输入1111&a…

ConcurrentSkipListMap的深入学习

目录 1、介绍 1.1、线程安全 1.2、有序性 1.3、跳表数据结构 1.4、API 提供的功能 1.5、高效性 1.6、应用场景 2、数据结构 2.1、跳表&#xff08;Skip List&#xff09; 2.2、节点类型&#xff1a; 1.Node 2.Index 3.HeadIndex 2.3、特点 3、选择层级 3.1、随…

XML简要介绍

实际上现在的Java Web项目中更多的是基于springboot开发的&#xff0c;所以很少再使用xml去配置项目。所以我们的目的就是尽可能快速的去了解如何读懂和使用xml文件&#xff0c;对于DTD&#xff0c;XMLSchema这类约束的学习可以放松&#xff0c;主要是确保自己知道这里面的大致…

什么是直播美颜SDK?美颜技术底层算法科普

当下&#xff0c;不论是社交直播、电商直播&#xff0c;还是线上教学、虚拟主播场景&#xff0c;都离不开美颜技术的加持。虽然大家在日常使用直播APP时经常体验到美颜效果&#xff0c;但背后的技术原理却相对复杂。本篇文章小编将为大家揭开直播美颜SDK的神秘面纱&#xff0c;…

【pbootcms】打开访问首页显示未检测到您服务器环境的sqlite3数据库拓展,请检查php.ini中是否已经开启该拓展

【pbootcms】新建网站&#xff0c;新放的程序&#xff0c;打开访问首页显示未检测到您服务器环境的sqlite3数据库拓展&#xff0c;请检查php.ini中是否已经开启该拓展。 检查目前网站用到哪个版本的php&#xff0c;然后打开相关文件。 修改一下内容&#xff1a; 查找sqlite3,…

MySQL——十、InnoDB引擎

MVCC 当前读&#xff1a; 读取的是记录的最新版本&#xff0c;读取时还要保证其他并发事务不能修改当前记录&#xff0c;会对读取的记录进行加锁。 -- 当前读 select ... lock in share mode(共享锁) select ... for update update insert delete (排他锁)快照读&#xff1a;…

visual studio生成动态库DLL

visual studio生成动态库DLL 创建动态库工程 注意 #include “pch.h” 要放在上面 完成后点击生成 创建一个控制台项目 设置项目附加目录为刚才创建的动态库工程Dll1&#xff1a; 配置附加库目录&#xff1a; 配置动态库的导入库&#xff08;.lib&#xff09;&#xff1a;链…

IDEA中git对于指定文件进行版本控制

最近在自己写代码的时候遇到了和公司里面不一样的&#xff0c;自己写的代码推到码云上是&#xff0c;会默认对于所有修改都进行提交&#xff0c;这样再提交的时候很不方便。 问了问ai&#xff0c;表示可以手动创建脚本实现&#xff0c;但是ai曲解了我的意思&#xff0c;它实现…

用Python绘制梦幻星空

用Python绘制梦幻星空 在这篇教程中&#xff0c;我们将学习如何使用Python创建一个美丽的星空场景。我们将使用Python的图形库Pygame和随机库来创建闪烁的星星、流星和月亮&#xff0c;打造一个动态的夜空效果。 项目概述 我们将实现以下功能&#xff1a; 创建深蓝色的夜…

GEE计算 RSEI(遥感生态指数)

&#x1f6f0;️ 什么是 RSEI&#xff1f;为什么要用它评估生态环境&#xff1f; RSEI&#xff08;遥感生态指数&#xff0c;Remote Sensing Ecological Index&#xff09; 是一种通过遥感数据计算得到的、综合反映区域生态环境质量的指标体系。 它的设计初衷是用最少的变量&…

python的家教课程管理系统

目录 技术栈介绍具体实现截图系统设计研究方法&#xff1a;设计步骤设计流程核心代码部分展示研究方法详细视频演示试验方案论文大纲源码获取/详细视频演示 技术栈介绍 Django-SpringBoot-php-Node.js-flask 本课题的研究方法和研究步骤基本合理&#xff0c;难度适中&#xf…

实现书签-第一部分

实现书签-第一部分 本节我们将实现书签功能&#xff0c;为菜谱点击类似于收藏的功能&#xff0c;然后可以在上方的书签找到我们所有收藏的书签&#xff1b; 在此之前&#xff0c;让我们修复一下之前的功能BUG&#xff0c;当我们搜索的时候&#xff0c;下面分页始终保持在上一…

解决将其他盘可用空间,移植到C盘

第一步首先下载安装 用来扩内存盘的实用工具资源-CSDN文库 第二步打开diskgenius.exe 第三步选中想扩容的盘 右击-》选择扩容分区-》选择要缩小的分区-》然后确定 第四步拖拽对勾的地方 或者在箭头地方输入想阔的大小&#xff0c;然后开始&#xff0c;一直确定&#xff0c;就…

第二天的尝试

目录 一、每日一言 二、练习题 三、效果展示 四、下次题目 五、总结 一、每日一言 清晰的明白自己想要的是什么&#xff0c;培养兴趣也好&#xff0c;一定要有自己的一技之长。我们不说多优秀&#xff0c;但是如果父母需要我们出力&#xff0c;不要只有眼泪。 二、练习题 对…

k8s灰度发布

基于 Traefik 的加权灰度发布-腾讯云开发者社区-腾讯云 Traefik | Traefik | v1.7 Releases traefik/traefik GitHub 从上面连接下载后上传到harbor虚拟机 vagrant upload /C/Users/HP280/Downloads/traefik 下载配置文件 wget -c http://raw.githubusercontent.com/conta…

前端面经 9 JS中的继承

借用Class实现继承 实现继承 extends super extends 继承父类 super调用父类的构造函数 子类中存在方法采取就近原则 &#xff0c;子类构造函数需要使用super()调用父类的构造函数 JS 静态属性和私有属性 寄生组合式继承

memcached主主复制+keepalive

一、Memcached主主复制技术原理 Memcached原生不支持复制&#xff0c;需通过repcached分支实现双向同步。其关键机制包括&#xff1a; 双向同步架构 两节点互为主备&#xff08;Master-Master&#xff09;&#xff0c;任意节点写入的数据会同步至对端。同步基于TCP协议&#x…