(六)RabbitMQ第二种模型:工作模型(Work Queues)

news2025/8/13 22:22:50

工作模型(Work Queues)

  • 一、轮询发送消息
  • 二、消息应答
    • 2.1、概念
    • 2.2、自动应答
    • 2.3、手动应答
    • 2.4、消息的重新入队
    • 2.5、手动应答代码

概念工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进
程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

通俗的理解:当大量的消息发送到MQ的时候,这个时候为了快速的消费这些消息,就会开启多个工作线程进行消费,为了消息不会重复的消费,所以采用轮询的方式。

示例图

在这里插入图片描述

一、轮询发送消息

创建工作线程

package com.feng.workQueue;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/16 16:09
 * @Version 1.0
 * @Description 工作线程 1号(也就是一号消费者)
 */
public class Worker01 {
    //队列名
    public static final String QUEUE_NAME = "hello";

    /**
     * 消费消息
     *
     * @param args
     */
    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("工作线程1等待消费。。。。。。。");
        //consumerTag: 消费者的标签
        channel.basicConsume(QUEUE_NAME,true,(consumerTag, message)->{
            System.out.println("接收的消息是:"+new String(message.getBody()));
        },consumerTag->{
            System.out.println(consumerTag+"消息接收失败");
        });
    }
}

这里工作线程都是一样的,所以你可以自己开多线程或者直接使用IDEA自带的开启类的多个实例,如下

在这里插入图片描述

效果如下

在这里插入图片描述

生产者

package com.feng.workQueue;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/16 16:37
 * @Version 1.0
 * @Description 生产者代码
 */
public class Task01 {
    //队列名
    public static final String QUEUE_NAME = "hello";
    //发送大量消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //队列的声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //从控制台进行输入消息进行发送
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String msg = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("消息发送成功"+msg);
        }
    }
}

测试

发送消息

在这里插入图片描述

工作线程1

在这里插入图片描述

工作线程2

在这里插入图片描述

由上可知工作线程的轮询

二、消息应答

2.1、概念

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

也就是在消费者进行消费处理完后会进行一个应答信号给MQ,然后MQ才可以将消息进行删除,防止消费者突然宕机所引发的消息丢失

2.2、自动应答

队列中的消息发送给消费者后立即被认为已经传送成功,然后删除消息

这种模式就是可以在消息发送就认为成功,对于提升处理大量消息的效率是很不错的有很大的吞吐量,但是缺点就是一旦消费者挂了,就会造成消息的丢失,或者是没有限制传递消息的数量,引起消息的堆积导致内存炸了,会将工作线程杀死

适用情况:在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

2.3、手动应答

// RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
Channel.basicAck(用于肯定确认) 

Channel.basicNack(用于否定确认) 

// 与 Channel.basicNack 相比少一个参数 multiple(批量应答)
// 不处理该消息了直接拒绝,可以将其丢弃了
Channel.basicReject(用于否定确认) 

Multiple 的解释

手动应答的好处是可以批量应答并且减少网络拥堵

  • true 代表批量应答 channel 上未应答的消息
    比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答

  • false 同上面相比
    只会应答 tag=8 的消息 5,6,7 这三个消息依然不会

也就是一批消息都处理完了只需要应答一个就行了,不需要每一个消息都进行应答确认,提高效率
在这里插入图片描述

2.4、消息的重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

这里有个问题。就是消费者消费消息后,发生异常,ACK失败的话,消息会重新入队,这个消息会不会重新消费,为了防止这个问题,我们可以在消费者的程序里捕获异常,防止ACK失败后对消息进行消费,并且记录下这个消息以及错误,还有一种情况就是消费端都挂了,他就会不停的重新入队,会造成mq的不断执行同一个操作,我们可以设置重新入队的次数限制

2.5、手动应答代码

生产者

package com.feng.shouDongYingDa;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 13:38
 * @Version 1.0
 * @Description 手动应答测试生产者
 */
public class Task2 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String msg = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,msg.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+msg);
        }

    }
}

消费者1(处理消息时间较短)

package com.feng.shouDongYingDa;

import com.feng.utils.RabbitMQUtil;
import com.feng.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 13:47
 * @Version 1.0
 * @Description 手动应答消费者1(处理消息时间较短的)
 */
public class Work03 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("C1消费消息的时间较短");

        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,(String consumerTag, Delivery message)->{
            //睡一秒,这里用了一个自己封装的工具类,就是毫秒转秒
            SleepUtils.sleep(1);
            System.out.println("接收到的消息是:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 1.消息的标记 (Envelopes就是一些消息属性的封装类)
             * 2.是否批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        },consumerTag->{

        });
    }
}

消费者2(处理消息时间较长)

package com.feng.shouDongYingDa;

import com.feng.utils.RabbitMQUtil;
import com.feng.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/24 13:47
 * @Version 1.0
 * @Description 手动应答消费者2(处理消息时间较长的)
 */
public class Work04 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        System.out.println("C2消费消息的时间较长");

        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,(String consumerTag, Delivery message)->{
            //睡一秒
            SleepUtils.sleep(30);
            System.out.println("接收到的消息是:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 1.消息的标记 (Envelopes就是一些消息属性的封装类)
             * 2.是否批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        },consumerTag->{

        });
    }
}

测试

因为是轮询消费消息的,所以发两个消息是一个消费者一个,但是因为消费者2睡了30s,所以比较慢,如果在其睡眠时间中断程序模拟宕机,消息并不会丢失,而是会重新入队,由其他消费者消费,如下所示

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/33265.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为什么说 Windows 10 不会被 DDoS SSDP反射攻击利用

为什么说 Windows 10 不会被 DDoS SSDP反射攻击利用一段来自陌生人的对话我所在网络拓扑图一、SSDP 协议极简介绍二、模拟查找 SSDP 设备2.1 Linux 发现 SSDP 服务2.2 Windows 发现 SSDP 服务三、Windows 10 VS Windows 7 数据包四、回答先前的问题回答 1:Windows 1…

前后端分离项目,vue+uni-app+php+mysql订座预约小程序系统设计与实现

功能介绍 【后台功能】 广告管理:设置小程序首页轮播图广告和链接 留言列表:所有用户留言信息列表,支持删除 会员列表:查看所有注册会员信息,支持删除 录入资讯:录入资讯标题、内容等信息 管理资讯&#x…

证书格式说明

证书格式说明 参考: 证书格式说明SSL中,公钥、私钥、证书的后缀名都是些啥? PEM 参考: Pem格式 Privacy-Enhanced Mail (PEM)是存储、传输密码学的密钥、公开密钥证书和其他数据的文件格式的业界标准。 许多加密标准使用ASN.1…

几行 Python 代码就可以提取数百个时间序列特征

以下所有内容均来自python绿色通道订阅号,个人整理主要为了个人方便查看,希望也可以对各位有所帮助 时间序列数据是随着时间的推移反复捕获的变量值,随着时间的推移可以产生一系列的按时间顺序索引的数据点。在时间序列中,数据具…

dp入门(二)

目录 45、跳跃计划 53、最大子数组和 55、跳跃游戏 62、不同路径 63、不同路径2 64、最小路径和 70、爬楼梯 72、编辑距离 84、柱形图中最大的矩形 85、最大矩形 4721、排队 45、跳跃计划 当前可移动距离尽可能多走,如果还没到终点,步数再加一。整体…

Spring Boot 入门

37) Boot 骨架项目 https://start.spring.io/pom.xml 38) Boot War项目 步骤1:创建模块,区别在于打包方式选择 war 步骤2:编写控制器 Controller public class MyController { ​RequestMapping("/hello")public String abc() …

南京溧水农民丰收节 国稻种芯·中国水稻节:江苏味稻文化

南京溧水农民丰收节 国稻种芯中国水稻节:江苏味稻文化 (融媒体记者 诸婧雯)新闻中国采编网 中国新闻采编网 谋定研究中国智库网 国稻种芯中国水稻节 中国三农智库网-功能性农业农业大健康大会报道:由溧水区政府、市农业农村局主办…

MCE | Hippo 途径与靶向策略

在 PubMed 输入了“Hippo pathway or YAP/TAZ”,小编发现近十年来与 Hippo 通路沾点边的研究势头猛烈,且发的文章不少都“非富即贵”,如发表在 Nature Cell Biology 上的两篇关于 YAP (TAZ) 相变的文章 (两篇结论相反的文章,还能双…

红黑树C++实现

目录 一、红黑树的概念 二、红黑树的性质 三、红黑树节点的定义 四、红黑树的插入 4.1 插入节点 4.2 插入节点的颜色 4.3 调整情况1 4.4 调整情况2 4.5 调整情况3 4.6 调整情况总结 五、调整的实现 5.1 调整的步骤分析 5.2 代码实现 六、树的平衡判断 七、源代码…

微信小程序制作要多少钱?【制作小程序】

关于微信小程序制作要多少钱的问题,是很多企业商家在制作小程序之前需要了解的事项,因为总是听说制作小程序的费用有高有低,而他们又对这方面不太了解,所以也还是需要了解微信小程序制作要多少钱的。那么微信小程序制作要多少钱呢…

RocketMQ中生产者发消息前为啥一定要调用start()方法?

前言 我们在使用RocketMQ发送消息时,一般都会使用DefaultMQProducer,类型的代码如下: DefaultMQProducer producer new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("42.192.50.8:9876"); try {pr…

Chrome 103支持使用本地字体,纯前端导出PDF优化

在前端导出PDF,解决中文乱码一直是一个头疼的问题。要解决这个问题,需要将ttf等字体文件内容注册到页面PDF生成器中。但是之前网页是没有权限直接获取客户机器字体文件,这时就需要从服务器下载字体文件或者提示用户选择字体文件上传到页面。对…

链接杂谈 CASPP

构建大型程序 构建大型程序,不可避免的一个问题是链接问题: - 链接器提示:缺少某个模块 缺少某个库 不兼容的库版本 理解全局变量的链接 你的代码可能有多个全局变量,有些是强变量,有些是弱定义,执行…

清除浮动的常用方法

关于浮动 我们为什么需要浮动? 我们想把多个块级元素放到同一行上。 打破标准流的限制。 浮动原来做图文混排效果,现在主要用来做网页布局的。 浮动语法 只有左浮动和右浮动。 float: left; float: right;浮动特点 1.浮动元素会脱离标准流&#x…

Win10禁止应用独占麦克风

痛点需求: qq和微信同时发起语音通话,发现只有一个qq说话对方能听到,但是微信却不能,这是典型的应用程序独占了麦克风,导致其他应用无法使用。 有没有办法让qq和微信同时使用麦克风呢? 答案是:有…

图的拓扑序列

拓扑序列: 拓扑序是按照点的先后顺序排列的。拓扑序列满足以下两点: 1.每个顶点在序列中出现且只出现一次。 2.若存在一条从顶点 A 到顶点 B 的路径,那么在序列中顶点 A 出现在顶点 B 的前面。 拓扑序列只存在于有向无环图中。可以理解成…

MCE | 肝炎病毒是如何诱发肝癌的

肝炎病毒分类 肝炎病毒是世界上最常见的肝炎病因,其它原因包括酗酒、某些药物、毒素、其他感染、自身免疫性疾病和非酒精性脂肪性肝炎 (NASH)。肝炎病毒共有五种主要的肝炎病毒株,分别为 A、B、C、D 和 E 型。目前,全世界大约有 3.25 亿人患…

2023中国绿色铝业国际峰会

会议背景 铝行业属于能源高度密集型行业,主要包括铝矿石开采、氧化铝生产、电解铝生产和铝材加工等环节。我国原铝产量自2001年以来一直占据世界首位,连续7年产量占比超过全球50%。然而与国际先进铝生产企业相比,我国铝生产企业单位原铝碳…

C# 自定义事件

一 自定义事件 例如,利用自定义绘制的技术,画出一个圆角按钮。 现在来看,怎么样给它添加自定义的事件。 二 要点与细节 1 Control 类本身就有继承的鼠标和键盘事件,这里只是一个引子,用于引出更复杂的自定义事件。 …

web测试——业务测试2

1.历史数据 前端: 组件相关  组件内部是否动过;  展示的数据是否受影响;  失焦后的校验(爆红) 页面样式相关  坐标位置、  按钮位置是否动过,  新版本上线对历史配置的影响 交互提示相关  新手引导的展示位置、关闭后的展…