rabbitmq五种模式的实现——springboot

news2025/5/12 12:15:26

rabbitmq五种模式的实现——springboot

基础知识和javase的实现形式可以看我之前的博客

代码地址:https://github.com/9lucifer/rabbitmq4j-learning

一、进行集成

(一)Spring Boot 集成 RabbitMQ 概述

Spring Boot 提供了对 RabbitMQ 的自动配置支持,通过 RabbitTemplate@RabbitListener 可以方便地实现消息的生产和消费。以下是基于 Spring Boot 的 RabbitMQ 集成示例。


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的队列中。

1. 配置文件(application.yml)
spring:
  rabbitmq:
    host: 自己服务器的ip       # RabbitMQ 服务器地址
    port: 5672               # RabbitMQ 端口号
    username: admin          # RabbitMQ 用户名
    password: admin          # RabbitMQ 密码
    virtual-host: /          # 虚拟主机(默认是 /)
server:
  port: 8081                 # 应用端口
2. 生产者代码(Controller)
package top.miqiu.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg")
    public String sendMsg(@RequestParam String msg) {
        // 发送消息到队列 boot_queue
        rabbitTemplate.convertAndSend("", "boot_queue", msg);
        return "发送成功: " + msg;
    }
}
关键
  1. RabbitTemplate:Spring 提供的 RabbitMQ 操作模板,用于发送消息。
  2. convertAndSend:发送消息到指定队列。
  3. @GetMapping:定义一个 GET 请求接口,路径为 /sendMsg
效果

image-20250216093813082


(三)消费者代码解析

消费者负责从队列中接收并处理消息。

消费者代码(Listener)
package top.miqiu.controller;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receive {

    @RabbitListener(queuesToDeclare = @Queue("boot_queue"))
    public void consumer(String msg) {
        System.out.println("消息内容为:" + msg);
    }
}
关键
  1. @RabbitListener:监听指定队列,当队列中有消息时,自动调用 consumer 方法。
  2. queuesToDeclare:如果队列不存在,会自动创建队列。
效果

image-20250216093839842

二、工作模式

(一)生产者

  1. 生产者通过 RabbitTemplate 将消息发送到 RabbitMQ 的队列中。

  2. 关键代码解析

    java复制

    rabbitTemplate.convertAndSend("", "boot_work", msg + i);
    
    • rabbitTemplate:RabbitMQ 的核心操作类,用于发送消息。
    • convertAndSend 方法
      • 第一个参数是交换机名称(这里为空字符串,表示默认交换机)。
      • 第二个参数是队列名称(boot_work)。
      • 第三个参数是消息内容(msg + i)。
    • 循环发送:代码中通过循环发送了 20 条消息,每条消息内容为 msg + i
  3. 运行逻辑
    生产者调用 /sendMsg 接口时,会将消息发送到队列 boot_work 中,消息内容为循环生成的字符串。


(二)消费者

  1. 功能描述
    消费者从队列中接收消息并处理,处理完成后发送确认信号。

  2. 关键代码解析

    @RabbitListener(queuesToDeclare = @Queue("boot_work"))
    
    • @RabbitListener:注解用于监听指定队列的消息。
    • queuesToDeclare:声明队列名称(boot_work)。
    • @Queue:声明队列的详细信息。
    public void consumer(
            String msg,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
            Channel channel
    ) throws IOException, InterruptedException {
        Thread.sleep(1000); // 模拟消息处理时间
        channel.basicAck(deliveryTag, true); // 确认消息已处理
        System.out.println("消费者1 消息内容为:" + msg);
    }
    
    • msg:接收到的消息内容。
    • deliveryTag:消息的唯一标识,用于确认消息是否成功处理。
    • Channel:RabbitMQ 的通道,用于执行消息确认操作。
    • basicAck:确认消息已处理,避免消息重复发送。
      • 第一个参数是 deliveryTag
      • 第二个参数是 multiple,表示是否批量确认。
  3. 运行逻辑
    消费者监听队列 boot_work,接收到消息后模拟处理时间(Thread.sleep(1000)),然后通过 channel.basicAck 发送确认信号,表示消息已处理完成。

(三)效果

image-20250216102608684

可以看到休眠的时长不同,消费的速度不同,侧面证明该模式适用于任务分配场景,多个消费者可以并行处理任务,提高效率。

三、Pub/Sub 模式

(一)Pub/Sub 模式概述

Pub/Sub(发布/订阅)模式是一种消息传递模式,生产者将消息发送到一个交换机(Exchange),而不是直接发送到队列。消费者通过绑定交换机来接收消息。这种模式允许多个消费者订阅同一个消息源,实现消息的广播。

(二)生产者

  1. 功能描述
    生产者通过 RabbitTemplate 将消息发送到一个名为 boot-pubsub 的交换机。

  2. 关键代码解析

    rabbitTemplate.convertAndSend("boot-pubsub", "", msg);
    
    • convertAndSend 方法
      • 第一个参数是交换机名称(boot-pubsub)。
      • 第二个参数是路由键(这里为空字符串,表示不指定路由键)。
      • 第三个参数是消息内容(msg)。
    • 交换机类型boot-pubsub 是一个 fanout 类型的交换机,它会将消息广播到所有绑定的队列。
  3. 运行逻辑
    生产者调用 /sendPubsub 接口时,将消息发送到 boot-pubsub 交换机,交换机会将消息广播到所有绑定的队列。

  4. 运行结果

    image-20250216104227902


(三)消费者

  1. 功能描述
    消费者通过绑定到 boot-pubsub 交换机接收消息,并处理接收到的消息。

  2. 关键代码解析

    @RabbitListener(bindings =
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "boot-pubsub", type = "fanout")
            ))
    public void consumer(String msg) {
        System.out.println("consumer 4 消息内容为:" + msg);
    }
    
    • @RabbitListener:注解用于监听消息。
    • @QueueBinding:声明队列与交换机的绑定关系。
      • value = @Queue:声明一个匿名队列。
      • exchange = @Exchange:声明交换机的名称(boot-pubsub)和类型(fanout)。
    • consumer 方法:处理接收到的消息,并打印消息内容。
  3. 运行逻辑
    消费者绑定到 boot-pubsub 交换机,接收所有广播的消息,并打印消息内容。

  4. 运行效果

    image-20250216104255415


(四)Pub/Sub 模式特点

  1. 广播机制
    生产者发送的消息会被广播到所有绑定到交换机的队列,多个消费者可以同时接收相同的消息。
  2. 解耦合
    生产者和消费者之间通过交换机解耦,生产者无需知道消费者的存在,消费者也无需知道生产者的存在。
  3. 动态绑定
    可以动态添加或移除消费者,而无需修改生产者的代码。


四、Routing 模式

(一)Routing 模式概述

Routing 模式是一种基于路由键(Routing Key)的消息路由模式。生产者将消息发送到 direct 类型的交换机,并指定一个路由键。消费者通过绑定到交换机并指定绑定键(Binding Key)来接收消息。只有路由键与绑定键完全匹配时,消息才会被路由到对应的队列。


(二)生产者

  1. 功能描述
    生产者通过 RabbitTemplate 将消息发送到 boot-routing 交换机,并指定路由键(key)。

  2. 关键代码解析

    rabbitTemplate.convertAndSend("boot-routing", key, msg);
    
    • boot-routing:交换机名称,类型为 direct
    • key:路由键,由方法参数传入,用于指定消息的路由规则。
    • msg:消息内容,由方法参数传入。
  3. 运行逻辑
    生产者调用 /sendRouting 接口时,将消息发送到 boot-routing 交换机,并通过路由键决定消息的去向。

image-20250216113239428


(三)消费者

  1. 功能描述
    消费者通过绑定到 boot-routing 交换机接收消息,并处理接收到的消息。

  2. 关键代码解析

    @RabbitListener(bindings =
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "boot-routing", type = "direct"),
                    key = {"trace"}
            ))
    public void consumer(String msg) {
        System.out.println("consumer 6 消息内容为:" + msg);
    }
    
    • @RabbitListener:注解用于监听消息。
    • @QueueBinding:声明队列与交换机的绑定关系。
      • value = @Queue:声明一个匿名队列。
      • exchange = @Exchange:声明交换机的名称(boot-routing)和类型(direct)。
      • key = {"trace"}:绑定键,表示该队列只接收路由键为 trace 的消息。
    • consumer 方法:处理接收到的消息,并打印消息内容。
  3. 运行逻辑
    消费者绑定到 boot-routing 交换机,并通过绑定键 trace 接收匹配的消息。

image-20250216113302874


(四)Routing 模式特点

  1. 精确匹配
    direct 类型的交换机要求路由键与绑定键完全匹配,消息才会被路由到对应的队列。

  2. 多队列绑定
    一个交换机可以绑定多个队列,每个队列可以指定不同的绑定键。

  3. 解耦合
    生产者和消费者之间通过交换机解耦,生产者无需知道消费者的存在,消费者也无需知道生产者的存在。

五、Topic 模式

(一)Topic 模式概述

Topic 模式是一种灵活的发布/订阅消息模式,适用于需要根据动态路由规则分发消息的场景。生产者将消息发送到一个 topic 类型的交换机,并指定一个路由键(Routing Key)。消费者通过绑定交换机并指定匹配规则(Binding Key)来接收消息。topic 类型的交换机支持模糊匹配,允许更灵活的消息路由。

(二)生产者

  1. 功能描述
    生产者通过 RabbitTemplate 将消息发送到 boot-topic 交换机,并指定路由键(key)。

  2. 关键代码解析

    rabbitTemplate.convertAndSend("boot-topic", key, msg);
    
    • boot-topic:交换机名称,类型为 topic
    • key:路由键,由方法参数传入,用于指定消息的路由规则。例如,user.loginuser.register
    • msg:消息内容,由方法参数传入。
  3. 运行逻辑
    生产者调用 /sendTopic 接口时,将消息发送到 boot-topic 交换机,并通过路由键决定消息的去向。交换机会根据绑定规则将消息路由到匹配的队列。

image-20250216123803069


(三)消费者

  1. 功能描述
    消费者通过绑定到 boot-topic 交换机接收消息,并处理接收到的消息。

  2. 关键代码解析

    @RabbitListener(bindings =
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "boot-topic", type = "topic"),
                    key = {"user.*"}
            ))
    public void consumer(String msg) {
        System.out.println("consumer 8 user.* 消息内容为:" + msg);
    }
    
    • @RabbitListener:注解用于监听消息。
    • @QueueBinding:声明队列与交换机的绑定关系。
      • value = @Queue:声明一个匿名队列。
      • exchange = @Exchange:声明交换机的名称(boot-topic)和类型(topic)。
      • key = {"user.\*"}:绑定键,表示该队列只接收路由键以 user. 开头的消息(例如 user.loginuser.register)。
    • consumer 方法:处理接收到的消息,并打印消息内容。
  3. 运行逻辑
    消费者绑定到 boot-topic 交换机,并通过绑定键 user.* 接收匹配的消息。例如,生产者发送路由键为 user.login 的消息时,该消费者会接收并处理该消息。

    image-20250216123818814


(四)Topic 模式特点

  1. 灵活的路由规则
    • topic 类型的交换机支持模糊匹配:
      • *(星号):匹配一个单词。
      • #(井号):匹配零个或多个单词。
    • 例如,路由键为 user.login 的消息可以被绑定键为 user.*user.login 的队列接收。
  2. 动态绑定
    消费者可以根据需要动态绑定不同的队列,而无需修改生产者的代码。
  3. 解耦合
    生产者和消费者之间通过交换机解耦,生产者无需知道消费者的存在,消费者也无需知道生产者的存在。

(五)注意事项

  1. 交换机类型
    确保交换机类型为 topic,因为 topic 类型的交换机支持模糊匹配。
  2. 绑定键的正确性
    消费者需要正确设置绑定键(key),否则无法接收到匹配的消息。
  3. 路由键的格式
    路由键应使用点分隔符(.),例如 user.logintrace.error,以符合 topic 交换机的匹配规则。
  4. 队列声明
    如果使用匿名队列(@Queue),队列会在消费者启动时自动创建,但在 RabbitMQ 管理界面中可能看不到队列名称。如果需要持久化队列,可以显式声明队列名称。
  5. 消息丢失问题
    如果没有消费者绑定到匹配的路由键,消息可能会丢失。可以通过设置交换机的 DLX(Dead Letter Exchange)来处理未消费的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2301430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

23. AI-大语言模型-DeepSeek赋能开发-Spring AI集成

文章目录 前言一、Spring AI 集成 DeepSeek1. 开发AI程序2. DeepSeek 大模型3. 集成 DeepSeek 大模型1. 接入前准备2. 引入依赖3. 工程配置4. 调用示例5. 小结 4. 集成第三方平台(已集成 DeepSeek 大模型)1. 接入前准备2. POM依赖3. 工程配置4. 调用示例…

Educational Codeforces Round 174 (Rated for Div. 2)(ABCD)

A. Was there an Array? 翻译: 对于整数数组 ​,我们将其相等特征定义为数组 ,其中,如果数组 a 的第 i 个元素等于其两个相邻元素,则 ;如果数组 a 的第 i 个元素不等于其至少一个相邻元素,则 …

如何在本机上模拟IP地址

如何在本机上模拟IP地址 前言 在某些开发或测试场景中,我们可能需要在本机上模拟一个指定的 IP 地址,并让局域网内的其他设备能够通过该 IP 访问本机提供的服务(如 Web 服务)。 本文将详细介绍如何在 Windows 和 macOS 系统上实…

【嵌入式Linux应用开发基础】进程间通信(1):管道

目录 一、管道的基本概念 二、管道的工作原理 三、管道的类型 3.1. 匿名管道(Anonymous Pipe) 3.2. 命名管道(Named Pipe,FIFO) 四、管道的读写规则 4.1. 匿名管道的读写规则 4.2. 命名管道的读写规则 五、管…

【DeepSeek】Mac m1电脑部署DeepSeek

一、电脑配置 个人电脑配置 二、安装ollama 简介:Ollama 是一个强大的开源框架,是一个为本地运行大型语言模型而设计的工具,它帮助用户快速在本地运行大模型,通过简单的安装指令,可以让用户执行一条命令就在本地运…

DHCP详解,网络安全零基础入门到精通实战教程!

一、DHCP简介 DHCP(Dynamic Host Configuration Protocol),动态主机配置协议,是一个应用层协议。当我们将客户主机ip地址设置为动态获取方式时,DHCP服务器就会根据DHCP协议给客户端分配IP,使得客户机能够利用这个IP上网。 DHCP前身是BOOTP&am…

【Prometheus】prometheus结合pushgateway实现脚本运行状态监控

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…

立创实战派ESP32-S3烧录小智AI指南

小智 AI 聊天机器人-开源项目介绍 本项目是一个开源项目,主要用于教学目的。我们希望通过这个项目,能够帮助更多人入门 AI 硬件开发,了解如何将当下飞速发展的大语言模型应用到实际的硬件设备中。无论你是对 AI 感兴趣的学生,还是…

深度学习的集装箱箱号OCR识别技术,识别率99.9%

集装箱箱号OCR识别技术是一项结合计算机视觉和规则校验的复杂任务,以下是其关键要点及实现思路的总结: 1、集装箱号结构:11位字符,格式为公司代码(3字母)和序列号(6数字)以及校验码(1数字)和尺寸/类型代码(可选),例如…

如何在Windows下使用Ollama本地部署DeepSeek R1

参考链接: 通过Ollama本地部署DeepSeek R1以及简单使用的教程(超详细) 【DeepSeek应用】DeepSeek R1 本地部署(OllamaDockerOpenWebUI) 如何将 Chatbox 连接到远程 Ollama 服务:逐步指南 首先需要安装oll…

【分布式理论12】事务协调者高可用:分布式选举算法

文章目录 一、分布式系统中事务协调的问题二、分布式选举算法1. Bully算法2. Raft算法3. ZAB算法 三、小结与比较 一、分布式系统中事务协调的问题 在分布式系统中,常常有多个节点(应用)共同处理不同的事务和资源。前文 【分布式理论9】分布式…

postgres源码学习之简单sql查询

postgres源码学习之sql查询 sql查询的主流程读取sql解析sql重写sql获得执行计划执行查询操作结果返回 sql查询的主流程 参考postgres的处理流程 由上一节,我们可以看到,当有新的连接通过权限认证之后,将进入等待接收sql语句,并执…

C#项目05-猜数字多线程

本项目利用多线程,通过点击按钮猜数字, 知识点 线程 基本概念 进程:一组资源,构成一个正在运行的程序,这些资源包括地址空间、文件句柄以及程序启动需要的其他东西的载体。 线程:体现一个程序的真实执行情况, 线…

《C语言动态顺序表:从内存管理到功能实现》

1.顺序表 1.1 概念 顺序存储的线性表,叫顺序表。 1.2顺序表存放的实现方式 可以使用数组存储数据,可以实现逻辑上相连,物理内存上也相连。也可以使用malloc在堆区申请一片连续的空间,存放数据,实现逻辑上相连&#…

通过API 调用本地部署 deepseek-r1 模型

如何本地部署 deepseek 请参考(windows 部署安装 大模型 DeepSeek-R1) 那么实际使用中需要开启API模式,这样可以无拘无束地通过API集成的方式,集成到各种第三方系统和应用当中。 上遍文章是基于Ollama框架运行了deepSeek R1模型…

DeepSeek-学习与实践

1.应用场景 主要用于学习与使用DeepSeek解决问题, 提高效率. 2.学习/操作 1.文档阅读 文档 DeepSeek -- 官网, 直接使用 --- 代理网站 --- 极客智坊 https://poe.com/DeepSeek-R1 https://time.geekbang.com/search?qdeepseek -- 搜索deepseek的资料 资料 20250209DeepSeekC…

撕碎QT面具(6):调节窗口大小后,控件被挤得重叠的解决方法

问题:控件重叠 分析原因:因为设置了最小大小,所以界面中的大小不会随窗口的变化而自动变化。 处理方案:修改mimumSize的宽度与高度为0,并设置sizePolicy为Expanding,让其自动伸缩。 结果展示(自…

解锁机器学习核心算法 | K-平均:揭开K-平均算法的神秘面纱

一、引言 机器学习算法种类繁多,它们各自有着独特的优势和应用场景。前面我们学习了线性回归算法、逻辑回归算法、决策树算法。而今天,我们要深入探讨的是其中一种经典且广泛应用的聚类算法 —— K - 平均算法(K-Means Algorithm&#xff09…

【Linux】匿名管道的应用场景-----管道进程池

目录 一、池化技术 二、简易进程池的实现: Makefile task.h task.cpp Initchannel函数: 创建任务: 控制子进程: 子进程执行任务: 清理收尾: 三、全部代码: 前言: 对于管…

PostgreSQL的学习心得和知识总结(一百六十九)|深入理解PostgreSQL数据库之 Group By 键值消除 的使用和实现

目录结构 注:提前言明 本文借鉴了以下博主、书籍或网站的内容,其列表如下: 1、参考书籍:《PostgreSQL数据库内核分析》 2、参考书籍:《数据库事务处理的艺术:事务管理与并发控制》 3、PostgreSQL数据库仓库…