#RabbitMQ# 消息队列入门

news2025/5/29 11:48:22

目录

一 MQ技术选型

1 运行rabbitmq

2 基本介绍

3 快速入门

1 交换机负责路由消息给队列

2 数据隔离

二 Java客户端

1 快速入门

2 WorkQueue

3 FanOut交换机

4 Direct交换机

5 Topic交换机

*6 声明队列交换机

1 在配置类当中声明

2 使用注解的方式指定

7 消息转换器


*前景引入

维度异步通讯同步通讯RabbitMQ 的定位
交互方式通过中间件间接通信,无阻塞等待直接通信,需实时响应作为异步通讯的核心载体,支持消息缓存与路由
耦合度低(生产者和消费者解耦)高(调用方依赖被调用方可用性)通过队列解耦系统,提升容错性
适用场景高并发、耗时任务、事件驱动架构实时性要求高的简单交互天然适合异步场景,也可通过 RPC 支持同步需求
性能与扩展性高吞吐,支持水平扩展受限于实时响应能力通过集群、负载均衡优化异步性能

一 MQ技术选型

MQ(message Queue)消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broke。

1 运行rabbitmq

在虚拟机上安装Docker_虚拟机安装docker-CSDN博客

拉取镜像

  • docker pull rabbitmq:3-management

在容器当中运行

  • docker run ...

借助端口访问

2 基本介绍

核心概念总结

角色作用类比
Publisher发送消息的程序寄信人
Exchange按规则将消息分发到队列邮局分拣员
Queue存储消息的容器邮箱
Consumer从队列取消息并处理的程序收信人
Virtual Host隔离不同业务的消息环境(如测试、生产)邮局内的独立部门

3 快速入门

1 交换机负责路由消息给队列

添加成功

找到一台交换机

需要添加绑定队列从而实现路由给队列

消息路由成功

2 数据隔离

RabbitMQ 中的 虚拟主机(vhost) 可以用一个简单的比喻来理解:它就像一台大型服务器中的“独立房间”,每个房间都有自己的门禁系统、家具和规则,互不干扰。以下是它的核心作用:

实现:

先添加一个用户

现在这个用户还没有虚拟主机,这里其是无法访问之前创建的队列,是与之前的虚拟主机隔离开的

现在退出原先的用户,以刚刚创建的用户信息登录,然后添加一个虚拟主机

现在就可以在现在的用户之下的虚拟主机上创建新的队列

二 Java客户端

1 快速入门

实现:

1 导入spring-amqp依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2 添加队列

3 配置MQ地址

4 发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testSendMessage2Queue() {
        String queueName = "simple.queue1";
        String msg = "hello, amqp!";
        rabbitTemplate.convertAndSend(queueName, msg);
    }

5 队列

6 在消费者的相关方法中定义

    @RabbitListener(queues = "simple.queue1")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者收到了simple.queue的消息:【" + msg + "】");
    }

7 然后将项目启动,再在测试类中发送消息,控制台会实时监控到发送的消息

8 队列当中的消息拿出来在控制台里面就没有消息了

2 WorkQueue

任务模型:简单来说就是让多个消费者绑定到一个队列,共同消费队列当中的消息。

一个队列多个消费者,可以缓解消息堆积问题。

1 配置项

2 不写的话(默认一人一半,处理不完在队列里等待)

3 新增一个队列

4 两个消费者(消费能力不同,消费能力相同应该是轮询消费)

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了 work.queue的消息:【" + msg + "】");
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2 收到了 work.queue的消息...... :【" + msg + "】");
        Thread.sleep(200);
    }

5 生产者

    @Test
    void testWorkQueue() throws InterruptedException {
        String queueName = "work.queue";
        for (int i = 1; i <= 50; i++) {
            String msg = "hello, worker, message_" + i;
            rabbitTemplate.convertAndSend(queueName, msg);
            Thread.sleep(20);
        }
    }

6 测试

3 FanOut交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种

Fanout模式会将接受到的消息广播到跟其绑定的每一个队列,广播模式。

例子

1 先将队列声明好

2 再声明交换机同时与队列绑定

3 消费者

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2 收到了 fanout.queue2的消息:【" + msg + "】");
    }

4 生产者

    @Test
    void testSendFanout() {
        String exchangeName = "hmall.fanout";
        String msg = "hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName, null, msg);
    }

测试结果:

为什么第二个参数是 null

在你的代码中,第二个参数是 null,这是为了配合 Fanout 交换机 的特性。以下是关键点:

Fanout 交换机的特性

  • Fanout 交换机(也称为广播交换机)会将消息 无条件广播到所有绑定到该交换机的队列完全忽略路由键
  • 因此,在使用 Fanout 交换机时,路由键(routingKey)可以设为 null,因为交换机不会使用它来决定消息的路由规则。

4 Direct交换机

这种交换机可以实现与Fanout交换机相同的效果同时也可以实现定向的效果。

需求

1 创建队列与交换机

(交换机需要给routingKey值)

2 消费者

@RabbitListener(queues = "direct.queue1") // 直接监听名为 direct.queue1 的队列
public void listenDirectQueue1(String msg) {
    System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2") // 直接监听名为 direct.queue2 的队列
public void listenDirectQueue2(String msg) {
    System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg + "】");
}

3 生产者

    @Test
    void testSendDirect() {
        String exchangeName = "hmall.direct";
        String msg = "蓝色通知,警报解除,哥斯拉是放的气球";
        rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
    }

测试:

发送的路由键接收队列触发的消费者
reddirect.queue1, direct.queue2消费者1 + 消费者2
bluedirect.queue1消费者1
yellowdirect.queue2消费者2

可以根据需求更改生产者的代码逻辑:

5 Topic交换机

Topic 交换机是 RabbitMQ 中基于模式匹配的路由机制,允许通过通配符(* 和 #)实现灵活的路由规则。

需求

实现:

声明队列和交换机

消费者

    @RabbitListener(queues = "topic.queue1")
    public void listenTopicQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了 topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(queues = "topic.queue2")
    public void listenTopicQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2 收到了 topic.queue2的消息:【" + msg + "】");
    }

生产者

    @Test
    void testSendTopic() {
        String exchangeName = "hmall.topic";
        String msg = "今天天气挺不错,我的心情的挺好的";
        rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
    }

测试:可以根据需求修改发送的RoutingKey

Direct交换机与Topic的差异

特性Direct 交换机Topic 交换机
路由键匹配方式精确匹配(完全一致)模式匹配(支持通配符 * 和 #
灵活性低(适合简单路由)高(适合复杂路由场景)
典型场景订单状态变更、任务分发日志分类、多维度消息分发

*6 声明队列交换机

为了改善在控制台创建队列交换机的笨重,可以使用相关接口

声明队列和交换机

实现:

1 在配置类当中声明

Fanout的

package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {

    // fanoutExchange 定义交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        // ExchangeBuilder.fanoutExchange("").build();
        return new FanoutExchange("hmall.fanout2");
    }

    //  queue 创建队列
    @Bean
    public Queue fanoutQueue3(){
        // QueueBuilder.durable("ff").build();//持久化
        return new Queue("fanout.queue3");
    }

    // 绑定队列和交换机
    @Bean
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    }

    // 创建队列
    @Bean
    public Queue fanoutQueue4(){
        return new Queue("fanout.queue4");
    }

    // 绑定队列和交换机
    @Bean
    public Binding fanoutBinding4(){
        return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
    }
}

Direct的

package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;

// @Configuration
public class DirectConfiguration {

    //   定义交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("hmall.direct");
    }

    //   创建队列
    @Bean
    public Queue directQueue1() {
        return new Queue("direct.queue1");
    }
    // 队列与交换机进行绑定
    @Bean
    public Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }

    //  队列与交换机进行绑定
    @Bean
    public Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }

    //   创建队列
    @Bean
    public Queue directQueue2() {
        return new Queue("direct.queue2");
    }

    //  队列与交换机进行绑定
    @Bean
    public Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }

    //  队列与交换机进行绑定
    @Bean
    public Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }

}

2 使用注解的方式指定

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1", durable = "true"),
            exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2", durable = "true"),
            exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg) throws InterruptedException {
        System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg + "】");
    }

通过使用 @RabbitListener 的 bindings + @QueueBinding 注解的方式,不需要手动创建队列、交换机或绑定关系

  1. 检查资源是否存在
    Spring 会通过 RabbitAdmin 组件向 RabbitMQ 服务器发起检查,确认队列、交换机是否已存在。

  2. 自动创建缺失的资源

    • 若队列 direct.queue1 或 direct.queue2 不存在,会根据 @Queue 注解的配置(如 namedurable自动创建队列

    • 若交换机 hmall.direct 不存在,会根据 @Exchange 注解的配置(如 nametype自动创建交换机

  3. 自动绑定队列到交换机
    根据 key 指定的路由键,将队列与交换机绑定(如 direct.queue1 绑定 red 和 blue 路由键)。

7 消息转换器

使用

1. SimpleMessageConverter(默认)

  • 行为

    • 支持 Stringbyte[]Serializable 对象。

    • 若消息是 Serializable 对象,使用 Java 原生序列化。

  • 问题

    • 强耦合:发送方和接收方必须有相同的类路径(否则反序列化失败)。

    • 安全性差:Java 原生序列化易受攻击(如反序列化漏洞)。

2. Jackson2JsonMessageConverter(推荐)

  • 行为

    • 将对象转换为 JSON 字符串,再转为 byte[]

    • 反序列化时,将 JSON 还原为对象(需指定目标类型)。

  • 优势

    • 跨语言兼容:JSON 是通用格式,非 Java 客户端也可解析。

    • 松耦合:不强制要求发送方和接收方的类路径一致。

    • 安全性高:避免 Java 原生序列化漏洞

1 依赖引入

        <!--Jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
        </dependency>

2 Bean的创建

    // 消息转换器
    @Bean
    public MessageConverter jacksonMessageConvertor(){
        return new Jackson2JsonMessageConverter();
    }

3 消费者

    @RabbitListener(queues = "object.queue")
    public void listenObject(Map<String, Object> msg) throws InterruptedException {
        System.out.println("消费者 收到了 object.queue的消息:【" + msg + "】");
    }

4 生产者

    @Test
    void testSendObject() {
        Map<String, Object> msg = new HashMap<>(2);
        msg.put("name", "jack");
        msg.put("age", 21);
        rabbitTemplate.convertAndSend("object.queue", msg);
    }

5 在实际业务当中的使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2387336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EPT(Efficient Prompt Tuning)方法,旨在解决提示调优(Prompt Tuning)中效率与准确性平衡和跨任务一致性的问题

EPT(Efficient Prompt Tuning)方法,旨在解决提示调优(Prompt Tuning)中效率与准确性平衡和跨任务一致性的问题 一、核心原理:分解提示与多空间投影 1. 提示分解:用低秩矩阵压缩长提示 传统问题: 长提示(如100个token)精度高但训练慢,短提示(如20个token)速度快但…

云原生安全核心:云安全责任共担模型(Shared Responsibility Model)详解

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 1. 基础概念 什么是云安全责任共担模型&#xff1f; 云安全责任共担模型&#xff08;Shared Responsibility Model, SRM&#xff09;是云服务提供商&…

go并发与锁之sync.Mutex入门

sync.Mutex 原理&#xff1a;一个共享的变量&#xff0c;哪个线程握到了&#xff0c;哪个线程可以执行代码 功能&#xff1a;一个性能不错的悲观锁&#xff0c;使用方式和Java的ReentrantLock很像&#xff0c;就是手动Lock&#xff0c;手动UnLock。 使用例子&#xff1a; v…

[Java恶补day8] 3. 无重复字符的最长子串

给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长 子串 的长度。 示例 1: 输入: s “abcabcbb” 输出: 3 解释: 因为无重复字符的最长子串是 “abc”&#xff0c;所以其长度为 3。 示例 2: 输入: s “bbbbb” 输出: 1 解释: 因为无重复字符的最长子串是 “…

LabVIEW教学用开发平台

一、培训目标 基础编程&#xff1a;掌握 LabVIEW 数据类型、程序结构、子 VI 设计与调试技巧。 硬件通信&#xff1a;精通 RS-232/485、TCP/IP、Modbus、PLC 等工业通信协议及实现。 高级设计模式&#xff1a;熟练运用状态机、生产者 - 消费者模式构建复杂测控系统。 项目实…

Package Size Comparison – 6 Leads

Package Size Comparison 6 LeadsTSOP SOT SM SMT SOT23 SC-74 SC-59 SC-88 SOT363 US6 UMT6 SC-70 SOT563 ES EMT SC-75-6

python打卡day38

Dataset和DataLoader 知识点回顾&#xff1a; Dataset类的__getitem__和__len__方法&#xff08;本质是python的特殊方法&#xff09;Dataloader类minist手写数据集的了解 作业&#xff1a;了解下cifar数据集&#xff0c;尝试获取其中一张图片 在遇到大规模数据集时&#xff0c…

vLLM 核心技术 PagedAttention 原理详解

本文是 vLLM 系列文章的第二篇&#xff0c;介绍 vLLM 核心技术 PagedAttention 的设计理念与实现机制。 vLLM PagedAttention 论文精读视频可以在这里观看&#xff1a;https://www.bilibili.com/video/BV1GWjjzfE1b 往期文章&#xff1a; vLLM 快速部署指南 1 引言&#xf…

《软件工程》第 2 章 -UML 与 RUP 统一过程

在软件工程领域&#xff0c;UML&#xff08;统一建模语言&#xff09;与 RUP&#xff08;统一过程&#xff09;是进行面向对象软件开发的重要工具和方法。接下来&#xff0c;我们将深入探讨第 2 章的内容&#xff0c;通过案例和代码&#xff0c;帮助大家理解和掌握相关知识。 …

(转)Docker与K8S的区别

1 定义角度 Docker是一种开放源码的应用容器引擎&#xff0c;允许开发人员将其应用和依赖包打包成可移植的容器/镜像中&#xff1b;然后&#xff0c;发布到任何流行的 Linux 或 Windows 机器上&#xff0c;也能实现虚拟化。该容器完全使用沙箱机制&#xff0c;彼此之间没有任何…

商用密码 vs 普通密码:安全加密的核心区别

商用密码 vs 普通密码&#xff1a;安全加密的核心区别 一. 引言&#xff1a;密码的世界二. 什么是普通密码&#xff1f;三. 什么是商用密码&#xff1f;四. 普通密码 vs 商用密码&#xff1a;核心区别五. 选择合适的密码方案六. 结语 前言 肝文不易&#xff0c;点个免费的赞和…

MYSQL中的分库分表及产生的分布式问题

分库分表是分布式数据库架构中常用的优化手段&#xff0c;用于解决单库单表数据量过大、性能瓶颈等问题。其核心思想是将数据分散到多个数据库&#xff08;分库&#xff09;或多个表&#xff08;分表&#xff09;中&#xff0c;以提升系统的吞吐量、查询性能和可扩展性。 一&am…

投影机三色光源和单色光源实拍对比:一场视觉体验的终极较量

一、光源技术&#xff1a;从 “单色模拟” 到 “三色原生” 的进化 &#xff08;一&#xff09;单色光源&#xff1a;白光的 “色彩魔术” 单色光源投影机采用单一白光作为基础光源&#xff0c;通过LCD上出现色彩呈现颜色。这种技术路线的优势在于成本可控&#xff0c;早期被广…

电子电气架构 --- 下一代汽车电子电气架构中的连接性

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 钝感力的“钝”,不是木讷、迟钝,而是直面困境的韧劲和耐力,是面对外界噪音的通透淡然。 生活中有两种人,一种人格外在意别人的眼光;另一种人无论…

手写Tomcat(一)

一、Tomcat简介 Tomcat 服务器是一个免费的开放源代码的Web应用服务器&#xff0c;属于轻量级应用服务器&#xff0c;在中小型系统和并发访问用户不是很多的场合下被普遍使用&#xff0c;是开发和调试JSP 程序的首选。 1.1 Tomcat基本架构 Servlet接口文件中定义的方法有以下…

【机器学习基础】机器学习入门核心算法:支持向量机(SVM)

机器学习入门核心算法&#xff1a;支持向量机&#xff08;SVM&#xff09; 一、算法逻辑1.1 基本概念1.2 核心思想线性可分情况 二、算法原理与数学推导2.1 原始优化问题2.2 拉格朗日对偶2.3 对偶问题2.4 核函数技巧2.5 软间隔与松弛变量 三、模型评估3.1 评估指标3.2 交叉验证…

定时清理流媒体服务器录像自动化bash脚本

定时清理流媒体服务器保存录像文件夹 首先创建一个文件,解除读写权限 touch rm_videos.sh chmod 777 rm_videos.sh将内容复制进去&#xff0c;将对应文件夹等需要修改的内容&#xff0c;根据自己的实际需求进行修改 #!/bin/bash# 设置目标目录&#xff08;修改为你的实际路…

Logi鼠标切换桌面失效

Mac上习惯了滑屏切换桌面&#xff0c;所以Logi鼠标也定制了切换桌面的动作&#xff0c;有一天发现这个动作失效了&#xff0c;且只有切换桌面的动作失效。 发现Logi Options出现了这个提示&#xff0c;如图所示&#xff08;具体原因未知&#xff0c;已配置不自动更新版本&…

2025年全国青少年信息素养大赛复赛C++集训(15):因子问题(题目及解析)

2025年全国青少年信息素养大赛复赛C集训&#xff08;15&#xff09;&#xff1a;因子问题&#xff08;题目及解析&#xff09; 题目描述 任给两个正整数N、M&#xff0c;求一个最小的正整数a&#xff0c;使得a和(M-a)都是N的因子。 时间限制&#xff1a;10000 内存限制&…

Vue3进阶教程:1.初次了解vue

1.初次了解vue vue文件目录和各个文件在这里不做介绍 此课程对针对有点vue基础的同学&#xff0c;或者看过我上部分vue的教程 与之前我的Vue教程不同的是&#xff0c;写法和内容有区别 真正的了解Vue3 1.创建vue组件 1.npm create vuelatest 2.取名 3.TS要选上 4.其他先不选 5…