【SpringCloud学习笔记】RabbitMQ(中)

news2025/7/11 11:48:07

1. 交换机概述

前面《RabbitMQ上篇》我们使用SpringAMQP来演示如何用Java代码操作RabbitMQ,当时采用的是生产者直接将消息发布给队列,但是实际开发中不建议这么做,更加推荐生产者将消息发布到交换机(exchange),然后由exchange路由到队列,其架构如下所示:

可以看出,在发布-订阅模型中新增一个"交换机"角色,此后各个角色的任务如下:

  • publisher:不再是将message直接转发到queue,而是将message转发给exchange
  • exchange:一方面接收来自publisher生产的消息;另一方面,依据route key以及type将消息路由给绑定的不同的队列
  • queue:与以前一样,暂存消息,供消费者消费,另外还需要同交换机建立绑定关系
  • consumer:与以前一样,订阅queue中的消息,并进行业务处理消费消息

注意:由于我们的exchange不暂存消息,只做消息的路由,因此如果没有queue与exchange绑定或者routing key设置错误,就会导致消息丢失!!!

2. 交换机类型

RabbitMQ提供的交换机类型有如下四种:

  1. Fanout Exchange:扇出交换机,形象来说就是"广播交换机",会将消息路由给所有绑定的queue
  2. Direct Exchange:定向交换机,基于RoutingKey发给订阅的queue
  3. Topic Exchange: 通配符订阅,在Direct的基础上引入通配符
  4. Headers Exchange: 头匹配,基于MQ的消息头匹配,使用场景较少(此处不讲解)

2.1 Fanout Exchange

下面是Fanout Exchange的工作流程图:

特征:Fanout Exchange将消息路由给全部跟它绑定的queue
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:fanout.queue1、fanout.queue2image.png
  2. 在RabbitMQ控制台中新建一个Fanout类型的Exchange:fanout.exchange

image.png

  1. 将fanout.exchange与fanout.queue1、fanout.queue2分别建立binding关系

image.png

  1. 新建两个方法用于模拟consumer,分别监听fanout.queue1以及fanout.queue2队列
/**
 * 订阅fanout.queue1队列
 * @param msg 消息
 */
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    log.info("listener1 从【fanout.queue1】接收到消息:" + msg);
}

/**
 * 订阅fanout.queue2队列
 * @param msg 消息
 */
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    log.info("listener2 从【fanout.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给fanout.exchange
/**
 * 测试FanoutExchange交换机类型
 */
@Test
public void testFanoutExchange() {
    // 1. 定义exchange名称
    String exchangeName = "fanout.exchange";
    // 2. 定义消息体
    String msg = "震惊!某大学频频被曝出食堂安全问题";
    // 3. 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
  1. 观察结果

image.png
结果如上图所示:说明fanout.exchange雀氏将消息广播给了所有与之绑定的queue

2.2 Direct Exchange

特点:Direct Exchange要求在与queue建立binding关系的时候定义一个BindingKey,之后publisher生产者携带消息的同时也会指定RoutingKey,只有RoutingKey与BindingKey一致的queue才会被路由消息

工作流程如上图所示,其中queue1与exchange的Binding Key为"blue"以及"red",queue2与exchange的Binding Key为"yellow"以及"red",此时当Routing Key为"blue",Direct Exchange只会将消息路由给queue1
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:direct.queue1、direct.queue2

image.png

  1. 在RabbitMQ控制台中新建一个Direct类型的Exchange:direct.exchange

image.png

  1. 将direct.exchange与direct.queue1、direct.queue2分别建立binding关系,其中与queue1的binding key为"blue"与"red",与queue2的binding key为"yellow"与"red"

image.png

  1. 新建两个方法用于模拟consumer,分别监听direct.queue1以及direct.queue2队列
/**
 * 订阅direct.queue1队列
 * @param msg 消息
 */
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    log.info("listener1 从【direct.queue1】接收到消息:" + msg);
}

/**
 * 订阅direct.queue2队列
 * @param msg 消息
 */
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    log.info("listener2 从【direct.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给direct.exchange,并指定routing key为"blue"
/**
 * 测试DirectExchange交换机类型
 */
@Test
public void testDirectExchange() {
    // 1. 定义交换机名称
    String exchangeName = "direct.exchange";
    // 2. 定义消息体
    String msg = "今日份消息只交给幸运色为blue的哦~";
    // 3. 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
}
  1. 观察结果

image.png
结果符合预期,只有direct.queue1能够接受到消息!

2.3 Topic Exchange

Topic Exchange与Direct Exchange非常类似,都可以依据BindingKey以及RoutingKey的匹配程度进而路由给特定符合条件的queue,但是Topic Exchange定义Binding Key可以为一组词,中间用"."进行分隔,并且支持使用通配符,规则如下:

  • #:匹配0个或者多个词
  • *:匹配1个单词

例如现在queue1的BindingKey为"china.#“,而queue2的BindingKey为”#.news",而RoutingKey为"china.reports",此时可以路由给queue1,但是无法路由给queue2,如果RoutingKey为"china.news"则queue1、queue2均可以被路由
操作步骤:

  1. 在RabbitMQ控制台中新建两个队列:topic.queue1、topic.queue2

image.png

  1. 在RabbitMQ控制台中新建一个Topic类型的Exchange:topic.exchange

image.png

  1. 将topic.exchange与topic.queue1、topic.queue2分别建立binding关系,其中与queue1的binding key为"china.#“,与queue2的binding key为”#.news"

image.png

  1. 新建两个方法用于模拟consumer,分别监听topic.queue1以及topic.queue2队列
/**
 * 订阅topic.queue1队列
 * @param msg 消息
 */
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
    log.info("listener1 从【topic.queue1】接收到消息:" + msg);
}

/**
 * 订阅topic.queue2队列
 * @param msg 消息
 */
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
    log.info("listener2 从【topic.queue2】接收到消息:" + msg);
}
  1. 新建一个测试类方法,模拟将消息发布给topic.exchange,并指定routing key为"china.news"
/**
 * 测试TopicExchange交换机类型
 */
@Test
public void testTopicExchange() {
    // 1. 定义交换机名称
    String exchangeName = "topic.exchange";
    // 2. 定义消息体
    String msg = "中国新闻报,快来买呀!";
    // 3. 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
}
  1. 观察结果

image.png
证明通配符生效!

3. 声明队列和交换机

前面我们收发消息的过程是使用Java代码实现的,但是创建Queues以及Exchanges仍然需要我们在RabbitMQ提供的控制台实现,那么如何使用Java代码来创建Queue以及Exchange呢?
SpringAMQP API:

  • 声明队列:使用new Queue("队列名称")创建
  • 声明交换机:使用new FanoutExchange("交换机名称")(以FanoutExchange为例)
  • 声明绑定关系:使用BindingBuilder.bind(队列对象).to(交换机对象)构建

3.1 Fanout声明

步骤:

  1. 编写一个配置类,使用@Configuration 声明
  2. 内部配置Queue、Exchange、Binding,并使用@Bean声明
@Configuration
public class FanoutConfig {

    /**
     * 声明FanoutExchange交换机
     * @return 返回FanoutExchange对象
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("code.fanout.exchange");
    }

    /**
     * 声明FanoutQueue队列
     * @return 返回FanoutQueue队列
     */
    @Bean
    public Queue fanoutQueue() {
        return new Queue("code.fanout.queue");
    }

    /**
     * 声明绑定关系
     * @param fanoutExchange 交换机
     * @param fanoutQueue 队列
     * @return 绑定关系
     */
    @Bean
    public Binding fanoutBinding(FanoutExchange fanoutExchange, Queue fanoutQueue) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }
}

3.2 Direct声明

步骤:

  1. 编写一个配置类,使用@Configuration 声明
  2. 内部配置Queue、Exchange、Binding,并使用@Bean声明
@Configuration
public class DirectConfig {

    /**
     * 声明一个DirectExchange交换机
     * @return 返回一个DirectExchange类型对象
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("code.direct.exchange");
    }

    /**
     * 声明一个Queue队列
     * @return 返回一个Queue类型对象
     */
    @Bean
    public Queue directQueue() {
        return new Queue("code.direct.queue");
    }

    /**
     * 声明一个绑定关系
     * @return 返回Binding对象
     */
    @Bean
    public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("");
    }
}

3.3 基于注解声明

注解声明格式:

@Component
@Slf4j
public class AnnotateRabbitListener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("annotate.direct.queue"),
            key = {"blue", "red"},
            exchange = @Exchange(name = "annotate.direct.exchange", type = ExchangeTypes.DIRECT)
    ))
    public void listenAnnotateDirect(String msg) {
        log.info("接收到消息:" + msg);
    }
}

4. 消息转换器

4.1 现象演示

前面我们都是将字符串类型的数据作为消息进行传输,那么如果是对象类型的消息呢,我们尝试发送一个自定义User类型作为消息传输:

/**
 * 自定义User类型
 * @author 米饭好好吃
 */
@Data
@AllArgsConstructor
public class User implements Serializable {
    private String name;
    private Integer age;
}
@Test
public void testSendObject() {
    // 1. 声明队列名称
    String queueName = "work.queue";
    // 2. 定义消息体
    User user = new User("jack", 22);
    // 3. 发送消息
    rabbitTemplate.convertAndSend(queueName, user);
}

从RabbitMQ控制台中查看消息内容如下:
image.png

4.2 追踪源码

image.png
我们发现实际调用了convertMessageIfNecessary(object)方法,我们继续追踪进去:
image.png
该方法判断object是否为Message类型,如果不是就调用getRequiredMessageConverter()获取所需的消息转换器,继续追踪进去:
image.png
image.png
该方法返回了一个SimpleMessageConverter实例对象,因此我们回到上一层,获取到MessageConverter实例后又调用了toMessage方法,我们继续追踪进去观察是如何转换消息的:
image.png
在AbstruectMessageConverter中实现了toMessage方法,而createMessage方法在子类 SimpleMessageConverter重写了该方法:
image.png
可以看出调用了SerialzationUtils.serialize(object)进行了序列化,继续追踪观察到底是如何序列化的:
image.png
可以看出是借助ObjectOutputStream进行序列化的,而这这个是JDK默认的序列化方式,该方式有如下缺点:

  • 序列化过程不够安全,可能存在注入风险
  • 序列化结果可读性较差
  • 序列化结果占用体积较大

因此我们需要重写消息转换器中的序列化机制:

4.3 自定义JSON序列化器

因此JDK原生序列化器有诸多确定,因此我们需要使用自定义的JSON序列化器,此处需要引入jackson-databind相关依赖

<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-xml</artifactId>
  <version>2.9.10</version>
</dependency>
/**
 * 消息转换器配置
 * @author 米饭好好吃
 */
@Configuration
public class MessageConvertConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

验证结果:
image.png
在控制台中我们可以发现消息格式就是熟悉的JSON格式了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1822032.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI 大模型的赛点:通用与垂直之争

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

机器视觉:光源的类型以及主要参数

光源在机器视觉中起着决定性的作用&#xff0c;决定了视觉算法的复杂性&#xff0c;也决定了视觉系统的精度和稳定性。光源用于突出目标物体的特征&#xff0c;增加目标物体与背景的对比度&#xff0c;克服环境光线的干扰。光源的选择与打光方式在视觉系统的前期评估中非常重要…

【DIY飞控板PX4移植】BARO模块BMP388气压计的PCB硬件设计和PX4驱动配置

BARO模块BMP388气压计的PCB硬件设计和PX4驱动配置 BMP388简介硬件设计封装原理图PCB设计引脚选择问题 PX4驱动配置飞控板的配置文件夹结构default.px4board文件nuttx-config/nsh/defconfig文件nuttx-config/include/board.h文件src/board_config.h文件src/i2c.cpp文件init/rc.b…

Springboot防疫知识科普系统-计算机毕业设计源码03531

摘 要 如今计算机行业的发展极为快速&#xff0c;搭载于计算机软件运行的数据库管理系统在各行各业得到了广泛的运用&#xff0c;其在数据管理方面具有的准确性和高效性为大中小企业的日常运营提供了巨大的帮助。自从2020年新冠疫情爆发以来&#xff0c;防疫成了社会关注的重中…

LLM 大模型学习:数据预处理、模板设计以

在模型训练过程中&#xff0c;数据及数据处理是最为重要的工作之一。在当前模型训练流程趋于成熟的情况下&#xff0c;数据集的好坏&#xff0c;是决定了该次训练能否成功的最关键因素。 在上一篇中&#xff0c;我们提到了模型训练的基本原理是将文字转换索引再转换为对应的向…

开放式耳机哪个品牌质量比较好?2024高性价比机型推荐!

随着音乐技术的不断发展&#xff0c;开放式耳机已成为音乐发烧友们的另外一种选择。从最初的简单音质&#xff0c;到如今的高清解析&#xff0c;开放式耳机不断进化升级。音质纯净&#xff0c;佩戴舒适&#xff0c;无论是街头漫步还是家中放松时候&#xff0c;都能带给你身临其…

Arrays与Lambda

Arrays 默认排序&#xff1a; 按照指定规则排序&#xff1a; 细节&#xff1a; 底层原理&#xff1a; 代码实现&#xff1a;o1-o2:升序排列 o2-o1:降序排列 Lambda表达式&#xff1a;简化匿名内部类方法 函数式编程&#xff1a; 格式&#xff1a; 总结&#xff1a; …

Web应用安全测试-业务功能滥用(一)

Web应用安全测试-业务功能滥用&#xff08;一&#xff09; 1、短信定向转发 漏洞描述&#xff1a;短信接收人可任意指定 测试方法&#xff1a;拦截发送短信的请求&#xff0c;将手机号改为测试人员的手机号&#xff0c;测试是否可接收短信验证码。 风险分析&#xff1a;攻击…

动态防护开启教程和体验感受

动态防护是雷池 WAF 社区版在版本 [6.0.0] 中新增的一个功能&#xff0c;它属于站点高级防护的一部分。动态防护的主要作用是自动动态加密网站的 HTML 和 JavaScript 源码&#xff0c;目的是阻止爬虫和攻击自动化程序的分析。这项功能在 [6.0.0] 版本中标记为 BETA 版本&#x…

【Linux】进程_3

文章目录 五、进程3. 进程4. 进程状态 未完待续 五、进程 3. 进程 在当前&#xff0c;我们只能通过执行可执行程序来让操作系统帮我们启动进程&#xff0c;那我们如何使用代码来自己启动进程呢&#xff1f;我们可以使用 fork() 函数。作用是创建子进程。 我们创建一个程序来…

echarts学习:使用dataset管理数据

前言 在我们公司的组件库中有许多echarts图表相关的组件&#xff0c;这些组件在使用时&#xff0c;只需将图表数据以特定的格式传入组件中&#xff0c;十分方便。因此当我得知echarts 可以使用dataset集中管理数据时&#xff0c;我就决定自己一定要搞懂它&#xff0c;于是在最…

导入导出带下拉框模版(EasyExcel)

前言 项目进行到新的一个迭代了&#xff0c;赶了1周需求&#xff0c;接口终于处理完了。分享记录下迭代中处理导入、导出、下载模版功能的细节吧。 一、场景 EasyExcel&#xff08;阿里&#xff09;实现Excel数据处理三层表头&#xff0c;第二、三层表头动态数据根据第二、三层…

WebMvcConfigurer配置不当导致鉴权失败

最近同事说他们有个新需求&#xff0c;需要对接口进行加解密&#xff0c;所以他给项目配置了一个拦截器&#xff0c;但这个拦截器直接导致了每个接口鉴权失败&#xff0c;每次调用接口都是提示没有session信息。 公司内的所有java项目是公用同一套基础依赖&#xff0c;所以我也…

模具保护器 具体应用在哪些场所

模具监视器&#xff0c;被誉为模具的忠诚守护者&#xff0c;其应用领域遍布各类生产型设备&#xff0c;宛如一道坚实的防线&#xff0c;捍卫着模具的安全与生产的高效。以下是模具监视器在各领域所展现的卓越风采及其非凡功能&#xff1a; 在生产型设备的广阔天地里&#xff0c…

springboot美食菜谱分享平台优化版(源码+sql+论文报告)

绪论 1.1 研究意义 当今社会作为一个飞速的发展社会&#xff0c;网络已经完全渗入人们的生活&#xff0c; 网络信息已成为传播的第一大媒介&#xff0c; 可以毫不夸张说网络资源获取已逐步改变了人们以前的生活方式&#xff0c;网络已成为人们日常&#xff0c;休闲主要工具。…

加密软件有哪些优点?除了对文件加密这几点也很重要

加密软件用于保护数据安全&#xff0c;防止未经授权访问、数据泄露。通过使用加密算法来实现对数据的加密处理&#xff0c;确保数据在传输、存储过程中的机密性与完整性。 加密软件有哪些优点&#xff1f; 1、灵活控制&#xff1a;允许更灵活地配制加密控制条件&#xff0c;满…

夏季河湖防溺水新举措:青犀AI视频智能监控系统保障水域安全

近日一则新闻引起大众关注&#xff0c;有网友发布视频称&#xff0c;假期在逛西湖时&#xff0c;发现水面上“平躺”漂浮着一名游客在等待救援。在事发3分钟内&#xff0c;沿湖救生员成功将落水游客救到了岸边。 随着夏季的到来&#xff0c;雨水增多&#xff0c;各危险水域水位…

Java内存模型,堆、栈和方法区的区别

Java内存管理是Java虚拟机&#xff08;JVM&#xff09;技术的核心之一。了解Java内存管理对于提高程序性能、解决内存泄漏和优化资源利用至关重要。 一、Java内存模型&#xff08;Java Memory Model, JMM&#xff09; Java内存模型描述了Java程序中变量&#xff08;包括实例字…

Flink作业执行之 3.StreamGraph

Flink任务如何跑起来之 3.StreamGraph 1. StreamGraphGenerator 在前文了解Transformation和StreamOperator后。接下来Transformation将转换成StreamGraph&#xff0c;即作业的逻辑拓扑结构。 在env.execute()方法中调用getStreamGraph方法生成StreamGraph实例。StreamGraph…

Electron无感打印 静默打印(vue3 + ts + vite)

&#xff08;electron vue3 项目搭建部分 自行查找其他资源 本文只讲解Electronvue3 如何实现静默打印&#xff09; 第一步获取打印机资源 渲染端代码&#xff08;vue里面&#xff09; // 因使用了vite所以在浏览器中打开 require会报错 只能在electron中 const { ipcRender…