RocketMQ源码学习(三)-Producer业务Pro

Producer启动

Producer分为两种:

普通消息生产者:DefaultMQProduce;这个只需要构建一个Netty客户端,
往Broker发送消息就行了。注意,异步回调只是在Producer接收到Broker的响
应后自行调整流程,不需要提供Netty服务

事物消息生产者:TransactionMQProducer。这个需要构建一个Netty客户端,往Broker发送消息。同时也要构建Netty服务端,供Broker回查本地事务状态

前面使用RocketMQ的原生API,创建好生产者对象,指定NameServer服务,然后调用start方法启动。

首先从普通消息生产者和看起

普通消息生产者

DefaultMQProducer启动

org.apache.rocketmq.client.producer.DefaultMQProducer#start

@Override
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    this.defaultMQProducerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}
复制代码

真正启动的start方法

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service 客户端负载均衡服务
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                // 将服务运行状态改为:正在运行
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}
复制代码

并将运行状态改为Running。

在看负载均衡实现的时候,发现根据start的负载均衡只有Consumer的,并没有找Producer的负载实现。这是因为在启动建立连接的时候,只有consumer会被分配MessageQueue,Producer并不会被分配,只有在发送的时候才会被分配。

所以Producer的负载均衡在发送消息时才会建立MessageQueue的连接。所以需要关注Producer发送消息的方法

Producer负载均衡实现

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

...
// 生产者获取topic的公开信息,重点观察一下怎么选择MessageQueue的
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

...
// ToDo: k2->这里实现负载均衡,选择将消息发送到哪一个MessageQueue中
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
...
复制代码

路由流程如下:

image.png

MessageQueue的选择方法实现:

org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

选择MessageQueue通过index自增的方式,对MessageQueue的大小取模:

...
// ToDo:k2->可以看到Producer选择的MessageQueue就是自增
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    if (pos < 0)
        pos = 0;
    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    // Broker轮训,尽量将请求分配给每个Broker
    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
            return mq;
    }
}

// 主要功能是记录可能存在问题的Broker,下次请求尽量不发送到这个Broker上
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
    final MessageQueue mq = tpInfo.selectOneMessageQueue();
    if (notBestBroker != null) {
        mq.setBrokerName(notBestBroker);
        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
    }
    return mq;
} else {
    latencyFaultTolerance.remove(notBestBroker);
}
...
复制代码

Producer发送消息

...
// 消息发送实体
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
...
复制代码

通过选择的mq查找对应的Broker然后发送消息:

...
// 消息发送实体
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
...
复制代码

所以整个过程是先通过Topic找到MessageQueue然后通过MessageQueue找到Broker,向Broker发送请求。

事物消息生产者

启动流程一样,最终还是由org.apache.rocketmq.client.producer.DefaultMQProducer#start启动

@Override
public void start() throws MQClientException {
    // 初始化事物环境
    this.defaultMQProducerImpl.initTransactionEnv();
    super.start();
}
复制代码

再来看一下事物消息的发送。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction

// 获取实现的事物监听器,这也是为什么必须实现事物监听的原因
TransactionListener transactionListener = getCheckListener();
复制代码

Message消息体验证:

// 消息体验证
Validators.checkMessage(msg, this.defaultMQProducer);
复制代码

消息发送:

// 消息发送
sendResult = this.send(msg);
复制代码

最终消息的发送调用还是与普通生产者的一样。org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

所以事物的负载均衡与普通消息的也是一样的。

事物对发送消息结果的处理

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction

switch (sendResult.getSendStatus()) {
    case SEND_OK: {
        try {
            if (sendResult.getTransactionId() != null) {
                msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
            }
            String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                msg.setTransactionId(transactionId);
            }
            if (null != localTransactionExecuter) {
                localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
            } else if (transactionListener != null) {
                log.debug("Used new transaction API");
                // 调用我们自己实现的事物监听器对消息处理后返回的状态
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            }
            if (null == localTransactionState) {
                localTransactionState = LocalTransactionState.UNKNOW;
            }

            if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                log.info("executeLocalTransactionBranch return {}", localTransactionState);
                log.info(msg.toString());
            }
        } catch (Throwable e) {
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    
    ...
try {
    // 最终根据sendResult、localTransactionState的信息处理事物消息,
    this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
    log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
复制代码

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction

根据sendResult、localTransactionState的状态封装Netty的请求头,然后发送给Netty。后面就是Netty处理的一套流程,我没有再深入看下去。

ToDo: 有点疑问就是什么Producer时候将消息放入死信队列,怎么根据消息的状态进行消息重试?
没有定位到相关的源代码……

this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
    this.defaultMQProducer.getSendMsgTimeout());
复制代码