RocketMQ 延迟队列

news2025/7/30 11:32:36

什么是延迟队列

指消息发送到某个队列后,在指定多长时间之后才能被消费。

应用场景

RocketMQ 延迟队列

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。

broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m

8m 9m 10m 20m 30m 1h 2h”,18个level。

可以配置自定义messageDelayLevel。需要注意的是 messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:

msg.setDelayLevel(level)

level有以下三种情况:

level == 0,消息为非延迟消息1<=level<=maxLevel,消息延迟特定时间,例如level1,延迟1s

level > maxLevel,则level maxLevel,例如level==20,延迟2h

在 RocketMQ中定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

RocketMQ 延迟队列和RabbitMQ延迟队列相比

RocketMQ直接一步到位,功能类似于RabbitMQ的延迟交换器插件、而RabbitMQ提供了延迟队列,但如果是基于消息设置每个消息设置不同的延迟时间,会产生前面的消息早已过期,但后面的消息还存在消息队列中,故此RabbitMQ提供了延迟交换器插件,而RocketMQ 延迟队列设计就比较好了。

具体示例

生产者

// 实例化生产者,并指定生产组名称DefaultMQProducer producer =newDefaultMQProducer("myproducer_group_topic_name_delay_01");//设置实例名称,一个jvm中有多个生产者可以根据实例名区分//默认default
        producer.setInstanceName("topic_delay");// 指定nameserver的地址
        producer.setNamesrvAddr("localhost:9876");//设置同步重试次数
        producer.setRetryTimesWhenSendFailed(2);//设置异步发送次数//producer.setRetryTimesWhenSendAsyncFailed(2);// 初始化生产者
        producer.start();for(int i =0; i <20; i++){Message message =newMessage("topic_name_delay",("key="+ i).getBytes("utf-8"));//设置延迟消费时间 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2h
            message.setDelayTimeLevel(i);// 1 同步发送  如果发送失败会根据重试次数重试SendResult send = producer.send(message);SendStatus sendStatus = send.getSendStatus();System.out.println(sendStatus.toString());}

消费者

/**
         * 推消息消费
         */DefaultMQPushConsumer defaultMQPushConsumer =newDefaultMQPushConsumer("consumer_group_delay_01");// 指定nameserver的地址
        defaultMQPushConsumer.setNamesrvAddr("localhost:9876");
        defaultMQPushConsumer.subscribe("topic_name_delay","*");// 1 提高消费并行度
        defaultMQPushConsumer.setConsumeThreadMax(10);
        defaultMQPushConsumer.setConsumeThreadMin(1);// 2 以批量方式进行 消费// 设置消息批处理的一个批次中消息的最大个数
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//设置重试次数 默认16次
        defaultMQPushConsumer.setMaxReconsumeTimes(1);// 添加消息监听器,一旦有消息推送过来,就进行消费
        defaultMQPushConsumer.setMessageListener(newMessageListenerConcurrently(){@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context){//final MessageQueue messageQueue = context.getMessageQueue();for(MessageExt msg : msgs){System.out.println(msg);try{System.out.println(newString(msg.getBody(),"utf-8"));}catch(UnsupportedEncodingException e){
                        e.printStackTrace();}}// 消息消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 消息消费失败//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});

消费消息,按照0到18级别来,0 表示不延迟,1表示延迟1s,大于等于18表示延迟2h

按照级别一次类推

默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”,18个level。

这里只拷贝0-8 的打印日志,可以自己等待确认。

MessageExt [queueId=0, storeSize=195, queueOffset=19, sysFlag=0, bornTimestamp=1628949643548, bornHost=/192.168.0.103:55518, storeTimestamp=1628949643554, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F0000000000029A8A, commitLogOffset=170634, bodyCRC=858365373, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=20, CONSUME_START_TIME=1628949650433, UNIQ_KEY=C0A8006748D07C53A9EB47ABD51C0000, CLUSTER=DefaultCluster, WAIT=true, DELAY=0}, body=[107, 101, 121, 61, 48], transactionId=‘null’}]
key=0
MessageExt [queueId=1, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643558, bornHost=/192.168.0.103:55518, storeTimestamp=1628949644566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ACF8, commitLogOffset=175352, bodyCRC=1143909675, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949650435, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5260001, CLUSTER=DefaultCluster, WAIT=true, DELAY=1, REAL_QID=1}, body=[107, 101, 121, 61, 49], transactionId=‘null’}]
key=1
MessageExt [queueId=2, storeSize=234, queueOffset=18, sysFlag=0, bornTimestamp=1628949643561, bornHost=/192.168.0.103:55518, storeTimestamp=1628949648566, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002ADE2, commitLogOffset=175586, bodyCRC=1562901649, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=19, CONSUME_START_TIME=1628949650436, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5290002, CLUSTER=DefaultCluster, WAIT=true, DELAY=2, REAL_QID=2}, body=[107, 101, 121, 61, 50], transactionId=‘null’}]
key=2
MessageExt [queueId=3, storeSize=234, queueOffset=17, sysFlag=0, bornTimestamp=1628949643566, bornHost=/192.168.0.103:55518, storeTimestamp=1628949653569, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002AECC, commitLogOffset=175820, bodyCRC=706792455, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=18, CONSUME_START_TIME=1628949653572, UNIQ_KEY=C0A8006748D07C53A9EB47ABD52E0003, CLUSTER=DefaultCluster, WAIT=true, DELAY=3, REAL_QID=3}, body=[107, 101, 121, 61, 51], transactionId=‘null’}]
key=3
MessageExt [queueId=0, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643568, bornHost=/192.168.0.103:55518, storeTimestamp=1628949673574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B0A0, commitLogOffset=176288, bodyCRC=876894628, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949673577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5300004, CLUSTER=DefaultCluster, WAIT=true, DELAY=4, REAL_QID=0}, body=[107, 101, 121, 61, 52], transactionId=‘null’}]
key=4
MessageExt [queueId=1, storeSize=234, queueOffset=23, sysFlag=0, bornTimestamp=1628949643570, bornHost=/192.168.0.103:55518, storeTimestamp=1628949703574, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B18A, commitLogOffset=176522, bodyCRC=1128491314, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=24, CONSUME_START_TIME=1628949703577, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5320005, CLUSTER=DefaultCluster, WAIT=true, DELAY=5, REAL_QID=1}, body=[107, 101, 121, 61, 53], transactionId=‘null’}]
key=5
MessageExt [queueId=2, storeSize=234, queueOffset=20, sysFlag=0, bornTimestamp=1628949643572, bornHost=/192.168.0.103:55518, storeTimestamp=1628949763575, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B35E, commitLogOffset=176990, bodyCRC=1514813576, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=21, CONSUME_START_TIME=1628949763580, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5340006, CLUSTER=DefaultCluster, WAIT=true, DELAY=6, REAL_QID=2}, body=[107, 101, 121, 61, 54], transactionId=‘null’}]
key=6
MessageExt [queueId=3, storeSize=234, queueOffset=19, sysFlag=0, bornTimestamp=1628949643574, bornHost=/192.168.0.103:55518, storeTimestamp=1628949823580, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B532, commitLogOffset=177458, bodyCRC=760023070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=20, CONSUME_START_TIME=1628949823583, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5360007, CLUSTER=DefaultCluster, WAIT=true, DELAY=7, REAL_QID=3}, body=[107, 101, 121, 61, 55], transactionId=‘null’}]
key=7
MessageExt [queueId=0, storeSize=234, queueOffset=22, sysFlag=0, bornTimestamp=1628949643576, bornHost=/192.168.0.103:55518, storeTimestamp=1628949883582, storeHost=/192.168.0.103:10911, msgId=C0A8006700002A9F000000000002B706, commitLogOffset=177926, bodyCRC=1039275407, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_name_delay’, flag=0, properties={MIN_OFFSET=0, REAL_TOPIC=topic_name_delay, MAX_OFFSET=23, CONSUME_START_TIME=1628949883586, UNIQ_KEY=C0A8006748D07C53A9EB47ABD5380008, CLUSTER=DefaultCluster, WAIT=true, DELAY=8, REAL_QID=0}, body=[107, 101, 121, 61, 56], transactionId=‘null’}]
key=8

同样我们在控制台可以看到,存放的消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/367836.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自学黑客2年都没入门,从零入门渗透有那么难吗?附入门教程。

最近年底了&#xff0c;不少朋友都是在总结一年的学习成果。最后不少人发现完成情况与自己最初定下的目标相去甚远。 我认识不少人自学大半年了&#xff1a;b站&#xff0c;网盘&#xff0c;各种各样的资源数不胜数&#xff0c;总之只要是跟安全相关的不管学不学&#xff0c;先…

IP地址,子网掩码,网段 概念详解

文章目录1. 子网掩码1.1 子网掩码的概念及作用1.2 子网掩码的组成1.3 子网掩码的表示方法1.4 为什么要使用子网掩码&#xff1f;1.5 子网掩码的分类2. 子网掩码和IP地址的关系2.1 根据掩码确定网段IP地址是以 网络号和 主机号来标示网络上的主机的&#xff0c;我们把网络号相同…

「JVM 编译优化」Graal 编译器

文章目录1. 历史背景2. 构建编译调试环境3. JVMCI 编译器接口4. 代码中间表示5. 代码优化与生成1. 历史背景 Graal 编译器在 JDK 9 以 Jaotc 提前编译工具的形式首次加入到官方的 JDK 中&#xff0c;JDK 10 开始提供替换&#xff08;得益于 HotSpot 编译器接口&#xff0c;Jav…

vue中的key值

1. 什么是key&#xff1f; 当我们对一个数据进行遍历生成DOM时&#xff0c;vue的内部会根据索引号对其进行key的定义&#xff0c;而key会作为每一个新生成DOM的唯一标识。 2.为什么不建议索引值作为key&#xff1f; 很多人喜欢直接使用索引值作为key&#xff0c;当所遍历的数…

项目缓存问题处理

1、public/index.html文件头部配置 <meta http-equiv"pragram" content"no-cache"> <meta http-equiv"cache-control" content"no-cache,no-store,must-revalidate"> <meta http-equiv"expires" content&…

Video 标签无法播放 mp4 的原因和解决办法

问题 用 QQ 的截图录屏功能录制的 mp4 视频&#xff0c;无法用 <video> 标签正常播放。 原因 通过搜索的说法是&#xff1a; 查阅文档&#xff08;不知道是啥文档&#xff09;&#xff0c;关于video标签所支持的视频格式和编码&#xff1a; MPEG4 带有H.264视频编码和…

【Windows Server 2019】发布服务器 | 远程桌面服务的安装与配置 Ⅰ——理论,实验拓扑和安装基于RemoteAPP的RDS

目录1. 理论1.1 什么是远程桌面服务2. 实验拓扑2.1 拓扑说明3. 安装基于RemoteAPP的RDS关联博文1. 理论 1.1 什么是远程桌面服务 远程桌面服务 (RDS) 是一个卓越的平台&#xff0c;可以生成虚拟化解决方案来满足每个最终客户的需求&#xff0c;包括交付独立的虚拟化应用程序、…

茂名市 2021 年高中信息技术学科素养展评

没事干&#xff0c;发一下去年去比赛的题目。 目录 第一题 30分 第二题 30分 第一题 30分 题目&#xff1a; “姐姐&#xff0c;乘除法运算太难了&#xff0c;有什么办法能熟练掌握吗&#xff1f;”今年 读小学四年级的表弟向李红求救。为了提高表弟的运算能力&#xff0c;…

Candence allegro 创建等长的方法

随着源同步时序电路的发展,越来越多的并行总线开始采用这种时序控制电路,最典型的代表当属目前炙手可热的DDRx系列。下图这种点到点结构的同步信号,对于攻城狮来说,设置等长约束就非常easy了图片。 But,对于有4、6、8、、、等多颗DDR芯片的ACC同步信号来说,要设置等长约束…

在Excel中按条件筛选数据并存入新的表

案例 老板想要看去年每月领料数量大于1000的数据。手动筛选并复制粘贴出来,需要重复操作12次,实在太麻烦了,还是让Python来做吧。磨刀不误砍柴工,先整理一下思路: 1读取原表,将数量大于1000的数据所对应的行整行提取(如同在excel表中按数字筛选大于1000的) 2将提取的数…

Mysql安装和基本使用

MySQLMySQL 是一个关系型数据库管理系统&#xff0c;由瑞典 MySQL AB 公司开发&#xff0c;目前属于 Oracle 公司。MySQL 是一种关联数据库管理系统&#xff0c;关联数据库将数据保存在不同的表中&#xff0c;而不是将所有数据放在一个大仓库内&#xff0c;这样就增加了速度并提…

Linux 驱动基础

注册驱动模块时给模块传递参数 在一些情况下&#xff0c;我们要动态的改变驱动中某个变量的值&#xff0c;那么就可以在注册时给驱动模块传递参数。 给驱动模块中传递参数&#xff0c;需要定义好接受参数值的全局变量&#xff0c;并调用module_param 来引用它&#xff0c;具体…

Spring架构篇--2.6 远程通信基础--Rpc-Socket实战篇

前言&#xff1a;微服务之间怎么通过socket完成通信&#xff1b;本文通过demo 展示微服务如何通过socket 完成服务之间的通信&#xff1b; 1 使用maven新建两个springboot 服务&#xff1a;模拟实现订单通过订单号获取商品信息&#xff1a; 1.1 创建建springboot 项目后&…

1080T、2080T、4070T显卡的深度学习性能测试和结论

先说结论&#xff1a; 4070T显卡FP32的训练和推理速度跟3090应该基本类似。但由于显存12G偏低&#xff0c;4070T不太适合如今的深度学习模型训练&#xff08;新手列外&#xff0c;大部分模型都能训练起来&#xff0c;耗电也相对很低&#xff09;&#xff0c;更适合测试最新的一…

记录一次ubuntu下配置ssh登录出现的问题

现象描述: 1. 配置完服务器端公钥和本地的私钥之后&#xff0c;ssh登录始终会让输入密码&#xff0c;用ssh -vvv rootip 查看发现发送密钥之后就没反应了。 本机debug info: debug1: Trying private key: C:\Users\wangc/.ssh/id_xxxx &#xff08;私钥文件&#xff09; debug3…

每日站会如何进行优化流程,更高效?

1、每日站会时间要求 每日站会是开发团队一个以15分钟为限的活动。每日站会每一天的上午9点准时在会议室举行。开会时间需要把握精准&#xff0c;并需要每天坚持进行站会讨论活动。 每日站会如何进行优化流程&#xff0c;更高效&#xff1f;​ 2、团队中站会的角色和职责…

Unity(三)--导入3d模型并实现UGUI界面上嵌入3d模型

Unity支持的常用模型格式及建模软件: 格式建模软件网格动画材质骨骼FBX3DMax,C4D,Blender,Maya等√√√√OBJ3DMax,C4D,Blender,Maya等√目录 导入模型并调整好位置创建2D场景(UGUI)使3d模型显示在图片前面方法一:使用Render Texture注意点导入模型并调整好位置 以FBX为例,…

SAP MM 物料管理模块入门学习笔记 2023.2.24

https://zhuanlan.zhihu.com/p/555022893 SAP 企业组织结构 SAP 物料管理模块企业组织结构从上到下分为 集团——》公司——》工厂——》库存地点 集团&#xff1a;SAP 系统组织结构最高级别&#xff1a;内部包括一个完整的SAP系统全部数据 公司代码&#xff1a; 标识集团内一…

【Kubernetes 企业项目实战】09、Rancher 2.6 管理 k8s-v1.23 及以上版本高可用集群

目录 一、Rancher 介绍 1.1Rancher简介 1.2 Rancher 和 k8s 的区别 1.3 Rancher 企业使用案例 二、安装 Rancher 2.1 初始化环境 2.2 安装 Rancher 2.3 登录 Rancher 平台 三、通过 Rancher 管理已存在的 k8s 集群 3.1 配置 rancher 3.2 导入 k8s ​四、通过 Ranc…

啊哈 算法读书笔记 第 1 章 一大波数正在靠近——排序

目录 排序算法&#xff1a; 时间复杂度&#xff1a; 排序算法和冒泡排序之间的过渡&#xff1a; 冒泡排序 冒泡排序和快速排序之间的过渡&#xff1a; 快速排序 排序算法&#xff1a; 首先出场的是我们的主人公小哼&#xff0c;上面这个可爱的娃就是啦。期末考试完了老…