RocketMQ中生产者发消息前为啥一定要调用start()方法?

news2025/8/13 18:21:40

前言

我们在使用RocketMQ发送消息时,一般都会使用DefaultMQProducer,类型的代码如下:

DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("42.192.50.8:9876");
try {
    producer.start();
    producer.send(new Message("topic", "ping".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
    e.printStackTrace();
} finally {
    producer.shutdown();
}
复制代码

上述代码中,在消息发送之前调用了start()方法,如果不调用start()方法,直接发送消息,那么会出现以下报错:

报错消息里面很明显地告知我们,目前这个DefaultMQProducer状态没有准备好,还不能发送消息。为了一探究竟,我们得去看看start()里面究竟做了什么操作呢?

一探究竟

我们根据源码一路走下来,可以追踪到DefaultMQProducerImpl.start(final boolean startFactory)这个方法:

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
​
                this.checkConfig();
​
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 创建MQClientInstance
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                // 注册Producer到MQClientInstance中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
​
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                // 启动MQClientInstance实例
                if (startFactory) {
                    mQClientFactory.start();
                }
​
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
复制代码

上述代码主要做了以下几点:

1.创建MQClientInstance实例;

2.注册Producer到MQClientInstance实例中;

3.启动MQClientInstance实例;

MQClientInstance实例并不是每次都会创建的,它创建出来也会缓存的MQClientManager中,不过根据源码来看的话,每次创建Producer都会对应创建一个新的MQClientInstance实例,所以一般情况下不建议一个应用服务中重复创建Producer

最终start()方法的关键实现逻辑还是需要进入MQClientInstance.start()中:

    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 如果namesrv地址为null,那么就需要自己找namesrv地址
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // 开启一个请求响应渠道,没猜错的话,应该是netty实现的
                    this.mQClientAPIImpl.start();
                    // 开启定时任务
                    this.startScheduledTask();
                    // 开启拉消息服务
                    this.pullMessageService.start();
                    // 开启负载均衡服务
                    this.rebalanceService.start();
                    // 再开启一个默认生产者,这个生产者不需要启动MQClientInstance实例
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
复制代码

看样子,这才是start()方法真正要做的事情:

1.找namesrv地址,应该是后面需要使用namesrv地址查询对应的broker

2.开启Netty客户端的初始化,包括与namesrv建立信道;另外开启两个定时任务,一个清除列表中过期的请求,第二个就是筛选可用的namesrv服务;

3.开启一些定时任务;包括如果没有设置namesrv地址的话,会从指定站点拉namesrv地址;清除下线broker并发送心跳给所有的broker等工作;

4.因为当前是生产者,所以pullMessageService很快就结束;

5.生产者不需要做负载均衡,所以rebalanceService很快也结束;

6.给默认创建的生产者执行一下start()方法,其实啥也没做;

上述大多数任务都是给消费者使用的,作为生产者,唯一起作用的就是前三步,查找namesrv地址、第二步与namesrv建立通信以及第三步对broker的一些定时清理工作;不过没有发生消息之前,是不会从远程获取任何数据的。所以综上所述,start()方法里面只做了以下两件事情:

1.与namesrv建立通信渠道,它甚至都没有从namesrv获取任何数据;

2.启动一些定时任务,包括清理下线的broker;

小结

虽然在生产者中,start()方法里面真正做的事情比较少,但是却是非常有必要的。发送消息之前,我们没有使用start()方法,导致消息发送失败,是因为生产者与namesrv之间的通信渠道没有建立。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/33248.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Chrome 103支持使用本地字体,纯前端导出PDF优化

在前端导出PDF,解决中文乱码一直是一个头疼的问题。要解决这个问题,需要将ttf等字体文件内容注册到页面PDF生成器中。但是之前网页是没有权限直接获取客户机器字体文件,这时就需要从服务器下载字体文件或者提示用户选择字体文件上传到页面。对…

链接杂谈 CASPP

构建大型程序 构建大型程序,不可避免的一个问题是链接问题: - 链接器提示:缺少某个模块 缺少某个库 不兼容的库版本 理解全局变量的链接 你的代码可能有多个全局变量,有些是强变量,有些是弱定义,执行…

清除浮动的常用方法

关于浮动 我们为什么需要浮动? 我们想把多个块级元素放到同一行上。 打破标准流的限制。 浮动原来做图文混排效果,现在主要用来做网页布局的。 浮动语法 只有左浮动和右浮动。 float: left; float: right;浮动特点 1.浮动元素会脱离标准流&#x…

Win10禁止应用独占麦克风

痛点需求: qq和微信同时发起语音通话,发现只有一个qq说话对方能听到,但是微信却不能,这是典型的应用程序独占了麦克风,导致其他应用无法使用。 有没有办法让qq和微信同时使用麦克风呢? 答案是:有…

图的拓扑序列

拓扑序列: 拓扑序是按照点的先后顺序排列的。拓扑序列满足以下两点: 1.每个顶点在序列中出现且只出现一次。 2.若存在一条从顶点 A 到顶点 B 的路径,那么在序列中顶点 A 出现在顶点 B 的前面。 拓扑序列只存在于有向无环图中。可以理解成…

MCE | 肝炎病毒是如何诱发肝癌的

肝炎病毒分类 肝炎病毒是世界上最常见的肝炎病因,其它原因包括酗酒、某些药物、毒素、其他感染、自身免疫性疾病和非酒精性脂肪性肝炎 (NASH)。肝炎病毒共有五种主要的肝炎病毒株,分别为 A、B、C、D 和 E 型。目前,全世界大约有 3.25 亿人患…

2023中国绿色铝业国际峰会

会议背景 铝行业属于能源高度密集型行业,主要包括铝矿石开采、氧化铝生产、电解铝生产和铝材加工等环节。我国原铝产量自2001年以来一直占据世界首位,连续7年产量占比超过全球50%。然而与国际先进铝生产企业相比,我国铝生产企业单位原铝碳…

C# 自定义事件

一 自定义事件 例如,利用自定义绘制的技术,画出一个圆角按钮。 现在来看,怎么样给它添加自定义的事件。 二 要点与细节 1 Control 类本身就有继承的鼠标和键盘事件,这里只是一个引子,用于引出更复杂的自定义事件。 …

web测试——业务测试2

1.历史数据 前端: 组件相关  组件内部是否动过;  展示的数据是否受影响;  失焦后的校验(爆红) 页面样式相关  坐标位置、  按钮位置是否动过,  新版本上线对历史配置的影响 交互提示相关  新手引导的展示位置、关闭后的展…

pycharm2022.2 远程连接服务器调试代码

目的: 同步本地和服务器的全部或者部分文件本地debug,服务器跑实验 需要条件: 服务器上已经创建好虚拟环境你本地已经安装好pycharm 1.1 File → Settings → Project:XXX →Python Interpreter 打开之后再右边这添加解释器。选On SSH 1.2把…

简单的网页制作期末作业——电影泰坦尼克号(4页)

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置,有div的样式格局,这个实例比较全面,有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 文章目录一、网页介绍一…

小学生python游戏编程arcade----excel调用

小学生python游戏编程arcade----excel调用前言小学生python游戏编程arcade----excel调用1、excel文件1.1 excel表头1.2 excel文件1.3 文件读取函数1.4 打开excel文件读取数据,每行一个字典,再总存为序列1.5 打开excel文件读取数据,取两列存为字典1.6 游戏…

[漏洞复现] jenkins 远程代码执行 (CVE-2019-100300)

文章目录一、简介二、影响版本三、复现四、修复一、简介 拥有Overall/Read 权限的用户可以绕过沙盒保护,在jenkins可以执行任意代码。此漏洞需要一个账号密码和一个存在的job。 Jenkins的pipeline主要是通过一个配置文件或者job里面的pipeline脚本配置来设定每个j…

锐捷MPLS跨域方案C2实验配置

目录 配置ASBR之间的EBGP邻居 配置PE之间的Vpnv4邻居 此时配置PE与CE设备对接命令 手工配置为PE地址分配标签 MPLS隧道——跨域解决方案C1、C2讲解_静下心来敲木鱼的博客-CSDN博客https://blog.csdn.net/m0_49864110/article/details/127634890?ops_request_misc%257B%252…

数字孪生|交通运输可视化系统

交通是城市经济发展的动脉,与我们的日常生活息息相关。 传统交通信息管理中,只是做了粗略的信息发布以及简单的交通流量监测,早已经不能满足现代智慧交通的需求。现代的智慧交通则提供基于实时交通数据的交通信息服务,融入了物联网…

python离线安装module以及常见问题及解决方案

文章目录一,离线安装module1.1 下载module1.2 离线安装二,常见的问题2.1 模块缺少合适的适配:error: Could not find suitable distribution for Requirement.parse()2.2 install成功但发现控制台打印的最后一行显示下载module版本为0.0.0工作…

如何区分小角X射线散射和小角X射线衍射?

小角X射线散射(SAXS)大多数被用来测定超细粉体、纳米离子分布的有关性质,小角X射线衍射(SAXD)则主要用来测定超大晶面间距或者薄膜结构等等问题,在用途上两种实验并不一致,本篇文章将介绍小角X射…

Promise错误处理比较:使用then的第二个参数还是catch

catch是一个语法糖而己 还是通过then来处理的 如果在then的第一个函数里抛出了异常,后面的catch能捕获到,而then的第二个函数捕获不到。 catch是一个语法糖而己 还是通过then 来处理的: Promise.prototype.catch function(fn){return this…

Amazon Braket 与量子计算

KY1,Yankuan Pan2,Bertran Shao3,Zoey Deng41.Amazon HERO;2.项目架构师;3.开发者生态负责人;4.开发者关系 Amazon Braket 是一项完全托管式的量子计算服务,主要可面向研究人员、科学家和开发人员提供一些量子计算软硬件服务进行研究和开发工…

中国设备维修安装企业能力等级证书(制冷空调)

中国设备维修安装企业能力等级证书(制冷空调),是由中国制冷学会审定,中国设备管理协会批准颁发,全国性制冷空调设备维修、安装行业能力等级证书。也是目前国内唯一能在中国招标网http://www.ctba.org.cn/查询到的制冷空…