rocketmq延迟消息的底层原理
消息实体
延时消息是指允许消息在指定延迟时间后才被消费者消费
Apache RocketMQ
中,消息的核心实体类是 org.apache.rocketmq.common.message.Message
public class Message implements Serializable {
private String topic; // 消息主题(必填)
private int flag; // 消息标志(用户自定义)
private Map<String, String> properties; // 消息属性(键值对,可用于过滤、追踪等)
private byte[] body; // 消息体(消息内容)
private String transactionId; // 事务ID(用于事务消息)
...
}
在消息属性中properties
,有一些常见属性:
TAGS
:标签,可用于消息过滤DELAY
:延迟级别(延时消息)TIMER_DELAY_MS
:延迟投递的毫秒数TIMER_DELAY_SEC
:延迟投递的秒数TIMER_DELIVER_MS
:精准指定投递时间戳
实现机制
延迟等级
RocketMQ
将延迟消息设计成 “延迟级别(delayLevel)” 的形式,每个级别对应一个固定的延迟时间。在4.x的版本中,RocketMQ
不支持任意时间精度的延迟,而是预设了18个延迟等级。使用使用ConcurrentSkipListMap
存储延迟级别与时间的映射关系
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private final ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */>delayLevelTable = new ConcurrentSkipListMap<>();
public void setDelayTimeLevel(int level) {
this.putProperty("DELAY", String.valueOf(level));
}
在5.x的版本中支持任意时间延迟
public void setDelayTimeSec(long sec) {
this.putProperty("TIMER_DELAY_SEC", String.valueOf(sec));
}
public void setDelayTimeMs(long timeMs) {
this.putProperty("TIMER_DELAY_MS", String.valueOf(timeMs));
}
public void setDeliverTimeMs(long timeMs) {
this.putProperty("TIMER_DELIVER_MS", String.valueOf(timeMs));
}
消息的处理流程
消息进入延迟队列
Producer
发送延迟消息时,设置 message.setDelayTimeLevel(x)
。延迟消息到达Broker
不会立即进入你设置的业务 Topic
;会先被投递到名为 SCHEDULE_TOPIC_XXXX
的系统内置 Topic
。
public class CommitLog {
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// ...
if (msg.getDelayTimeLevel() > 0) {
// 如果超过了最大延迟级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 获取RMQ_SYS_SCHEDULE_TOPIC
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 根据延迟级别选取对应的队列
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 将消息原本的TOPIC和队列ID设置到消息属性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 设置SCHEDULE_TOPIC
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// ...
}
}
-
判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延时等级
-
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
,RMQ_SYS_SCHEDULE_TOPIC
是在TopicValidator
中定义的常量,值为SCHEDULE_TOPIC_XXXX
-
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel())
,根据延迟级别选取对应的队列,把相同延迟级别的消息放在同一个队列中 -
将消息原本的
TOPIC
和队列ID
设置到消息属性中,MessageAccessor
是 RocketMQ 中的一个工具类,作用是以非公开的方式修改Message
对象的内部字段或属性。public static void putProperty(Message msg, String name, String value) { msg.putProperty(name, value); }
启动定时任务
Broker启动的时候会调用ScheduleMessageService
的start
方法,start方法中为不同的延迟级别创建了对应的定时任务来处理延迟消息
public class ScheduleMessageService extends ConfigManager {
// 首次执行延迟的时间
private static final long FIRST_DELAY_TIME = 1000L;
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
// 遍历所有的延迟级别
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) { // 如果获取的消费进度为空
offset = 0L; // 默认为0,从第一条消息开始处理
}
if (timeDelay != null) {
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// 为每个延迟级别创建对应的定时任务
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// ...
}
}
}
遍历所有的延迟等级,为每个延迟等级创建对应的定时任务。
每个DeliverDelayedMessageTimerTask
负责:
- 从对应延迟级别的队列中扫描消息
- 检查消息的投递时间是否到达
- 将到期消息重新投递到目标主题