消息队列特殊问题—-消息丢失、重复消费、消息顺序性和消息

前言

消息队列拥有解耦、异步和削峰的优点,但是引入消息队列也会增加系统的复杂度,降低系统的可靠性。
消息队列常见问题有:消息丢失重复消费消息顺序性保证消息堆积
接下来分别介绍这几种问题的概念和解决方案(主要介绍在 RabbitMQ 中解决方案)。

1. 消息丢失(消息可靠性保证)

分析消息丢失问题,需要从3个角度考虑,分别是生产者服务器消费者

1.1 生产者发送失败

生产者可能因为网络断开等原因导致发送失败。

解决方案
  • RabbitMQ 可以通过发送方确认机制事务机制来保证生产者写消息可靠性。不过需要注意的是这两种机制都会带来较大的性能开销,建议只对重要消息开启。
  • 对于 RabbitMQ 需要注意的是,上面两种机制只能保证消息被发送交换机,如果交换机背后没有绑定队列,消息最终仍会丢失,可以开启相应的生产参数 mandatory=true,保证交换机背后有队列。
  • Kafka 可以通过设置 ack 参数,要求消息被 broker 收到后再回复 ack。

1.2 服务器丢失消息

服务器可能因为宕机重启集群内部网络问题而丢失消息。

解决方案
  • RabbitMQ可以对消息持久化,当然首先要求队列持久化
  • 对于 kafka,同样可以开启持久化,另外 kafka 可以设置acks=all保证消息被写入所有副本中,来解决集群间不一致导致的消息丢失。

1.3 消费者丢失消息

消费者可能因为消费消息后却没能正确处理,却对消息进行了 ack,而导致消息丢失。

解决方案
  • RabbitMQ 中消费者采用手动提交 ack,在消费成功再进行 ack,并且需要开启requeue使得消息能够被其它消费者消费。
  • kafka 需要关闭自动提交 offset

2. 重复消费(消息幂等性保证)

2.1 重复消费原因

导致消息重复消费原因,从起因角度出发可以分为两种:
(1)业务代码问题生产者多次生产消费处理消息后却没有进行 ack,导致 MQ server 中出现多次消息。
(2)网络问题。生产者开启发送方确认机制,MQ server实际上已经收到消息,但是回复的 ack 没有被生产者收动,导致生产者重复发送消息;或者消费者回复的 ack 没有被 MQ server 收到,导致消息再次被放回队列。

2.2 如何保证消息幂等性

幂等性是指:一个操作执行任意多次所产生影响与执行一次影响相同。

保证幂等性,核心思想就是去重。一般可以分为强校验弱校验两种方式。

  • 个人理解,强校验和弱校验中都需要进行重复校验,其核心区别应该在于,强校验相同操作执行多次时会失败,一般借助事务来进行回滚。

2.2.1 强校验

  • 强校验:业务唯一标识 + 流水表 + 事务执行

强校验步骤是:
(1)每个消息都需要携带业务唯一标识;
(2)在请求执行前需要根据业务唯一标识判断请求是否被执行过;
(3)业务操作执行;
(4)业务操作执行成功后,记录/删除业务唯一标识。
需要注意的是,步骤(3)和(4)必须放在相同事务中执行,那么即便请求执行多次,也会因为无法记录唯一业务标识,而回滚前面的业务操作。

强校验应用举例

金融系统一般都采用强校验来保证幂等性,且一般采用流水表方式。在数据库建立流水表,用一些字段组合作唯一约束,那么后续再想要存储记录,业务操作也会失败回滚。

  • 金融系统的流水表一般还需要用于对账等,收到消息时判断流水表中是否有记录,有表示消息处理过,直接返回。

2.2.2 弱校验

  • 弱校验:token 机制 / 期望值比较

弱校验思想其实与强校验一致,只是不需要事务地记录业务唯一标识也没必要保存流水表甚至于不一定需要使用业务唯一标识
弱校验常见实现方式有:

  • token 机制。请求方需要先到服务器获取 token,服务器将 token 缓存在 redis 或者内存中,收到请求后判断 token 是否存在。(其实与业务唯一标识实现差不多)
  • 期望值。请求方携带期望值(比如记录版本号),服务器执行操作前判断期望值是否一致,一致才执行。(类似于乐观锁的思想,只是不需要循环请求了)
弱校验应用举例

弱校验往往用于业务操作多次执行也不是特别要紧的场景,比如发送短信通知。

2.2.3 Kafka 消息幂等性

Kafka 本身支持消息幂等性,但这也只能保证单分区消息幂等性,整体上的幂等性还是需要通过业务机制来保证。

3. 消息顺序性

  • 对于 RabbitMQ 而言,其实如果不使用高级特性的话,一般消息都是可以保证顺序性。但是如果使用了发送方确认、ack 结合 requeue 等机制,就会导致消息顺序错误。

消息顺序性,同样要从生产者服务器消费者3个角度去考虑。

3.1 生产者顺序生产

要保证消息顺序性,首先需要保证消息进入 MQ server 时的顺序性,一般情况下需要做到如下几点:
(1)只有单个生产者,多生产者难以控制时序;
(2)生产者单线程生产消息,或者在多线程下通过一些技术保证顺序性(比如CompletableFuture和 akka actor 框架)。

3.2 MQ 保证先进先出

  • RabbitMQ是可以保证同一个队列消息先进先出的,将顺序性消息都写入同一个队列
  • Kafka 则可以保证同一个分区先进先出,不仅需要写入同一个 topic,还需要通过key 结合分区策略,保证消息落入同一个分区

3.3 消费者顺序消费

(1)只有单个消费者;(2)消费者采用单线程消费,或者多线程下保证顺序性。

3.4 RabbitMQ高级特性的影响

在 RabbitMQ 中如果使用了发送方确认机制、requeue、消息优先级、超时时间等高级特性,这些特性有可能会导致先发送消息后到达 MQ。这些特性导致消息错序,有的属于生产者角度,有的属于 MQ server 角度。

  • 不使用这些高级特性,来避免消息错序。
  • 如果一定要使用这些高级特性,就需要业务上携带全局有序标识,消费者顺序消费。(没收到下一条消息则等待,但这样同样会有等待消息超时、造成阻塞的风险)

4. 消息堆积

4.1 消息堆积的原因

消息堆积,可以从是否发生异常来考虑原因:

  • 程序无异常(性能问题+异常流量),但消费者消费速度远低于生产者,也可以理解为流量异常。
  • 程序异常(代码逻辑问题),消费者不能消费消息,或者消费消息但未 ack 导致消息重回队列。

4.2 消息堆积解决方案

4.2.1 紧急方案

往往消息堆积都是紧急的线上问题,需要通过监控及时发现堆积。如果堆积速度较慢,可以等待流量减缓后自行消费掉;如果堆积速度较快,且势头持续不减,需要进行紧急扩容。
扩容方案:多部署几台消费者。如果来得及,可以临时修改代码,改动如下:
(1)多线程消费(可能造成不顺序消费);
(2)收到消息后立即 ack。如果处理较为耗时,甚至可以只消费+导出消息,不处理消息
(3)程序异常的话,可以对消费代码捕获异常并打印错误日志;

  • 项目中使用的报警方案:python 脚本使用 http 请求 rabbitmq server,解析结果中 ready 消息数目,设置报警阈值,大于阈值,则输出信息到监控日志,然后结合公司内部的监控平台实现报警。

4.2.2 后续修复方案

后续复盘时,需要分析消息堆积原因,并给出针对性解决方案:
(1)性能问题。对消费者程序进行性能优化,提升处理能力;MQ流控系统调优。
(2)异常流量。引入流控系统,对异常流量进行削峰;增加校验逻辑,拒绝异常请求。
(3)程序逻辑问题。修复错误逻辑,最好是对消费代码捕获异常,并可以将异常消息导出或者放入死信队列。

小结

本文主要探讨了消息队列的可靠性相关问题机器解决方案,后续可能会增加一些对消息队列高可用性能等问题讨论。

参考网址

https://github.com/doocs/advanced-java/tree/main/docs/high-concurrency
《RabbitMQ实战指南》
复制代码