引言
RabbitMQ 消息的消费有两种确认模式: 自动确认和手动确认
自动确认:Broker(RabbitMQ 服务器)在将消息发送给消费者后即将消息从队列中删除,无论消费者是否消费成功。如果消费者消费时业务代码出现异常或者还未消费完毕时系统宕机,就会导致消息丢失。
手动确认:消费者消费完毕后手动地向 Broker 发送确认通知,Broker 收到确认通知后再从队列中删除对应的消息。
由于自动确认方式存在的缺陷,对于一些重要的消息,实际中一般采用手动确认的方式来保证消息业务的可靠性。
那么在手动确认方式下,消费者业务中具体如何保证消息的可靠性呢? 下面我们介绍一种方式,即采用 重试 + 手动确认 + 死信队列 的方式来保证消费信息不丢失。
死信队列
死信(Dead Letter),指无法被消费者正确地进行业务处理的消息,消费者消费时业务程序抛出了异常,其主要原因有两个。一、消息本身是有问题的(主要原因),如付款消息中传递的银行卡号不存在。二、由于网络波动等原因,消费者依赖的第三方服务调用异常,如调用第三方接口失败,数据库由于无法获取连接访问失败等情况。对于这类消息,一般将其放入 RabbitMQ 的死信队列中,使用专门的消费者对死信进行处理,或者进行人工补偿。
死信队列的消息来源
1、消息被否定确认使用 channel.basicNack
或 channel.basicReject
,并且此时requeue
属性被设置为false
。
2、消息在队列中的时间超过了设置的TTL(time to live)时间。
3、消息数量超过了队列的容量限制。
当一个队列中的消息满足上述三种情况任一个时,改消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。
Tips:死信队列和普通的业务队列完全一样,只不过是业务上创建用来存储处理失败的消息的队列。所以其工作方式也和业务队列相同,死信仍然需要交换机的转发到达死信队列。
死信队列的配置
1、为业务队列绑定死信交换机(即用来转发该队列中中死信的交换机)。
2、将死信队列与死信交换机绑定。
代码示例:
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "business-exchange";
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead-letter-exchange";
public static final String BUSINESS_QUEUE_NAME = "business-queue";
public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";
public static final String ROUTING_KEY = "routing-key";
// 声明业务交换机
@Bean
public DirectExchange businessExchange(){
return new DirectExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信交换机
@Bean
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
// 声明业务队列
@Bean
public Queue businessQueue(){
Map<String, Object> args = new HashMap<>(2);
// 设置业务队列的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();
}
// 声明死信队列
@Bean
public Queue deadLetterQueue(){
return new Queue(DEAD_LETTER_QUEUE_NAME);
}
// 将业务队列绑定到业务交换机
@Bean
public Binding bindBusinessQueue(){
return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY);
}
// 将死信队列绑定到死信交换机
@Bean
public Binding bindDeadLetterQueue(){
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);
}
}
复制代码
在上面的配置示例中,我们声明了一个业务队列、一个业务交换机、一个死信队列、一个死信交换机。其中声明业务队列时的 x-dead-letter-exchange 参数指定队列的死信交换机,当信息被判定为死信时就会被Broker自动转发给配置的死信交换机。
生产者代码
我们定义一个Controller接口来测试消息发送,消息体由接口参数传入。
@RestController
@RequestMapping("/rabbitmq")
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/send")
public void send(@RequestParam String msg){
rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, msg);
}
}
复制代码
使用postman调用生产者接口:
http://localhost:5006/rabbitmq/send?msg=normal meaage
复制代码
通过 RabbitMQ 控制台查看可以看到我们声明的交换机个队列成功创建,消息被发送到了业务队列中。
消费者手动确认+重试
消费者配置
# rabbitmq服务器连接端口 (默认为5672)
spring.rabbitmq.host=192.168.44.104
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
# 开启消费者手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码
@Service
@Slf4j
@RabbitListener(queues = "business-queue")
public class RabbitConsumer {
/**
* 指定消费的队列
*/
@RabbitHandler
public void consume(String msg, Message message, Channel channel){
boolean success = false;
int retryCount = 3;
while (!success && retryCount-- > 0){
try {
// 处理消息
log.info("收到消息: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag());
if(message.equals("dead-letter")){
throw new RuntimeException("收到死信");
}
// 正常处理完毕,手动确认
success = true;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("程序异常:{}", e.getMessage());
}
}
// 达到最大重试次数后仍然消费失败
if(!success){
// 手动删除,移至死信队列
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
启动消费者,消息被正常消费后手动确认,消息从队列中删除
我们下面发送一个死信
http://localhost:5006/rabbitmq/send?msg=dead-letter
复制代码
从上图我们可以看到消费的代码重试了3次后将消息否定确认,Broker将消息判断为死信,发送至死信交换机,最终转发到死信队列。
图中看到死信确实被转发到了死信队列。根据实际的业务情况,我们可以创建专门的死信消费者对死信进行处理,或者进行人工补偿。
Tips: 代码示例中没有涉及数据库事务,若消费程序使用了声明式的事务@Transactional,在捕获异常后要手动回滚事务。如下图:
近期评论