先看有哪些send方法

首先说红圈的
有3个红圈。归类成3种发送方式。假设前提条件,发送的topic,有3个broker,每个broker总共4个write队列,总共有12个队列。
- 普通发送。负载均衡12个队列。指定超时时间
- 指定MessageQueue,发送,指定超时时间
- 指定selector器,指定特定参数,指定超时时间。一般用于局部有序,比如相同userId的,到同一个队列
默认超时时间时3秒
再说蓝圈
- sendDefaultImpl 负载均衡的方式,选择队列。然后调sendKernelImpl
- sendSelectImpl 指定队列selector和arg的方式,选择队列。然后调sendKernelImpl
- sendKernelImpl 最核心的方式。这里已经明确队列,做真实的消息发送
很明显,只需要简单解读sendDefaultImpl和sendSelectImpl如何选择队列。然后重点在于查看sendKernelImpl方法实现
sendDefaultImpl选择队列分析
先看源码
private SendResult sendDefaultImpl(
      Message msg,
      final CommunicationMode communicationMode,
      final SendCallback sendCallback,
      final long timeout
  ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      this.makeSureStateOK();
      Validators.checkMessage(msg, this.defaultMQProducer);
      final long invokeID = random.nextLong();
      long beginTimestampFirst = System.currentTimeMillis();
      long beginTimestampPrev = beginTimestampFirst;
      long endTimestamp = beginTimestampFirst;
      TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
      if (topicPublishInfo != null && topicPublishInfo.ok()) {
          boolean callTimeout = false;
          MessageQueue mq = null;
          Exception exception = null;
          SendResult sendResult = null;
          int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
          int times = 0;
          String[] brokersSent = new String[timesTotal];
          for (; times < timesTotal; times++) {
              String lastBrokerName = null == mq ? null : mq.getBrokerName();
              MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
              if (mqSelected != null) {
                  mq = mqSelected;
                  brokersSent[times] = mq.getBrokerName();
                  try {
                      beginTimestampPrev = System.currentTimeMillis();
                      if (times > 0) {
                          //Reset topic with namespace during resend.
                          msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                      }
                      long costTime = beginTimestampPrev - beginTimestampFirst;
                      if (timeout < costTime) {
                          callTimeout = true;
                          break;
                      }
                      sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                      endTimestamp = System.currentTimeMillis();
                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                      switch (communicationMode) {
                          case ASYNC:
                              return null;
                          case ONEWAY:
                              return null;
                          case SYNC:
                              if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                  if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                      continue;
                                  }
                              }
                              return sendResult;
                          default:
                              break;
                      }
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
第一步,通过topic查找路由信息tryToFindTopicPublishInfo
 先从内存中获取。内存是DefaultMQProducerImpl#topicPublishInfoTable
 如果内存没有,则从nameserver获取
 org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String)
内存是什么时候添加的呢?是有定时器任务更新的。详情看我写的文章rocketmq-push模式-消费侧重平衡-类流程图分析
第二步、设定默认重试3次(包含首次),选择topic的其中一个队列
 org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}
可以发现,topic对应的TopicPublishInfo,维护者一个ThreadLocalIndex对象。
 每个线程先会获取一个index,然后对index取模,得到某一个队列。
 这意味着,sendDefaultImpl中,队列的负载均衡是线程独立的。每个线程维护着自己的index,每发送一次,index+1。
public int incrementAndGet() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            this.threadLocalIndex.set(index);
        }
        this.threadLocalIndex.set(++index);
        return Math.abs(index & POSITIVE_MASK);
    }
第三步、选择完MessageQueue后,调用sendKernelImpl发送消息
sendSelectImpl选择队列分析
先看源码
private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            try {
                List<MessageQueue> messageQueueList =
                    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
                Message userMessage = MessageAccessor.cloneMessage(msg);
                String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
                userMessage.setTopic(userTopic);
                mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
            } catch (Throwable e) {
                throw new MQClientException("select message queue threw exception.", e);
            }
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeout < costTime) {
                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
            }
            if (mq != null) {
                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
            } else {
                throw new MQClientException("select message queue return null.", null);
            }
        }
        validateNameServerSetting();
        throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    }
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl
第一步,通过topic查找路由信息tryToFindTopicPublishInfo。分析同上
 第二步,通过MessageQueueSelector,找出发送的MessageQueue
 MessageQueueSelector的实现方式,可以自定义。提供了2种
 SelectMessageQueueByRandom 随机一个
 SelectMessageQueueByHash 根据arg的hashcode取模一个。适合局部有序
public class SelectMessageQueueByHash implements MessageQueueSelector {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode() % mqs.size();
        if (value < 0) {
            value = Math.abs(value);
        }
        return mqs.get(value);
    }
}
第三步、选择完MessageQueue后,调用sendKernelImpl发送消息
sendKernelImpl发送分析
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
 第一步、通过MessageQueue,获取对应的master节点地址
 第二步、设置消息的唯一id。详情看以下实现。明显是客户端生成的,(由于不是分布式唯一ID的创建方式,有点怀疑会重复。后续查看)
 org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID
 第三步、对消息body做消息压缩
 第四步、判断该消息是否是事务消息。给sysFlag位标志变量加标志
 第五步、发送前可做一些自定义的检查CheckForbiddenHook、SendMessageHook
 第六步、构建SendMessageRequestHeader requestHeader,将msg的一些内容设置到header上
 第七部、根据发送模式communicationMode,调用不同的sendMessage方法
 org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
switch (communicationMode) {
    case ASYNC:
        Message tmpMessage = msg;
        boolean messageCloned = false;
        if (msgBodyCompressed) {
            //If msg body was compressed, msgbody should be reset using prevBody.
            //Clone new message using commpressed message body and recover origin massage.
            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
            tmpMessage = MessageAccessor.cloneMessage(msg);
            messageCloned = true;
            msg.setBody(prevBody);
        }
        if (topicWithNamespace) {
            if (!messageCloned) {
                tmpMessage = MessageAccessor.cloneMessage(msg);
                messageCloned = true;
            }
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeAsync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            tmpMessage,
            requestHeader,
            timeout - costTimeAsync,
            communicationMode,
            sendCallback,
            topicPublishInfo,
            this.mQClientFactory,
            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
            context,
            this);
        break;
    case ONEWAY:
    case SYNC:
        long costTimeSync = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTimeSync) {
            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
        }
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
            brokerAddr,
            mq.getBrokerName(),
            msg,
            requestHeader,
            timeout - costTimeSync,
            communicationMode,
            context,
            this);
        break;
    default:
        assert false;
        break;
}
第八步、最终会调用NettyRemotingClient的发送方法
 SYNC:
 org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync
 ONEWAY:
 org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway
 ASYNC:
 org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync
总结
product的发送有几种API模式,其实目的都是为了选择MessageQueue
- 默认的发送,是根据topic的队列,做负载均衡的方式,topicPublishInfo内部维护着ThreadLocalIndex对象,做线程级别的负载均衡。而且默认都3次重试机会,意味可以选择不同队列做发送;
- 指定messageQueue,是调用方明确知道发送的MessageQueue,这种失败不会做重试;
- 指定MessageQueueSelector等,这种是通过传入的参数,计算出对应的MessageQueue,这种失败不会做重试,适合作为局部有序的发送方式
选择好队列后,就会调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl方法,主要是构建SendMessageRequestHeader,执行自定义的发送before和after的处理。
 sendKernelImpl最终会调用NettyRemotingClient提供的接口,分别处理SYNC、ONEWAY、ASYNC的三种模式

![77,【1】.[CISCN2019 华东南赛区]Web4](https://i-blog.csdnimg.cn/direct/39879d558c374ab8bb4f4ba7a256805e.png)








![80,【4】BUUCTF WEB [SUCTF 2018]MultiSQL](https://i-blog.csdnimg.cn/direct/0d30aa60303a4b3d91190a5918e807cf.png)








