前言
消息队列拥有解耦、异步和削峰的优点,但是引入消息队列也会增加系统的复杂度,降低系统的可靠性。
消息队列常见问题有:消息丢失
、重复消费
、消息顺序性保证
和消息堆积
。
接下来分别介绍这几种问题的概念和解决方案(主要介绍在 RabbitMQ 中解决方案)。
1. 消息丢失(消息可靠性保证)
分析消息丢失问题,需要从3个角度考虑,分别是生产者、服务器和消费者。
1.1 生产者发送失败
生产者可能因为网络断开等原因导致发送失败。
解决方案
- RabbitMQ 可以通过
发送方确认机制
和事务机制
来保证生产者写消息可靠性。不过需要注意的是这两种机制都会带来较大的性能开销,建议只对重要消息开启。 - 对于 RabbitMQ 需要注意的是,上面两种机制只能保证消息被发送交换机,如果交换机背后没有绑定队列,消息最终仍会丢失,可以开启相应的生产参数
mandatory=true
,保证交换机背后有队列。 - Kafka 可以通过设置
ack
参数,要求消息被 broker 收到后再回复 ack。
1.2 服务器丢失消息
服务器可能因为宕机、重启、集群内部网络问题而丢失消息。
解决方案
- RabbitMQ可以对
消息持久化
,当然首先要求队列持久化
。 - 对于 kafka,同样可以开启持久化,另外 kafka 可以设置
acks=all
保证消息被写入所有副本中,来解决集群间不一致导致的消息丢失。
1.3 消费者丢失消息
消费者可能因为消费消息后却没能正确处理,却对消息进行了 ack,而导致消息丢失。
解决方案
- RabbitMQ 中消费者采用
手动提交 ack
,在消费成功再进行 ack,并且需要开启requeue
使得消息能够被其它消费者消费。 - kafka 需要关闭
自动提交 offset
。
2. 重复消费(消息幂等性保证)
2.1 重复消费原因
导致消息重复消费原因,从起因角度出发可以分为两种:
(1)业务代码问题。生产者多次生产、消费处理消息后却没有进行 ack,导致 MQ server 中出现多次消息。
(2)网络问题。生产者开启发送方确认机制,MQ server实际上已经收到消息,但是回复的 ack 没有被生产者收动,导致生产者重复发送消息;或者消费者回复的 ack 没有被 MQ server 收到,导致消息再次被放回队列。
2.2 如何保证消息幂等性
幂等性
是指:一个操作执行任意多次所产生影响与执行一次影响相同。
保证幂等性,核心思想就是去重。一般可以分为强校验
和弱校验
两种方式。
- 个人理解,强校验和弱校验中都需要进行重复校验,其核心区别应该在于,强校验相同操作执行多次时会失败,一般借助事务来进行回滚。
2.2.1 强校验
- 强校验:
业务唯一标识
+流水表
+事务执行
强校验步骤是:
(1)每个消息都需要携带业务唯一标识;
(2)在请求执行前需要根据业务唯一标识判断请求是否被执行过;
(3)业务操作执行;
(4)业务操作执行成功后,记录/删除业务唯一标识。
需要注意的是,步骤(3)和(4)必须放在相同事务
中执行,那么即便请求执行多次,也会因为无法记录唯一业务标识,而回滚前面的业务操作。
强校验应用举例
金融系统一般都采用强校验
来保证幂等性,且一般采用流水表
方式。在数据库建立流水表,用一些字段组合作唯一约束,那么后续再想要存储记录,业务操作也会失败回滚。
- 金融系统的
流水表
一般还需要用于对账等,收到消息时判断流水表中是否有记录,有表示消息处理过,直接返回。
2.2.2 弱校验
- 弱校验:token 机制 / 期望值比较
弱校验思想其实与强校验一致,只是不需要事务地记录业务唯一标识,也没必要保存流水表,甚至于不一定需要使用业务唯一标识。
弱校验常见实现方式有:
token 机制
。请求方需要先到服务器获取 token,服务器将 token 缓存在 redis 或者内存中,收到请求后判断 token 是否存在。(其实与业务唯一标识实现差不多)期望值
。请求方携带期望值(比如记录版本号),服务器执行操作前判断期望值是否一致,一致才执行。(类似于乐观锁的思想,只是不需要循环请求了)
弱校验应用举例
弱校验往往用于业务操作多次执行也不是特别要紧的场景,比如发送短信通知。
2.2.3 Kafka 消息幂等性
Kafka 本身支持消息幂等性,但这也只能保证单分区消息幂等性
,整体上的幂等性还是需要通过业务机制来保证。
3. 消息顺序性
- 对于 RabbitMQ 而言,其实如果不使用高级特性的话,一般消息都是可以保证顺序性。但是如果使用了发送方确认、ack 结合 requeue 等机制,就会导致消息顺序错误。
消息顺序性,同样要从生产者
、服务器
和消费者
3个角度去考虑。
3.1 生产者顺序生产
要保证消息顺序性,首先需要保证消息进入 MQ server 时的顺序性,一般情况下需要做到如下几点:
(1)只有单个生产者,多生产者难以控制时序;
(2)生产者单线程生产消息,或者在多线程下通过一些技术保证顺序性(比如CompletableFuture和 akka actor 框架)。
3.2 MQ 保证先进先出
- RabbitMQ是可以保证同一个队列消息先进先出的,将顺序性消息都写入
同一个队列
。 - Kafka 则可以保证同一个分区先进先出,不仅需要写入同一个 topic,还需要通过key 结合分区策略,保证消息落入
同一个分区
。
3.3 消费者顺序消费
(1)只有单个消费者;(2)消费者采用单线程消费,或者多线程下保证顺序性。
3.4 RabbitMQ高级特性的影响
在 RabbitMQ 中如果使用了发送方确认机制、requeue、消息优先级、超时时间等高级特性,这些特性有可能会导致先发送消息后到达 MQ。这些特性导致消息错序,有的属于生产者角度,有的属于 MQ server 角度。
- 不使用这些高级特性,来避免消息错序。
- 如果一定要使用这些高级特性,就需要业务上携带
全局有序标识
,消费者顺序消费。(没收到下一条消息则等待,但这样同样会有等待消息超时、造成阻塞的风险)
4. 消息堆积
4.1 消息堆积的原因
消息堆积,可以从是否发生异常来考虑原因:
- 程序无异常(
性能问题
+异常流量
),但消费者消费速度远低于生产者,也可以理解为流量异常。 - 程序异常(
代码逻辑问题
),消费者不能消费消息,或者消费消息但未 ack 导致消息重回队列。
4.2 消息堆积解决方案
4.2.1 紧急方案
往往消息堆积都是紧急的线上问题,需要通过监控及时发现堆积。如果堆积速度较慢,可以等待流量减缓后自行消费掉;如果堆积速度较快,且势头持续不减,需要进行紧急扩容。
扩容方案:多部署几台消费者。如果来得及,可以临时修改代码,改动如下:
(1)多线程消费(可能造成不顺序消费);
(2)收到消息后立即 ack。如果处理较为耗时,甚至可以只消费+导出消息,不处理消息。
(3)程序异常的话,可以对消费代码捕获异常并打印错误日志;
- 项目中使用的报警方案:python 脚本使用 http 请求 rabbitmq server,解析结果中 ready 消息数目,设置报警阈值,大于阈值,则输出信息到监控日志,然后结合公司内部的监控平台实现报警。
4.2.2 后续修复方案
后续复盘时,需要分析消息堆积原因,并给出针对性解决方案:
(1)性能问题。对消费者程序进行性能优化,提升处理能力;MQ流控系统调优。
(2)异常流量。引入流控系统,对异常流量进行削峰;增加校验逻辑,拒绝异常请求。
(3)程序逻辑问题。修复错误逻辑,最好是对消费代码捕获异常,并可以将异常消息导出或者放入死信队列。
小结
本文主要探讨了消息队列的可靠性相关问题机器解决方案,后续可能会增加一些对消息队列高可用
、性能
等问题讨论。
参考网址
https://github.com/doocs/advanced-java/tree/main/docs/high-concurrency
《RabbitMQ实战指南》
复制代码
近期评论