【一起学RocketMq】消息发送源码分析

前言

在服务发现中,我们知道Broker在启动时会开启定时任务向每个NameServer注册自己的路由信息,而Producer启动时也会根据Topic向NameServer拉取更新Topic对应的路由信息到本地,这个路由信息从上篇文章我们知道就是TopicRouteData,里面包含了BrokerDatas和QueueDatas。当Producer要发送某一消息时,很显然就会先从本地获取Topic对应的路由,那如果本地找不到会怎样呢?如果找到了TopicRouteData,一个topic对应多个QueueData,每个Queue记录了自己所属的Broker,那么Topic是怎么选出对应的路由信息,又是采用什么样的负载均衡的策略选出某个Queue的呢?本文和你一探究竟。

探索消息发送

我们使用RocketMq发送消息的代码如下:

DefaultMQProducer producer = new DefaultMQProducer(topic);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message(topic,"testTag","testKey","testMessage".getBytes());
SendResult result = producer.send(msg);
复制代码

发送消息就是从DefaultMQProducer.send()开始的
1、看到先是校验了消息的合法性,还是使用的defaultMQProducerImpl的send()

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        Validators.checkMessage(msg, this);
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg);
    }
复制代码

2、真正的消息发送就在DefaultMqProducerImpl.sendDefaultImpl()方法

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
    ...前面都是一些检查
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
        ...后面是一些异常处理
    }
复制代码

3、我们看到首先调用的tryToFindTopicPublishInfo(topic),根据消息的topic获取topic的路由信息。看看这个方法里干了啥

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
复制代码
  • 这个方法先从topicPublishInfoTable里拿TopicPublishInfo,如果没有的话调用mQClientFactory.updateTopicRouteInfoFromNameServer(topic)从远程nameServer获取topic的路由信息,这在服务发现一文中已经分析过了。然后再把远程拿到的放进topicPublishInfoTable中。
  • 精髓来了,不是说nameServer一定有topic路由信息,如果你没有配置topic,而且消息也是第一次发送的话,nameServer也没有该topic的路由。这里就执行的是this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);再看看这个熟悉的方法updateTopicRouteInfoFromNameServer(topic, isDefault, defaultMQProducer)

因此此时isDefault=true,而且defaultMQProducer!=null,所以就走到下面这个分支,调用getDefaultTopicRouteInfoFromNameServer获取默认topic的路由信息。

if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    }
复制代码

这个createTopicKey我们看到是指定的一个Topic,叫做“TBW102”,它会在broker允许自动创建topic的时候自动创建。

image.png

image.png

到这里我们就明白了,根据topic获取路由信息。先从本地获取,本地获取到了就返回。否则再走一遍服务发现的逻辑,从远程NameServer获取,获取到了就返回。如果NameServer也获取不到,那就使用默认的Topic(TBW102)的路由信息作为暂时路由,并插入到本地路由表,当TopicX的消息到达Broker之后,Broker发现没有该Topic的路由,就会自动创建,然后同步到NameServer。

4、下面就是选择一个Queue进行发送消息了。在服务发现中我们知道NameServer返回的是TopicRouteData,里面由queueDatas和brokerDatas。queueData中包含了Topic对应的所有Queue信息,结构如下:

public class QueueData implements Comparable<QueueData> {
    private String brokerName; //Queue所属的Broker
    private int readQueueNums; //该Broker上,对于该Topic配置的读队列个数
    private int writeQueueNums; //该Broker上,对于该Topic配置的写队列个数
    private int perm;
    private int topicSynFlag;
 }
复制代码

对于RocketMq,Queue是比较抽象的一个概念,并不是说某个具体的队列。Topic、QueuData以及Broker是1:1:1的,QueueData本质上是记录了某个Topic在某个Broker上的所有路由信息。

在前面一步中,当生产者从NameServer获取到Topic对应的TopicRouteData时,会将其转换成TopicPublishInfo,存放在本地路由表中。

{
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
复制代码

在topicRouteData2TopicPublishInfo内部,会遍历TopicRouteData的QueueData,按照配置的读写队列的个数,生成MessageQueue,存放在TopicPublishInfo中。

List<QueueData> qds = route.getQueueDatas();
            Collections.sort(qds);
            for (QueueData qd : qds) {
                if (PermName.isWriteable(qd.getPerm())) {
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }

                    if (null == brokerData) {
                        continue;
                    }

                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }

                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }

复制代码

5、再回到发送消息的地方。只要times < timesTotal不超过重试次数,以及timeout < costTime,在超时时间范围内,都可以进行重试发送。

6、this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);就是负载均衡选择一个MessageQueue进行发送。选择Queue的逻辑就在TopicPublishInfo的selectOneMessageQueue(lastBrokerName)方法内

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
复制代码
public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
复制代码

可以看出,负载均衡的策略是计数器的取模算法。在lastBrokerName为空时,对计数器自增,计数器的值对messaeQueue列表个数取模,获得对应下标的MessageQueue。在lastBrokerName不为空时,先对计数器自增,然后遍历messaeQueue列表,计数器的值对messageQueue列表个数取模获得对应的下标,如果对应下标的MessageQueue还是lastBroker,就再次循环,否则返回对应下标的Broker。

7、接下来就是消息发送的核心流程,在DefaultMqProducerImpl.sendKernelImpl()方法里
首先要获取Queue对应的Broker地址

String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

复制代码

拿到Broker地址后,要将消息内容及其他消息封装进请求头

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                ...
复制代码

接着调用MQClientAPIImpl的sendMessage()方法

sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
复制代码

在sendMessage()里面就是封装请求,调用封装的Netty进行网络传输了。
首先是封装请求RemotingCommand,设置消息内容。

RemotingCommand request = null;
        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
        if (isReply) {
            if (sendSmartMsg) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
            }
        } else {
            if (sendSmartMsg || msg instanceof MessageBatch) {
                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
            } else {
                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
            }
        }
        request.setBody(msg.getBody());
复制代码

然后根据发送的方式是单向、异步还是同步分别进行调用。

switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }
复制代码

可以看到单向是invokeOneway之后就不管返回null了,异步是sendMessageAsync()传入了sendCallback这个回调函数。同步就是sendMessageSync一直等待结果并返回这个结果。至此,消息的发送就结束了。

我们回顾下消息发送的整个流程

image.png

结语

我们看到消息发送的几个关键词DefaultMqProducerImpl,topicPublishInfoTable,updateTopicPublicInfoFromNameServer,默认Topic(TBW102),TopicPublishInfo.selectOneMessageQueue以及MQClientAPIImpl.sendMessage()。本文读完后,你可以解答以下问题了

  • 第一步获取topic对应路由的过程是怎么的?如果从Nameserver也没有获取到怎么办?
  • 当获取到TopicPublicInfo后拿到多个MessageQueue,怎么负载均衡选择一个MessageQueue进行发送的。
  • 发送消息大致流程是怎样的?最后网络调用有哪三种方式?