追求极致性能,RocketMQ 消息通信详解

news2025/7/27 12:01:42

1 介绍

RocketMQ 消息队列架构主要包括 NameServe、Broker(Master/Slave)、Producer、Consumer 4 个核心部件,基本执行流程如下:

点击查看大图

  1. NameServer 优先启动。NameServer 是整个 RocketMQ 的“中央大脑” ,作为 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。

  2. Broker 启动后,需要将自己注册至 NameServer 中,并 保持长连接,每 30s 发送一次发送心跳包,来确保 Broker 是否存活。并将 Broker 信息 ( IP+、端口等信息)以及 Broker 中存储的 Topic 信息上报。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

  3. NameServer 如果检测到 Broker 宕机(因为使用心跳机制, 如果检测超 120s(两分钟)无响应),则从路由注册表中将其移除。

  4. 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(Broker 可能是 Cluster 模式),然后根据负载均衡算法从列表中选择 1 台 Broker ,建立连接通道,进行消息发送。

  5. 消费者在订阅某个 topic 的消息之前从 NamerServer 获取 Broker 服务器地址列表(Broker 可能是 Cluster 模式),包括关联的全部 Topic 队列信息。进而获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费数据。

  6. 生产者和消费者默认每 30s 从 NamerServer 获取 Broker 服务器地址列表,以及关联的所有 Topic 队列信息,更新到 Client 本地。

    2 ~ 4 步骤实际上是 Producer、Broker 以及 NameServer 之间整个进行数据通信的过程,面对复杂的消息队列系统,一个性能优良,稳定性高的网络通信模块是非常重要的,它体现了 RocketMQ 集群消息的整体吞吐和负载能力。也是 RocketMQ 保证高性能、高稳定性的基石。

2 网络通信过程分析

2.1 通信类(rocketmq-remoting )的结构解析

点击查看大图

通过上图可以看到,在整个 RocketMQ 队列系统中,rocketmq-remoting 这个 module 是专门用来负责网络通信职能的。并且从模块依赖关系中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服务) 等模块均依赖了它。

通信层是基于 Netty 进行扩展的,并自定义了通信协议,用于将消息传递给 Broker 进行存储。实现 Client 与 Server 之间高效的数据请求与接收。

RocketMQ 相关视频解析:https://www.bilibili.com/video/BV1kB4y1U7bk/?spm_id_from=333.999.0.0

2.2 协议结构设计

因为是基于 Netty 进行扩展的,所以自定义了 RocketMQ 的消息协议,在传输过程的数据进行结构制定、封装、编解码的过程。在 RocketMQ 中,负责这个工作的就是 RemotingCommand 类,我们来看看这个类的几个重要属性:

2.3 消息内容的组成结构

传输的消息内容主要由一下几个部分组成:

点击查看大图

2.4 RocketMQ 消息通信流程

在 RocketMQ 消息队列中支持通信的模式主要有

  • sync 同步发送模式

  • async 异步发送模式

  • oneway 单向模式,无需关注 Response

2.4.1 通信流程说明

下图从 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基于 NettyRemotingClient 的消息发送,以及 Handler 处理过程来说明。

点击查看大图

  • Broker 和 NameServer 启动时同步调用 NettyRemotingServer.start() 方法, 初始化 Netty 服务器配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组配置 Channel 添加 NettyServerHandler 调用 serverBootstrap.bind() 监听端口,等待 client 的 connection

  • Producer 和 Consumer 同样需要启动 Netty 的客户端,通过调用 NettyRemotingClient.start() 初始化 Netty 客户端配置客户端 NioEventLoopGroup 线程组配置 Channel 添加 NettyClientHandler

  • 发送同步消息时,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创建用于通信的 Channel 通道。

  • 创建完 Channel 后,生产者 Producer 调用 Channel.writeAndFlush() 发送数据

  • NettyRemotingServer 服务端线程组 处理可读事件,调用 NettyServerHandler 处理数据。

  • 下一步,NettyServerHandler 调用 processMessageReceived 方法,接收并处理传送过来的数据。

  • 根据请求码 RequestCode 区别不同的请求,来执行不同的 Processor。说明:Processor 在服务端初始化的时候,将 RequestCode 添加到 Processor 缓存中。消息的存、查、拉取都是不同的请求码。

  • processMessageReceived 从 ResponseTables(key 为 opaque) 缓存中取出 ResponseFuture,并将将返回结果设置到 ResponseFuture。同步模式下执行 responseFuture.putResponse()方法,异步调用执行回调方法。

  • NettyRemotingClient 收到可读事件,调用 NettyClientHandler 读取并处理返回事件。

2.4.2 Reactor 多线程设计

上面我们说过了,RocketMQ 的通信是采用 Netty 组件作为底层通信库。同样的,它也遵循 Reactor 多线程模型,并在此基础上做了一些优化。

点击查看大图

上面图中四个图形可以大致说明 NettyRemotingServer 的 Reactor 多线程模型,在 RocketMQ 中的存在形式。

  • M:1 个 Reactor 主线程:eventLoopGroupBoss,它的职能是负责监听 TCP 网络连接请求,有连接请求过来时候,创建 SocketChannel,并注册到 selector 上。

  • S:RocketMQ 的源码中会选择 NIO 或 Epoll,来监听网络数据,当监听到网络数据过来时,读取数据并丢给 Worker 线程池:eventLoopGroupSelector,Rocket 源码中默认设置线程数为 3。

  • M1:执行业务之前的各种杂事(SSL 认证、空闲检查、网络连接检查、编解码、序列化反序列化 等等),交付给 这些工作交给 defaultEventExecutorGroup 去处理,RocketMQ 源码中默认线程数设置为 8。

  • M2:剩下处理业务的操作,就直接放在业务线程池中执行了。按照之前说的,依据 RequestCode 去 processorTable 本地缓存中找到对应的 processor,并封装成 task 任务,在丢给对应的业务 processor 线程池来处理。

完整的可以参照官网的这张图:

点击查看大图

总结

上面介绍了 RocketMQ 消息通信的主要内容,我们用几句话总结下:

  • 整个 RocketMQ 队列系统中,rocketmq-remoting Module 是专门用来负责网络通信职能的。

  • 网络通信模块基于 Netty 进行扩展的,所以自定义了 RocketMQ 的消息协议,在传输过程的数据进行结构制定、封装、编解码的过程。

  • 理解 NettyRemotingServer/NettyRemotingClient 的初始化过程,以及调用 NettyServerHandler/NettyClienthandler 进行处理的执行流程。

  • 同步异步:同步和异步消核心区别是 同步消息通过 Netty 发送请求后会执行 ResponseFuture.waitResponse() 阻塞等待,异步的请求则 SendCallback 相应的方法进行回调处理。

  • 多线程模式下会通过 1 个 Reactor 主线程(监听连接),以及 Reactor 线程池(监听数据)、Worker 线程池(处理前置工作)、Processor 线程池(处理业务逻辑) 来处理通信过程。

需要这份资料的朋友可以+文末wx名片免费获取

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/16714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络(下)

运输层 运输层概述 概念 进程之间的通信 从通信和信息处理的角度看,运输层向它上面的应用层提供通信服务,它属于面向通信部分的最高层,同时也是用户功能中的最低层。当网络的边缘部分中的两个主机使用网络的核心部分的功能进行端到端的通…

【深度学习】U-net网络结构搭建 | pytorch

文章目录前言一、U-net网络结构复现(上采样部分采用转置卷积nn.ConvTranspose2d)1.1、整体结构介绍1.2、encoder部分实现(左边网络部分)1.3、decoder部分实现(右边网络部分)1.4、整个网络搭建二、U-net网络…

React源码分析5-commit

前两章讲到了,react 在 render 阶段的 completeUnitWork 执行完毕后,就执行 commitRoot 进入到了 commit 阶段,本章将讲解 commit 阶段执行过程源码。 总览 commit 阶段相比于 render 阶段要简单很多,因为大部分更新的前期操作都…

Dubbo框架基本使用

一:软件架构的演练过程【了解】 单体应用架构--->垂直应用架构--->分布式架构(SOA架构/微服务架构) 1.单体应用架构 单体应用架构,就是将一个系统的多个模块做成一个项目,然后部署到tomcat服务器上 优点: 项目架…

第01章+Java概述

课程链接:韩顺平Java_程序举例_哔哩哔哩_bilibili 什么叫程序 程序:计算机执行某些操作或解决某个问题而编写的一系列有序指令的集合。 Java版本迭代 官网介绍: Oracle Java SE Support Roadmap LTS为长期支持版本:推荐使用…

IQM的Unimon:一种新的量子比特,可促进量子计算机的实用化

​ 量子处理器中unimon 量子比特的艺术效果图。(图片来源:网络) 来自芬兰IQM量子计算机公司、阿尔托大学和芬兰VTT技术研究中心的一组科学家发现了一种新的超导量子比特——unimon,可提高量子计算的准确性。该团队已经实现了第一…

解读阿里Q2财报:阿里云的跨周期引擎

昨天,阿里巴巴公布今年6月到9月财务业绩,显示云业务总收入为267.6亿元,在除去阿里内部使用的额度后,抵销跨分部交易后营收为207.57亿元,比上一个季度增长超17%。 具体看,值得关注的有三点: 1、…

Python爬取公交线路信息及站点shp数据 文末附数据下载地址

本篇主要记录爬取公交网整个过程,由于这次所用方法虽比较常规,但由于该网站页面内容转码原因以及遍历链接较多,所以小坑还是比较多的,特在此进行记录。 以前爬过百度地图,当时用的是API平台,加上网站比较规范,所以标签节点什么的都比较清晰,但这次由于特殊原因所选择的…

对JavaScript中的Math.random随机函数破解

什么是随机 在通常的说法中,随机性是指事件中明显实际缺乏可预测性,事件、符号或步骤的随机序列通常没有顺序 举个例子,比如我们在抛硬币,硬币的结果取决于很多因素,比如说我们施加的力,空气阻力&#xff…

Linus shell 在一个脚本中调用另外一个脚本变量

1.新建public.sh文件,并添加以下内容: 2.新建ceshi.sh文件,并添加以下内容: 3.在终端赋予ceshi.sh文件执行权限,并运行该文件。

角度回归(复数与欧拉公式,L1,L2)

文章目录1 BEV下,Eula 损失函数2 BEV下,PointPillars使用sin联合SmoothL13 透视图下, MultiBin 全局方向损失4 L1/L2-norm 的周期损失函数1 BEV下,Eula 损失函数 Yolo-complex的论文中,对于BEV视角下,目标…

SDN和NFV的区别?

前言 网络功能虚拟化(Network Functions Virtualization,NFV)是一种关于网络架构的概念。我们平时使用的x86服务器由硬件厂商生产,在安装了不同的操作系统以及软件后实现了各种各样的功能。而传统的网络设备并没有采用这种模式&am…

2000-2019年各省产业结构合理化指数(干春晖泰尔指数)

2000-2019年省级产业结构合理化指数(干春晖泰尔指数) 1、来源:统计NJ及各省统计NJ 2、时间2000-2019年 3、数据说明:含原始数据和计算过程 4、范围包括全国31省 5、指标包括:各省总产值、第一产业增加值、第二产业…

C++基础知识要点--表达式 (Primer C++ 第五版 · 阅读笔记)

目录表达式基础算术运算符逻辑和关系运算符赋值运算符递增和递减运算符成员访问运算符条件运算符位运算符sizeof运算符逗号运算符类运算符运算符优先级表表达式 基础 当一个对象被用作右值的时候,用的是对象的 值(内容);当对象被用作左值的时候&#x…

Linux 信号

概念:信号不是信号量,信号量是进程间的一种通信方式,信号是系统中的软件中断,指一种事件通知机制,通知进程发生了某个事件,打断当前的操作,去处理这个事件。 种类:一共有62种信号&a…

Linux之用户管理、权限管理、程序安装卸载

一. 用户管理 1. 查看账户 (1). 查看当前账号:whoami ​​(2). 查看系统当前登录的账号:who ​​补充常用选项: ​​(3). 查看系统所有的账号: cat /etc/passwd ​​2. exit:退出登录账户 如果是图形界面&#xff0c…

curl命令的常用操作

curl是非常实用的命令行工具,用来与服务器之间传输数据。它的命令行参数多达几十种。 在Linux环境中使用curl命令可以进行接口测试。利用curl对http协议发送Get/Post/Delete/Put请求,同时还可以携带header来满足接口的特定需求。 curl命令的语法 curl[options] [U…

Linux03-网络设置

一、说明 在上一节,咱使用VMware安装了虚拟机,网络设置选择了 “桥接模式” ,本节咱们来具体讨论一下网络连接方式和网络设置。 实验环境:CentOS7 VMware 二、桥接模式 当我们设置桥接模式时,虚拟机是直接使用物理…

eNSP出现错误,错误代码40暴力解决方案

如果你和我一样,在eNSP中启动一个设备时发生了错误,错误代码为40,那么这篇文件可能会帮助你。 首先你可以仔细地按照这篇说明中的做法进行操作,如果你电脑也是win10,并且之前没有安装过wireshark,virtualb…

后端总说他啥也没动,我从线上调了一下测试接口,你再说一句动没动

◇ 不知道广大前端同学有没有过这样的经历,在做新需求联调的时候,原本上一个版本已经做的好好的功能,前后端已经联调好的。这次做需求的时候,测试发现好多地方都不对了。 ◇ 开发人员经常说的一句话就是:我啥也没动啊…