rocketmq延迟消息的底层原理浅析

news2025/6/7 15:45:03

rocketmq延迟消息的底层原理

消息实体

延时消息是指允许消息在指定延迟时间后才被消费者消费

Apache RocketMQ 中,消息的核心实体类是 org.apache.rocketmq.common.message.Message

public class Message implements Serializable {
    private String topic;                 // 消息主题(必填)
    private int flag;                     // 消息标志(用户自定义)
    private Map<String, String> properties; // 消息属性(键值对,可用于过滤、追踪等)
    private byte[] body;                  // 消息体(消息内容)
    private String transactionId;         // 事务ID(用于事务消息)
    ...
}

在消息属性中properties,有一些常见属性:

  • TAGS:标签,可用于消息过滤
  • DELAY:延迟级别(延时消息)
  • TIMER_DELAY_MS:延迟投递的毫秒数
  • TIMER_DELAY_SEC:延迟投递的秒数
  • TIMER_DELIVER_MS:精准指定投递时间戳

实现机制

延迟等级

RocketMQ 将延迟消息设计成 “延迟级别(delayLevel)” 的形式,每个级别对应一个固定的延迟时间。在4.x的版本中,RocketMQ不支持任意时间精度的延迟,而是预设了18个延迟等级。使用使用ConcurrentSkipListMap存储延迟级别与时间的映射关系

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  
private final ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */>delayLevelTable = new ConcurrentSkipListMap<>();

public void setDelayTimeLevel(int level) {
    this.putProperty("DELAY", String.valueOf(level));
}

在5.x的版本中支持任意时间延迟

public void setDelayTimeSec(long sec) {
    this.putProperty("TIMER_DELAY_SEC", String.valueOf(sec));
}

public void setDelayTimeMs(long timeMs) {
    this.putProperty("TIMER_DELAY_MS", String.valueOf(timeMs));
}

public void setDeliverTimeMs(long timeMs) {
    this.putProperty("TIMER_DELIVER_MS", String.valueOf(timeMs));
}

消息的处理流程

消息进入延迟队列

Producer 发送延迟消息时,设置 message.setDelayTimeLevel(x)。延迟消息到达Broker不会立即进入你设置的业务 Topic;会先被投递到名为 SCHEDULE_TOPIC_XXXX 的系统内置 Topic

public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // ...
      if (msg.getDelayTimeLevel() > 0) {
          // 如果超过了最大延迟级别
          if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
              msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
          }
          // 获取RMQ_SYS_SCHEDULE_TOPIC
          topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
          // 根据延迟级别选取对应的队列
          int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

          // 将消息原本的TOPIC和队列ID设置到消息属性中
          MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
          MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
          msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
          // 设置SCHEDULE_TOPIC
          msg.setTopic(topic);
          msg.setQueueId(queueId);
      }
  }
        // ...
    }
}

  1. 判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延时等级

  2. topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPICRMQ_SYS_SCHEDULE_TOPIC是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX

  3. int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()),根据延迟级别选取对应的队列,把相同延迟级别的消息放在同一个队列中

  4. 将消息原本的TOPIC和队列ID设置到消息属性中,MessageAccessor 是 RocketMQ 中的一个工具类,作用是以非公开的方式修改 Message 对象的内部字段或属性

    public static void putProperty(Message msg, String name, String value) {
            msg.putProperty(name, value);
        }
    

启动定时任务

Broker启动的时候会调用ScheduleMessageServicestart方法,start方法中为不同的延迟级别创建了对应的定时任务来处理延迟消息

public class ScheduleMessageService extends ConfigManager {
    // 首次执行延迟的时间
    private static final long FIRST_DELAY_TIME = 1000L;
    public void start() {
        if (started.compareAndSet(false, true)) {
            super.load();
            this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
            if (this.enableAsyncDeliver) {
                this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
            }
            // 遍历所有的延迟级别
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) { // 如果获取的消费进度为空
                    offset = 0L; // 默认为0,从第一条消息开始处理
                }
                if (timeDelay != null) {
                    if (this.enableAsyncDeliver) {
                        this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                    }
                    // 为每个延迟级别创建对应的定时任务
                    this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
                }
            }
            // ...
        }
    }
}

遍历所有的延迟等级,为每个延迟等级创建对应的定时任务。

每个DeliverDelayedMessageTimerTask负责:

  1. 从对应延迟级别的队列中扫描消息
  2. 检查消息的投递时间是否到达
  3. 将到期消息重新投递到目标主题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2401337.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【openssl】升级为3.3.1,避免安全漏洞

本文档旨在形成 对Linux系统openssl版本进行升级 的搭建标准操作过程&#xff0c;搭建完成后&#xff0c;实现 openssl 达到3.3以上版本&#xff0c;避免安全漏洞 效果。 一、查看当前版本 版本不高于3.1的&#xff0c;均需要升级。 # 服务器上运行以下命令&#xff0c;查看…

使用 HTML +JavaScript 从零构建视频帧提取器

在视频编辑、内容分析和多媒体处理领域&#xff0c;常常需要从视频中提取关键帧。手动截取不仅效率低下&#xff0c;还容易遗漏重要画面。本文介绍的视频帧提取工具通过 HTML5 技术栈实现了一个完整的浏览器端解决方案&#xff0c;用户可以轻松选择视频文件并进行手动或自动帧捕…

论文速读《DexWild:野外机器人策略的灵巧人机交互》

项目链接&#xff1a;https://dexwild.github.io/ 论文链接&#xff1a;https://arxiv.org/pdf/2505.07813 0. 简介 2025年5月&#xff0c;卡内基梅隆大学&#xff08;CMU&#xff09;发布了一篇突破性论文《DexWild: Dexterous Human Interactions for In-the-Wild Robot Pol…

Bug问题

一、list 页面 import React, { useEffect, useState } from react; import { shallowEqual, useHistory, useSelector } from dva; import { Button, message } from choerodon-ui/pro; import formatterCollections from hzero-front/lib/utils/intl/formatterCollections; …

【数据结构】5. 双向链表

文章目录 一、链表的分类1、双向链表的结构 二、双向链表的实现0、准备工作1、初始化2、打印3、尾插4、头插5、尾删6、头删7、查找8、在指定位置之后插入数据9、删除指定位置10、销毁 一、链表的分类 链表总共分为8种&#xff0c;具体的分组方式如图所示&#xff1a; 带头指的…

【Linux手册】冯诺依曼体系结构

目录 前言 五大组件 数据信号 存储器&#xff08;内存&#xff09;有必要吗 常见面试题 前言 冯诺依曼体系结构是当代计算机基本架构&#xff0c;冯诺依曼体系有五大组件&#xff0c;通过这五大组件直观的描述了计算机的工作原理&#xff1b;学习冯诺依曼体系可以让给我们更…

Mobile App UI自动化locator

在开展mobile app UI层自动化测试时&#xff0c;编写目标元素的locator是比较耗时的一个环节&#xff0c;弄清楚locator背后的逻辑&#xff0c;可以有效降低UI层测试维护成本。此篇博客以webdriverioappium作为UI自动化工具为例子&#xff0c;看看有哪些selector方法&#xff0…

(LeetCode 每日一题) 1061. 按字典序排列最小的等效字符串 (并查集)

题目&#xff1a;1061. 按字典序排列最小的等效字符串 思路&#xff1a;使用并查集&#xff0c;来将等价的字符连起来&#xff0c;形成一棵树。这棵树最小的字母&#xff0c;就代表整颗树&#xff0c;时间复杂度0(n)&#xff0c;细节看注释。 C版本&#xff1a; class Solutio…

linux 安装mysql8.0;支持国产麒麟,统信uos系统

一&#xff1a;使用我已经改好的mysql linux mysql8.0解压可用&#xff0c;点我下载 也在国产麒麟系统&#xff0c;统信uos系统也测试过&#xff0c;可用&#xff1b; 下载后&#xff0c;上传mysql.tar.gz 然后使用root角色去执行几个命令即可&#xff1b;数据库密码&#xf…

C#实现远程锁屏

前言 这是一次提前下班没有锁屏进而引发的一次思考后的产物&#xff0c;思考的主要场景是当人离开电脑后&#xff0c;怎么能控制电脑锁屏&#xff0c;避免屏幕上的聊天记录被曝光。 首先想到通过系统的电源计划设置闲置超时时间熄屏&#xff0c;这可能是最接近场景的解决方案&a…

SpringBoot3整合MySQL8的注意事项

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 注意事项 1、请添加添加如下依赖&#xff1a; <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><…

智语心桥:当AI遇上“星星的孩子”,科技如何点亮沟通之路?

目录: 引言:当科技的温度,遇见“星星的孩子”“智语心桥”:一座为孤独症儿童搭建的AI沟通之桥核心技术探秘:AI如何赋能“读心”与“对话”?个性化魔法:AI如何实现“千人千面”的精准干预?应用场景畅想:从家庭到机构,AI的全方位支持为什么是“智语心桥”?——价值、可…

itop-3568开发板机器视觉opencv开发手册-图像绘制-画线

本小节代码在配套资料“iTOP-3568 开发板\03_【iTOP-RK3568 开发板】指南教程 \04_OpenCV 开发配套资料\11”目录下&#xff0c;如下图所示&#xff1a; cv2.line 函数功能&#xff1a; 绘制一条直线。 函数原型&#xff1a; cv2.line(img,pt1,pt2,color,thicknessNone,lin…

sudo docker exec -it backend bash 以交互方式(interactive)进入正在运行的 Docker 容器的命令行环境

sudo docker exec -it backend bash&#x1f50d; 总体作用 这条命令的作用是&#xff1a; 以交互方式&#xff08;interactive&#xff09;进入名为 backend 的正在运行的 Docker 容器的命令行环境。 你会进入容器的“终端”&#xff0c;就像登录到一个 Linux 系统一样&#…

浅析EXCEL自动连接PowerBI的模板

浅析EXCEL自动连接PowerBI的模板 之前我分享过&#xff1a;PowerBI链接EXCEL实现自动化报表 &#xff0c;其中一个关键工具就是提到的EXCEL链接模板&#xff0c;即宏工作薄。 今天就大概来聊一聊这个宏工作簿的底层原理是啥&#xff0c;怎么实现的。 第一步&#xff1a; 打开…

java32

1.反射 获取类&#xff1a; 获取构造方法&#xff1a; 获取权限修饰符&#xff1a; 获取参数信息&#xff1a; 利用反射出来的构造器来创建对象&#xff1a; 获取成员变量&#xff1a; 获取成员方法&#xff1a; 综合练习&#xff1a; 动态代理&#xff1a;

【Redis】zset 类型

zset 一. zset 类型介绍二. zset 命令zaddzcard、zcountzrange、zrevrange、zrangebyscorezpopmax、zpopminzrank、zrevrank、zscorezrem、zremrangebyrank、zremrangebyscorezincrby阻塞版本命令&#xff1a;bzpopmax、bzpopmin集合间操作&#xff1a;zinterstore、zunionstor…

从Gartner报告看Atlassian在生成式AI领域的创新路径与实践价值

本文来源atlassian.com&#xff0c;由Atlassian全球白金合作伙伴——龙智翻译整理。 二十余年来&#xff0c;Atlassian始终是创新领域的领军者。凭借对团队协作本质的深刻理解&#xff0c;Atlassian在AI时代仍持续引领协作方式的革新。如今&#xff0c;这一领先地位再次获得权威…

Kafka 安装教程(支持 Windows / Linux / macOS)

一、下载 1、kafka官网下载地址:https://kafka.apache.org/downloads 根据实际情况下载对应的版本 2、JDK的版本最好是17+ JDK下载地址:https://www.oracle.com/java/technologies/javase/jdk17-0-13-later-archive-downloads.html 二、安装 前置条件 安装 Java(至少 Jav…

OpenCV种的cv::Mat与Qt种的QImage类型相互转换

一、首先了解cv::Mat结构体 cv::Mat::step与QImage转换有着较大的关系。 step的几个类别区分: step:矩阵第一行元素的字节数step[0]:矩阵第一行元素的字节数step[1]:矩阵中一个元素的字节数step1(0):矩阵中一行有几个通道数step1(1):一个元素有几个通道数(channel()) cv::Ma…