RabbitMQ初步到精通-第四章-RabbitMQ工作模式-WORK

news2025/7/22 14:01:04

第四章-RabbitMQ工作模式-WORK

1.模式介绍

1.1 work模式

Work模式与前面的Simple模式一致,也是消息经由生产者发到Exchange再到queue再被消费者消费。不同点在于SIMPL模式是一个队列对应的一个消费者,此模式会由一个队列对应两个消费者或大于两个消费者。

 

1.2 work模式模拟

此模式下,我们的消费者实现了对消息的平均消费,但如果消费者1消费能力若于消费者2,那就会造成消费者1 的消息积压,这时候我们就会想到使用公平模式,能者多劳,消费快的多消费,消费少的少消费。合理消息。后面代码会涉及到。

 

2.验证代码

2.1 work平均模式

我们还是举小明洗澡的例子,小明洗澡比较孤独,这时候小明的女朋友小丽也来一块洗澡了,但两个人不想公用一个喷头,便又接入了一个喷头,一人一个开始洗,但是小明洗澡比较快,虽然洗完了,但是喷头关不上,只好把水浪费掉,小丽洗澡比较慢,但感觉水又不太够用。一共就20升水,一人10升。

2.1.1 生产者


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 一个生产者,一个默认的交换机,一个队列,两个消费者
 * @createTime 2022/07/27 19:34:00
 */
public class WaterProducer {

    public static final String QUEUE_NAME = "SolarWaterHeater";

    //生产者
    public static void main(String[] args) throws Exception {
        //1、获取connection
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        for (int i = 1; i <= 20; i++) {
            sendMsg(channel, i);
            Thread.sleep(200);
        }
        //4、关闭管道和连接
        channel.close();
        connection.close();
    }

    private static void sendMsg(Channel channel, int k) throws IOException {
        //3、发送消息到exchange
        String msg = k + "升";
        /**
         * 参数1:指定exchange,使用“”。默认的exchange
         * 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中
         * 参数3:指定传递的消息携带的properties
         * 参数4:指定传递的消息,byte[]类型
         */
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("水龙头放水成功!" + k + "升");
    }

}

2.1.2 消费者

小明洗澡:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 工作模式  一个生产者,一个默认的交换机,一个队列,两个消费者
 * 需要在consumer消费者端,平均分配
 * @createTime 2022/07/27 19:36:00
 */
public class XMShowerConsumer {

    public static final String QUEUE_NAME = "SolarWaterHeater";

    //消费者
    public static void main(String[] args) throws Exception {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();
        //3、创建队列-helloworld
        /**
         * 参数1:queue 指定队列名称
         * 参数2:durable 是否开启持久化(true)
         * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
         * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
         * 参数5:arguments 指定队列携带的信息
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            int i = 1;

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (i > 5) {
                    System.out.println("小明洗澡洗完,开始浪费-第:" + new String(body, "UTF-8"));
                } else {
                    System.out.println("小明洗澡已用水-第: " + new String(body, "UTF-8"));
                }
                i++;
            }
        };
        /**
         * 参数1:queue 指定消费哪个队列
         * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
         * 参数1:cancelCallback 指定消费回调
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
        System.out.println("小明开始洗澡-快速洗......");

        //5、键盘录入,让程序不结束!
        System.in.read();

        //6、释放资源
        channel.close();
        connection.close();
    }

}

小丽洗澡:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 工作模式  一个生产者,一个默认的交换机,一个队列,两个消费者
 * 需要在consumer消费者端,平均分配
 * @createTime 2022/07/27 19:36:00
 */
public class XLShowerConsumer {

    public static final String QUEUE_NAME = "SolarWaterHeater";

    //消费者
    public static void main(String[] args) throws Exception {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();
        //3、创建队列-helloworld
        /**
         * 参数1:queue 指定队列名称
         * 参数2:durable 是否开启持久化(true)
         * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
         * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
         * 参数5:arguments 指定队列携带的信息
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("小丽洗澡已用水-第: " + new String(body, "UTF-8"));
            }
        };
        /**
         * 参数1:queue 指定消费哪个队列
         * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
         * 参数1:cancelCallback 指定消费回调
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
        System.out.println("小丽开始洗澡-慢慢洗......");

        //5、键盘录入,让程序不结束!
        System.in.read();

        //6、释放资源
        channel.close();
        connection.close();
    }

}

2.1.3 结果验证

生产者放水消息:

水龙头放水成功!1升
水龙头放水成功!2升
水龙头放水成功!3升
水龙头放水成功!4升
水龙头放水成功!5升
水龙头放水成功!6升
水龙头放水成功!7升
水龙头放水成功!8升
水龙头放水成功!9升
水龙头放水成功!10升
水龙头放水成功!11升
水龙头放水成功!12升
水龙头放水成功!13升
水龙头放水成功!14升
水龙头放水成功!15升
水龙头放水成功!16升
水龙头放水成功!17升
水龙头放水成功!18升
水龙头放水成功!19升
水龙头放水成功!20升

小明洗澡

小明开始洗澡-快速洗......
小明洗澡已用水-第: 2升
小明洗澡已用水-第: 4升
小明洗澡已用水-第: 6升
小明洗澡已用水-第: 8升
小明洗澡已用水-第: 10升
小明洗澡洗完,开始浪费-第:12升
小明洗澡洗完,开始浪费-第:14升
小明洗澡洗完,开始浪费-第:16升
小明洗澡洗完,开始浪费-第:18升
小明洗澡洗完,开始浪费-第:20升

小丽洗澡

小丽洗澡已用水-第: 1升
小丽洗澡已用水-第: 3升
小丽洗澡已用水-第: 5升
小丽洗澡已用水-第: 7升
小丽洗澡已用水-第: 9升
小丽洗澡已用水-第: 11升
小丽洗澡已用水-第: 13升
小丽洗澡已用水-第: 15升
小丽洗澡已用水-第: 17升
小丽洗澡已用水-第: 19升

从结果来看,两个人都是平均用水,虽然有快有慢,慢的就会存在堆积情况。

2.2 公平模式

小丽就有意见了,自己不够洗,小明还白白浪费那么多水。小明给出了建议,让小丽把碰头开关开大点,自己开小点,这样就让小丽多用点水,自己少用点也就够了。

2.2.1 生产者

同上面生产者一致

2.2.2 消费者

小明洗澡:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 工作模式  一个生产者,一个默认的交换机,一个队列,两个消费者
 * 需要在consumer消费者端,添加Qos能力以及更改为ACK手动即可让消费者根据自己的能力消费,不是RabbitMQ默认的平均分配了
 * @createTime 2022/07/27 19:36:00
 */
public class XMShowerConsumer {

    public static final String QUEUE_NAME = "SolarWaterHeater";

    //消费者
    public static void main(String[] args) throws Exception {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        channel.basicQos(1);

        //3、创建队列-helloworld
        /**
         * 参数1:queue 指定队列名称
         * 参数2:durable 是否开启持久化(true)
         * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
         * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
         * 参数5:arguments 指定队列携带的信息
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("小明洗澡已用水-第: " + new String(body, "UTF-8"));

                //手动ACK(接收信息,指定是否批量操作)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        /**
         * 参数1:queue 指定消费哪个队列
         * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
         * 参数1:cancelCallback 指定消费回调
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);
        System.out.println("小明开始洗澡-慢慢洗......");

        //5、键盘录入,让程序不结束!
        System.in.read();

        //6、释放资源
        channel.close();
        connection.close();
    }

}

小丽洗澡:


/**
 * @author rabbit
 * @version 1.0.0
 * @Description 工作模式  一个生产者,一个默认的交换机,一个队列,两个消费者
 * 需要在consumer消费者端,添加Qos能力以及更改为ACK手动即可让消费者根据自己的能力消费,不是RabbitMQ默认的平均分配了
 * @createTime 2022/07/27 19:36:00
 */
public class XLShowerConsumer {

    public static final String QUEUE_NAME = "SolarWaterHeater";

    //消费者
    public static void main(String[] args) throws Exception {
        //1、获取连对象、
        Connection connection = RabbitCommonConfig.getConnection();
        //2、创建channel
        Channel channel = connection.createChannel();

        channel.basicQos(1);

        //3、创建队列-helloworld
        /**
         * 参数1:queue 指定队列名称
         * 参数2:durable 是否开启持久化(true)
         * 参数3:exclusive 是否排外(当前队列只能被一个消费者消费)
         * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
         * 参数5:arguments 指定队列携带的信息
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4.开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("小丽洗澡已用水-第: " + new String(body, "UTF-8"));

                //手动ACK(接收信息,指定是否批量操作)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        /**
         * 参数1:queue 指定消费哪个队列
         * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
         * 参数1:cancelCallback 指定消费回调
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);
        System.out.println("小丽开始洗澡-快速洗......");

        //5、键盘录入,让程序不结束!
        System.in.read();

        //6、释放资源
        channel.close();
        connection.close();
    }

}

2.2.3 结果验证

生产者:

水龙头放水成功!1升
水龙头放水成功!2升
水龙头放水成功!3升
水龙头放水成功!4升
水龙头放水成功!5升
水龙头放水成功!6升
水龙头放水成功!7升
水龙头放水成功!8升
水龙头放水成功!9升
水龙头放水成功!10升
水龙头放水成功!11升
水龙头放水成功!12升
水龙头放水成功!13升
水龙头放水成功!14升
水龙头放水成功!15升
水龙头放水成功!16升
水龙头放水成功!17升
水龙头放水成功!18升
水龙头放水成功!19升
水龙头放水成功!20升

小明:

小明开始洗澡-慢慢洗......
小明洗澡已用水-第: 2升
小明洗澡已用水-第: 7升
小明洗澡已用水-第: 12升
小明洗澡已用水-第: 17升

小丽:

小丽开始洗澡-快速洗......
小丽洗澡已用水-第: 1升
小丽洗澡已用水-第: 3升
小丽洗澡已用水-第: 4升
小丽洗澡已用水-第: 5升
小丽洗澡已用水-第: 6升
小丽洗澡已用水-第: 8升
小丽洗澡已用水-第: 9升
小丽洗澡已用水-第: 10升
小丽洗澡已用水-第: 11升
小丽洗澡已用水-第: 13升
小丽洗澡已用水-第: 14升
小丽洗澡已用水-第: 15升
小丽洗澡已用水-第: 16升
小丽洗澡已用水-第: 18升
小丽洗澡已用水-第: 19升
小丽洗澡已用水-第: 20升

从结果看:实现了我们期望的结果,小丽用的水多了,小明用的水少,大家都洗好了。

3.模式总结

此模式我们最应该注意的就是平均模式与公平模式的实现,这里是靠消费者的手工确认机制来实现的。

    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

如上代码中的 第二个参数 autoAck,

true的时候,属于自动确认,消费者一旦接收到此消息,就会发回确认消息给Broker,Broker会从队列中删除掉此消息。当有多个消费者注册到同一个queue时,会默认轮询分发。

false的时候,属于手动确认,消费者虽然接收到了消息,还需要执行一个 方法来告诉Broker才行

channel.basicAck(envelope.getDeliveryTag(), false);

若没有告诉Broker,Broker还会将此消息再次发送。

同时此种情况下还需要使用此方法:

void basicQos(int prefetchCount) throws IOException;

指定每次消费抓取的数量

maximum number of messages that the server will deliver

这样我们通过这些改造,实现了,消费者消费的时候,消费一个告诉Broker删除一个,没消费的时候就不要给消费者投递了,最终实现了公平消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/17013.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MyBatis的配置文件

日志怎么在mybatis中实现呢 1.添加pom.xml依赖 2.添加logback配置文件 mybatisconfig.xml 引入外部属性资源文件 注意:其中使用的较多的需要设置的为&#xff1a; 引入外部属性资源文件 将数据库的蛇形命名映射为驼峰命名 使用typeAliases标签设置里面的package标签,为…

智工教育:环评师考试重要知识点

环境影响评价师的知识点庞杂纷繁&#xff0c;想要顺利通过考试&#xff0c;就要经常复习看过的知识&#xff0c;达到加强记忆的效果。下面是小编整理的环评师考试重要知识点&#xff0c;一起来复习吧! 考试目的及大纲主要内容 通过本科目考试&#xff0c;检验具有一定实践经验…

shell脚本编程基础(中)

目录 (一&#xff09;shell流程控制-for循环语句 1. for 循环介绍 2. for语法 2.1 for 语法一 2.2 for语法二 3.循环控制语句 3.1 sleep N 脚本执行到该步休眠N秒 3.2 continue 跳过循环中的某次循环 3.3 break 跳出循环继续执行后续代码 3.4实例 &#xff08;二&am…

MyBatis大数据量插入方案

1、前言 在开发过程中&#xff0c;有时我们会碰到将大批量的数据入库的场景&#xff0c;那么我们一般有下面三种方式入库&#xff1a; ExecutorType.BATCH批处理方式插入foreach循环标签插入MyBatisPlus自带的saveBatch批量新增方法 下面我们用一个案例来测试一下&#xff0…

让学前端不再害怕英语单词(一)

有很多跟着我学习的学生经常跟我抱怨前端的单词很多&#xff0c;学了css又忘了html的单词&#xff0c;学了js又忘了css 的单词&#xff0c;所以本着给跟着我学习的学生提供一个学习前端不怕英语单词的课程&#xff0c;就打算写一篇博客去讲述如何增强在学习前端的时候巩固语法并…

通过阅读源码解决项目难题:GToken替换JWT实现SSO单点登录

文章目录jwt的问题jwt的请求流程图gtoken的优势注意问题演示demo入门示例运行效果启动项目&#xff1a;访问不认证接口&#xff1a;返回成功未登录时访问认证接口&#xff1a;返回错误提示请求登录接口&#xff1a;返回token携带token再次访问认证接口&#xff1a;返回成功分析…

postgres-operator 原理解析- 章节 I

这篇文章我想写postgres-operator如何利用kubernetes实现高可用功能其中的客户端流量路由部分。 总体的目的呢就是客户端数据库连接请求&#xff0c;如果通过利用kubernetes的机制实现将流量路由到实际的Postgresql主节点。 基础知识 Services without selectors 平常得Ser…

【Java进阶篇】第三章 常用类

文章目录一、String类1、String类概述2、String字符串的存储原理3、有String型属性的对象4、两种字符串对象创建方式的区别5、String类的特殊构造方法6、String类中的方法二、StringBuffer类1、StringBuffer类的构造方法2、String类和StringBuffer类的区别3、StringBuffer和Str…

我修复了一个 Vite Bug,让我的项目首屏性能提高了 25%

本文正在参加「金石计划 . 瓜分6万现金大奖」 一次偶然的机会&#xff0c;我将项目&#xff08;基于 tdesign-vue-next-starter &#xff09;由 Vite 2.7 升级成 Vite 3.x 后&#xff0c;发现首次运行 Vite dev 构建&#xff0c;页面首屏时间非常长&#xff0c;且一定会整个页…

Vue3基础看这一篇就够了(万字长篇,附实例代码及效果演示)

目录 前言 概述 Vue3组合式api VS Vue2选项式api 基础部分 setup 选项式api的风格 组合式api的风格 区别 响应式数据 ref reactive shallowReactive 与 shallowRef 计算属性和监听 computed 函数 watch 函数 watchEffect 生命周期 响应式数据只读 toRaw 返回代…

呼叫中心中间件(mod_cti基于FreeSWITCH)-背景音(彩话)接口

背景音&#xff0c;就是给通话添加一个背景音&#xff0c;比如办公室的噪音&#xff0c;键盘敲击声&#xff0c;等。彩话&#xff0c;就是通话过程播放一个声音&#xff0c;代替人工说话&#xff0c;这个声音双方可以同时听到&#xff0c;而且播放过程不影响双方通话。 用处 …

「JVS低代码开发平台」关于逻辑引擎的触发讲解

JVS逻辑引擎是代码开发套件中的业务瓶装的核心&#xff0c;用于去实现各种场景下的逻辑功能&#xff0c;可以把他理解为一个程序配置器与程序的执行器。 逻辑引擎是可以被多种配置器调用的触发的&#xff0c;从而实现了各种业务场景中对应功能的实现&#xff0c;那么接下来我们…

RabbitMQ初步到精通-第四章-RabbitMQ工作模式-PUB/SUB

第四章-RabbitMQ工作模式-PUB/SUB 1.模式介绍 1.1 模式 此模式称为发布订阅模式&#xff0c;从此模式开始&#xff0c;我们就不再使用默认的交换机了&#xff0c;开始定义我们自己的交换机。 此发布订阅模式&#xff0c;使用的交换机类型为Fanout。定义好交换机&#xff0c;消…

【MATLAB教程案例42】语音信号的MFCC特征提取matlab仿真

欢迎订阅《FPGA学习入门100例教程》、《MATLAB学习入门100例教程》 本课程学习成果预览: 目录 1.软件版本 2.MFCC理论概述

JavaScript之BOM复习(54th)

1、BOM概述 1、BOM Browser Object Model 浏览器对象模型 2、它提供了独立于内容而与浏览器窗口进行交互的对象&#xff0c;其核心对象是 window 3、BOM 由一系列相关的对象构成&#xff0c;并且每个对象都提供了很多方法与属性 4、BOM 缺乏标准&#xff0c;JavaScript 语法的…

用QT实现一个简单的桌面宠物

有时候桌面空空的&#xff0c;或者屏幕空旷了&#xff0c;我们就可以找一点东西来点缀一下&#xff0c;那么桌面宠物是一个不错的选择。 作为一个程序猿&#xff0c;如何实现一个桌面宠物呢&#xff1f; 本文就给大家带来的是如何用qt提供一种思路并写一个简单的桌面宠物。 思…

深入理解Linux网络技术内 幕(八)——设备注册和初始化

文章目录前言设备注册之时设备除名之时分配net_device结构NIC注册和除名的架构设备初始化设备驱动程序初始化设备类型初始化&#xff1a;xxx_setup函数可选的初始化和特殊情况net_device结构的组织查询设备状态队列规则状态注册状态设备的注册和除名切割操作&#xff1a;netdev…

C#编程的构成要素(结合unity做实例分析)

目录 定义变量 变量的名称很重要 将变量作为占位符 疯狂的方法 方法驱动行为 方法也是占位符 类的引入 一直在使用类 日常蓝图 注释是关键 将脚本附加到游戏对象上 脚本成为组件 类与组件通信 本文主要来自<<C#实践入门>>哈里森.费隆 著&#xff0c;仅用…

甘露糖-聚乙二醇-氨基|mannose-PEG-NH2|氨基-PEG-甘露糖

甘露糖-聚乙二醇-氨基|mannose-PEG-NH2|氨基-PEG-甘露糖 氨基&#xff08;Amino&#xff09;由一个氮原子和两个氢原子构成&#xff0c;化学式为-NH2。在有机化学中&#xff0c;氨基是基本碱基&#xff0c;大多数含有氨基的有机物都有一定碱的特性&#xff0c; 中文名称&…

基于数学形态学的路面裂缝图像处理技术-含Matlab代码

⭕⭕ 目 录 ⭕⭕✳️ 一、引言✳️ 二、图像预处理✳️ 三、路面裂缝图像的边缘检测✳️ 3.1 裂缝识别✳️ 3.2 裂缝区域信息获取✳️ 3.3 裂缝特征提取✳️ 四、参考文献✳️ 五、Matlab代码获取✳️ 一、引言 对于路面裂缝而言&#xff0c; 采用图像处理技术对其进行识别与计…