RocketMQ实现延迟队列精确到秒级实现

news2025/8/1 19:50:13

前言篇:

为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,

其实对于大部分的功能是够用了的,但是以前的项目,全部都是使用了阿里云的rocketmq,原因是不同的供应商的订单的延时时间是不同的

(部分供应商的订单未支付30分钟取消,有些1个半小时取消,各种时间都有),

所以使用了大量的延时队列,但是开源版本不支持任意时间延时(希望官方支持这个功能)

为了实现这个功能,网上查询了不少资料,查询到不少相关的文章,主要实现都是基于时间轮来实现的,

但是比较少开源的代码实现(也许大家都没有这个需求吧)

debug实践篇:

1. 撸起袖子加油干,首先,下载源代码 https://github.com/apache/rocketmq.git,导入ide

运行mvn package 生成jar包,如果成功的话,会生成到distribution目录下面

2. 查看文档,发现要运行namesvr 和 broker

找到 src\main\java\org\apache\rocketmq\namesrv\NamesrvStartup.java ,开心的执行main方法,

哦哦哦哦哦,果然报错了,提示 rocketmq.home.dir 目录不存在,查看源码, 原来是从system.propeties读取的,

为了调试,我毫不犹豫的加上了配置文件,

再次运行,不报错了,控制台显示,成功啦( 生活是多么美好,空气是多么清晰! )

3.运行 broker ,打开 src\main\java\org\apache\rocketmq\broker\BrokerStartup.java,执行main方法,

添加 配置文件 ( D:\\mq\\rocketmq-rocketmq-all-4.9.2是我本地的路径,你要修改成自己的 )

1 System.setProperty("rocketmq.home.dir", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb");2 System.setProperty("user.home", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb\\home\\");

运行一下,成功了,开心的发一条消息,试试,哦哦哦哦哦。 发不出去哦( 人生最痛苦的事情是,快要成功了,却没有成功 ) 。

原来还要配置namesvr地址,在启动命令,添加 -n localhost:9876 ( 上面的namesvr 启动的ip和端口)

4.漫长的改造之路 (我们是勇敢的斯巴达勇士,一直勇往直前)

用了阿里云的延时队列,发现它的message 可以传一个时间过来(任意的延时时间)

来来来,我们复制一下( 不要告诉别人,我们一直是复制,粘贴的,没有原创, 嘘 ...... )

1/** 2      * 该类预定义一些系统键. 3      */4     static public class SystemPropKey {5         public static final String TAG = "__TAG";6         public static final String KEY = "__KEY";7         public static final String MSGID = "__MSGID";8         public static final String SHARDINGKEY = "__SHARDINGKEY";9         public static final String RECONSUMETIMES = "__RECONSUMETIMES";10         public static final String BORNTIMESTAMP = "__BORNTIMESTAMP";11         public static final String BORNHOST = "__BORNHOST";12/**13          * 设置消息的定时投递时间(绝对时间). <p>例1: 延迟投递, 延迟3s投递, 设置为: System.currentTimeMillis() + 3000; <p>例2: 定时投递,14          * 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-0115          * 11:30:00").getTime()16          */17         public static final String STARTDELIVERTIME = "__STARTDELIVERTIME";18     }
/**     * <p> 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. </p> <ol> <li>延迟投递: 延迟3s投递, 设置为: System.currentTimeMillis() + 3000;</li>     * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01     * 11:30:00").getTime()</li> </ol>     */    public void setStartDeliverTime(final long value) {        putSystemProperties(SystemPropKey.STARTDELIVERTIME, String.valueOf(value));    }

5.既然要改造rocketmq,在哪里改呢,debug,debug,debug(一直到天荒地老),功夫不负有心人,找到啦,

找到 \src\main\java\org\apache\rocketmq\broker\processor\SendMessageProcessor.java, 发现

public RemotingCommand processRequest(ChannelHandlerContext ctx,                                          RemotingCommand request) throws RemotingCommandException {        SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.consumerSendMsgBack(ctx, request);default:                SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return null;                }                mqtraceContext = buildMsgContext(ctx, requestHeader);                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);                RemotingCommand response;if (requestHeader.isBatch()) {                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);                } else {                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);                }                this.executeSendMessageHookAfter(response, mqtraceContext);return response;        }    }

继续debug,发现 sendMessage 就是处理发送消息的,

如果我们在这里判断是否延时消息就写入文件,然后返回成功到客户端,等到了时间就发送延迟消息,不就搞定了吗?

oh,yes,就是这么干的

//处理延迟消息 delay message        String startTime = msgInner.getProperty(Message.SystemPropKey.STARTDELIVERTIME);        boolean isDelayMsg = false;        long nextStartTime = 0;if (startTime != null && msgInner.getDelayTimeLevel() <= 0) {            nextStartTime = Long.parseLong(startTime);if (nextStartTime >= System.currentTimeMillis()) {                isDelayMsg = true;            }        }if (isDelayMsg) {return delayProcessor.handlePutMessageResultFuture(response, request, msgInner, ctx, queueIdInt, nextStartTime);        } else {if (traFlag != null && Boolean.parseBoolean(traFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {                    response.setCode(ResponseCode.NO_PERMISSION);                    response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                                    + "] sending transaction message is forbidden");return response;                }                putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);            } else {                putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);            }return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);        }    }

其中 delayProcessor.handlePutMessageResultFuture 是我们用来处理延迟消息的地方

我们按照每个时间一个文件夹来保存延时消息,等延时消息到达后,定时的写入延时队列里面。

详细原理,请查考 rocketmq 原理实现篇 https://www.cnblogs.com/tomj2ee/p/15815186.html

<em> </em>
package org.apache.rocketmq.broker.delay;import io.netty.channel.ChannelHandlerContext;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.rocketmq.broker.BrokerController;import org.apache.rocketmq.common.protocol.ResponseCode;import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;import org.apache.rocketmq.logging.InternalLogger;import org.apache.rocketmq.logging.InternalLoggerFactory;import org.apache.rocketmq.remoting.protocol.RemotingCommand;import org.apache.rocketmq.store.MessageExtBrokerInner;import java.io.*;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ThreadLocalRandom;public class DelayProcessor implements  Runnable {    protected static final InternalLogger log = InternalLoggerFactory.getLogger(DelayProcessor.class.getCanonicalName());    protected final BrokerController brokerController;    protected final SocketAddress storeHost;    private ExecutorService jobTaskExecute = Executors.newFixedThreadPool(16);    public DelayProcessor(final BrokerController brokerController) {        this.brokerController = brokerController;        this.storeHost = new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController                .getNettyServerConfig().getListenPort());        Thread thread = new Thread(this);        thread.setName("delayProcessor-run---thread");        thread.setDaemon(true);new File(getDelayPath()).mkdirs();        thread.start();        Thread missCallThread = new Thread(() -> {            try {for(;;) {                    Thread.sleep(10 * 1000);                    sendMissCallMsg();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        });        missCallThread.setName("delayProcessor-callback-thread");        missCallThread.start();        System.out.println("init delay success " +getDelayPath());    }    public RemotingCommand handlePutMessageResultFuture(RemotingCommand response,                                                        RemotingCommand request,                                                        MessageExtBrokerInner msgInner,                                                        ChannelHandlerContext ctx,int queueIdInt, long nextStartTime) {return handlePutMessageResult(response, request, msgInner, ctx, queueIdInt, nextStartTime);    }    private RemotingCommand handlePutMessageResult(RemotingCommand response,                                                   RemotingCommand request, MessageExtBrokerInner msg,                                                   ChannelHandlerContext ctx,int queueIdInt, long nextStartTime) {        boolean svOk = saveMsgFile(nextStartTime, msg);        SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader();        sendMessageResponseHeader.setQueueId(1);        sendMessageResponseHeader.setMsgId("0");        sendMessageResponseHeader.setQueueOffset(0l);        sendMessageResponseHeader.setTransactionId("");        RemotingCommand newCommand = RemotingCommand.createRequestCommand(ResponseCode.SUCCESS, sendMessageResponseHeader);if (svOk) {            newCommand.setCode(ResponseCode.SUCCESS);        } else {            newCommand.setCode(ResponseCode.SYSTEM_ERROR);            newCommand.setRemark("发送消息延迟失败!");        }        newCommand.setExtFields(request.getExtFields());        newCommand.setVersion(response.getVersion());        newCommand.setOpaque(response.getOpaque());        newCommand.setLanguage(response.getLanguage());        newCommand.setBody(request.getBody());if (!request.isOnewayRPC()) {            try {                ctx.writeAndFlush(newCommand);            } catch (Throwable e) {                log.error("DelayProcessor process request over, but response failed", e);                log.error(request.toString());                log.error(response.toString());            }        }return newCommand;    }    public void putMessage(MessageExtBrokerInner msgInner) {        this.brokerController.getMessageStore().putMessage(msgInner);    }    @Override    public void run() {for (; ; ) {            long curTime = System.currentTimeMillis() / 1000;            jobTaskExecute.submit(() -> sendMsg(curTime));            try {                Thread.sleep(1000);            } catch (InterruptedException e) {            }        }    }    private String getDelayPath() {        String delayPath = "./delay-store"+ File.separator + "delay";return delayPath;    }    private boolean saveMsgFile(long startTime, MessageExtBrokerInner msgInner) {        ObjectOutputStream objectOutputStream = null;        try {            String msgId =(startTime/1000 )+"-"+ System.currentTimeMillis() + "-" + ThreadLocalRandom.current().nextInt(99999999);            System.out.println( getCurrentTime()+"写入延迟消息 >>" + msgId);            String parentDir = getDelayPath() + File.separator + startTime / 1000;            File parentFile = new File(parentDir);if (!parentFile.exists()) {                parentFile.mkdirs();            }            String fileName = parentDir + File.separator + msgId;            FileOutputStream fos = new FileOutputStream(fileName);            BufferedOutputStream bos = new BufferedOutputStream(fos);            objectOutputStream = new ObjectOutputStream(bos);            objectOutputStream.writeObject(msgInner);returntrue;        } catch (Exception ex) {            log.error("saveMsgFile ex:", ex);returnfalse;        } finally {            try {if (objectOutputStream != null) {                    objectOutputStream.close();                }            } catch (Exception ex) {                log.error("saveMsgFile ex:", ex);            }        }    }    private MessageExtBrokerInner readFile(File f) {        ObjectInputStream ois = null;        try {            ois = new ObjectInputStream(new FileInputStream(f));return (MessageExtBrokerInner) ois.readObject();        } catch (Exception ex) {return null;        } finally {if (ois != null) {                try {                    ois.close();                } catch (IOException e) {                    e.printStackTrace();                }            }        }    }    private  void sendMissCallMsg() {        File lst = new File(getDelayPath());        File[] files = lst.listFiles();        long startTime = System.currentTimeMillis() / 1000 - 10 * 1000;for (File f : files) {            String name = f.getName();if (f.isDirectory() && !name.equals(".") && !name.equals("..")) {                try {                    Long fileTime = Long.parseLong(name);if (fileTime <= startTime) {                        sendMsg(fileTime);                    }                } catch (Exception ex) {                }            }        }    }    private String  getCurrentTime(){return  Thread.currentThread().getName()+ ">>["+DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")+"] ";    }    private void sendMsg(long startTime) {        File lst = new File(getDelayPath() + File.separator + startTime);        File[] files = lst.listFiles();if (files != null) {for (File f : files) {                System.out.println( getCurrentTime()+"时间到发送>> "+ startTime+" to commitLog " + f.getName());                MessageExtBrokerInner msgInner = readFile(f);if (msgInner != null) {                    putMessage(msgInner);                    System.out.println( getCurrentTime()+"写入log >> "+ startTime+" to commitLog " + f.getName()+" success");                    f.delete();                }            }            lst.delete();        }    }}

总结:rocketmq延迟队列实现主要是通过时间轮和文件来保存延时消息,等到了时间后,再写入延时队列,来达到延时的目的。

总共有4种方式来实现延时队列,可以参考延时队列的实现原理篇

https://www.cnblogs.com/tomj2ee/p/15815157.html

开源rocketmq延迟队列实现:

https://gitee.com/venus-suite/rocketmq-with-delivery-time.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/367755.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

剑指 Offer 12. 矩阵中的路径

⭐简单说两句⭐ CSDN个人主页&#xff1a;后端小知识 &#x1f50e;GZH&#xff1a;后端小知识 &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; Hello吖&#xff0c;各位小伙伴大家好呀&#xff0c;今天我采用了一种特别的方式&#x1f60e;来…

《关于我找了好久的bug,却没找出来的,又不小心解决了的事》

个人简介 作者简介&#xff1a;大家好&#xff01;我是yukki。个人主页&#xff1a;yukki. 喜欢&#xff1a;&#x1f308;点赞&#x1f308;收藏&#x1f308;一键三连&#xff01;共勉问题&#xff1a; 这是一个SpringBoot问题 刚开始很正常可以启动&#xff0c;但是加了r…

键盘布局持久化技术

**01 **键盘布局简介 键盘布局是按键在键盘上的分布模式&#xff0c;决定了键位顺序。键盘布局在发展过程中&#xff0c;由于使用习惯的不同&#xff0c;各国间使用的键盘布局存在细微差别&#xff0c;因此在Windows系统上以国家为单位区分不同的键盘布局方案。我们最熟悉的布…

后端接收格式为x-www-form-urlencoded的数据

1.x-www-form-urlencoded是什么&#xff1f; x-www-form-urlencoded纸面翻译即所谓url格式的编码&#xff0c;是post的默认Content-Type&#xff0c;其实就是一种编码格式&#xff0c;类似json也是一种编码传输格式。form表单中使用 form的enctype属性为编码方式&#xff0…

【MySQL】5.7版本解压安装配置

前言 之所以使用解压版本&#xff0c;而不使用exe安装&#xff0c;因为exe的安装方式删除过于麻烦&#xff01;&#xff01;&#xff01; 如果安装MySQL过程中&#xff0c;出错了或者想重新在来一把&#xff0c;删除mysql服务即可 sc delete mysql # 删除已经安装好的Mysql&a…

ifconfig不显示ipv4地址,ifconfig eth0 192.168.5.9失败

ifconfig eth0 192.168.5.9设置ip地址后&#xff0c;通过ifconfig仍然没有ipv4地址&#xff1a; 一、 执行ifup eth0启动eth0: ifconfig、ifup、ifdown &#xff1a;这三个命令的用途都是启动网络接口&#xff0c;不过&#xff0c;ifup 与 ifdown 仅就 /etc/sysconfig/network-…

【数据结构】红黑树

红黑树一、红黑树的概念二、红黑树的接口2.1 插入三、验证四、源码一、红黑树的概念 红黑树也是一个二叉搜索树&#xff0c;他是通过对任何一条从根到叶子的路径上各个结点着色方式的限制&#xff0c;最长路径长度不超过最短路径长度的 2 倍保持近似平衡。他在每个节点添加了一…

华为OD机试题,用 Java 解【勾股数元组】问题

最近更新的博客 华为OD机试 - 猴子爬山 | 机试题算法思路 【2023】华为OD机试 - 分糖果(Java) | 机试题算法思路 【2023】华为OD机试 - 非严格递增连续数字序列 | 机试题算法思路 【2023】华为OD机试 - 消消乐游戏(Java) | 机试题算法思路 【2023】华为OD机试 - 组成最大数…

骨传导耳机靠谱吗,骨传导耳机的原理是什么

很多人刚开始接触骨传导耳机时都会具有一个疑问&#xff0c;骨传导耳机是不是真的靠谱&#xff0c;是不是真的不伤害听力&#xff1f;骨传导耳机传输声音的原理是什么&#xff1f; 下面就给大家讲解一下骨传导耳机传输声音的原理以及骨传导耳机对听力到底有没有伤害。 骨传导…

Python编写GUI界面,实现小说下载器

嗨害大家好鸭&#xff01;我是小熊猫~思路一、数据来源分析二. 代码实现步骤代码实现一、单章小说下载二、整本小说下载三、多线程采集四、采集排行榜所有小说五、搜索小说功能六、GUI界面<center>**&#x1f447;问题解答 源码获取 技术交流 抱团学习请联系&#x1f…

【蓝桥集训】第七天——并查集

作者&#xff1a;指针不指南吗 专栏&#xff1a;Acwing 蓝桥集训每日一题 &#x1f43e;或许会很慢&#xff0c;但是不可以停下来&#x1f43e; 文章目录1.亲戚2.合并集合3.连通块中点的数量有关并查集的知识学习可以移步至—— 【算法】——并查集1.亲戚 或许你并不知道&#…

华为OD机试题,用 Java 解【喊 7 的次数重排】问题

最近更新的博客 华为OD机试 - 猴子爬山 | 机试题算法思路 【2023】华为OD机试 - 分糖果(Java) | 机试题算法思路 【2023】华为OD机试 - 非严格递增连续数字序列 | 机试题算法思路 【2023】华为OD机试 - 消消乐游戏(Java) | 机试题算法思路 【2023】华为OD机试 - 组成最大数…

大数据开发 - Java入门2

目录Java基础知识注释关键字常量标识符测试题回顾Java基础知识 注释 对程序的解释说明 分类&#xff1a; 单行注释&#xff1a;// 对本行后面的内容进行注释多行注释&#xff1a;/*解释内容 */文档注释 &#xff1a;/** 注释内容*/ --用于产生帮助文档&#xff0c;也有多行注…

高通平台开发系列讲解(SIM卡篇)SIM卡基础概念

文章目录 一、SIM卡基本定义二、卡的类型三、SIM卡的作用三、SIM卡基本硬件结构四、SIM卡的内部物理单元五、卡文件系统沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇文章将介绍SIM的相关组件。 一、SIM卡基本定义 SIM卡是一种智能卡(ICC Card/UICC Card) SIM…

在线客服系统接入网站会员,绑定会员ID,展示会员昵称头像,传递手机号等扩展字段【唯一客服】...

在客服系统聊天链接里&#xff0c;可以带上自己网站的会员信息&#xff0c;例如&#xff1a;昵称、头像、手机号等 具体使用方式如下 聊天链接中增加以下参数&#xff1a; visitor_id: 自有会员visitor_name: 自有会员名称avator: 自有会员头像lang: 多语言 cn 中文 &#xff0…

链接服务器查询导致的阻塞

背景客户反馈数据库在上午10点时出现严重阻塞&#xff0c;阻塞源头会话在等待OLEDB&#xff0c;没有见过这个等待类型&#xff0c;请我们协助分析。现象登录SQL专家云&#xff0c;进入趋势分析&#xff0c;下钻到10点钟的活动会话&#xff0c;看到发生了两次严重的阻塞。转到活…

指针的进阶【上篇】

文章目录&#x1f4c0;1.字符指针&#x1f4c0;2.指针数组&#x1f4c0;3.数组指针&#x1f4bf;3.1.数组指针的定义&#x1f4bf;3.2. &数组名VS数组名&#x1f4bf;3.3.数组指针的使用&#x1f4c0;1.字符指针 int main() {char ch w;char* pc &ch;// pc就是字符指…

数据结构之顺序表篇

一、顺序表概念 二、顺序表各类接口实现 *顺序表初始化 **顺序表销毁 ***顺序表插入操作 ****顺序表删除操作 *****顺序表查找操作 ******顺序表实现打印操作 三、顺序表整体实现源码 *SeqList.h **SeqList.c ***test.c 一、顺序表概念 讲顺序表之前先引入线性表概念&#xff…

可视化服务编排在金融APP中的实践

本文重点介绍了京东金融APP在BFF层实践过程中遇到的问题&#xff0c;并引出可视化服务编排在金融APP中的落地实践&#xff0c;其中重点介绍了可视化服务编排系统的核心功能及实现。 可视化服务编排系统已经稳定支持了金融APP从去年618到现在的所有发版迭代&#xff0c;对人效提…

Apache ActiveMQ安装和使用

文章目录Apache ActiveMQ安装和使用 环境下载安装配置启动登录Apache ActiveMQ安装和使用 环境 Ubuntu20.04 下载 官网&#xff1a;https://activemq.apache.org/download-archives 如下载5.14.4版本&#xff0c;apache-activemq-5.14.4-bin.tar.gz&#xff0c;测试过没问题…