RabbitMQ初步到精通-第四章-RabbitMQ工作模式-PUB/SUB

news2025/7/22 14:11:48

第四章-RabbitMQ工作模式-PUB/SUB

1.模式介绍

1.1 模式

此模式称为发布订阅模式,从此模式开始,我们就不再使用默认的交换机了,开始定义我们自己的交换机。

此发布订阅模式,使用的交换机类型为Fanout。定义好交换机,消息的传输路径变为,从Producer发出,发送至Fanout类型的交换机,交换机将消息分别推送给和自己绑定的Queue上,

消费者再去消费对应的Queue,此处与前面模式一致。

 1.2 应用场景

此种模式用在什么场景呢,凡是场景中触发一个动作后,后面的流程都需要根据此动作做出反应,而后面的流程一般>1个,流程之间也没有先后顺序,还需要快速执行,即可以使用此模式。

举个栗子:

1. 用户注册成功后发送通知,既要发短信,又发邮件,还发站内信,发信息这些流程也不存在先后顺序,即可以使用这种模式。用户注册后,推送消息到mq, 发短信、发邮件、发站内信分别去消费消息实现信息发送。

2. 支付系统中用户支付成功后,后续要有一系列动作,例如要上B端账,调风控、结算、发消息等逻辑,也同样可以使用此模式应用其中。

1.2 模拟

首先注意到我们的Exchange 类型 是 fanout,

其次在发送的时候 routingkey -填与不填都不再影响 Exchange的分发路由了。

 

2.验证代码

还是举例 小明洗澡的例子。小明一天在看科幻小说,突然睡着了,梦里梦见自己研究出了一种热水器转换开关,这种开关可以将流过的水复制一份。心想太棒了,这以后能省不少水。

于是改造了下自己的太阳能热水器,热水器内部变成了两个水槽,加入此转换器后,分别给两个水槽上水,水管出水1L,两个水槽都能充满1L。改造成功后,便开始拉着自己的女朋友开始了洗澡实验。

2.1 生产者


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 发布订阅模式一个生产者,一个交换机,两个队列,两个消费者
 * 声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定规则直接绑定。
 * 生产者创建一个exchange并且指定类型,和一个或多个队列绑定在一起。当生产者发送消息是会发送到exchange中,再由exchange到绑定的队列中
 * @createTime 2022/07/27 19:34:00
 */
public class WaterProducer {

    public static final String PUBSUB_QUEUE_1 = "SolarWaterHeaterXM";
    public static final String PUBSUB_QUEUE_2 = "SolarWaterHeaterXL";

    //生产者
    public static void main(String[] args) throws Exception {
        //1、获取connection
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        for (int i = 1; i <= 10; i++) {
            sendMsg(channel, i);
            Thread.sleep(100);
        }

        //4、关闭管道和连接
        channel.close();
        connection.close();
    }

    private static void sendMsg(Channel channel, int k) throws IOException {
        //3、通过channel创建自己的exchange 并且绑定队列
        /**
         * 参数1:exchange的名称
         * 参数2:指定exchange的类型
         * FANOUT-Publish/Subscribe
         * DIRECT-Routing
         * TOPIC-Topics
         */
        channel.exchangeDeclare("publish-exchange", BuiltinExchangeType.FANOUT);
        channel.queueBind(PUBSUB_QUEUE_1, "publish-exchange", "");
        channel.queueBind(PUBSUB_QUEUE_2, "publish-exchange", "");

        //4、发送消息到exchange
        String msg = k + "升";
        /**
         * 参数1:指定exchange,使用“”。默认的exchange
         * 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中
         * 参数3:指定传递的消息携带的properties
         * 参数4:指定传递的消息,byte[]类型
         */
        channel.basicPublish("publish-exchange", "", null, msg.getBytes());
        System.out.println("水龙头放水成功!" + k + "升");
    }

}

2.2 消费者

小明:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 发布订阅模式 一个生产者,一个交换机,两个队列,两个消费者
 * @createTime 2022/07/27 19:36:00
 */
public class XMShowerConsumer {

    public static final String PUBSUB_QUEUE_1 = "SolarWaterHeaterXM";

    //消费者
    public static void main(String[] args) throws Exception {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        channel.basicQos(1);
        //3、创建队列
        /**
         * 参数1:queue 指定队列名称
         * 参数2:durable 是否开启持久化(true)
         * 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)
         * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
         * 参数5:arguments 指定队列携带的信息
         */
        channel.queueDeclare(PUBSUB_QUEUE_1, true, false, false, null);

        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("小明洗澡用水: " + new String(body, "UTF-8"));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //手动ACK(接收信息,指定是否批量操作)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 参数1:queue 指定消费哪个队列
         * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
         * 参数1:cancelCallback 指定消费回调
         *
         */
        //3.关闭自动ACK
        channel.basicConsume(PUBSUB_QUEUE_1,false,consumer);
        System.out.println("小明使用热水器中的XM水槽开始洗澡......");

        //5、键盘录入,让程序不结束!
        System.in.read();

        //6、释放资源
        channel.close();
        connection.close();
    }


}

小丽:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 发布订阅模式 一个生产者,一个交换机,两个队列,两个消费者
 * @createTime 2022/07/27 19:36:00
 */
public class XLShowerConsumer {

    public static final String PUBSUB_QUEUE_2 = "SolarWaterHeaterXL";

    //消费者
    public static void main(String[] args) throws Exception {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        channel.basicQos(1);
        //3、创建队列-helloworld
        /**
         * 参数1:queue 指定队列名称
         * 参数2:durable 是否开启持久化(true)
         * 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)
         * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
         * 参数5:arguments 指定队列携带的信息
         */
        channel.queueDeclare(PUBSUB_QUEUE_2, true, false, false, null);

        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("小丽洗澡用水: " + new String(body, "UTF-8"));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //手动ACK(接收信息,指定是否批量操作)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        /**
         * 参数1:queue 指定消费哪个队列
         * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
         * 参数1:cancelCallback 指定消费回调
         *
         */
        //3.关闭自动ACK
        channel.basicConsume(PUBSUB_QUEUE_2, false, consumer);
        System.out.println("小丽使用热水器中的XL水槽开始洗澡......");

        //5、键盘录入,让程序不结束!
        System.in.read();

        //6、释放资源
        channel.close();
        connection.close();
    }

}

2.3 结果

生产者:

水龙头放水成功!1升
水龙头放水成功!2升
水龙头放水成功!3升
水龙头放水成功!4升
水龙头放水成功!5升
水龙头放水成功!6升
水龙头放水成功!7升
水龙头放水成功!8升
水龙头放水成功!9升
水龙头放水成功!10升

消费者小明:

小明洗澡用水: 1升
小明洗澡用水: 2升
小明洗澡用水: 3升
小明洗澡用水: 4升
小明洗澡用水: 5升
小明洗澡用水: 6升
小明洗澡用水: 7升
小明洗澡用水: 8升
小明洗澡用水: 9升
小明洗澡用水: 10升

消费者小丽:

小丽洗澡用水: 1升
小丽洗澡用水: 2升
小丽洗澡用水: 3升
小丽洗澡用水: 4升
小丽洗澡用水: 5升
小丽洗澡用水: 6升
小丽洗澡用水: 7升
小丽洗澡用水: 8升
小丽洗澡用水: 9升
小丽洗澡用水: 10升

最终实现了,生产者出水10L,两个人都使用了10L水洗了澡,刚洗完,小明就睡醒了,原来是一个梦。

3. 总结

从此模式开始,我们接触到了Exchange的创建,及绑定,以及使用了Exchange的类型 Fanout。

而以前我们在simple模式及work模式中,用到的默认Exchange类型都是Direct类型。

核心代码-声明Exchange:

channel.exchangeDeclare("publish-exchange", BuiltinExchangeType.FANOUT);

核心代码-绑定queue

channel.queueBind(PUBSUB_QUEUE_1, "publish-exchange", "");

核心代码-发送给Fanout的Exchange

channel.basicPublish("publish-exchange", "", null, msg.getBytes());

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/16996.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MATLAB教程案例42】语音信号的MFCC特征提取matlab仿真

欢迎订阅《FPGA学习入门100例教程》、《MATLAB学习入门100例教程》 本课程学习成果预览: 目录 1.软件版本 2.MFCC理论概述

JavaScript之BOM复习(54th)

1、BOM概述 1、BOM Browser Object Model 浏览器对象模型 2、它提供了独立于内容而与浏览器窗口进行交互的对象&#xff0c;其核心对象是 window 3、BOM 由一系列相关的对象构成&#xff0c;并且每个对象都提供了很多方法与属性 4、BOM 缺乏标准&#xff0c;JavaScript 语法的…

用QT实现一个简单的桌面宠物

有时候桌面空空的&#xff0c;或者屏幕空旷了&#xff0c;我们就可以找一点东西来点缀一下&#xff0c;那么桌面宠物是一个不错的选择。 作为一个程序猿&#xff0c;如何实现一个桌面宠物呢&#xff1f; 本文就给大家带来的是如何用qt提供一种思路并写一个简单的桌面宠物。 思…

深入理解Linux网络技术内 幕(八)——设备注册和初始化

文章目录前言设备注册之时设备除名之时分配net_device结构NIC注册和除名的架构设备初始化设备驱动程序初始化设备类型初始化&#xff1a;xxx_setup函数可选的初始化和特殊情况net_device结构的组织查询设备状态队列规则状态注册状态设备的注册和除名切割操作&#xff1a;netdev…

C#编程的构成要素(结合unity做实例分析)

目录 定义变量 变量的名称很重要 将变量作为占位符 疯狂的方法 方法驱动行为 方法也是占位符 类的引入 一直在使用类 日常蓝图 注释是关键 将脚本附加到游戏对象上 脚本成为组件 类与组件通信 本文主要来自<<C#实践入门>>哈里森.费隆 著&#xff0c;仅用…

甘露糖-聚乙二醇-氨基|mannose-PEG-NH2|氨基-PEG-甘露糖

甘露糖-聚乙二醇-氨基|mannose-PEG-NH2|氨基-PEG-甘露糖 氨基&#xff08;Amino&#xff09;由一个氮原子和两个氢原子构成&#xff0c;化学式为-NH2。在有机化学中&#xff0c;氨基是基本碱基&#xff0c;大多数含有氨基的有机物都有一定碱的特性&#xff0c; 中文名称&…

基于数学形态学的路面裂缝图像处理技术-含Matlab代码

⭕⭕ 目 录 ⭕⭕✳️ 一、引言✳️ 二、图像预处理✳️ 三、路面裂缝图像的边缘检测✳️ 3.1 裂缝识别✳️ 3.2 裂缝区域信息获取✳️ 3.3 裂缝特征提取✳️ 四、参考文献✳️ 五、Matlab代码获取✳️ 一、引言 对于路面裂缝而言&#xff0c; 采用图像处理技术对其进行识别与计…

③计算机病毒实验实验报告

班级 计科2101 姓名 彭彭头 学号 时间 2022年5月6日 成绩 实验项目名称 计算机病毒实验二 实验目的 1、了解脚本病毒的感染方式。 2、了解脚本病毒的手工清除方法。 实验内容 通过批处理文件进行计算机病毒和编写&#xff0c;了解脚本病毒的感染方式。 实验环…

Java笔记(十三)

文献种类&#xff1a;专题技术总结文献 开发工具与关键技术&#xff1a; IntelliJ IDEA、Java 语言 作者&#xff1a; 方建恒 年级&#xff1a; 2020 撰写时间&#xff1a; 2022 年 11 月 18 日 Java笔记(十三) 今天我给大家继续分享一下我的Java笔记&#xff0c; 我们继续来…

【Linux】环境基础开发工具使用

Vim Vim 是一个编辑器 只能编辑&#xff0c;只能写代码 直接输入vim &#xff1a; q就是退出 touch新文件&#xff0c;vim 进入 vim是一款多模式的编辑器 命令模式&#xff08;默认打开的模式&#xff09; 按 i 进入编辑模式/插入模式 esc回到命令模式 冒号进入底行…

【前沿技术RPA】 一文了解UiPath的代码审查工具Workflow Analyzer

&#x1f40b;作者简介&#xff1a;博主是一位.Net开发者&#xff0c;同时也是RPA和低代码平台的践行者。 &#x1f42c;个人主页&#xff1a;会敲键盘的肘子 &#x1f430;系列专栏&#xff1a;UiPath &#x1f980;专栏简介&#xff1a;UiPath在传统的RPA&#xff08;Robotic…

[附源码]java毕业设计企业员工管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Azdio-PEG-Maleimide,N3-PEG-MAL,叠氮-PEG-马来酰亚胺化学试剂供应

1、名称 英文&#xff1a;Azdio-PEG-Maleimide&#xff0c;N3-PEG-MAL 中文&#xff1a;叠氮-聚乙二醇-马来酰亚胺 2、CAS编号&#xff1a;N/A 3、所属分类&#xff1a;Azide PEG Maleimide PEG 4、分子量&#xff1a;可定制&#xff0c;N3-PEG 20k -MAL、N3-PEG 10k -MAL…

EFK部署centos7.9(四)Filebeat 部署

下载安装包 wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.5.4-linux-x86_64.tar.gz tar xzvf filebeat-6.5.4-linux-x86_64.tar.gz -C /usr/local/ 解压安装包 cd /usr/local/ mv filebeat-6.5.4-linux-x86_64 filebeat cd filebeat/ mv filebe…

Springboot导出Excel,支持大数据量

1、添加maven依赖 <dependency><groupId>org.apache.poi</groupId><artifactId>poi-ooxml</artifactId><version>3.17</version> </dependency> 2、ExcelUtil工具类 import org.apache.poi.ss.usermodel.Cell; import org.…

【附源码】Python计算机毕业设计天气预报APP

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;我…

甘露糖-聚乙二醇-炔基|mannose-PEG-Alkyne|炔基-PEG-甘露糖

甘露糖-聚乙二醇-炔基|mannose-PEG-Alkyne|炔基-PEG-甘露糖 中文名称&#xff1a;甘露糖-炔基 英文名称&#xff1a;mannose-Alkyne 别称&#xff1a;炔基修饰甘露糖&#xff0c;炔基-甘露糖 mannose-PEG-Alkyne 甘露糖-聚乙二醇-炔基 炔基-PEG-甘露糖 纯度&#xff1a;…

统一网关Gateway、路由断言工厂、路由过滤器及跨域问题处理

目录 一、搭建网关服务 二、路由断言工程Route Predicate Factory 三、路由过滤器 四、全局过滤器GlobalFilter 过滤器执行顺序 五、跨域问题处理 一、搭建网关服务 网关的作用&#xff1a; 对用户请求做身份认证&#xff0c;权限校验 将用户请求路由到微服务&#xff0…

吴恩达机器学习课程笔记二

文章目录神经网络基础知识神经网络前向传播伪代码前向传播中单个神经元的作用矩阵加速运算训练模型的细节常用激活函数ReLUSigmoidLinear activation functiontanh选择激活函数选择输出层的激活函数选择隐藏层的激活函数为什么需要非线性激活函数Softmax激活函数多标签分类问题…

Metabase学习教程:提问-2

查询编辑器中的自定义表达式 何时应该使用自定义表达式&#xff0c;以及为什么要在Metabase的查询编辑器中利用它们。 在数学中&#xff0c;表达式是符号的集合&#xff0c;它们一起表示一个值。如果你以前使用过电子表格软件&#xff0c;表达式就是公式&#xff0c;比如SUM&…