Producer启动
Producer分为两种:
普通消息生产者:DefaultMQProduce;这个只需要构建一个Netty客户端,
往Broker发送消息就行了。注意,异步回调只是在Producer接收到Broker的响
应后自行调整流程,不需要提供Netty服务
事物消息生产者:TransactionMQProducer。这个需要构建一个Netty客户端,往Broker发送消息。同时也要构建Netty服务端,供Broker回查本地事务状态
前面使用RocketMQ的原生API,创建好生产者对象,指定NameServer服务,然后调用start方法启动。
首先从普通消息生产者和看起
普通消息生产者
DefaultMQProducer启动
org.apache.rocketmq.client.producer.DefaultMQProducer#start
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
复制代码
真正启动的start方法
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service 客户端负载均衡服务
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
// 将服务运行状态改为:正在运行
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
复制代码
并将运行状态改为Running。
在看负载均衡实现的时候,发现根据start的负载均衡只有Consumer的,并没有找Producer的负载实现。这是因为在启动建立连接的时候,只有consumer会被分配MessageQueue,Producer并不会被分配,只有在发送的时候才会被分配。
所以Producer的负载均衡在发送消息时才会建立MessageQueue的连接。所以需要关注Producer发送消息的方法
Producer负载均衡实现
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
...
// 生产者获取topic的公开信息,重点观察一下怎么选择MessageQueue的
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
...
// ToDo: k2->这里实现负载均衡,选择将消息发送到哪一个MessageQueue中
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
...
复制代码
路由流程如下:
MessageQueue的选择方法实现:
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
选择MessageQueue通过index自增的方式,对MessageQueue的大小取模:
...
// ToDo:k2->可以看到Producer选择的MessageQueue就是自增
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// Broker轮训,尽量将请求分配给每个Broker
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 主要功能是记录可能存在问题的Broker,下次请求尽量不发送到这个Broker上
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
...
复制代码
Producer发送消息
...
// 消息发送实体
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
...
复制代码
通过选择的mq查找对应的Broker然后发送消息:
...
// 消息发送实体
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
...
复制代码
所以整个过程是先通过Topic找到MessageQueue然后通过MessageQueue找到Broker,向Broker发送请求。
事物消息生产者
启动流程一样,最终还是由org.apache.rocketmq.client.producer.DefaultMQProducer#start启动
@Override
public void start() throws MQClientException {
// 初始化事物环境
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
复制代码
再来看一下事物消息的发送。
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
// 获取实现的事物监听器,这也是为什么必须实现事物监听的原因
TransactionListener transactionListener = getCheckListener();
复制代码
Message消息体验证:
// 消息体验证
Validators.checkMessage(msg, this.defaultMQProducer);
复制代码
消息发送:
// 消息发送
sendResult = this.send(msg);
复制代码
最终消息的发送调用还是与普通生产者的一样。org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
所以事物的负载均衡与普通消息的也是一样的。
事物对发送消息结果的处理
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
// 调用我们自己实现的事物监听器对消息处理后返回的状态
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
...
try {
// 最终根据sendResult、localTransactionState的信息处理事物消息,
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
复制代码
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
根据sendResult、localTransactionState的状态封装Netty的请求头,然后发送给Netty。后面就是Netty处理的一套流程,我没有再深入看下去。
ToDo: 有点疑问就是什么Producer时候将消息放入死信队列,怎么根据消息的状态进行消息重试?
没有定位到相关的源代码……
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
复制代码




近期评论