messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
延迟消息级别
 

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    //事务消息处理
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // 如果是延迟消息
        if (msg.getDelayTimeLevel() > 0) {
            // 如果设置的值过大,则设置为最大延迟级别
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 修改Topic
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            // 根据延迟级别,决定要将其投递到那个队列中
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
            // 记录原始的 topic 和 队列信息
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 修改topic和队列信息
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
    
RocketMQ的Broker端在存储生产者写入消息时,首先将其写入CommitLog里,为了不让用户立刻就能消费到这条消息,
这里先将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并且根据设置的延迟级别选择将消息投放到哪一个队列里。
 

 
 
 
 整个流程;
- 生产者发送延迟消息到Broker里
 - 把消息转发到SCHEDULE_TOPIC_XXXX主题下的队列中
 - 延迟服务定期消费SCHEDULE_TOPIC_XXXX主题下的消息,到时间了就把它拿到CommitLog中
 - 消息重新被投放到目标Topic里
 - 消费者消费延迟消息
 


















