【RocketMQ 生产者和消费者】- 生产者启动源码-上报生产者和消费者心跳信息到 broker(3)

news2025/5/23 10:34:45

文章目录

  • 1. 前言
  • 2. sendHeartbeatToAllBrokerWithLock 上报心跳信息
  • 3. prepareHeartbeatData 准备心跳数据
  • 4. sendHearbeat 发送心跳上报请求
  • 5. broker 处理心跳请求
    • 5.1 heartBeat 处理心跳包
    • 5.2 createTopicInSendMessageBackMethod 创建重传 topic
    • 5.3 registerConsumer 注册消费者
      • 5.3.1 updateChannel 更新消费者连接通道
      • 5.3.2 updateSubscription 更新订阅消息
      • 5.3.3 通知消费者重平衡
      • 5.3.4 处理消费者注册事件
        • 5.3.4.1 register 注册消费者过滤信息
        • 5.3.4.2 register 创建 FilterDataMapByTopic 对象
        • 5.3.4.3 register 注册过滤信息
  • 6. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息
  • 【RocketMQ 生产者和消费者】- 生产者启动源码-启动流程(1)
  • 【RocketMQ 生产者和消费者】- 生产者启动源码-创建 MQClientInstance(2)

上两篇文章中我们探讨了生产者的启动流程,这篇文章就来看下生产者启动之后如何发送心跳信息到 broker。


2. sendHeartbeatToAllBrokerWithLock 上报心跳信息

这个方法就是用于上报生产者的心跳信息到所有的 broker,不过可以注意到这个方法是在 MQClientInstance 里面的,上一篇文章我们就说了 MQClientInstance 是生产者和消费者的公共类,所以这个方法上报心跳的时候会把消费者和生产者的信息都一起上报到 broker 中,当然这里的生产者和消费者是指同一个进程的。

/**
 * 发送心跳信息到所有 broker
 */
private void sendHeartbeatToAllBroker() {
    // 心跳数据,这里是先准备一个心跳包
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
    // 如果没有任何生产者和消费者的数据,就不需要发送心跳包
    if (producerEmpty && consumerEmpty) {
        log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
        return;
    }

    // brokerAddrTable 不为空
    if (!this.brokerAddrTable.isEmpty()) {
        // 发送心跳的次数 + 1
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        // 遍历所有 broker
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, HashMap<Long, String>> entry = it.next();
            // 获取 broker 名字
            String brokerName = entry.getKey();
            // 这里的 oneTable 是指 id -> address, 每一个 broker 都需要记录下来这个 broker 所在的集群的其他 broker 地址
            // oneTable 的 key 就是 id,0 表示主节点,其他数字表示从节点,value 就是节点的地址
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    Long id = entry1.getKey();
                    String addr = entry1.getValue();
                    if (addr != null) {
                        // 这里就是如果消费者配置为空,并且当前的 broker 节点不是 Master 节点,就没必要注册到从节点上面了
                        if (consumerEmpty) {
                            if (id != MixAll.MASTER_ID)
                                // producer 生产者只需要和从节点发送心跳即可, 但是如果也有消费者,那么也可以往从节点
                                continue;
                        }

                        try {
                            // 这里就是把心跳包发送给 broker
                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
                            if (!this.brokerVersionTable.containsKey(brokerName)) {
                                this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                            }
                            // 将版本设置到 brokerVersionTable 中
                            // brokerName -> (address, version) 的映射关系
                            this.brokerVersionTable.get(brokerName).put(addr, version);
                            // 每发送 20 次心跳就打印一次日志
                            if (times % 20 == 0) {
                                log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                log.info(heartbeatData.toString());
                            }
                        } catch (Exception e) {
                            if (this.isBrokerInNameServer(addr)) {
                                log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                            } else {
                                log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                    id, addr, e);
                            }
                        }
                    }
                }
            }
        }
    }
}

上面就是这个方法的源码,首先就通过 prepareHeartbeatData 准备一个心跳包,如果说心跳包里面的生产者和消费者都为空,就说明不需要发送心跳信息,直接返回。

如果不为空,需要发送心跳信息,需要遍历所有 broker,由于 broker 集群是以 brokerName 为标记,所以会遍历 brokerAddrTable 集合中的所有 key(brokerName),然后处理 value,value 是 HashMap<Long, String> 类型,Long 是 brokerId,String 是这个 broker 的地址,意思就是 brokerName 集群下面的主从节点。

遍历 broker 时会判断如果心跳包里面的消费者配置为空,并且当前的 broker 节点不是 Master 节点,就没必要注册到从节点上面了,生产者只需要负责和主节点建立心跳,因为生产者生产消息都是直接存储到主节点的,从节点负责同步。

接下来调用 sendHearbeat 把心跳包发送给 broker,返回 broker 记录的心跳信息的版本,然后存储到本地缓存 brokerVersionTable 中。

/**
 * broker 信息版本, brokerName 集群 -> (address -> 版本)
 */
private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
    new ConcurrentHashMap<String, HashMap<String, Integer>>();

3. prepareHeartbeatData 准备心跳数据

/**
 * 准备心跳数据包
 * @return
 */
private HeartbeatData prepareHeartbeatData() {
    // 心跳包
    HeartbeatData heartbeatData = new HeartbeatData();

    // 客户端 ID
    heartbeatData.setClientID(this.clientId);

    // 遍历所有消费者
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            // 构建 ConsumerData 数据
            ConsumerData consumerData = new ConsumerData();
            // 消费者组
            consumerData.setGroupName(impl.groupName());
            // 消费类型: PULL 和 PUSH
            consumerData.setConsumeType(impl.consumeType());
            // 消费模式: CLUSTERING 和 BROADCASTING, 也就是集群和广播
            consumerData.setMessageModel(impl.messageModel());
            // 消费者启动的时候从哪里开始消费, 在 ConsumeFromWhere 这个类中
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());
            // 订阅信息,包括过滤消息相关标签、SQL规则
            consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
            consumerData.setUnitMode(impl.isUnitMode());

            // 加入消费者的心跳数据集合中
            heartbeatData.getConsumerDataSet().add(consumerData);
        }
    }

    // 生产者的心跳
    for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
            // 生产者组名
            ProducerData producerData = new ProducerData();
            producerData.setGroupName(entry.getKey());

            // 添加到生产者集合里面
            heartbeatData.getProducerDataSet().add(producerData);
        }
    }

    return heartbeatData;
}

对于消费者要设置的心跳信息为:

  • groupName:消费者组
  • consumeType:消费类型,PULL 和 PUSH
  • messageModel:消费模式,CLUSTERING 和 BROADCASTING, 也就是集群和广播
  • consumeFromWhere:消费者启动的时候从哪里开始消费, 在 ConsumeFromWhere 这个类中
  • subscriptionDataSet:消费者组订阅信息,包括过滤消息相关标签、SQL规则
  • unitMode

对于生产者要设置的心跳信息为:

  • groupName:生产者组

4. sendHearbeat 发送心跳上报请求

/**
 * 发送心跳包给 broker 节点
 * @param addr
 * @param heartbeatData
 * @param timeoutMillis
 * @return
 * @throws RemotingException
 * @throws MQBrokerException
 * @throws InterruptedException
 */
public int sendHearbeat(
    final String addr,
    final HeartbeatData heartbeatData,
    final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
    // 构建心跳请求,请求编码是 HEART_BEAT
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
    // 请求语言,默认是 JAVA
    request.setLanguage(clientConfig.getLanguage());
    // 请求体,就是心跳包
    request.setBody(heartbeatData.encode());
    // 这里就是通过 Netty 发送心跳包了
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            // 返回结果,返回版本号
            return response.getVersion();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

这个方法就是向 broker 发送心跳信息,可以看到发送之前在请求里面设置了一些属性:

  • 请求语言,默认是 JAVA
  • 心跳包,也就是上面的生产者和消费者
  • 请求 Code 是 HEART_BEAT

最后请求是使用同步发送的,如果发送成功就返回 broker 返回的版本。


5. broker 处理心跳请求

broker 通过 ClientManageProcessor 处理器来处理心跳请求,处理的方法就是 heartBeat
在这里插入图片描述


5.1 heartBeat 处理心跳包

/**
 * 处理客户端心跳请求
 * @param ctx
 * @param request
 * @return
 */
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
    // 构建响应命令对象
    RemotingCommand response = RemotingCommand.createResponseCommand(null);
    // 解码,获取心跳包数据
    HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
    // 构建客户端连接信息对象
    ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
        ctx.channel(),
        heartbeatData.getClientID(),
        request.getLanguage(),
        request.getVersion()
    );

    // 处理心跳包中的消费者信息
    for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
        // 从 broker 的缓存中找出当前消费者组的订阅组配置
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                data.getGroupName());
        // 当 Consumer 发生变化的时候是否需要通知组内其他的 Consumer
        boolean isNotifyConsumerIdsChangedEnable = true;
        if (null != subscriptionGroupConfig) {
            // 从配置中获取 isNotifyConsumerIdsChangedEnable,默认就是 true
            isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
            int topicSysFlag = 0;
            if (data.isUnitMode()) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            }
            // 消息消费的重试队列,%RETRY%groupName
            String newTopic = MixAll.getRetryTopic(data.getGroupName());
            // 创建消息消费重试队列
            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        }

        // 注册消费者
        boolean changed = this.brokerController.getConsumerManager().registerConsumer(
            data.getGroupName(),
            clientChannelInfo,
            data.getConsumeType(),
            data.getMessageModel(),
            data.getConsumeFromWhere(),
            data.getSubscriptionDataSet(),
            isNotifyConsumerIdsChangedEnable
        );

        if (changed) {
            log.info("registerConsumer info changed {} {}",
                data.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel())
            );
        }
    }

    // 注册生产者
    for (ProducerData data : heartbeatData.getProducerDataSet()) {
        this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
            clientChannelInfo);
    }
    // 设置返回结果
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

这个方法会分别处理消费者和生产者的心跳信息,对于生产者比较简单,就是调用 registerProducer 注册生产者即可,但是对于消费者就需要给消费者组建立一个重传队列,然后再注册消费者。

重试队列就是消费者消费消息失败的时候会把消息发送到这个队列进行重试,重试队列是以消费者组为维度的,也就是说消费者重传是以消费者组为维度的,一个消费者组里面的消费者共享一个重传队列。


5.2 createTopicInSendMessageBackMethod 创建重传 topic

/**
 * 创建重传 topic,持久化到配置文件中,文件地址 ${user.home}/store/config/topics.json
 * @param topic // 待创建的 topic
 * @param clientDefaultTopicQueueNums // topic 下面的默认队列数, 默认 1
 * @param perm // 队列权限, 默认读写都有
 * @param topicSysFlag // topic 标识
 * @return
 */
public TopicConfig createTopicInSendMessageBackMethod(
    final String topic,
    final int clientDefaultTopicQueueNums,
    final int perm,
    final int topicSysFlag) {
    // 从 topic 配置中获取下这个 topic
    TopicConfig topicConfig = this.topicConfigTable.get(topic);
    // 如果已经存在配置了,那么直接返回
    if (topicConfig != null)
        return topicConfig;

    boolean createNew = false;

    try {
        // 新建的时候需要加锁,防止创建相同的 topic 互相覆盖
        if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                // 再次获取 topic 信息,双重检查锁
                topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null)
                    return topicConfig;

                // 这里就是新建配置
                topicConfig = new TopicConfig(topic);
                // 重传队列的读写队列数都是 1
                topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
                topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
                // 权限默认是读写
                topicConfig.setPerm(perm);
                topicConfig.setTopicSysFlag(topicSysFlag);

                log.info("create new topic {}", topicConfig);
                // 添加到 topicConfigTable 集合里面
                this.topicConfigTable.put(topic, topicConfig);
                createNew = true;
                // 获取下一个版本,这个版本用于标识当前 topicConfigTable 有没有发生变化,比如在从节点同步 topicConfigTable 的
                // 时候就可以使用版本和本地存储的版本进行队列,如果发生了变化再重新写入文件中
                this.dataVersion.nextVersion();
                // 持久化到文件中,文件地址: ${user.home}/store/config/topics.json
                this.persist();
            } finally {
                // 解锁
                this.topicConfigTableLock.unlock();
            }
        }
    } catch (InterruptedException e) {
        log.error("createTopicInSendMessageBackMethod exception", e);
    }

    // 如果创建了新的 topic
    if (createNew) {
        // 注册 broker 信息
        this.brokerController.registerBrokerAll(false, true, true);
    }

    return topicConfig;
}

创建重传 topic 时首先从 broker 的本地缓存 topicConfigTable 中获取 topic 的配置,然后如果已经存在 topic 配置了,就直接返回,否则才去创建。

在新建 topic 的时候需要加锁,防止并发创建 topic,可以看到重传 topic 设置的属性如下:

  • 读写队列数都是 1
  • 权限默认是可读写
  • topic 系统标识

创建出来之后设置到本地缓存 topicConfigTable 中,就代表创建成功了,然后更新版本 dataVersion,接着持久化到文件中,文件地址: ${user.home}/store/config/topics.json,更新版本就是因为 broker 需要向 NameServer 注册 topic 配置信息,把版本也传过去标识 broker 心跳版本,可以说 broker 的心跳就是 topic 配置信息。

最后如果创建了新的 topic,就会像 NameServer 注册 topic 信息,registerBrokerAll 的逻辑可以看这篇文章:【RocketMQ Broker 相关源码】-注册 broker 信息到所有的 NameServer。


5.3 registerConsumer 注册消费者

/**
 * 注册消费者
 * @param group                             消费者组
 * @param clientChannelInfo                 客户端连接通道
 * @param consumeType                       消费类型(PULL 或者 PUSH)
 * @param messageModel                      消费模式(集群或者广播)
 * @param consumeFromWhere                  消费点位
 * @param subList                           消费者组订阅信息, 一个消费者组里面的消费者可以订阅多个 topic 下面的消息
 * @param isNotifyConsumerIdsChangedEnable  消费者组里面的消费者发生变化时是否需要通知其他消费者重平衡
 * @return
 */
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

    // 获取消费者组信息
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null == consumerGroupInfo) {
        // 创建一个新的 ConsumerGroupInfo
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        // 如果不存在才新增
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    // 是否在这个消费者组里面新增了消费者连接
    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
            consumeFromWhere);
    // 是否更新了消费者组订阅信息
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    // 如果发生了变更
    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            // 通知消费者重平衡
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }

    // 处理消费者注册事件
    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

    // 是否发生了变更
    return r1 || r2;
}

注册消费者需要处理两件事,首先就是注册消费者连接到 channelInfoTable 集合中,channelInfoTable 集合用于管理消费者组下面的连接信息,key 是连接,value 是 ClientChannelInfo,是连接信息。

// 消费者组下面的连接信息
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
    new ConcurrentHashMap<Channel, ClientChannelInfo>(16);

public class ClientChannelInfo {
    // 连接通道
    private final Channel channel;
    // 客户端 ID
    private final String clientId;
    // 一般是 JAVA
    private final LanguageCode language;
    // 版本
    private final int version;
    // 上一次上报心跳的事件
    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
	...
}

然后就是更新消费者组的订阅信息,就是本地 subscriptionTable 集合。


5.3.1 updateChannel 更新消费者连接通道

/**
 * 更新消费者连接
 * @param infoNew               客户端连接通道
 * @param consumeType           消费类型(PULL 或者 PUSH)
 * @param messageModel          消费模式(集群或者广播)
 * @param consumeFromWhere      消费点位
 * @return
 */
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
    MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
    boolean updated = false;
    this.consumeType = consumeType;
    this.messageModel = messageModel;
    this.consumeFromWhere = consumeFromWhere;

    // 获取原来的连接
    ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
    if (null == infoOld) {
        // 如果不存在就新增
        ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
        if (null == prev) {
            log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
                messageModel, infoNew.toString());
            // 标记设置为 true
            updated = true;
        }

        infoOld = infoNew;
    } else {
        // 如果已存在就判断需不需要更新
        if (!infoOld.getClientId().equals(infoNew.getClientId())) {
            // 这里就是出现 BUG 了, 因为正常来说一个连接通道和一个 clientId 对应, 记录下日志
            log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
                this.groupName,
                infoOld.toString(),
                infoNew.toString());
            // 重新修正 channelInfoTable 里面的映射关系
            this.channelInfoTable.put(infoNew.getChannel(), infoNew);
        }
    }

    // 更新 lastUpdateTimestamp 属性, 表示当前消费者组上一次上报心跳的时间
    this.lastUpdateTimestamp = System.currentTimeMillis();
    infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);

    return updated;
}

更新连接通道的逻辑就是更新 channelInfoTable 集合,然后更新下 lastUpdateTimestamp,因为这个方法是在 ConsumerGroupInfo 中的,所以更新的 lastUpdateTimestamp 就代表这个消费者组上一次上报心跳时间。更新这个属性是因为 broker 有一个定时任务


5.3.2 updateSubscription 更新订阅消息

public boolean updateSubscription(final Set<SubscriptionData> subList) {
    boolean updated = false;

    // 1. 遍历所有订阅信息, 将 subList 设置到 subscriptionTable 中
    for (SubscriptionData sub : subList) {
        // 获取原来的订阅信息
        SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
        if (old == null) {
            // 如果获取不到就新建
            SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            if (null == prev) {
                updated = true;
                log.info("subscription changed, add new topic, group: {} {}",
                    this.groupName,
                    sub.toString());
            }
        // 如果当前新增的订阅信息版本比原来的要大且消费者类型是 PUSH
        } else if (sub.getSubVersion() > old.getSubVersion()) {
            if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                log.info("subscription changed, group: {} OLD: {} NEW: {}",
                    this.groupName,
                    old.toString(),
                    sub.toString()
                );
            }

            // 更新订阅信息集合, 这种情况下关系到消费者拉取消息就要更新, 如果是 PULL 类型由于是用户控制就不需要更新
            this.subscriptionTable.put(sub.getTopic(), sub);
        }
    }

    // 2. 删除不存在的订阅信息
    Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
    while (it.hasNext()) {
        // 遍历所有订阅信息
        Entry<String, SubscriptionData> next = it.next();
        String oldTopic = next.getKey();

        boolean exist = false;
        for (SubscriptionData sub : subList) {
            if (sub.getTopic().equals(oldTopic)) {
                // 如果 subscriptionTable 里面存储的 topic 订阅信息不在 subList 集合中, 说明消费者没有上报过来
                exist = true;
                break;
            }
        }

        if (!exist) {
            // 如果不存在, 说明这个订阅信息被修改了
            log.warn("subscription changed, group: {} remove topic {} {}",
                this.groupName,
                oldTopic,
                next.getValue().toString()
            );

            // 删掉这个订阅信息
            it.remove();
            updated = true;
        }
    }

    // 设置上报心跳的时间
    this.lastUpdateTimestamp = System.currentTimeMillis();

    return updated;
}

更新订阅消息流程分为两大步,首先是遍历所有订阅信息, 将消费者上报的 subList 设置到 subscriptionTable 中,如果 broker 中没有存储这个订阅消息,又或者存储的这个订阅信息版本过期了,就会更新到 subscriptionTable 中。

然后就是删除不存在的订阅信息,比如原来 subscriptionTable 存在 topicA -> subAtopicB -> subB 订阅信息,而消费者上报过来的消费者组订阅信息是 topicA -> subA,那么 topicB -> subB 就会被删掉,这也透露一件事,就是消费者组里面的消费者订阅关系需要一直,具体可以看官方的这篇文章:RocketMQ 订阅关系。


5.3.3 通知消费者重平衡

当消费者组里面新增了消费者又或者消费者的订阅关系发生了变化,比如又订阅了多一个 topic,这种情况下会通过 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()) 通知消费者进行重平衡。当然这个方法在 【RocketMQ Broker 相关源码】- NettyRemotingClient 和 NettyRemotingServer 这篇文章的 3.3.1 有讲解,所以这里不再多说。


5.3.4 处理消费者注册事件

同样的注册事件也是通过 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()) 来处理,而这里的注册主要指的是消费者过滤信息的注册,就是 SQL92 过滤信息的注册。
在这里插入图片描述
可以看到就是通过 register 方法去注册过滤信息,下面就从这个方法入手。


5.3.4.1 register 注册消费者过滤信息
/**
 * 注册消费者组过滤信息
 * @param consumerGroup
 * @param subList
 */
public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {
    for (SubscriptionData subscriptionData : subList) {
        // 一个一个注册
        register(
            subscriptionData.getTopic(),
            consumerGroup,
            subscriptionData.getSubString(),
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion()
        );
    }

    // 获取消费者组过滤信息
    Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);

    Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();
    // 遍历旧的过滤信息
    while (iterator.hasNext()) {
        ConsumerFilterData filterData = iterator.next();

        boolean exist = false;
        for (SubscriptionData subscriptionData : subList) {
            // 现在新上报的订阅信息是已经存在的
            if (subscriptionData.getTopic().equals(filterData.getTopic())) {
                // 信息存在, 不需要删除
                exist = true;
                break;
            }
        }

        // 如果这个过滤信息已经不再上报了, 将原来的过滤信息过期事件设置为当前时间, 相当于懒删除了
        if (!exist && !filterData.isDead()) {
            filterData.setDeadTime(System.currentTimeMillis());
            // 日志输出
            log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);
        }
    }
}

/**
 * 获取消费者组的过滤信息
 * @param consumerGroup
 * @return
 */
public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) {
    Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>();

    // 遍历 filterDataByTopic
    Iterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator();
    while (topicIterator.hasNext()) {
        FilterDataMapByTopic filterDataMapByTopic = topicIterator.next();

        // 获取 topic 下面的消费者过滤信息
        Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator();

        // 遍历这些过滤信息
        while (filterDataIterator.hasNext()) {
            ConsumerFilterData filterData = filterDataIterator.next();

            // 如果跟参数消费者组一样, 说明这个 topic 是这个消费者组需要消费的
            if (filterData.getConsumerGroup().equals(consumerGroup)) {
                // 添加到返回集合中
                ret.add(filterData);
            }
        }
    }

    return ret;
}

客户端上报过来的心跳信息里面的消费者订阅信息包括了消费者的一些过滤信息,之前也说过了消费者组里面的消费者订阅关系需要保持一致,所以消费者上报过来的订阅信息就可以认为是这个消费者组的订阅信息,因此可以看到遍历 subList 一个一个注册之后,需要将旧的过滤信息里面没有上报过来的删掉,不过这里的删掉是懒删除,只是设置下 ConsumerFilterData#deadTime 为当前时间,就表示这个过滤信息已经过期了。

getByGroup 就是获取这个消费者组的消费者过滤信息,获取的逻辑是遍历 filterDataByTopic 集合来获取。filterDataByTopic 以 topic 为 key,因为一个 topic 可以被多个消费者组下面的消费者去消费,所以 value 是 FilterDataMapByTopic 对象,这个对象里面的属性是一个 ConcurrentMap 集合和 topic,集合的 key 是消费者组,value 是消费者组的过滤信息。

/**
 * 消费者过滤信息, 一个 topic 可以被多个消费者组下面的消费者去消费, 所以这里是以 topic 为 key
 */
private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>
    filterDataByTopic = new ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>(256);

public static class FilterDataMapByTopic {
    /**
     * 消费者组的过滤信息
     */
    private ConcurrentMap<String/*consumer group*/, ConsumerFilterData>
        groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();

    private String topic;

	...
}

getByGroup 的遍历逻辑就是遍历所有 topic 下面的 FilterDataMapByTopic,然后继续遍历 FilterDataMapByTopic 里面的 groupFilterData 属性,判断这个 topic 是否被这个消费者组消费,逻辑并不复杂,就是里面的集合有点绕。


5.3.4.2 register 创建 FilterDataMapByTopic 对象

继续回到 register 方法,这个方法就是负责创建出 FilterDataMapByTopic,上面我们也说了 filterDataByTopic 以 FilterDataMapByTopic 为 value,所以这个方法主要还是创建出 FilterDataMapByTopic 对象。

/**
 * 注册 SQL92 信息到消费者组集合 filterDataByTopic 中
 * @param topic
 * @param consumerGroup
 * @param expression
 * @param type
 * @param clientVersion
 * @return
 */
public boolean register(final String topic, final String consumerGroup, final String expression,
    final String type, final long clientVersion) {
    // 如果是 TAG 类型的过滤信息, 直接返回
    if (ExpressionType.isTagType(type)) {
        return false;
    }

    // 如果 SQL92 过滤表达式为空, 直接返回
    if (expression == null || expression.length() == 0) {
        return false;
    }

    // 获取 topic 下面的过滤信息集合, 注意这里就是消费者组下面一个 topic 一个过滤信息集合
    FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);

    if (filterDataMapByTopic == null) {
        // 不存在就新建一个
        FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
        FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
        filterDataMapByTopic = prev != null ? prev : temp;
    }

    // 计算出 consumerGroup#topic 的布隆过滤器信息
    // 1.经过 k 次 hash 求出来的 k 位
    // 2.布隆过滤器总共多少位
    BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);

    // 注册过滤信息到 filterDataMapByTopic 中
    return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}

register 方法会在一开始就判断如果过滤类型是 TAG 就不需要注册,那就是说明 filterDataByTopic 这个集合 只存储 SQL92 过滤类型的过滤数据

这个方法逻辑不多,值得关注的是 bloomFilter.generate,可以看到对传入的 consumerGroup#topic 进行 hash 之后生成布隆过滤器信息,下面就看下这个布隆过滤器信息包括什么。

在讲解 generate 方法前,我们来想一下布隆过滤器有什么重要信息:

  1. hash 函数个数 k
  2. bit 数组长度 m
  3. 误差率 fpp

对于布隆过滤器,如果要往里面设置一个字符串,就需要先通过 k 个哈希函数求出 k 个位,然后将布隆过滤器的这 k 个位设置为1,如果感兴趣可以看这篇文章:详细说说布隆过滤器 BloomFilter。

对于消费者过滤信息,创建的 BloomFilterData 包括两个重要信息:这 k 个 hash 函数求出来的 bit 数组(长度为 k)bit 数组长度 m
在这里插入图片描述


5.3.4.3 register 注册过滤信息

上面创建好布隆过滤器信息之后,最终调用 register 注册过滤信息,而注册过滤信息主要就是往 groupFilterData 集合设置,上面也说了,groupFilterData 集合存储的是 consumerGroup -> ConsumerFilterData 的映射关系。

/**
 * 注册过滤信息
 * @param consumerGroup     消费者组
 * @param expression        SQL92 过滤表达式
 * @param type              过滤类型, SQL92
 * @param bloomFilterData   布隆过滤器数据
 * @param clientVersion     订阅信息版本
 * @return
 */
public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData,
    long clientVersion) {
    // 获取旧的过滤信息
    ConsumerFilterData old = this.groupFilterData.get(consumerGroup);

    if (old == null) {
        // 创建一个新的 ConsumerFilterData
        ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
        if (consumerFilterData == null) {
            return false;
        }
        // 设置布隆过滤器信息
        consumerFilterData.setBloomFilterData(bloomFilterData);

        // 添加到 groupFilterData 中
        old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);
        if (old == null) {
            // 这里就是新增成功, 直接返回
            log.info("New consumer filter registered: {}", consumerFilterData);
            return true;
        } else {
            // 并发添加了, 注意 FilterDataMapByTopic 是以 topic 为维度的, 一个 topic 可以被多个消费者组消费
            if (clientVersion <= old.getClientVersion()) {
                if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
                    // 打印下日志, 意思就是并发添加的这两过滤信息还不一样
                    log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
                        consumerGroup, topic,
                        clientVersion, old.getClientVersion(),
                        old.getExpressionType(), old.getExpression(),
                        type, expression);
                }
                // 版本一样, 另一个线程添加的过滤信息过期了, 就直接设置下过期时间为 0, 重新启用
                if (clientVersion == old.getClientVersion() && old.isDead()) {
                    reAlive(old);
                    return true;
                }

                // 版本一样同时另一个线程添加的没有过期或者当前线程版本比较低, 当前线程就注册失败了
                return false;
            } else {
                // 这里就是新增的版本比原来的要高, 直接覆盖原来的
                this.groupFilterData.put(consumerGroup, consumerFilterData);
                log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);
                return true;
            }
        }
    } else {
        // 原来过滤信息已经存在了且当前添加的版本 <= 原来的
        if (clientVersion <= old.getClientVersion()) {
            if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
                // 打印下日志, 意思就是过滤表达式不一样
                log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
                    consumerGroup, topic,
                    clientVersion, old.getClientVersion(),
                    old.getExpressionType(), old.getExpression(),
                    type, expression);
            }
            // 如果跟原来的版本一样就直接重新启用, 逻辑和上面的一样
            if (clientVersion == old.getClientVersion() && old.isDead()) {
                reAlive(old);
                return true;
            }

            // 添加的版本比原来的版本还要低或者相同但是原来的没有过期
            return false;
        }

        // 这里就是添加的版本比原来的高, 但是还是得看下过滤信息是否有变化, 首先是过滤表达式和类型
        boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);
        // 然后是布隆过滤器信息是否发生了变化
        if (old.getBloomFilterData() == null && bloomFilterData != null) {
            change = true;
        }
        if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {
            change = true;
        }

        // 过滤表达式发生了变化
        if (change) {
            // 创建新的过滤表达式
            ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
            if (consumerFilterData == null) {
                // new expression compile error, remove old, let client report error.
                // 这里就是过滤表达式有问题, 将旧的删掉, 然后直接返回 false 表示注册失败, 所以说如果表达式有问题也会把旧的删掉
                this.groupFilterData.remove(consumerGroup);
                return false;
            }
            // 设置布隆过滤器数据
            consumerFilterData.setBloomFilterData(bloomFilterData);

            // 创建成功, 添加到 groupFilterData 中
            this.groupFilterData.put(consumerGroup, consumerFilterData);

            log.info("Consumer filter info change, old: {}, new: {}, change: {}",
                old, consumerFilterData, change);

            return true;
        } else {
            // 过滤表达式没有发生变化, 直接设置版本号
            old.setClientVersion(clientVersion);
            // 如果旧的已经过期了, 重新启用
            if (old.isDead()) {
                reAlive(old);
            }
            return true;
        }
    }
}

protected void reAlive(ConsumerFilterData filterData) {
    // 重新设置过期时间为 0
    long oldDeadTime = filterData.getDeadTime();
    filterData.setDeadTime(0);
    log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
}

里面的注释都写得比较清楚,也可以看到这里面维护 groupFilterData 集合是根据两种情况来维护,一种是原来没有,一种是原来有的,如果是原来集合里面就有 group -> ConsumerFilterData 的映射,就需要判断当前上报的过滤信息版本和原来的对比,如果版本不是最新的就会被覆盖,一般来说如果过滤信息不变版本都不会变。

然后在这个方法中也可以看到当 old == null 的时候应该是为了防止并发,使用 putIfAbsent 添加成功之后返回 old,还要判断这个 old 是不是已经存在了,如果存在还需要对比添加的版本来决定留下哪个。

最后注意下里面的 reAlive 方法,这个方法就是将一个过期的过滤信息变可用,过滤信息 ConsumerFilterData 里面有一个属性 deadTime 表示过期时间,如果设置成不为 0 就表示过期了,所以这个 reAlive 方法就是重新将这个属性设置为 0,表示复用里面的过滤信息。


6. 小结

好了,这篇文章中我们探讨了上报心跳信息到 broker 的源码,可以看到上传的心跳信息里面不单单包括生产者,也包括消费者的,所以这个方法是生产者和消费者的共用方法。

而 broker 处理心跳的时候对于消费者不单单要处理消费者连接信息,同时也要处理消费者组订阅消息以及 SQL92 过滤信息,也就是上面的 5.3 小节。





如有错误,欢迎指出!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2383818.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python----循环神经网络(Word2Vec的优化)

一、负采样 基本思想&#xff1a; 在训练过程中&#xff0c;对于每个正样本&#xff08;中心词和真实上下文词组成的词对&#xff09;&#xff0c;随机采样少量&#xff08;如5-20个&#xff09;负样本&#xff08;中心词与非上下文词组成的词对&#xff09;。 模型通过区分正…

Simon J.D. Prince《Understanding Deep Learning》

学习神经网络和深度学习推荐这本书&#xff0c;这本书站位非常高&#xff0c;且很多问题都深入剖析了&#xff0c;甩其他同类书籍几条街。 多数书&#xff0c;不深度分析、没有知识体系&#xff0c;知识点零散、章节之间孤立。还有一些人Tian所谓的权威&#xff0c;醒醒吧。 …

开搞:第四个微信小程序:图上县志

原因&#xff1a;我换了一个微信号来搞&#xff0c;因为用同一个用户&#xff0c;备案只能一个个的来。这样不行。所以我换了一个。原来注册过小程序。现在修改即可。注意做好计划后&#xff0c;速度备案和审核&#xff0c;不然你时间浪费不起。30元花起。 结构&#xff1a; -…

Seata源码—7.Seata TCC模式的事务处理一

大纲 1.Seata TCC分布式事务案例配置 2.Seata TCC案例服务提供者启动分析 3.TwoPhaseBusinessAction注解扫描源码 4.Seata TCC案例分布式事务入口分析 5.TCC核心注解扫描与代理创建入口源码 6.TCC动态代理拦截器TccActionInterceptor 7.Action拦截处理器ActionIntercept…

【语法】C++的map/set

目录 平衡二叉搜索树 set insert() find() erase() swap() map insert() 迭代器 erase() operator[] multiset和multimap 在之前学习的STL中&#xff0c;string&#xff0c;vector&#xff0c;list&#xff0c;deque&#xff0c;array都是序列式容器&#xff0c;它们的…

vue vite textarea标签按下Shift+Enter 换行输入,只按Enter则提交的实现思路

注意input标签不能实现&#xff0c;需要用textarea标签 直接看代码 <template><textareav-model"message"keydown.enter"handleEnter"placeholder"ShiftEnter 换行&#xff0c;Enter 提交"></textarea> </template>&l…

深入理解 PlaNet(Deep Planning Network):基于python从零实现

引言&#xff1a;基于模型的强化学习与潜在动态 基于模型的强化学习&#xff08;Model-based Reinforcement Learning&#xff09;旨在通过学习环境动态的模型来提高样本效率。这个模型可以用来进行规划&#xff0c;让智能体在不需要与真实环境进行每一次决策交互的情况下&…

仿腾讯会议——视频发送接收

1、 添加音频模块 2、刷新图片&#xff0c;触发重绘 3、 等比例缩放视频帧 4、 新建视频对象 5、在中介者内定义发送视频帧的函数 6、完成发送视频的函数 7、 完成开启/关闭视频 8、绑定视频的信号槽函数 9、 完成开启/关闭视频 10、 完成发送视频 11、 完成刷新图片显示 12、完…

从3.7V/5V到7.4V,FP6291在应急供电智能门锁中的应用

在智能家居蓬勃发展的当下&#xff0c;智能门锁以其便捷、安全的特性&#xff0c;成为现代家庭安防的重要组成部分。在智能门锁电量耗尽的情况下&#xff0c;应急电源外接移动电源&#xff08;USB5V输入&#xff09; FP6291升压到7.4V供电可应急开锁。增强用户在锁具的安全性、…

【人工智障生成日记1】从零开始训练本地小语言模型

&#x1f3af; 从零开始训练本地小语言模型&#xff1a;MiniGPT TinyStories&#xff08;4090Ti&#xff09; &#x1f9ed; 项目背景 本项目旨在以学习为目的&#xff0c;从头构建一个完整的本地语言模型训练管线。目标是&#xff1a; ✅ 不依赖外部云计算✅ 完全本地运行…

Selenium-Java版(frame切换/窗口切换)

frame切换/窗口切换 前言 切换到frame 原因 解决 切换回原来的主html 切换到新的窗口 问题 解决 回到原窗口 法一 法二 示例 前言 参考教程&#xff1a;Python Selenium Web自动化 2024版 - 自动化测试 爬虫_哔哩哔哩_bilibili 上期文章&#xff1a;Sel…

一文深度解析:Pump 与 PumpSwap 的协议机制与技术差异

在 Solana 链上&#xff0c;Pump.fun 和其延伸产品 PumpSwap 构成了 meme coin 发行与流通的两大核心场景。从初期的游戏化发行模型&#xff0c;到后续的自动迁移与交易市场&#xff0c;Pump 系列协议正在推动 meme coin 从“爆发性投机”走向“协议化运营”。本文将从底层逻辑…

星云智控v1.0.0产品发布会圆满举行:以创新技术重构物联网监控新生态

星云智控v1.0.0产品发布会圆满举行&#xff1a;以创新技术重构物联网监控新生态 2024年5月15日&#xff0c;成都双流蛟龙社区党群服务中心迎来了一场备受业界瞩目的发布会——优雅草科技旗下”星云智控v1.0.0”物联网AI智控系统正式发布。本次发布会吸引了包括沃尔沃集团、新希…

SpringBoot(一)--- Maven基础

目录 前言 一、初始Maven 1.依赖管理 2.项目构建 3.统一项目结构 二、IDEA集成Maven 1.Maven安装 2.创建Maven项目 2.1全局设置 2.2 创建SpringBoot项目 2.3 常见问题 三、单元测试 1.JUnit入门 2.断言 前言 Maven 是一款用于管理和构建Java项目的工具&#xff…

基于FPGA控制电容阵列与最小反射算法的差分探头优化设计

在现代高速数字系统测试中&#xff0c;差分探头的信号完整性直接影响测量精度。传统探头存在阻抗失配导致的信号反射问题&#xff0c;本文提出一种通过FPGA动态控制电容阵列&#xff0c;结合最小反射算法的优化方案&#xff0c;可实时调整探头等效容抗&#xff0c;将信号反射损…

kakfa 基本了解

部署结构 Kafka 使用zookeeper来协商和同步&#xff0c;但是kafka 从版本3.5正式开始deprecate zookeeper, 同时推荐使用自带的 kraft. 而从4.0 开始则不再支持 zookeeper。 所以 kafka 是有control plane 和 data plane 的。 data plane 就是broker&#xff0c;control plane…

Origin绘制多因子柱状点线图

多因子柱状点线图是一种结合柱状图和点线图的复合图表&#xff0c;常用于同时展示多个因子&#xff08;变量&#xff09;在不同分组下的分布和趋势变化。 适用场景&#xff1a; &#xff08;1&#xff09;比较多个因子在不同分组中的数值大小&#xff08;柱状图&#xff09;&a…

Web漏洞扫描服务的特点与优势:守护数字时代的安全防线

在数字化浪潮中&#xff0c;Web应用程序的安全性已成为企业业务连续性和用户信任的核心要素。随着网络攻击手段的不断升级&#xff0c;Web漏洞扫描服务作为一种主动防御工具&#xff0c;逐渐成为企业安全体系的标配。本文将从特点与优势两方面&#xff0c;解析其价值与应用场景…

抛弃传统P2P技术,EasyRTC音视频基于WebRTC打造教育/会议/远程巡检等场景实时通信解决方案

一、方案背景 随着网络通信发展&#xff0c;实时音视频需求激增。传统服务器中转方式延迟高、资源消耗大&#xff0c;WebP2P技术由此兴起。EasyRTC作为高性能实时通信平台&#xff0c;集成WebP2P技术&#xff0c;实现低延迟、高效率音视频通信&#xff0c;广泛应用于教育、医疗…

俄罗斯军总参情报局APT28组织瞄准援乌后勤供应链发起全球网络攻击

2025年5月&#xff0c;由美国、英国、欧盟和北约网络安全与情报机构联合发布的最新网络安全公告披露&#xff0c;俄罗斯军总参情报局&#xff08;GRU&#xff09;第85特别服务中心第26165部队&#xff08;又称APT28、Fancy Bear、Forest Blizzard和BlueDelta&#xff09;正持续…