RabbitMQ手动确认+重试+死信队列保证消费可靠性

引言

RabbitMQ 消息的消费有两种确认模式: 自动确认手动确认

自动确认:Broker(RabbitMQ 服务器)在将消息发送给消费者后即将消息从队列中删除,无论消费者是否消费成功。如果消费者消费时业务代码出现异常或者还未消费完毕时系统宕机,就会导致消息丢失。

手动确认:消费者消费完毕后手动地向 Broker 发送确认通知,Broker 收到确认通知后再从队列中删除对应的消息。

由于自动确认方式存在的缺陷,对于一些重要的消息,实际中一般采用手动确认的方式来保证消息业务的可靠性。

那么在手动确认方式下,消费者业务中具体如何保证消息的可靠性呢? 下面我们介绍一种方式,即采用 重试 + 手动确认 + 死信队列 的方式来保证消费信息不丢失。

死信队列

死信(Dead Letter),指无法被消费者正确地进行业务处理的消息,消费者消费时业务程序抛出了异常,其主要原因有两个。一、消息本身是有问题的(主要原因),如付款消息中传递的银行卡号不存在。二、由于网络波动等原因,消费者依赖的第三方服务调用异常,如调用第三方接口失败,数据库由于无法获取连接访问失败等情况。对于这类消息,一般将其放入 RabbitMQ 的死信队列中,使用专门的消费者对死信进行处理,或者进行人工补偿。

死信队列的消息来源

1、消息被否定确认使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false

2、消息在队列中的时间超过了设置的TTL(time to live)时间。

3、消息数量超过了队列的容量限制。

当一个队列中的消息满足上述三种情况任一个时,改消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。

Tips:死信队列和普通的业务队列完全一样,只不过是业务上创建用来存储处理失败的消息的队列。所以其工作方式也和业务队列相同,死信仍然需要交换机的转发到达死信队列。

死信队列的配置

1、为业务队列绑定死信交换机(即用来转发该队列中中死信的交换机)。

2、将死信队列与死信交换机绑定。

代码示例:

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "business-exchange";
    public static final String DEAD_LETTER_EXCHANGE_NAME = "dead-letter-exchange";
    public static final String BUSINESS_QUEUE_NAME = "business-queue";
    public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";
    public static final String ROUTING_KEY = "routing-key";

    // 声明业务交换机
    @Bean
    public DirectExchange businessExchange(){
        return new DirectExchange(BUSINESS_EXCHANGE_NAME);
    }
    // 声明死信交换机
    @Bean
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    }
    // 声明业务队列
    @Bean
    public Queue businessQueue(){
        Map<String, Object> args = new HashMap<>(2);
        // 设置业务队列的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
        return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();
    }
    // 声明死信队列
    @Bean
    public Queue deadLetterQueue(){
        return new Queue(DEAD_LETTER_QUEUE_NAME);
    }
    // 将业务队列绑定到业务交换机
    @Bean
    public Binding bindBusinessQueue(){
        return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY);
    }
    // 将死信队列绑定到死信交换机
    @Bean
    public Binding bindDeadLetterQueue(){
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);
    }
}
复制代码

在上面的配置示例中,我们声明了一个业务队列、一个业务交换机、一个死信队列、一个死信交换机。其中声明业务队列时的 x-dead-letter-exchange 参数指定队列的死信交换机,当信息被判定为死信时就会被Broker自动转发给配置的死信交换机。

生产者代码

我们定义一个Controller接口来测试消息发送,消息体由接口参数传入。

@RestController
@RequestMapping("/rabbitmq")
public class RabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/send")
    public void send(@RequestParam String msg){
        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, msg);
    }
}
复制代码

使用postman调用生产者接口:

http://localhost:5006/rabbitmq/send?msg=normal meaage
复制代码

通过 RabbitMQ 控制台查看可以看到我们声明的交换机个队列成功创建,消息被发送到了业务队列中。

image-20201216115655280

image-20201216115722330

消费者手动确认+重试

消费者配置

# rabbitmq服务器连接端口 (默认为5672)
spring.rabbitmq.host=192.168.44.104
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
# 开启消费者手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码
@Service
@Slf4j
@RabbitListener(queues = "business-queue")
public class RabbitConsumer {
    /**
     * 指定消费的队列
     */
    @RabbitHandler
    public void consume(String msg, Message message, Channel channel){
        boolean success = false;
        int retryCount = 3;
        while (!success && retryCount-- > 0){
            try {
                // 处理消息
                log.info("收到消息: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag());
                if(message.equals("dead-letter")){
                    throw new RuntimeException("收到死信");
                }
                // 正常处理完毕,手动确认
                success = true;
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }catch (Exception e){
                log.error("程序异常:{}", e.getMessage());
            }
        }
        // 达到最大重试次数后仍然消费失败
        if(!success){
            // 手动删除,移至死信队列
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
复制代码

启动消费者,消息被正常消费后手动确认,消息从队列中删除

image-20201216120020980

image-20201216131339000

我们下面发送一个死信

http://localhost:5006/rabbitmq/send?msg=dead-letter
复制代码

image-20201216134742543

从上图我们可以看到消费的代码重试了3次后将消息否定确认,Broker将消息判断为死信,发送至死信交换机,最终转发到死信队列。

image-20201216134949066

图中看到死信确实被转发到了死信队列。根据实际的业务情况,我们可以创建专门的死信消费者对死信进行处理,或者进行人工补偿。

Tips: 代码示例中没有涉及数据库事务,若消费程序使用了声明式的事务@Transactional,在捕获异常后要手动回滚事务。如下图:

image-20201216173124261