RocketMQ 消息发送核心源码解析:DefaultMQProducerImpl.send () 方法深度剖析

news2025/7/22 22:34:08

引言

在分布式系统中,消息队列是实现异步通信、服务解耦和流量削峰的关键组件。Apache RocketMQ 作为一款高性能、高可靠的消息中间件,被广泛应用于各类互联网场景。其中,消息发送是最基础也是最重要的功能之一。本文将深入剖析 RocketMQ 中 DefaultMQProducerImpl.send() 方法的实现原理,带你了解消息发送的核心流程和关键技术点。

一、DefaultMQProducerImpl 概述

DefaultMQProducerImpl 是 RocketMQ 消息生产者的核心实现类,它实现了消息发送的具体逻辑。其类层次结构如下:

DefaultMQProducerImpl
    ├── 实现了 MQProducerInner 接口
    ├── 依赖于 MQClientInstance 进行网络通信
    ├── 包含 TopicPublishInfoManager 管理主题路由信息
    ├── 使用 SendMessageHookList 支持发送钩子扩展

 核心属性

 private final InternalLogger log = ClientLogger.getLog();
    /**
     * 随机数
     */
    private final Random random = new Random();
    /**
     * 关联的消息生产者的组件
     */
    private final DefaultMQProducer defaultMQProducer;
    /**
     * topic 发布信息的映射表
     */
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
    /**
     * 发送消息的钩子List
     *
     */
    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
    /**
     * 完成事务消息的钩子list
     */
    private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
    /**
     * rpc钩子
     */
    private final RPCHook rpcHook;
    /**
     * 异步发送线程池队列
     */
    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
    /**
     * 默认异步发送消息的线程池
     */
    private final ExecutorService defaultAsyncSenderExecutor;
    /**
     * 检查请求的队列
     */
    protected BlockingQueue<Runnable> checkRequestQueue;
    /**
     * 检查请求的线程池
     */
    protected ExecutorService checkExecutor;
    /**
     * 服务的状态
     */
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    /**
     * 客户端的实例
     */
    private MQClientInstance mQClientFactory;
    /**
     * 检查禁用钩子的list
     */
    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
    /**
     * 容错策略
     */
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
    /**
     * 异步化发送消息的线程池
     */
    private ExecutorService asyncSenderExecutor;

二、send () 方法的核心流程

2.1 方法签名与重载

DefaultMQProducerImpl 提供了多个重载的 send() 方法,支持同步、异步和单向发送模式:

// 同步发送
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

// 异步发送
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;

// 单向发送
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;

// 带超时参数的同步发送
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

// 带队列选择器的发送
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

2.2 核心发送流程

所有 send() 方法最终都会调用 sendDefaultImpl() 方法,这是消息发送的核心实现:

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();

        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;

        //根据topic进行查找topic的路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //设置总的重试次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //进行选择一个消息队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                //选择出来的队列不为null
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        //走一个网络通信逻辑
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                            continue;
                        } else {
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

三、关键步骤详解

3.1 获取主题路由信息

 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //从本地的Map中进行获取Topic的路由信息
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //本地的Map中没有这个Topic的路由信息,那么就进行从NameServer中获取路由信息
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 从远程的NameServer中获取路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        //topic的路由信息是可用的,那么就直接返回这个topicPublishInfo
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //从远程的NameServer中获取路由信息 在进行返回
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

3.2 消息队列选择

/**
     * 选择一个可用的队列
     * @param tpInfo topic的路由信息
     * @param lastBrokerName 上一次发送时候的broker
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                //对这个queue进行累加一下index
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                //遍历topic里的每个consumeQueue
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    //针对index 进行取模
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    //轮询选择一个messageQueue
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //判断这个broker是否可用
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                //遍历完之后,发现没有可用的broker 至少选择一个broker
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        return new MessageQueue(mq.getTopic(), notBestBroker, tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    } else {
                        return mq;
                    }
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

3.3 底层消息发送

private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        long beginStartTime = System.currentTimeMillis();
        //获取broker地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    sysFlag |= compressType.getCompressionFlag();
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                //发送消息的请求头
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                requestHeader.setBname(mq.getBrokerName());
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC: //同步发送
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        //同步发送消息
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

3.4 调用NettyClient发送消息

 public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        //获取一个当前的时间戳
        long beginStartTime = System.currentTimeMillis();
        //这里获取一个channel 这个channel 就是Broker跟NameServer之间的连接
        final Channel channel = this.getAndCreateChannel(addr);
        //如果连接存在并且连接是活跃的 那么就可以发送请求
        if (channel != null && channel.isActive()) {
            try {
                //执行发送前的一些操作 先不关心
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
                }
                //这里就是真正发送网络请求的地方
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                //执行完成后的一些操作 先不关心
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            //初始化响应的future对象
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            //获取请求的网络连接地址
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            //等待响应 使用countDownLatch进行同步的等待
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            //没有返回结果
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    //请求超时了 发送成功了
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    //请求压根就没有发送成功
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }
            //如果响应成功
            return responseCommand;
        } finally {
            //最终一定会把响应从table中删除
            this.responseTable.remove(opaque);
        }
    }

 3.5 BrokerController接收请求

        客户端发送消息给BrokerController,BrokerController会进行调用SendMessageProcessor进行处理发送过来的消息。

    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                  RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                //解析出来一个发送消息请求的header
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                //构建一个消息的上下文
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                if (requestHeader.isBatch()) {
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }

  处理单个消息的逻辑:asyncSendMessage,调用底层的存储逻辑进行异步增加消息。

this.brokerController.getMessageStore().asyncPutMessage(msgInner);

最终会进行调用 CommitLog#asyncPutMessage的方法,根据决定是调用CommitLog#asyncPutMessage还是调用 DLedgerCommitLog#asyncPutMessage方法。如果是调用 DLedgerCommitLog#asyncPutMessage方法,最终会进行调用DLedgerServer#handleAppend。

保存完消息之后,ReputMessageService线程会调用CommitLogDispatcherBuildConsumeQueue(向ConsumeQueue中添加数据)和CommitLogDispatcherBuildIndex(向索引中添加数据) 这两个分发组件。

四、性能优化与最佳实践

4.1 性能优化建议

  1. 批量发送:对于小消息,使用批量发送提高吞吐量

List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "TagA", "Message 1".getBytes()));
messages.add(new Message("TopicTest", "TagA", "Message 2".getBytes()));
producer.send(messages);

   2.异步发送:对响应时间敏感的场景使用异步模式

producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // 处理成功响应
    }
    
    @Override
    public void onException(Throwable e) {
        // 处理异常
    }
});

3.合理配置重试次数:根据业务需求调整 retryTimesWhenSendFailed

4.启用故障延迟机制:避免向故障 Broker 发送消息

4.2 监控与告警

  1. 发送成功率:监控 SendStatus.SEND_OK 的比例

  2. 响应时间:监控消息发送的平均响应时间

  3. 异常日志:关注 sendDefaultImpl 方法中的异常捕获

  4. Broker 状态:监控 Broker 的可用性和负载情况

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2396859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BiliNote部署实践

​ 开源地址&#xff1a; https://github.com/JefferyHcool/BiliNote &#x1f680; 快速开始 1. 克隆仓库 git clone https://github.com/JefferyHcool/BiliNote.git cd BiliNote mv .env.example .env2. 启动后端&#xff08;FastAPI&#xff09; cd backend pip insta…

bismark OT CTOT OB CTOB 以及mapping后的bam文件中的XG,XR列的含义

首先&#xff0c;OT&#xff0c;OB&#xff0c;CTOT&#xff0c;CTOB都是描述测序reads的&#xff0c;而不是描述参考基因组的。 bisul-fate建库会将DNA双链文库中非甲基化的C转化成U。转化结束后&#xff0c;被转化的U和互补链的G并不配对。此时正链&#xff08;&#xff0c;…

Android Native 之 adbd进程分析

目录 1、adbd守护进程 2、adbd权限降级 3、adbd命令解析 1&#xff09;adb shell 2&#xff09;adb root 3&#xff09;adb reboot 4、案例 1&#xff09;案例之实现不需要执行adb root命令自动具有root权限 2&#xff09;案例之实现不需要RSA认证直接能够使用adb she…

CAN通讯协议中各种参数解析

1.各种参数缩写 2.多帧传输时间参数解析 - Sender&#xff08;左侧&#xff09; 指的是 多帧数据的发送者&#xff0c;也就是&#xff1a; ECU&#xff08;被测系统 / 响应方&#xff09; - Receiver&#xff08;右侧&#xff09; 指的是 多帧数据的接收者&#xff0c;也就是…

网络攻防技术三:网络脆弱性分析

文章目录 一、影响安全的因素二、计算机网络三、网络体系结构脆弱性1、因特网容易被攻击的特性 四、典型网络协议安全性分析&#xff08;重要&#xff09;1、IPv42、RIP&#xff08;UDP)3、ICMP(UDP)4、ARP5、OSPF(IP数据报&#xff09;6、BGP(TCP)7、UDP8、TCP9、DNS(UDP)10、…

(八)登录认证与学生写作画像

本次将赵昱琨同学之前完成的学生写作画像与智能学习路径规划的后端与目前已有的后端框架进行整合。同时为了实现学生写作画像与智能学习路径规划&#xff0c;需要在之前简易的登录系统上进行重构&#xff0c;所以本次大规模重写了登录模块&#xff0c;同时发现很多过去冗余的代…

Netty学习example示例

文章目录 simpleServer端NettyServerNettyServerHandler Client端NettyClientNettyClientHandler tcp&#xff08;粘包和拆包&#xff09;Server端NettyTcpServerNettyTcpServerHandler Client端NettyTcpClientNettyTcpClientHandler protocolcodecCustomMessageDecoderCustomM…

[RoarCTF 2019]Easy Calc

查看源代码 <!--Ive set up WAF to ensure security.--> <script>$(#calc).submit(function(){$.ajax({url:"calc.php?num"encodeURIComponent($("#content").val()),type:GET,success:function(data){$("#result").html(<div …

[Windows]在Win上安装bash和zsh - 一个脚本搞定

目录 前言安装步骤配置要求下载安装脚本启动程序 前言 Windows是一个很流行的系统, 但是在Windows上安装bash和zsh一直是一个让人头疼的问题. 本蛙特意打包了一个程序, 用于一站式解决这一类的问题. 安装步骤 配置要求 系统: Windows软件: Powershell 5.1或以上 下载安装…

从认识AI开始-----解密LSTM:RNN的进化之路

前言 我在上一篇文章中介绍了 RNN&#xff0c;它是一个隐变量模型&#xff0c;主要通过隐藏状态连接时间序列&#xff0c;实现了序列信息的记忆与建模。然而&#xff0c;RNN在实践中面临严重的“梯度消失”与“长期依赖建模困难”问题&#xff1a; 难以捕捉相隔很远的时间步之…

leetcode0513. 找树左下角的值-meidum

1 题目&#xff1a;找树左下角的值 官方标定难度&#xff1a;中 给定一个二叉树的 根节点 root&#xff0c;请找出该二叉树的 最底层 最左边 节点的值。 假设二叉树中至少有一个节点。 示例 1: 输入: root [2,1,3] 输出: 1 示例 2: 输入: [1,2,3,4,null,5,6,null,null,7]…

命令行式本地与服务器互传文件

文章目录 1. 背景2. 传输方式2.1 SCP 协议传输2.2 SFTP 协议传输 3. 注意 命令行式本地与服务器互传文件 1. 背景 多设备协同工作中&#xff0c;因操作系统的不同&#xff0c;我们经常需要将另外一个系统中的文件传输到本地PC进行浏览、编译。多设备文件互传&#xff0c;在嵌入…

LabelImg: 开源图像标注工具指南

LabelImg: 开源图像标注工具指南 1. 简介 LabelImg 是一个图形化的图像标注工具&#xff0c;使用 Python 和 Qt 开发。它是目标检测任务中最常用的标注工具之一&#xff0c;支持 PASCAL VOC 和 YOLO 格式的标注输出。该工具开源、免费&#xff0c;并且跨平台支持 Windows、Lin…

计算机网络 TCP篇常见面试题总结

目录 TCP 的三次握手与四次挥手详解 1. 三次握手&#xff08;Three-Way Handshake&#xff09; 2. 四次挥手&#xff08;Four-Way Handshake&#xff09; TCP 为什么可靠&#xff1f; 1. 序列号与确认应答&#xff08;ACK&#xff09; 2. 超时重传&#xff08;Retransmis…

树欲静而风不止,子欲养而亲不待

2025年6月2日&#xff0c;13~26℃&#xff0c;一般 待办&#xff1a; 物理2 、物理 学生重修 职称材料的最后检查 教学技能大赛PPT 遇见&#xff1a;使用通义创作了一副照片&#xff0c;很好看&#xff01;都有想用来创作自己的头像了&#xff01; 提示词如下&#xff1a; A b…

Kotlin中的::操作符详解

Kotlin提供了::操作符&#xff0c;用于创建对类或对象的成员(函数、属性)的引用。这种机制叫做成员引用(Member Reference)。这是Kotlin高阶函数和函数式编程的重要组成部分。 简化函数传递 在Java中&#xff0c;我们这样传方法&#xff1a; list.forEach(item -> System.…

深入详解编译与链接:翻译环境和运行环境,翻译环境:预编译+编译+汇编+链接,运行环境

目录 一、翻译环境和运行环境 二、翻译环境&#xff1a;预编译编译汇编链接 &#xff08;一&#xff09;预处理&#xff08;预编译&#xff09; &#xff08;二&#xff09;编译 1、词法分析 2、语法分析 3、语义分析 &#xff08;三&#xff09;汇编 &#xff08;四&…

定时任务:springboot集成xxl-job-core(二)

定时任务实现方式&#xff1a; 存在的问题&#xff1a; xxl-job的原理&#xff1a; 可以根据服务器的个数进行动态分片&#xff0c;每台服务器分到的处理数据是不一样的。 1. 多台机器动态注册 多台机器同时配置了调度器xxl-job-admin之后&#xff0c;执行器那里会有多个注…

DeviceNET转EtherCAT网关:医院药房自动化的智能升级神经中枢

在现代医院药房自动化系统中&#xff0c;高效、精准、可靠的设备通信是保障患者用药安全与效率的核心。当面临既有支持DeviceNET协议的传感器、执行器&#xff08;如药盒状态传感器、机械臂限位开关&#xff09;需接入先进EtherCAT高速实时网络时&#xff0c;JH-DVN-ECT疆鸿智能…

一:UML类图

一、类的设计 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 学习设计模式的第一步是看懂UML类图,类图能直观的表达类、对象之间的关系,这将有助于后续对代码的编写。 类图在软件设计及应用框架前期设计中是不可缺少的一部分,它的主要成分包括:类名、…