RabbitMQ(5):死信队列的场景演示

这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战

一、死信队列

死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。

​ 一般来说,生产者将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。

死信消息来源:

  • 消息 TTL 过期
  • 队列满了,无法再次添加数据
  • 消息被拒绝(reject 或 nack),并且 requeue =false

二、死信实战(基础版)

在这种,我们使用基础版来完成死信队列的几种情况的演示。

定义一个生产者 向 普通交换机下的普通队列发送消息。

定义一个消费者向普通交换机下的普通队列消费消息,当它消息满足三大条件之一时,消息就发送到死信交换机下的死信队列。

死信交换机也不是什么特殊交换机,是自己命名的,只是专门用来接收死信消息。

2.1 发送 TTL 消息

消费者1:消费普通消息,如果普通消息它过期了,就将它转发到死信队列中。

public class Consumer1 {
    // 普通交换机
    public static final String NORML_EXCHANGE ="normal_exchange";
    // 死信交换机
    public static final String DEAD_EXCHANGE ="dead_exchange";
    // 普通队列的名称
    public static final String NORML_QUEUE ="normal_queue";
    // 死信队列的名称
    public static final String DEAD_QUEUE ="dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置连接属性
        connectionFactory.setHost("192.168.81.102");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("test");

        //3、从连接工厂中获取连接
        Connection connection = connectionFactory.newConnection("consumer2");
        //4、从连接中获取通道 channel
        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback =(String a, Delivery b)->{
            String message = new String(b.getBody());
            System.out.println("work1:"+message);
            // false 表示只确认 b.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息
            channel.basicAck(b.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback =(String a)->{
            System.out.println("消息消费被中断");
        };
        // 5、声明死信和普通交换机,类型为  direct
        channel.exchangeDeclare(NORML_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        // 队列声明:普通队列需要设置 arguments 参数
        HashMap<String, Object> arguments = new HashMap<>();
        // 过期时间,正常队列设置死信交换机,当消费失败就发送到死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","dead");// 设置路由key
        // 6、队列声明:普通队列需要设置 arguments 参数,普通队列添加参数
        channel.queueDeclare(NORML_QUEUE,false,false,false,arguments);
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        // 7、绑定关系
        channel.queueBind(NORML_QUEUE,NORML_EXCHANGE,"normal");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");

        // 8、消费消息
        /**
         * 消费者消费消息
         * @params1: 消费哪个队列
         * @params2:消费成功之后是否要自动应答 true代表自动应答 ,flase代表手动应答。
         * @params3: 消费者消费成功的回调
         * @params4: 消费者消费失败的回调
         */
        channel.basicConsume(NORML_QUEUE,false,deliverCallback,cancelCallback);
    }
}
复制代码

生产者:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2、设置连接属性
        connectionFactory.setHost("192.168.81.102");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("test");
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test");

        //3、从连接工厂中获取连接
        Connection connection = connectionFactory.newConnection("producer");
        //4、从连接中获取通道 channel
        Channel channel = connection.createChannel();
        AMQP.BasicProperties properties =new AMQP.BasicProperties().builder().expiration("10000").build(); // 单位毫秒 ,这里是10s
        // 6、发送死信队列,设置 TTL 消息
        for (int i = 1; i <=20; i++) {
            // @params1: 交换机exchange
            // @params2: 队列名称/routing
            // @params3: 属性配置,这里添加 ttl 消息
            // @params4: 发送消息的内容
            channel.basicPublish("normal_exchange","normal",properties,("潇雷挺帅,说第"+i+"遍。").getBytes());
            System.out.println("发送第"+i);
            Thread.sleep(1000);
        }
    }
}
复制代码

先启动 消费者1 然后创建了声明。再关闭消费者1,开始发送带有 ttl 的消息。最后可以发现,20条消息全都被发送到死信队列里面了。

image-20211112100711099

image-20211112100758753

image-20211112100809713

2.2 队列满了

消费者1添加以下代码,添加正常队列的长度限制。当消息超过 5条之后,再过来的消息就会变成死信队列。

 arguments.put("x-max-length",5);
复制代码

实现的效果如下:

image-20211112102131585

2.3 消息被拒

   DeliverCallback deliverCallback =(String a, Delivery b)->{
            String message = new String(b.getBody());
            System.out.println("work1:"+message);
            if(message.equals("5")){
                System.out.println("该消息被拒绝");
                channel.basicReject(b.getEnvelope().getDeliveryTag(),false);//true 塞回原队列,false 代表不放回原队列。
            }else{
                channel.basicAck(b.getEnvelope().getDeliveryTag(),false);
            }
        };
复制代码

三、死信实战(SpringBoot 版)

在这小节种,通过 Springboot 项目再次熟悉下这三大场景。

业务流程:

  • 1、正常业务消息被投递到正常业务的 Exchange,该 Exchange 根据路由键将消息路由绑定到正常队列
  • 2、正常的消息变成死信消息之后,会被自动投递到该队列绑定的死信交换机上;
  • 3、死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列
  • 4、消息到达死信队列后,可监听该死信队列,处理死信消息。

3.1 消息被拒

1、配置死信队列config

@Configuration
public class DeadConfig {
    // 交换机
    public static final String dead_exchange_name = "dead_exchange_springboot";
    // 普通交换机
    public static final String normal_exchange_name = "normal_exchange_springboot";
    // 普通队列
    public static final String normal_queue_name="normal_queue_springboot";
    // 死信队列
    public static final String dead_queue_name = "dead_queue_springboot";

    // 声明交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(dead_exchange_name);
    }
    // 声明交换机
    @Bean("normalExchange")
    public DirectExchange normalExchange(){
        return new DirectExchange(normal_exchange_name);
    }
    // 声明队列
    @Bean("deadQueue")
    public Queue deadQueue() {
        return QueueBuilder.durable(dead_queue_name).build();
    }
    // 声明队列
    @Bean("normal_queue_name")
    public Queue normalQueue() {
        return QueueBuilder.durable(normal_queue_name)
                .withArgument("x-dead-letter-exchange",dead_exchange_name)
                .withArgument("x-dead-letter-routing-key","dead")
                .build();
    }
    // 绑定队列到交换机
    @Bean
    public Binding queueBingExchange3(){
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal");
    }
    // 绑定队列到交换机
    @Bean
    public Binding queueBingExchange4(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }
}
复制代码

2、生产 yml配置

# 服务端口
server:
  port: 8300
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: test
    password: test
    virtual-host: test
    host: 192.168.81.102
    port: 5672
    template:
      # 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true
    publisher-confirm-type: correlated
    publisher-returns: true
复制代码

3、生产者发送消息

@Slf4j
@RestController
@RequestMapping("/msg")
public class SendController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/dead/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        rabbitTemplate.convertAndSend(DeadConfig.normal_exchange_name,"normal",message,new CorrelationData("1"));
    }
}
复制代码

4、消费者 yml

# 服务端口
server:
  port: 8301
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: test
    password: test
    virtual-host: test
    host: 192.168.81.102
    port: 5672
    listener:
      direct:
        # 表示消费者消费成功消息以后需要手动的进行签收(ack)
        acknowledge-mode: manual
        # 每次处理一个消息
        prefetch: 1
        retry:
          # 开启重试消费
          enabled: true
          # 最大重试次数
          max-attempts: 5
复制代码

5、消费者

    @RabbitListener(queues = DeadConfig.normal_queue_name)
    public void consumerDead(Message message, Channel channel) throws IOException {
        String s = new String(message.getBody());
        System.out.println(s);
        if(s.equals("2")){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);  //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
            log.info("接收的消息为:{}",message);
        }else{
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            log.info("未消费数据");
        }
    }
复制代码

在浏览器发送消息之后,被拒绝的消息进入到 死信队列中。

image-20211112114924584

3.2 队列满了

在普通队列中添加 x-max-length

image-20211112135236184

然后将之前的队列删除,重新启动完生产者和消费者,生成新的队列,关闭消费者后,让生产者发送消息,这时候没有消费者,队列在两条的时候会满了,然后多余的信息会发送到死信队列里面。如图所示:

image-20211112135800720

该队列的详情信息也可以点进去看:

image-20211112135834242

3.3 发送 TTL 消息

设置队列消息的过期时间 x-message-ttl ,即该时间到了之后该条消息就会被发送到 死信队列中。

先清除之前的队列,然后配置这个参数,设为 5秒

image-20211112140038929

在发送完消息,该条消息在该队列中待了5秒之后进入死信队列中

image-20211112140257104

总结

死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会丢失。

这篇文章还是入门级别的对这死信队列做了不同场景的演示,这种业务场景是要求消息必须可靠的,通过死信队列为这些队列的消息的可靠性提供保障。