SpringCloud(五)MQ消息队列

news2025/7/12 10:12:32

MQ

  • 概念
  • 常见消息模型
    • helloworld案例
      • 实现
        • 实现spring AMQP发送消息
        • 实现spring AMQP接收消息
    • 工作消息队列
      • 实现
    • 发布订阅模型
      • Fanout Exchange
        • 实现
      • DirectExchange
        • 实现
      • TopicExchange
        • 实现
      • DirectExchange 和FanoutExchange的差异
      • DirectExchange 和TopicExchange的差异
    • 基于@RabbitListener注解声明队列有 哪些常用注
  • 消息转换器
    • 注意
  • 同步调用
  • 异步调用
  • 安装
  • SpringAMQP
    • 特征

概念

MQ(MessageQueue):消息队列,事件驱动架构中的Broker
在这里插入图片描述

  • channel:操作MQ的工具
  • exchange:路由消息到队列
  • queue:缓存消息
  • virtual host: 虚拟主机,是对queue、exchange等资源逻辑分组

常见消息模型

在这里插入图片描述在这里插入图片描述

helloworld案例

角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息
    在这里插入图片描述

实现

实现spring AMQP发送消息

  • 在父工程引入spring-amqp的依赖
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列

    • 在publisher服务中编写application.yml,添加mq连接信息
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        virtual-host: /
        username: root
        password: toor
    
    • 在publisher服务中新建一个测试类,编写测试方法
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
        
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @Test
        public void testSimpleQueue(){
            String queueName = "simple.queue";
            String message = "hello,spring amqp!";
            rabbitTemplate.convertAndSend(queueName,message);
        }
    }
    

实现spring AMQP接收消息

  • 在父工程引入spring-amqp的依赖
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 在consumer服务中编写消费逻辑,监听simple.queue。

    • 在consumer服务中编写application.yml,添加mq连接信息
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        virtual-host: /
        username: root
        password: toor
    
    • 在publisher服务中新建一个测试类,编写测试方法
    @Component
    public class SpringRabbitListener {
        
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage (String msg) throws InterruptedException{
            System.out.println("spring 消费者接收到消息:【"+msg+"】");
        }
    }
    
    

工作消息队列

作用: 提高消息处理速度,避免队列消息堆积。
在这里插入图片描述

实现

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
	  @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testWorkQueue() throws InterruptedException {
        String queueName = "simple.queue";
        String message = "hello,spring amqp!";
        for (int i = 0; i <= 50; i++) {
            rabbitTemplate.convertAndSend(queueName,message+i);
            Thread.sleep(20);
        }
    }
  1. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage1 (String msg) throws InterruptedException{
        System.out.println("spring 消费者1接收到消息:【"+msg+"】");
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueueMessage2 (String msg) throws InterruptedException{
        System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色
        Thread.sleep(200);
  1. 消费者1每秒处理50条消息,消费者2每秒处理10条消息
    • 修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: root
    password: toor
    listener:
      direct:
        prefetch: 1

发布订阅模型

概念: 与之前模型区别是,允许将同一消息发送给多个消费者。
实现方式: exchange(交换机)
exchange: 负责消息路由,不存储,路由失败则消息丢失
常见exchange类型

  • Fanout:广播
  • Direct:路由
  • Topic:话题

在这里插入图片描述

Fanout Exchange

Fanout Exchange:将接收到的消息路由到每一个跟其绑定的queue

实现

  1. 在consumer服务中,利用代码声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)

    1. SringAMQP提供了声明交换机、队列、绑定关系的API。
      在这里插入图片描述
    2. 在consumer服务常见一个类,添加@configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding。
    @Configuration
    public class FanoutConfig {
        //声明FanoutExchange交换机
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("root.fanout");
        }
        //声明第一个队列
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
        //绑定队列1和交换机
        @Bean
        public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
        }
    //第二个同第一个
    }
    
  2. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1Message (String msg) throws InterruptedException{
        System.out.println("spring 消费者接收到消息:【"+msg+"】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2Message (String msg) throws InterruptedException{
        System.out.println("spring 消费者接收到消息:【"+msg+"】");
    }
  1. 在publisher中编写测试方法,向root.fanout发送消息
    @Test
    public void testSendFanoutExchange(){
        String exchangeName = "root.fanout";
        String message = "hello,spring amqp!";
        rabbitTemplate.convertAndSend(exchangeName,message);
    }

DirectExchange

DirectExchange: 将接收到的消息更具规则路由到指定的Queue,因此称为路由模式(routes)

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
    在这里插入图片描述

实现

  1. 利用@RabbitListener声明Exchange、Queue、RoutingKey
  2. 在consumer服务中编写两个消费者方法,分别监听direct.queue1和direct.queue2
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="direct.queue1"),
            exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue1Message (String msg) throws InterruptedException{
        System.out.println("spring 消费者接收到direct.queue1的消息:【"+msg+"】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="direct.queue2"),
            exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2Message (String msg) throws InterruptedException{
        System.out.println("spring 消费者接收到direct.queue2的消息:【"+msg+"】");
    }
  1. 在publisher中编写测试方法,向root.direct发送消息
    @Test
    public void testSendDirectExchange(){
        String exchangeName = "root.direct";
        String message = "hello,red!";
        rabbitTemplate.convertAndSend(exchangeName,"red",message);
    }

TopicExchange

TopicExchange: 与DirectExchange类似,区别在于routineKey必须是多个单词的列表,并且以== . ==分割。

Queue与Exchange指定BindingKey时可以使用通配符
# :代指0个或多个单词
*:代指一个单词
在这里插入图片描述

实现

  1. 利用@RabbitListener声明Exchange、Queue、RoutingKey
  2. 在consumer服务中编写两个消费者方法,分别监听topic.queue1和topic.queue2
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="topic.queue1"),
            exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1Message (String msg) throws InterruptedException{
        System.out.println("spring 消费者接收到topic.queue1的消息:【"+msg+"】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name="topic.queue2"),
            exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2Message (String msg) throws InterruptedException{
        System.out.println("spring 消费者接收到topic.queue2的消息:【"+msg+"】");
    }
  1. 在publisher中编写测试方法,向root.topic发送消息
    @Test
    public void testSendTopicExchange(){
        String exchangeName = "root.topic";
        String message = "hello,china.news!";
        rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
    }

DirectExchange 和FanoutExchange的差异

  • FanoutExchange将消息路由给每一个与之绑定的队列
  • DirectExchange根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

DirectExchange 和TopicExchange的差异

  • TopicExchange的routineKey必须使用多个单词,以== . ==分割
  • TopicExchange可以使用通配符

基于@RabbitListener注解声明队列有 哪些常用注

  • @Queue
  • @Exchange

消息转换器

设置JSON方式序列化
发送消息

  • 在publisher服务引入依赖
<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-xml</artifactId>
  <version>2.9.10</version>
</dependency>
  • 在publisher服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;


    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }

接收消息

  • 在consumer服务引入依赖
<dependency>
  <groupId>com.fasterxml.jackson.dataformat</groupId>
  <artifactId>jackson-dataformat-xml</artifactId>
  <version>2.9.10</version>
</dependency>
  • 在consumer服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;


    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
  • 定义消费者
    @RabbitListener(queues = "object.queue")
    public void listenObjectQueueMessage(Map<String,Object> msg){
        System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色
    }

注意

MessageConverter默认是JDK序列化
接收方和发送方必须使用相同的MessageConverter

同步调用

优点: 时效性高
问题:

  • 耦合度高
  • 性能下降
  • 资源浪费
  • 级联失败

异步调用

实现方式:

  • 事件驱动(常用)

优势:

  • 服务解耦
  • 性能提升,吞吐量提高
  • 故障隔离。没有强依赖,不担心级联失败问题
  • 流量削锋

缺点:

  • 依赖Broker的可靠性、安全性、吞吐能力
  • 架构复杂、业务没有明显的流程线,不好追踪管理

安装

docker pull rabbitmq:3-management


docker run \
 -e RABBITMQ_DEFAULT_USER=root \
 -e RABBITMQ_DEFAULT_PASS=toor \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

SpringAMQP

AMQP(Advance Message Queuing Protocol):是用于在应用程序或之间传递业务消息的开放标准,该协议与语言平台无关,更符合微服务中独立性的要求

Spring AMQP: 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分:

  • spring-amqp:基础抽象
  • spring-rabbit:底层默认实现

特征

  • 监听器容器,用于异步处理入站消息
  • 用于发送和接收消息的RabbitTemplate
  • RabbitAdmin用于自动声明队列、交换和绑定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/375687.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

浅谈hcip devops考试心得

官网&#xff1a;华为HCIP-Cloud Service DevOps Engineer 今天考过了hcip devops的考试&#xff0c;写两句&#xff0c;对于一些参加考试的朋友留下一点参考: 第一个问题&#xff0c;hcip还需要报班嘛&#xff1f; 我认为这一项是不需要的&#xff0c;有点浪费钱&#xff0c;…

软件测试人员会被替代吗?IT行业哪个方向的前景最好?字节12年测开是这样说的

互联网测试从业12年&#xff0c;前来作答。 逻辑上来说&#xff0c;软件工程最初始只需要两个岗位&#xff0c;一个是产品经理。&#xff0c;一个是研发&#xff08;开发&#xff09;&#xff0c;剩余的 所有岗位都是由他们衍生而来的。 第三个岗位大概率就是测试&#xff0c…

继电器的工作原理、构成和功能介绍

随着电力应用的不断发展&#xff0c;电气设备已经深入到我们的日常生活中&#xff0c;电气自动化技术大量使用在电力系统和生产型企业中&#xff0c;人们在享受电带来方便的同时要注意用电保护。继电器就是为了保护电路而生的&#xff0c;可以提高电路可靠性&#xff0c;保障用…

山洪径流过程模拟及洪水危险性评价

目录 1.洪水淹没危险性评价方法及技术讲解 2.GIS水文信息提取与分析(基于ArcGIS软件) 3.洪水淹没模拟水文分析&#xff1a;洪峰流量估算 4.洪水淹没模拟水力学分析&#xff1a;Hec-RAS实例操作 GIS水文分析&#xff08;ArcHydro、Spatial Anlysist等模块&#xff09;是流域…

从银行数字化转型来聊一聊,火山引擎 VeDI 旗下 ByteHouse 的应用场景

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 近日&#xff0c;火山引擎凭借云原生数据分析平台 ByteHouse&#xff0c;成功入围行业媒体 Internet Deep&#xff08;互联网周刊&#xff09;发布的《2022 云原生企…

【Linux】软件包管理器yum和编辑器vim的使用

文章目录1.软件包管理器yum1.1什么是软件包和软件包管理器1.2yum的介绍与使用2.编辑器vim2.1vim的基本概念2.2vim的基本操作2.3vim【Nomal mode】命令集2.4 vim末行模式命令集2.5 简单的vim配置1.软件包管理器yum 1.1什么是软件包和软件包管理器 在Linux下下载软件的方式是下…

数据治理之元数据管理Atlas

数据治理之元数据管理的利器——Atlas 一、数据治理与元数据管理 1.1 背景 为什么要做数据治理&#xff1f; 业务繁多&#xff0c;数据繁多&#xff0c;业务数据不断迭代。人员流动&#xff0c;文档不全&#xff0c;逻辑不清楚&#xff0c;对于数据很难直观理解&#xff0c;…

高阶人生从在职读研开始——中国社科院与美国杜兰大学金融管理硕士

说到学历&#xff0c;好多人都不太在意&#xff0c;感觉学历没什么用。等升职学历被卡时&#xff0c;等你想考公学历达不到时&#xff0c;当你想跳槽更大的平台时&#xff0c;学历会显得尤其重要。当机会来临时&#xff0c;我们应该做好全足的准备&#xff0c;而不是眼瞅着机会…

【CSS文字滚动】CSS实现文字横向循环无缝滚动,鼠标移入暂停移出继续(附实测源码)

CSS如何实现文字横向滚动滚动效果1、垃圾liMarquee&#xff08;最好别用&#xff09;2、css实现文字滚动&#xff0c;且鼠标移入移出暂停和继续HTML源码如下&#xff1a;CSS源码如下&#xff1a;JS源码如下&#xff1a;3、片尾彩蛋CSS实现文字横向循环无缝滚动&#xff0c;鼠标…

为什么文档对 SaaS 公司至关重要?

在过去十年左右的时间里&#xff0c;SaaS的兴起使全球数百家公司成为家喻户晓的公司。但他们并不是仅仅依靠产品的力量到达那里的。客户服务和支持是使一切在幕后顺利进行的原因——其中很大一部分是文档。以正确的风格和正确的位置在您的网站上找到适当的用户文档对于将浏览器…

Docker----------网络network

1. 是什么 1.1docker不启动默认网络 1.ens332.lo3.virbr0 在CentOS7的安装过程中如果有选择相关虚拟化的的服务安装系统后&#xff0c;启动网卡时会发现有一个以网桥连接的私网地址的virbr0网卡(virbr0网卡&#xff1a;它还有一个固定的默认IP地址192.168.122.1)&#xff0c…

大数据技术生态全景一览

大数据技术生态全景一览大数据平台ETL数据接入大数据平台海量数据存储大数据平台通用计算大数据平台各场景的分析运算分布式协调服务任务流调度引擎大数据平台ETL数据接入 大数据有很多的产品&#xff0c;琳琅满目。从架构图上就能看出产品很多。这些产品它们各自的功能是什么…

乐友商城学习笔记(十六)

购物车功能分析 新增 判断用户是否登陆&#xff08;页面&#xff09; 未登录&#xff0c;添加到页面的localstrorage中 登陆&#xff0c;添加到redis中 已经存在该商品的购物车记录&#xff0c;更新数量&#xff0c;否则新增一条商品的购物车记录 查询 ly.store.get(“LY_C…

【打卡】图分析与节点嵌入

背景介绍 图&#xff08;Graphs&#xff09;是一种对物体&#xff08;objects&#xff09;和他们之间的关系&#xff08;relationships&#xff09;建模的数据结构&#xff0c;物体以结点&#xff08;nodes&#xff09;表示&#xff0c;关系以边&#xff08;edges&#xff09;…

Uncaught ReferenceError: jQuery is not defined

今天在拉取项目部署到本地的时候遇到了一个问题特此记录一下 &#xff08;以后闭坑&#xff09; 我和同事同时拉取了一样的代码&#xff0c;结果同事的页面加载正常而我的页面像被狗啃了一样&#xff0c;知道是js的问题但是不知道问题出在哪里&#xff1f;后来还是同事帮我解决…

Python - DIY - 使用dump取json某些键值对合成新的json文件

Python - Json处理前言&#xff1a;应用场景&#xff1a;基本工具&#xff1a;文件操作&#xff1a;打开文件&#xff1a;写文件&#xff1a;读文件&#xff1a;关闭文件并刷新缓冲区&#xff1a;Json字符串和字典转换&#xff1a;json.loads()&#xff1a;json.dumps():Json文…

高端电器新十年,求解「竞速突围」

竞争激烈的高端电器品牌们&#xff0c;平时王不见王&#xff0c;但也有例外。海尔、博西、海信、创维、方太、老板等等近乎中国电器行业所有一线品牌副总裁级别以上高层&#xff0c;2月22日都现身于上海&#xff0c;来参加一场由红星美凯龙攒起来的高端电器局&#xff0c;2023中…

gogs代码仓库迁移至gitlab仓库上

将gogs仓库的所有分支都迁移到GitLab上 GitHub新建一个空项目 这个我就不教了: 不会的请点击这里 自查本地是否已有SSH公钥 我自身win11举例&#xff1a;C:\Users\PC.ssh\XXXXXX.pub 找“.pub”结尾的文件&#xff0c;用文本打开就是你本地的公钥了 直接复制公钥&#xff0…

比Redis更强,性能直接飙升一倍!

# 什么是KeyDB&#xff1f;KeyDB是Redis的高性能分支&#xff0c;专注于多线程&#xff0c;内存效率和高吞吐量。除了多线程之外&#xff0c;KeyDB还具有仅在Redis Enterprise中可用的功能&#xff0c;例如Active Replication&#xff0c;FLASH存储支持以及一些根本不可用的功能…

Java异常Throwable的分类

1. Exception&#xff1a;程序本身可以捕获并且可以处理的异常 编译时异常&#xff1a;编译期就会检查的异常&#xff0c;若调用的方法中throw了此类异常&#xff0c;则必须进行显式处理处理&#xff08;用try…catch捕获或者throws向上抛出&#xff09;&#xff0c;否则无法通…