这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
RocketMQ版本
- version: 5.1.0
RocketMQ中consumer消费模型
在了解RocketMQ的Rebalance机制之前,我们必须先简单了解下rocketmq的消费模型
我们知道在我们创建topic的时候需要指定一个参数就是读队列数

这里假设我们的topic是xiaozoujishu-topic,我们的读队列数
是4个,我们同一gid下的集群消费模式的消费者有两个,那么我们消费者是如何消费消息的呢
首先需要明确的是:
- 这里我们的消费模式是集群消费
- queue的负载均衡算法是使用默认的
AllocateMessageQueueAveragely(平均分配)
假设我们项目刚开始只有一个消费者,那么我们的消费队列分配就如下:

四个队列分配给一个消费者
此时如果我们再启动一个消费者,那么这时候就会进行Rebalance,然后此时我们的队列分配就变成如下:

所以通过上面的队列分配我就知道Rebalance是个啥了,我们下面对Rebalance进行一些定义
RocketMQ的Rebalance是什么
Rebalance(重新平衡)机制指的是:将一个Topic下的多个队列(queue),在同一个消费者组(consumer group)(gid)下的多个消费者实例(consumer instance)之间进行重新分配
Rebalance的目的
从上面可以看出Rebalance的本意是把一个topic的queue分配给合适的consumer,本意其实是为了提升消息的并行处理能力
但是Rebalance也带来了一些危害,后面我们会重点分析下
Rebalance的触发原因
我们这里先说结论
- 订阅Topic的队列数量变化
- 消费者组信息变化
这里是最深层的原因,就是topic的队列数量、消费组信息
实际我们可以将这些归结为Rebalance的元数据,这些元数据的变更,就会引起clinet的Rebalance
注意RocketMQ的
Rebalance是发生在client
这些元数据都在管broker管理
核心就是这三个类
- TopicConfigManager
- SubscriptionGroupManager
- ConsumerManager
只要这个三个类的信息有变化,client就会进行Rebalance。
下面我们可以具体说下什么情况下会让这三个类变化
订阅Topic的队列数量变化
什么情况下订阅Topic的队列数量会变化呢?
- broker扩容
- broker缩容
- broker宕机(本质也是类似缩容)
消费者组信息变化
什么时候消费者组信息会变化呢?
核心就是consumer的上下线,具体细分又可以分为如下原因:
- 服务日常滚动升级
- 服务扩容
- 服务订阅消息发生变化
源码分析
上面大致介绍了Rebalance的触发原因,现在我们结合源码来具体分析下
我们就从consumer的启动开始分析吧
这里我们以最简单的demo为例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(TOPIC, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
这里我们直接注意到
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
这个方法,看名字就知道是client向所有的broker发送心跳

我们进入到sendHeartbeatToAllBrokerWithLock方法看看
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
}
if (this.brokerAddrTable.isEmpty()) {
return;
}
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) {
String brokerName = brokerClusterInfo.getKey();
HashMap<Long, String> oneTable = brokerClusterInfo.getValue();
if (oneTable == null) {
continue;
}
for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) {
Long id = singleBrokerInstance.getKey();
String addr = singleBrokerInstance.getValue();
if (addr == null) {
continue;
}
if (consumerEmpty && MixAll.MASTER_ID != id) {
continue;
}
try {
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
这段代码主要是通过this.brokerAddrTable.entrySet()获取到所有的master broker地址,然后进行心跳发送
具体的心跳发送代码实际是在下面代码中进行的
int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
我们进入到该方法会发现和我们之前分析的一样,就是发送一个请求到broker,请求码是RequestCode.HEART_BEAT

我们看看RequestCode.HEART_BEAT的调用找到`broker的处理逻辑
很快我们通过方法名就能定位到处理client的请求的方法是ClientManageProcessor类的processRequest

我们具体进去看看这个方法

可以看到具体的逻辑被封装在return this.heartBeat(ctx, request);这个方法中,所以我们需要再进去看看
进去这个方法我们能看到一个比较核心的方法
registerConsumer

很明显这个方法就是注册consumer的方法

这个方法里面和Rebalance相关比较核心的方法就是这三个
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
这里我们可以看看clientChannelInfo里面是个啥玩意

具体深入到updateChannel方法里面就是判断是否为新的client,是就更新channelInfoTable

2.updateSubscription
这个方法就是判断订阅关系是否发生了变化并更新订阅关系

callConsumerIdsChangeListener
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
这个方法就是通知client进行Rebalance,具体的实现是参考了类似事件订阅的方式去实现的,这里是发送了一个CHANGE事件

这里我们可以简单看看事件定义的类型有哪些

我们直接看看具体的事件处理类

可以看到实现类有多个,我们直接看broker模块的DefaultConsumerIdsChangeListener类即可

可以看到这里是给该group所有的client发送Rebalance消息

具体的消息状态码是
RequestCode.NOTIFY_CONSUMER_IDS_CHANGED

client Rebalance
通过上面我们大致找到了整个通信过程,但是实际的Rebalance是发生在client,所以我们还是需要继续回到client的代码
我们通过状态码RequestCode.NOTIFY_CONSUMER_IDS_CHANGED
找到client的处理类ClientRemotingProcessor

实际处理方法就是
this.mqClientFactory.rebalanceImmediately();
我们进入这个方法看看这里最终就是唤醒阻塞的Rebalance线程

所以实际的方法调用还是在RebalanceService的 run方法

最终还是调用的是MQConsumerInner接口中的doRebalance方法
这里有个细节,为什么不是直接调用一个静态方法,要搞这么多花里胡哨的唤醒线程操作?
原来是cleint也会定时去Rebalance
默认是20s一次,可以配置

可以通过参数rocketmq.client.rebalance.waitInterval去配置
那么为什么client还要自己去循环Rebalance
原来这里是防止因为网络等其他原因丢失了broker的请求,后续网络回复了,也能进行进行Rebalance
下面我们继续看看Rebalance的实现细节
这里我们以常用的DefaultMQPushConsumerImpl为例
实际这里最终调用的还是抽象类RebalanceImpl的doRebalance方法

可以看到这里的Rebalance是按照topic的维度
我们先理解订阅单个topic的原理

这里的就是先对topic的queue排序,然后对consumer排序,
然后调用AllocateMessageQueueStrategy的allocate方法
这里我们暂时只分析默认的平均分配算法(AllocateMessageQueueAveragely),也就是我们最先说的分配算法。其他算法可以详细分析
这里的分配方式就是我们前面画图的,比如4个queue,2个consumer,那么就是每个consumer2个queue。
简单举例就是我们的 queue有q1、q2、q3、q4
consumer有 c1、c2
那么就是
c1:q1、q2
c2:q2、q3
需要注意的是如果consumer大于queue数量,多出的consumer就不会被分配到queue
client什么时候触发Rebalance
上面分析了这么多原理,这里我们总结下client什么时候会触发Rebalance
- consumer启动时会向所有master broker发送心跳,然后
broker发送信息通知所有consumer触发Rebalance - 启动完成后consumer会周期的触发
Rebalance,防止因为网络等问题丢失broker的通知而没有Rebalance - 当consumer停止时,也会通过之前分析的事件机制,触发注销
comsuer事件然后通知所有的comsuer触发Rebalance
总结
这里我们详细介绍了client是如何触发Rebalance的,以及触发Rebalance的时机,也介绍了Rebalance的好处。
实际还有很多细节我们限于篇幅暂未分析。
后面我们会继续分析Rebalance的坏处和一些详细的Rebalance算法
参考
- RocketMQ源码
- 博客
















![[日记]LeetCode算法·二十五——二叉树⑤ AVL树(插入+删除)附代码实现](https://img-blog.csdnimg.cn/d666c7b4278b4e5893917172133bd086.jpeg#pic_center)