消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。

启动流程

代码:DefaultMQProducerImpl#start
public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
                this.checkConfig();
        // ...
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
        // ...
        //注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
        boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
        // ...
        // 启动生产者
        mQClientFactory.start();
        // ...
        
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        RequestFutureHolder.getInstance().startScheduledTask(this);
    }
}整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
同一个clientId只会创建一个MQClientInstance。
MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
消息发送
 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
- 验证消息 // 消息验证 Validators.checkMessage(msg, this.defaultMQProducer); public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
- 查找路由 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } } public class TopicPublishInfo { private boolean orderTopic = false; //是否是顺序消息 private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); //该主题消息队列 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();//每选择一次消息队列,该值+1 private TopicRouteData topicRouteData;//关联Topic路由元信息 }代码:MQClientInstance#updateTopicRouteInfoFromNameServer TopicRouteData topicRouteData; //使用默认主题从NameServer获取路由信息 if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(clientConfig.getMqClientApiTimeout()); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { //使用指定主题从NameServer获取路由信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); }if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteData.topicRouteDataChanged(old); //判断路由是否需要更改 if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); }if (changed) { // Update endpoint map { ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData); if (!mqEndPoints.isEmpty()) { topicEndPointsTable.put(topic, mqEndPoints); } } // Update Pub info { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info if (!consumerTable.isEmpty()) { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } }
选择队列
- 默认不启用Broker故障延迟机制
代码:org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //第一次选择队列
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
        //遍历消息队列集合
            for (int i = 0; i < this.messageQueueList.size(); i++) {
            //sendWhichQueue自增后取模
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = index % this.messageQueueList.size();
                MessageQueue mq = this.messageQueueList.get(pos);
            //规避上次Broker队列
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
        //如果以上情况都不满足,返回sendWhichQueue取模后的队列
            return selectOneMessageQueue();
        }
    }- 启用Broker故障延迟机制
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //Broker故障延迟机制
    if (this.sendLatencyFaultEnable) {
        try {
            //对sendWhichQueue自增
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            //对消息队列轮询获取一个队列
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //验证该队列是否可用
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    //可用
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }
			//从规避的Broker中选择一个可用的Broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //获得Broker的写队列集合
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                //获得一个队列,指定broker和队列ID并返回
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}原理分析
代码:DefaultMQProducerImpl#sendDefaultImpl
endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        //计算broker规避的时长
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        //更新该FaultItem规避时长
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    //获得原FaultItem
        FaultItem old = this.faultItemTable.get(name);
    //为空新建faultItem对象,设置规避时长和开始时间
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
        //更新规避时长和开始时间
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }4.发送消息
消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(
    final Message msg,	//待发送消息
    final MessageQueue mq,	//消息发送队列
    final CommunicationMode communicationMode,		//消息发送内模式
    final SendCallback sendCallback,	pp	//异步消息回调函数
    final TopicPublishInfo topicPublishInfo,	//主题路由信息
    final long timeout	//超时时间
    )代码:DefaultMQProducerImpl#sendKernelImpl
        long beginStartTime = System.currentTimeMillis();
        String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
//获得broker网络地址信息
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    //没有找到从NameServer更新broker网络地址信息
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
        }//为消息分类唯一ID
if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
    topicWithNamespace = true;
}
//消息大小超过4K,启用消息压缩
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    msgBodyCompressed = true;
}
//如果是事务消息,设置消息标记MessageSysFlag.TRANSACTION_PREPARED_TYPE
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}//如果注册了消息发送钩子函数,在执行消息发送前的增强逻辑
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }
    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
}代码:SendMessageHook
public interface SendMessageHook {
    String hookName();
    void sendMessageBefore(final SendMessageContext context);
    void sendMessageAfter(final SendMessageContext context);
}代码:DefaultMQProducerImpl#sendKernelImpl
//构建消息发送请求包
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//主题
requestHeader.setTopic(msg.getTopic());
//默认创建主题Key
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//该主题在单个Broker默认队列树
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//队列ID
requestHeader.setQueueId(mq.getQueueId());
//消息系统标记
requestHeader.setSysFlag(sysFlag);
//消息发送时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
//消息标记
requestHeader.setFlag(msg.getFlag());
//消息扩展信息
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//消息重试次数
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
//是否是批量消息等
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}//如果注册了钩子函数,则发送完毕后执行钩子函数
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}2.3.4 批量消息发送

2.4.7 刷盘机制
 



















