场景1:保证消息不丢失
首先分析一下在那种情况下消息会出现丢失的情况,看一下RocketMQ主从服务的架构图:
消息一旦经网络流转就意味着消息的不稳定,因为网络自身就有存在不稳定。所以消息在流转的时候就可能丢失。
所以图中 1、2、3、4中都存在着消息丢失的问题。其次就是mq服务挂了。1、3、4是因为存在着跨网络,2则是因为主从同步存在时间差,写入缓存的消息可能会因为服务 done 了导致消息丢失。
阶段1:生产者端消息不丢失
现象:Producer 发送消息, MQServer没有收到。
解决方案:生产者使用事务消息机制保证消息零丢失。
以订单的场景分析一下事务消息:如图
订单系统写入数据库异常(Producer端)
通俗地讲就是在Producer端即生产者端出现异常导致消息无法发送到MQ服务端。此时如果我们不使用事务消息机制,那 RD(开发人员) 只能在下单时抛出异常,不往MQ发送消息。但是这样的话就导致当数据库恢复,这个消息无法再次发送,从而导致消息丢失。当然我们可以做补偿机制,将消息放入缓存 redis、本地等,然后启动另一个线程定时重试。
不选择这样的补偿方案是因为:
-
这样的补偿机制需要 RD 去开发,而且需要考虑消息流转的一些异常情况;
-
RocketMQ本身就已经具备对于这种异常情况的处理,所以不建议二次开发;
MQ本身提供的事务消息机制就是一种更优雅地方案。如果下单时写入数据库失败(可能数据库崩溃)。那么我们可以将数据暂时放入redis中缓存起来或者其他方式存储,然后给RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态,然后在RocketMQ回查事务状态时,再将数据写入数据库,这样就避免了数据库崩溃导致数据丢失的问题。
half消息发送失败(Producer端)
订单系统首先发送一个Half消息到MQ服务,MQ收到half消息后回复订单系统,但是这个消息对于下游消费者服务并不可见。
所以half的作用类似于一个嗅探的作用,试探一下MQ服务是否正常,如果MQ正常,就开始发送重要的消息。
首先分析一下,如果没有half,此时订单系统需要在自身的系统中完成下单后,发送消息到MQServer,如果写入MQ消息失败,那么就会导致整个下单链路中断。而引入half机制,如果写入失败,订单系统收不到来自MQ的回应,此时就认为MQ是有问题的,然后进行下一次重试,而这个half消息对下游服务并不可见。此时就需要 RD 对这种情况进行处理,从而保证消息来源的准确性。 所以可以在下单时给订单一个状态标记,然后等待MQ服务正常后再进行补偿操作,等待MQ服务正常后重新下单通知下游服务。
half消息发送成功到RocketMQ时,RocketMQ挂掉(MQServer)
这里需要注意的是,在事务消息的处理机制中,未知状态的状态事务回查是由RocketMQ Broker主动发起的,所以当half消息成功发送到RocketMQ后,此时RocketMQ done 掉以后,就无法回调到事务消息中回查事务状态的服务。此时需要将订单状态保持为最初的状态“新下单”。当RocketMQ恢复后,只要存储的消息没丢失(防止消息丢失可以进行消息存储处理),下面会介绍这种情况怎么处理,RocketMQ就会在继续状态回查的流程。
下单支付成功后如何优雅的等待支付成功
在订单的场景下,通常要求下单完成后,客户在一定时间内,比如10分钟、或者15分钟内完成订单支付,支付完成后,才会通知下游服务进行进一步的营销补偿。
分析一下如果不用事务消息,那通常会怎么办?
方案一:
比较简单的是直接启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单时间,将超过时间的订单回收。但是这种方案存在的问题是,需要定时扫描很庞大的一个订单信息,对于系统是一个很大的考验。
方案二:
使用RocketMQ的延迟消息机制。向MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的状态,判断订单是否已经支付,如果支付,就向下游发送通知。如果没有支付,就再次发送一个支付消息。在发送第十次消息时将消息回收。这种方案就不必对全部的订单表进行扫描,只需要针对每个单独的订单消息进行处理。
方案三:
如果使用事务消息,就可以使用事务消息的状态回查机制来代替定任务。在下单时给Broker返回一个UNKNOWN的未知状态,在状态回查的方法中去查询订单的支付状态。只需要配置RocketMQ中的事务消息回查次数和事务回查时间间隔,可以更方便地解决支付状态检查的需求。
小结
事务消息机制的作用:
在订单这个场景中,消息不丢失的问题实际是转换成了下单业务与下游服务的业务的分布式事务一致性问题。但是RocketMQ的事务消息机制实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事务一致性,对于下游服务的事务并没有保证。但是即便如此,也是分布式事务很好的一个降级方案。
阶段2:MQ服务端保证消息不丢失
方案一: 同步刷盘机制
之前的文章也分析过,配置RocketMQ刷盘方式:flushDiskType配置为同步刷盘,将消息实时存储,保证消息在刷盘过程中不会丢失。
方案二:Dledger的文件同步
在使用Dledger技术搭建RocketMQ集群,Dledger会通过两阶段提交的方式保证文件在主从之间同步。
两阶段提交:数据同步分为两个阶段:uncommitted阶段、一个是committed阶段。
Leader Broker 上的Dledger收到一条数据后,会将消息标记为unncommitted状态,然后通过自己的DledgerServer组件把这个uncommitted数据发送给Follower Broker的DledgerServer组件。
接着Follower Broker的DledgerServer收到uncommitted消息后,会返回一个ack给Leader Broker的Dledger。如果Leader Broker收到超过半数的Follower返回的ACK,那么就将该条消息的状态从uncommitted状态变为committed状态。
然后Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,将该条消息标记为committed状态。这样基于Raft协议完成了两阶段的数据同步。
阶段3:消费者端保证消息不丢失
正常情况下,消费者端都是需要先处理本地事务,然后在给MQ一个ACK响应。然后MQ会将消息标记为已消费,从而不再往其他消费者推送消息。所以Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是也会有下面这种情况下会造成服务端消息丢失——异步消费机制。代码如下:
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
new Thread(){
public void run(){
//处理业务逻辑
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
}
};
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
复制代码
这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能性。
阶段4:NameServer挂了如何保证消息不丢失
NameServer在RocketMQ中,扮演着路由中心的角色。集群中任意多的节点挂掉,都不会影响他提供的路由功能。那么当集群中所有的NameServer节点都挂了呢?
图灵课程中的PDF说:
但是我搭建了Dledger集群,
NameServer服务:三个节点:
192.168.40.128:9876;
192.168.40.129:9876;
192.168.40.130:9876
Broker节点:
基于Spring Boot搭建的项目:
yml文件配置:
rocketmq:
producer:
group: springBootGroup
name-server: 192.168.40.128:9876;192.168.40.129:9876;192.168.40.130:9876
server:
port: 1100
复制代码
启动类:
@SpringBootApplication(scanBasePackages = {"com.anzhi.rocketmq.*"})
public class RocketMQScApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQScApplication.class, args);
}
}
复制代码
controller层:
@RestController
@RequestMapping("/MQTest")
public class MQTestController {
@Resource
private SpringProducer springProducer;
@GetMapping("/sendMessage")
public String sendMessage(@RequestParam("message") String message){
springProducer.sendMessage("TestTopic", message);
return "消息发送完成";
}
@GetMapping("/sendTransMessage")
public String sendTransMessage(@RequestParam("messgae") String message){
try {
springProducer.sendMessageInTransaction("TestTopic", message);
return "事物消息发送完成";
} catch (InterruptedException e) {
e.printStackTrace();
}
return "事物消息发送失败";
}
}
复制代码
生产者:
@Component
public class SpringProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// 消息发送方法
public void sendMessage(String topic, String msg){
this.rocketMQTemplate.convertAndSend(topic, msg);
}
// 事物消息发送
public void sendMessageInTransaction(String topic, String msg) throws InterruptedException{
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for(int i=0; i<10; i++){
Message<String> message = MessageBuilder.withPayload(msg).build();
String destination = topic + ":" + tags[i % tags.length];
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message, destination);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
}
}
复制代码
消费者
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Recived message: " + message);
}
}
复制代码
监听器
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
@Slf4j
public class MyTransactionImpl implements RocketMQLocalTransactionListener {
private ConcurrentHashMap<Object, String> localTrans = new ConcurrentHashMap<>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("executeLocalTransaction开始执行");
Object id = message.getHeaders().get("id");
String destination = o.toString();
localTrans.put(id, destination);
org.apache.rocketmq.common.message.Message msg = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(), "UTF-8", destination, message);
String tags = msg.getTags();
if(StringUtils.contains(tags, "TagA")){
return RocketMQLocalTransactionState.COMMIT;
}else if(StringUtils.contains(tags, "TagB")){
return RocketMQLocalTransactionState.ROLLBACK;
}else{
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
//SpringBoot的消息对象中,并没有transactionId这个属性。跟原生API不一样。
//String destination = localTrans.get(msg.getTransactionId());
log.info("checkLocalTransaction开始执行");
return RocketMQLocalTransactionState.COMMIT;
}
}
复制代码
启动程序运行,然后关闭NameServer服务,仍然可以发送消息。Consumer和Producer可以正常工作。当重启的时候是无法正常工作的。因为需要重新从NameServer拉取路由信息。
但是RocketMQ会打印连接关闭,可连接地址是一个空的数组。打印信息如下:
2021-11-16 19:31:21.804 INFO 8324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2021-11-16 19:31:22.654 INFO 8324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2021-11-16 19:31:23.820 INFO 8324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2021-11-16 19:31:24.666 INFO 8324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2021-11-16 19:31:25.828 INFO 8324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2021-11-16 19:31:26.382 INFO 8324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2021-11-16 19:31:27.832 INFO 8324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2021-11-16 19:31:28.386 INFO 8324 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
复制代码
看源码还没看明白,然后实际操作了一下。不知道是不是这样测试,如果有误,请指正。感谢。
按照测试理解,我觉得Producer/Consumer本地缓存了路由信息,即使NameServer全部宕机,也是可以正常工作。但是服务一旦重启,Producer/Consumer就无法工作了。虽然服务可以工作,但是此时NameServer已经挂掉,即服务端无法再保证消息不再丢失。此时需要我们自己设计一个降级方案来处理这个问题。将消息存放在redis、或者其他地方,一旦RocketMQ恢复,就能立刻将消息发送出去。
小结:
完成分析过后,整个RocketMQ消息零丢失的方案如下:
-
生产者使用事务消息机制;
-
Broker配置同步刷盘 + Dledger主从架构;
-
消费者不使用异步消费;
-
整个MQ挂了要准备降级方案;
但是需要注意的是,这套消息零丢失的方案,在各个环节都大大降低了系统的处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失可能远远大于部分消息丢失的代价。所以在设计RocketMQ使用方案时,还是需要根据实际业务来处理。
场景2:RocketMQ如何保证消费顺序
为什么要保证顺序消费:在某些业务场景下,对MQ的消费顺序是严格要求的。比如:
-
用户的积分默认是0,而新注册用户设置为默认的10分;
-
用户有奖行为,积分+2分。
-
用户有不正当行为, 积分-3分
这样一组操作,正常用户的积分要变成9分。如果顺序乱了,这样会导致用户的行为结果与积分策略对应不上。此时就需要对这样的一组操作保证消息有序。
那么如何保证消息有序呢?
分析:
MQ顺序问题可以分为全局有序和局部有序:
-
全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费;
-
局部有序:只保证一部分关键消息的消费顺序;
落入业务场景中,我们需要分析全局有序和局部有序那个更重要。其实在大部分的MQ业务场景中,只需要保证局部有序即可。
例如QQ聊天,只需要保证一个聊天窗口里的消息有序就可以了。而对于电商订单场景,也只要保证一个订单的所有消息是有序的就可以。而全局有序的问题通常也可以压缩为局部有序。像我们常用的聊天室,就是典型的需要保证消息全局有序的场景。
但是这种场景就可以压缩成只有一个聊天窗口的QQ来理解。即整个系统只有一个聊天通道,这样就可以用QQ那种保证一个聊天窗口消息有序的方式来保证整个系统的全局消息有序。
落地到RocketMQ。通常情况下,发送者发送消息时,会通过MessageQueue轮询的方式保证消息尽量均匀分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,并且消息都是相互隔离的。这种情况下,是无法保证消息全局有序的。
方案一:
而对于局部有序的要求,只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
方案二:
还有一种鸡肋的方案:将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序。但是这样的做法对整个Topic的消息吞吐量影响非常大,如果这样做的话,基本上没有使用MQ的必要了。
场景3:使用RocketMQ如何快速处理积压消息
在正常情况下, 使用MQ都会要尽量保证消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题出现一般不好排查,除非消费者依赖的服务挂掉,此时消息会大量积压,当数据库服务恢复的时候,消费者积压的消息即大量的请求又打到数据库上,就可能造成数据库再次崩溃,此时才会感知到消息积压的情况。
对于RocketMQ或者Kafka还好,他们的消息积压不会造成很大的影响。而如果是RabbitMQ的话,大量的消息积压会导致性能直线下滑。
这里介绍一下RocketMQ如何确定消息是否有积压:使用web控制台,就能直接看到消息的积压情况。在Web页面,通过Consumer管理,点击查看可以实时看到消息的积压情况。
也可以通过mqadmin指令在后台检查各个Topic的消息延迟情况。
还有RocketMQ也会在它的${storePathRootDir}/config 目录下落地一系列的json文件,也可以用来跟
踪消息积压情况。
如何处理大量积压消息
-
增加消费者服务节点:如果Topic下的MessageQueue配置得足够多,那么每个Consumer就会分配多个MessageQueue来进行消费,所以此时我们就可以通过增加消费者节点来加快消息的消费。等到积压消息消费完成,再恢复正常的情况。极限情况是把Consumer的节点个数设置成与MessageQueue的个数相同,此时即使再增加Consumer节点也不起作用了。
-
如果原本Topic下配置的MessageQueue个数不多,此时就无法使用上述第一种增加Consumer节点的办法。所以需要曲线救国,即
2.1 临时创建一个新的Topic,配置足够多的MessageQueue。然后把原线上所有消费者节点的目标指向新的Topic
2.2 接着上线一组新的消费者,其任务就是消费旧Topic的消息,然后转储到新的Topic中,除此之外没有任何其他逻辑。
2.3 最后可以根据新Topic消息积压的情况,增加Consumer节点个数提高消费速度。之后再根据情况恢复成正常情况。
-
官网中还介绍了一种集群模式切换避免消息丢失的方案:原集群是普通方式搭建的主从架构,现在想要中途改为使用Dledger高可用集群,此时如果不想历史消息丢失,就需要先将消息消费完毕,因为Dledger集群会接管RocketMQ原有的CommitLog日志,所以切换时,需要尽快处理未消费的消息。
场景四:消息轨迹
RocketMQ默认提供了消息轨迹的功能:
1. RocketMQ消息轨迹数据的关键属性
2.消息轨迹配置
打开消息轨迹功能需要在broker.conf中打开一个关键配置
traceTopicEnable=true
复制代码
这个值默认是 false ,即默认消息轨迹是关闭的。
3.消息轨迹数据存储
默认情况下,消息轨迹数据是存于一个系统级别的Topic,RMQ_SYS_TRACE_TOPIC。这个Topic在Broker节点启动时,就会自动创建出来。
另外也支持客户端自定义轨迹数据存储的Topic




近期评论