文章目录
- 1. 前言
- 2. sendHeartbeatToAllBrokerWithLock 上报心跳信息
- 3. prepareHeartbeatData 准备心跳数据
- 4. sendHearbeat 发送心跳上报请求
- 5. broker 处理心跳请求
- 5.1 heartBeat 处理心跳包
- 5.2 createTopicInSendMessageBackMethod 创建重传 topic
- 5.3 registerConsumer 注册消费者
- 5.3.1 updateChannel 更新消费者连接通道
- 5.3.2 updateSubscription 更新订阅消息
- 5.3.3 通知消费者重平衡
- 5.3.4 处理消费者注册事件
- 5.3.4.1 register 注册消费者过滤信息
- 5.3.4.2 register 创建 FilterDataMapByTopic 对象
- 5.3.4.3 register 注册过滤信息
- 6. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
- 【RocketMQ 生产者和消费者】- 生产者启动源码-启动流程(1)
- 【RocketMQ 生产者和消费者】- 生产者启动源码-创建 MQClientInstance(2)
上两篇文章中我们探讨了生产者的启动流程,这篇文章就来看下生产者启动之后如何发送心跳信息到 broker。
2. sendHeartbeatToAllBrokerWithLock 上报心跳信息
这个方法就是用于上报生产者的心跳信息到所有的 broker,不过可以注意到这个方法是在 MQClientInstance 里面的,上一篇文章我们就说了 MQClientInstance 是生产者和消费者的公共类,所以这个方法上报心跳的时候会把消费者和生产者的信息都一起上报到 broker 中,当然这里的生产者和消费者是指同一个进程的。
/**
* 发送心跳信息到所有 broker
*/
private void sendHeartbeatToAllBroker() {
// 心跳数据,这里是先准备一个心跳包
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
// 如果没有任何生产者和消费者的数据,就不需要发送心跳包
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
}
// brokerAddrTable 不为空
if (!this.brokerAddrTable.isEmpty()) {
// 发送心跳的次数 + 1
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
// 遍历所有 broker
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
// 获取 broker 名字
String brokerName = entry.getKey();
// 这里的 oneTable 是指 id -> address, 每一个 broker 都需要记录下来这个 broker 所在的集群的其他 broker 地址
// oneTable 的 key 就是 id,0 表示主节点,其他数字表示从节点,value 就是节点的地址
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
// 这里就是如果消费者配置为空,并且当前的 broker 节点不是 Master 节点,就没必要注册到从节点上面了
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
// producer 生产者只需要和从节点发送心跳即可, 但是如果也有消费者,那么也可以往从节点
continue;
}
try {
// 这里就是把心跳包发送给 broker
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
// 将版本设置到 brokerVersionTable 中
// brokerName -> (address, version) 的映射关系
this.brokerVersionTable.get(brokerName).put(addr, version);
// 每发送 20 次心跳就打印一次日志
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
}
}
}
上面就是这个方法的源码,首先就通过 prepareHeartbeatData
准备一个心跳包,如果说心跳包里面的生产者和消费者都为空,就说明不需要发送心跳信息,直接返回。
如果不为空,需要发送心跳信息,需要遍历所有 broker,由于 broker 集群是以 brokerName 为标记,所以会遍历 brokerAddrTable
集合中的所有 key(brokerName),然后处理 value,value 是 HashMap<Long, String> 类型,Long 是 brokerId,String 是这个 broker 的地址,意思就是 brokerName 集群下面的主从节点。
遍历 broker 时会判断如果心跳包里面的消费者配置为空,并且当前的 broker 节点不是 Master 节点,就没必要注册到从节点上面了,生产者只需要负责和主节点建立心跳,因为生产者生产消息都是直接存储到主节点的,从节点负责同步。
接下来调用 sendHearbeat
把心跳包发送给 broker,返回 broker 记录的心跳信息的版本,然后存储到本地缓存 brokerVersionTable
中。
/**
* broker 信息版本, brokerName 集群 -> (address -> 版本)
*/
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
new ConcurrentHashMap<String, HashMap<String, Integer>>();
3. prepareHeartbeatData 准备心跳数据
/**
* 准备心跳数据包
* @return
*/
private HeartbeatData prepareHeartbeatData() {
// 心跳包
HeartbeatData heartbeatData = new HeartbeatData();
// 客户端 ID
heartbeatData.setClientID(this.clientId);
// 遍历所有消费者
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
// 构建 ConsumerData 数据
ConsumerData consumerData = new ConsumerData();
// 消费者组
consumerData.setGroupName(impl.groupName());
// 消费类型: PULL 和 PUSH
consumerData.setConsumeType(impl.consumeType());
// 消费模式: CLUSTERING 和 BROADCASTING, 也就是集群和广播
consumerData.setMessageModel(impl.messageModel());
// 消费者启动的时候从哪里开始消费, 在 ConsumeFromWhere 这个类中
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
// 订阅信息,包括过滤消息相关标签、SQL规则
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
// 加入消费者的心跳数据集合中
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
// 生产者的心跳
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
// 生产者组名
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
// 添加到生产者集合里面
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
对于消费者要设置的心跳信息为:
- groupName:消费者组
- consumeType:消费类型,PULL 和 PUSH
- messageModel:消费模式,CLUSTERING 和 BROADCASTING, 也就是集群和广播
- consumeFromWhere:消费者启动的时候从哪里开始消费, 在 ConsumeFromWhere 这个类中
- subscriptionDataSet:消费者组订阅信息,包括过滤消息相关标签、SQL规则
- unitMode
对于生产者要设置的心跳信息为:
- groupName:生产者组
4. sendHearbeat 发送心跳上报请求
/**
* 发送心跳包给 broker 节点
* @param addr
* @param heartbeatData
* @param timeoutMillis
* @return
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public int sendHearbeat(
final String addr,
final HeartbeatData heartbeatData,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// 构建心跳请求,请求编码是 HEART_BEAT
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
// 请求语言,默认是 JAVA
request.setLanguage(clientConfig.getLanguage());
// 请求体,就是心跳包
request.setBody(heartbeatData.encode());
// 这里就是通过 Netty 发送心跳包了
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
// 返回结果,返回版本号
return response.getVersion();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
这个方法就是向 broker 发送心跳信息,可以看到发送之前在请求里面设置了一些属性:
- 请求语言,默认是 JAVA
- 心跳包,也就是上面的生产者和消费者
- 请求 Code 是 HEART_BEAT
最后请求是使用同步发送的,如果发送成功就返回 broker 返回的版本。
5. broker 处理心跳请求
broker 通过 ClientManageProcessor 处理器来处理心跳请求,处理的方法就是 heartBeat
。
5.1 heartBeat 处理心跳包
/**
* 处理客户端心跳请求
* @param ctx
* @param request
* @return
*/
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
// 构建响应命令对象
RemotingCommand response = RemotingCommand.createResponseCommand(null);
// 解码,获取心跳包数据
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
// 构建客户端连接信息对象
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
// 处理心跳包中的消费者信息
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// 从 broker 的缓存中找出当前消费者组的订阅组配置
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
// 当 Consumer 发生变化的时候是否需要通知组内其他的 Consumer
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
// 从配置中获取 isNotifyConsumerIdsChangedEnable,默认就是 true
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// 消息消费的重试队列,%RETRY%groupName
String newTopic = MixAll.getRetryTopic(data.getGroupName());
// 创建消息消费重试队列
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
// 注册消费者
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
// 注册生产者
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
// 设置返回结果
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
这个方法会分别处理消费者和生产者的心跳信息,对于生产者比较简单,就是调用 registerProducer
注册生产者即可,但是对于消费者就需要给消费者组建立一个重传队列,然后再注册消费者。
重试队列就是消费者消费消息失败的时候会把消息发送到这个队列进行重试,重试队列是以消费者组为维度的,也就是说消费者重传是以消费者组为维度的,一个消费者组里面的消费者共享一个重传队列。
5.2 createTopicInSendMessageBackMethod 创建重传 topic
/**
* 创建重传 topic,持久化到配置文件中,文件地址 ${user.home}/store/config/topics.json
* @param topic // 待创建的 topic
* @param clientDefaultTopicQueueNums // topic 下面的默认队列数, 默认 1
* @param perm // 队列权限, 默认读写都有
* @param topicSysFlag // topic 标识
* @return
*/
public TopicConfig createTopicInSendMessageBackMethod(
final String topic,
final int clientDefaultTopicQueueNums,
final int perm,
final int topicSysFlag) {
// 从 topic 配置中获取下这个 topic
TopicConfig topicConfig = this.topicConfigTable.get(topic);
// 如果已经存在配置了,那么直接返回
if (topicConfig != null)
return topicConfig;
boolean createNew = false;
try {
// 新建的时候需要加锁,防止创建相同的 topic 互相覆盖
if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
// 再次获取 topic 信息,双重检查锁
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
// 这里就是新建配置
topicConfig = new TopicConfig(topic);
// 重传队列的读写队列数都是 1
topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
// 权限默认是读写
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
log.info("create new topic {}", topicConfig);
// 添加到 topicConfigTable 集合里面
this.topicConfigTable.put(topic, topicConfig);
createNew = true;
// 获取下一个版本,这个版本用于标识当前 topicConfigTable 有没有发生变化,比如在从节点同步 topicConfigTable 的
// 时候就可以使用版本和本地存储的版本进行队列,如果发生了变化再重新写入文件中
this.dataVersion.nextVersion();
// 持久化到文件中,文件地址: ${user.home}/store/config/topics.json
this.persist();
} finally {
// 解锁
this.topicConfigTableLock.unlock();
}
}
} catch (InterruptedException e) {
log.error("createTopicInSendMessageBackMethod exception", e);
}
// 如果创建了新的 topic
if (createNew) {
// 注册 broker 信息
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
创建重传 topic 时首先从 broker 的本地缓存 topicConfigTable 中获取 topic 的配置,然后如果已经存在 topic 配置了,就直接返回,否则才去创建。
在新建 topic 的时候需要加锁,防止并发创建 topic,可以看到重传 topic 设置的属性如下:
- 读写队列数都是 1
- 权限默认是可读写
- topic 系统标识
创建出来之后设置到本地缓存 topicConfigTable
中,就代表创建成功了,然后更新版本 dataVersion
,接着持久化到文件中,文件地址: ${user.home}/store/config/topics.json,更新版本就是因为 broker 需要向 NameServer 注册 topic 配置信息,把版本也传过去标识 broker 心跳版本,可以说 broker 的心跳就是 topic 配置信息。
最后如果创建了新的 topic,就会像 NameServer 注册 topic 信息,registerBrokerAll 的逻辑可以看这篇文章:【RocketMQ Broker 相关源码】-注册 broker 信息到所有的 NameServer。
5.3 registerConsumer 注册消费者
/**
* 注册消费者
* @param group 消费者组
* @param clientChannelInfo 客户端连接通道
* @param consumeType 消费类型(PULL 或者 PUSH)
* @param messageModel 消费模式(集群或者广播)
* @param consumeFromWhere 消费点位
* @param subList 消费者组订阅信息, 一个消费者组里面的消费者可以订阅多个 topic 下面的消息
* @param isNotifyConsumerIdsChangedEnable 消费者组里面的消费者发生变化时是否需要通知其他消费者重平衡
* @return
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
// 创建一个新的 ConsumerGroupInfo
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
// 如果不存在才新增
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 是否在这个消费者组里面新增了消费者连接
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 是否更新了消费者组订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 如果发生了变更
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
// 通知消费者重平衡
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 处理消费者注册事件
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
// 是否发生了变更
return r1 || r2;
}
注册消费者需要处理两件事,首先就是注册消费者连接到 channelInfoTable
集合中,channelInfoTable
集合用于管理消费者组下面的连接信息,key 是连接,value 是 ClientChannelInfo,是连接信息。
// 消费者组下面的连接信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
public class ClientChannelInfo {
// 连接通道
private final Channel channel;
// 客户端 ID
private final String clientId;
// 一般是 JAVA
private final LanguageCode language;
// 版本
private final int version;
// 上一次上报心跳的事件
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
...
}
然后就是更新消费者组的订阅信息,就是本地 subscriptionTable
集合。
5.3.1 updateChannel 更新消费者连接通道
/**
* 更新消费者连接
* @param infoNew 客户端连接通道
* @param consumeType 消费类型(PULL 或者 PUSH)
* @param messageModel 消费模式(集群或者广播)
* @param consumeFromWhere 消费点位
* @return
*/
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false;
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
// 获取原来的连接
ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
if (null == infoOld) {
// 如果不存在就新增
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) {
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
// 标记设置为 true
updated = true;
}
infoOld = infoNew;
} else {
// 如果已存在就判断需不需要更新
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
// 这里就是出现 BUG 了, 因为正常来说一个连接通道和一个 clientId 对应, 记录下日志
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
this.groupName,
infoOld.toString(),
infoNew.toString());
// 重新修正 channelInfoTable 里面的映射关系
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
// 更新 lastUpdateTimestamp 属性, 表示当前消费者组上一次上报心跳的时间
this.lastUpdateTimestamp = System.currentTimeMillis();
infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
return updated;
}
更新连接通道的逻辑就是更新 channelInfoTable 集合,然后更新下 lastUpdateTimestamp,因为这个方法是在 ConsumerGroupInfo 中的,所以更新的 lastUpdateTimestamp 就代表这个消费者组上一次上报心跳时间。更新这个属性是因为 broker 有一个定时任务
5.3.2 updateSubscription 更新订阅消息
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
// 1. 遍历所有订阅信息, 将 subList 设置到 subscriptionTable 中
for (SubscriptionData sub : subList) {
// 获取原来的订阅信息
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
// 如果获取不到就新建
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
// 如果当前新增的订阅信息版本比原来的要大且消费者类型是 PUSH
} else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
// 更新订阅信息集合, 这种情况下关系到消费者拉取消息就要更新, 如果是 PULL 类型由于是用户控制就不需要更新
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
// 2. 删除不存在的订阅信息
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
// 遍历所有订阅信息
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
for (SubscriptionData sub : subList) {
if (sub.getTopic().equals(oldTopic)) {
// 如果 subscriptionTable 里面存储的 topic 订阅信息不在 subList 集合中, 说明消费者没有上报过来
exist = true;
break;
}
}
if (!exist) {
// 如果不存在, 说明这个订阅信息被修改了
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
next.getValue().toString()
);
// 删掉这个订阅信息
it.remove();
updated = true;
}
}
// 设置上报心跳的时间
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
更新订阅消息流程分为两大步,首先是遍历所有订阅信息, 将消费者上报的 subList 设置到 subscriptionTable 中,如果 broker 中没有存储这个订阅消息,又或者存储的这个订阅信息版本过期了,就会更新到 subscriptionTable 中。
然后就是删除不存在的订阅信息,比如原来 subscriptionTable 存在 topicA -> subA
和 topicB -> subB
订阅信息,而消费者上报过来的消费者组订阅信息是 topicA -> subA
,那么 topicB -> subB
就会被删掉,这也透露一件事,就是消费者组里面的消费者订阅关系需要一直,具体可以看官方的这篇文章:RocketMQ 订阅关系。
5.3.3 通知消费者重平衡
当消费者组里面新增了消费者又或者消费者的订阅关系发生了变化,比如又订阅了多一个 topic,这种情况下会通过 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel())
通知消费者进行重平衡。当然这个方法在 【RocketMQ Broker 相关源码】- NettyRemotingClient 和 NettyRemotingServer 这篇文章的 3.3.1 有讲解,所以这里不再多说。
5.3.4 处理消费者注册事件
同样的注册事件也是通过 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel())
来处理,而这里的注册主要指的是消费者过滤信息的注册,就是 SQL92 过滤信息的注册。
可以看到就是通过 register 方法去注册过滤信息,下面就从这个方法入手。
5.3.4.1 register 注册消费者过滤信息
/**
* 注册消费者组过滤信息
* @param consumerGroup
* @param subList
*/
public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {
for (SubscriptionData subscriptionData : subList) {
// 一个一个注册
register(
subscriptionData.getTopic(),
consumerGroup,
subscriptionData.getSubString(),
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion()
);
}
// 获取消费者组过滤信息
Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);
Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();
// 遍历旧的过滤信息
while (iterator.hasNext()) {
ConsumerFilterData filterData = iterator.next();
boolean exist = false;
for (SubscriptionData subscriptionData : subList) {
// 现在新上报的订阅信息是已经存在的
if (subscriptionData.getTopic().equals(filterData.getTopic())) {
// 信息存在, 不需要删除
exist = true;
break;
}
}
// 如果这个过滤信息已经不再上报了, 将原来的过滤信息过期事件设置为当前时间, 相当于懒删除了
if (!exist && !filterData.isDead()) {
filterData.setDeadTime(System.currentTimeMillis());
// 日志输出
log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);
}
}
}
/**
* 获取消费者组的过滤信息
* @param consumerGroup
* @return
*/
public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) {
Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>();
// 遍历 filterDataByTopic
Iterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator();
while (topicIterator.hasNext()) {
FilterDataMapByTopic filterDataMapByTopic = topicIterator.next();
// 获取 topic 下面的消费者过滤信息
Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator();
// 遍历这些过滤信息
while (filterDataIterator.hasNext()) {
ConsumerFilterData filterData = filterDataIterator.next();
// 如果跟参数消费者组一样, 说明这个 topic 是这个消费者组需要消费的
if (filterData.getConsumerGroup().equals(consumerGroup)) {
// 添加到返回集合中
ret.add(filterData);
}
}
}
return ret;
}
客户端上报过来的心跳信息里面的消费者订阅信息包括了消费者的一些过滤信息,之前也说过了消费者组里面的消费者订阅关系需要保持一致,所以消费者上报过来的订阅信息就可以认为是这个消费者组的订阅信息,因此可以看到遍历 subList
一个一个注册之后,需要将旧的过滤信息里面没有上报过来的删掉,不过这里的删掉是懒删除,只是设置下 ConsumerFilterData#deadTime
为当前时间,就表示这个过滤信息已经过期了。
而 getByGroup
就是获取这个消费者组的消费者过滤信息,获取的逻辑是遍历 filterDataByTopic
集合来获取。filterDataByTopic 以 topic 为 key,因为一个 topic 可以被多个消费者组下面的消费者去消费,所以 value 是 FilterDataMapByTopic
对象,这个对象里面的属性是一个 ConcurrentMap 集合和 topic,集合的 key 是消费者组,value 是消费者组的过滤信息。
/**
* 消费者过滤信息, 一个 topic 可以被多个消费者组下面的消费者去消费, 所以这里是以 topic 为 key
*/
private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>
filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);
public static class FilterDataMapByTopic {
/**
* 消费者组的过滤信息
*/
private ConcurrentMap<String/*consumer group*/, ConsumerFilterData>
groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();
private String topic;
...
}
getByGroup 的遍历逻辑就是遍历所有 topic 下面的 FilterDataMapByTopic,然后继续遍历 FilterDataMapByTopic 里面的 groupFilterData 属性,判断这个 topic 是否被这个消费者组消费,逻辑并不复杂,就是里面的集合有点绕。
5.3.4.2 register 创建 FilterDataMapByTopic 对象
继续回到 register 方法,这个方法就是负责创建出 FilterDataMapByTopic,上面我们也说了 filterDataByTopic 以 FilterDataMapByTopic 为 value,所以这个方法主要还是创建出 FilterDataMapByTopic 对象。
/**
* 注册 SQL92 信息到消费者组集合 filterDataByTopic 中
* @param topic
* @param consumerGroup
* @param expression
* @param type
* @param clientVersion
* @return
*/
public boolean register(final String topic, final String consumerGroup, final String expression,
final String type, final long clientVersion) {
// 如果是 TAG 类型的过滤信息, 直接返回
if (ExpressionType.isTagType(type)) {
return false;
}
// 如果 SQL92 过滤表达式为空, 直接返回
if (expression == null || expression.length() == 0) {
return false;
}
// 获取 topic 下面的过滤信息集合, 注意这里就是消费者组下面一个 topic 一个过滤信息集合
FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
if (filterDataMapByTopic == null) {
// 不存在就新建一个
FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
filterDataMapByTopic = prev != null ? prev : temp;
}
// 计算出 consumerGroup#topic 的布隆过滤器信息
// 1.经过 k 次 hash 求出来的 k 位
// 2.布隆过滤器总共多少位
BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
// 注册过滤信息到 filterDataMapByTopic 中
return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}
register 方法会在一开始就判断如果过滤类型是 TAG 就不需要注册,那就是说明 filterDataByTopic
这个集合 只存储 SQL92 过滤类型的过滤数据。
这个方法逻辑不多,值得关注的是 bloomFilter.generate
,可以看到对传入的 consumerGroup#topic
进行 hash 之后生成布隆过滤器信息,下面就看下这个布隆过滤器信息包括什么。
在讲解 generate 方法前,我们来想一下布隆过滤器有什么重要信息:
- hash 函数个数 k
- bit 数组长度 m
- 误差率 fpp
对于布隆过滤器,如果要往里面设置一个字符串,就需要先通过 k 个哈希函数求出 k 个位,然后将布隆过滤器的这 k 个位设置为1,如果感兴趣可以看这篇文章:详细说说布隆过滤器 BloomFilter。
对于消费者过滤信息,创建的 BloomFilterData 包括两个重要信息:这 k 个 hash 函数求出来的 bit 数组(长度为 k)
和 bit 数组长度 m
。
5.3.4.3 register 注册过滤信息
上面创建好布隆过滤器信息之后,最终调用 register 注册过滤信息,而注册过滤信息主要就是往 groupFilterData 集合设置,上面也说了,groupFilterData 集合存储的是 consumerGroup -> ConsumerFilterData
的映射关系。
/**
* 注册过滤信息
* @param consumerGroup 消费者组
* @param expression SQL92 过滤表达式
* @param type 过滤类型, SQL92
* @param bloomFilterData 布隆过滤器数据
* @param clientVersion 订阅信息版本
* @return
*/
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,
long clientVersion) {
// 获取旧的过滤信息
ConsumerFilterData old = this.groupFilterData.get(consumerGroup);
if (old == null) {
// 创建一个新的 ConsumerFilterData
ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
if (consumerFilterData == null) {
return false;
}
// 设置布隆过滤器信息
consumerFilterData.setBloomFilterData(bloomFilterData);
// 添加到 groupFilterData 中
old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);
if (old == null) {
// 这里就是新增成功, 直接返回
log.info("New consumer filter registered: {}", consumerFilterData);
return true;
} else {
// 并发添加了, 注意 FilterDataMapByTopic 是以 topic 为维度的, 一个 topic 可以被多个消费者组消费
if (clientVersion <= old.getClientVersion()) {
if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
// 打印下日志, 意思就是并发添加的这两过滤信息还不一样
log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
consumerGroup, topic,
clientVersion, old.getClientVersion(),
old.getExpressionType(), old.getExpression(),
type, expression);
}
// 版本一样, 另一个线程添加的过滤信息过期了, 就直接设置下过期时间为 0, 重新启用
if (clientVersion == old.getClientVersion() && old.isDead()) {
reAlive(old);
return true;
}
// 版本一样同时另一个线程添加的没有过期或者当前线程版本比较低, 当前线程就注册失败了
return false;
} else {
// 这里就是新增的版本比原来的要高, 直接覆盖原来的
this.groupFilterData.put(consumerGroup, consumerFilterData);
log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);
return true;
}
}
} else {
// 原来过滤信息已经存在了且当前添加的版本 <= 原来的
if (clientVersion <= old.getClientVersion()) {
if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
// 打印下日志, 意思就是过滤表达式不一样
log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
consumerGroup, topic,
clientVersion, old.getClientVersion(),
old.getExpressionType(), old.getExpression(),
type, expression);
}
// 如果跟原来的版本一样就直接重新启用, 逻辑和上面的一样
if (clientVersion == old.getClientVersion() && old.isDead()) {
reAlive(old);
return true;
}
// 添加的版本比原来的版本还要低或者相同但是原来的没有过期
return false;
}
// 这里就是添加的版本比原来的高, 但是还是得看下过滤信息是否有变化, 首先是过滤表达式和类型
boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);
// 然后是布隆过滤器信息是否发生了变化
if (old.getBloomFilterData() == null && bloomFilterData != null) {
change = true;
}
if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {
change = true;
}
// 过滤表达式发生了变化
if (change) {
// 创建新的过滤表达式
ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
if (consumerFilterData == null) {
// new expression compile error, remove old, let client report error.
// 这里就是过滤表达式有问题, 将旧的删掉, 然后直接返回 false 表示注册失败, 所以说如果表达式有问题也会把旧的删掉
this.groupFilterData.remove(consumerGroup);
return false;
}
// 设置布隆过滤器数据
consumerFilterData.setBloomFilterData(bloomFilterData);
// 创建成功, 添加到 groupFilterData 中
this.groupFilterData.put(consumerGroup, consumerFilterData);
log.info("Consumer filter info change, old: {}, new: {}, change: {}",
old, consumerFilterData, change);
return true;
} else {
// 过滤表达式没有发生变化, 直接设置版本号
old.setClientVersion(clientVersion);
// 如果旧的已经过期了, 重新启用
if (old.isDead()) {
reAlive(old);
}
return true;
}
}
}
protected void reAlive(ConsumerFilterData filterData) {
// 重新设置过期时间为 0
long oldDeadTime = filterData.getDeadTime();
filterData.setDeadTime(0);
log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
}
里面的注释都写得比较清楚,也可以看到这里面维护 groupFilterData 集合是根据两种情况来维护,一种是原来没有,一种是原来有的,如果是原来集合里面就有 group -> ConsumerFilterData
的映射,就需要判断当前上报的过滤信息版本和原来的对比,如果版本不是最新的就会被覆盖,一般来说如果过滤信息不变版本都不会变。
然后在这个方法中也可以看到当 old == null
的时候应该是为了防止并发,使用 putIfAbsent
添加成功之后返回 old,还要判断这个 old 是不是已经存在了,如果存在还需要对比添加的版本来决定留下哪个。
最后注意下里面的 reAlive 方法,这个方法就是将一个过期的过滤信息变可用,过滤信息 ConsumerFilterData
里面有一个属性 deadTime
表示过期时间,如果设置成不为 0 就表示过期了,所以这个 reAlive 方法就是重新将这个属性设置为 0,表示复用里面的过滤信息。
6. 小结
好了,这篇文章中我们探讨了上报心跳信息到 broker 的源码,可以看到上传的心跳信息里面不单单包括生产者,也包括消费者的,所以这个方法是生产者和消费者的共用方法。
而 broker 处理心跳的时候对于消费者不单单要处理消费者连接信息,同时也要处理消费者组订阅消息以及 SQL92 过滤信息,也就是上面的 5.3 小节。
如有错误,欢迎指出!!!!