这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战
一、死信队列
死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
一般来说,生产者将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。
死信消息来源:
- 消息 TTL 过期
- 队列满了,无法再次添加数据
- 消息被拒绝(reject 或 nack),并且 requeue =false
二、死信实战(基础版)
在这种,我们使用基础版来完成死信队列的几种情况的演示。
定义一个生产者 向 普通交换机下的普通队列发送消息。
定义一个消费者向普通交换机下的普通队列消费消息,当它消息满足三大条件之一时,消息就发送到死信交换机下的死信队列。
死信交换机也不是什么特殊交换机,是自己命名的,只是专门用来接收死信消息。
2.1 发送 TTL 消息
消费者1:消费普通消息,如果普通消息它过期了,就将它转发到死信队列中。
public class Consumer1 {
// 普通交换机
public static final String NORML_EXCHANGE ="normal_exchange";
// 死信交换机
public static final String DEAD_EXCHANGE ="dead_exchange";
// 普通队列的名称
public static final String NORML_QUEUE ="normal_queue";
// 死信队列的名称
public static final String DEAD_QUEUE ="dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("test");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("consumer2");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
DeliverCallback deliverCallback =(String a, Delivery b)->{
String message = new String(b.getBody());
System.out.println("work1:"+message);
// false 表示只确认 b.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息
channel.basicAck(b.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback =(String a)->{
System.out.println("消息消费被中断");
};
// 5、声明死信和普通交换机,类型为 direct
channel.exchangeDeclare(NORML_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
// 队列声明:普通队列需要设置 arguments 参数
HashMap<String, Object> arguments = new HashMap<>();
// 过期时间,正常队列设置死信交换机,当消费失败就发送到死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key","dead");// 设置路由key
// 6、队列声明:普通队列需要设置 arguments 参数,普通队列添加参数
channel.queueDeclare(NORML_QUEUE,false,false,false,arguments);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 7、绑定关系
channel.queueBind(NORML_QUEUE,NORML_EXCHANGE,"normal");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");
// 8、消费消息
/**
* 消费者消费消息
* @params1: 消费哪个队列
* @params2:消费成功之后是否要自动应答 true代表自动应答 ,flase代表手动应答。
* @params3: 消费者消费成功的回调
* @params4: 消费者消费失败的回调
*/
channel.basicConsume(NORML_QUEUE,false,deliverCallback,cancelCallback);
}
}
复制代码
生产者:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("test");
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("producer");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
AMQP.BasicProperties properties =new AMQP.BasicProperties().builder().expiration("10000").build(); // 单位毫秒 ,这里是10s
// 6、发送死信队列,设置 TTL 消息
for (int i = 1; i <=20; i++) {
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置,这里添加 ttl 消息
// @params4: 发送消息的内容
channel.basicPublish("normal_exchange","normal",properties,("潇雷挺帅,说第"+i+"遍。").getBytes());
System.out.println("发送第"+i);
Thread.sleep(1000);
}
}
}
复制代码
先启动 消费者1 然后创建了声明。再关闭消费者1,开始发送带有 ttl 的消息。最后可以发现,20条消息全都被发送到死信队列里面了。
2.2 队列满了
消费者1添加以下代码,添加正常队列的长度限制。当消息超过 5条之后,再过来的消息就会变成死信队列。
arguments.put("x-max-length",5);
复制代码
实现的效果如下:
2.3 消息被拒
DeliverCallback deliverCallback =(String a, Delivery b)->{
String message = new String(b.getBody());
System.out.println("work1:"+message);
if(message.equals("5")){
System.out.println("该消息被拒绝");
channel.basicReject(b.getEnvelope().getDeliveryTag(),false);//true 塞回原队列,false 代表不放回原队列。
}else{
channel.basicAck(b.getEnvelope().getDeliveryTag(),false);
}
};
复制代码
三、死信实战(SpringBoot 版)
在这小节种,通过 Springboot 项目再次熟悉下这三大场景。
业务流程:
- 1、正常业务消息被投递到正常业务的 Exchange,该 Exchange 根据路由键将消息路由绑定到正常队列
- 2、正常的消息变成死信消息之后,会被自动投递到该队列绑定的死信交换机上;
- 3、死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列
- 4、消息到达死信队列后,可监听该死信队列,处理死信消息。
3.1 消息被拒
1、配置死信队列config
@Configuration
public class DeadConfig {
// 交换机
public static final String dead_exchange_name = "dead_exchange_springboot";
// 普通交换机
public static final String normal_exchange_name = "normal_exchange_springboot";
// 普通队列
public static final String normal_queue_name="normal_queue_springboot";
// 死信队列
public static final String dead_queue_name = "dead_queue_springboot";
// 声明交换机
@Bean("deadExchange")
public DirectExchange deadExchange(){
return new DirectExchange(dead_exchange_name);
}
// 声明交换机
@Bean("normalExchange")
public DirectExchange normalExchange(){
return new DirectExchange(normal_exchange_name);
}
// 声明队列
@Bean("deadQueue")
public Queue deadQueue() {
return QueueBuilder.durable(dead_queue_name).build();
}
// 声明队列
@Bean("normal_queue_name")
public Queue normalQueue() {
return QueueBuilder.durable(normal_queue_name)
.withArgument("x-dead-letter-exchange",dead_exchange_name)
.withArgument("x-dead-letter-routing-key","dead")
.build();
}
// 绑定队列到交换机
@Bean
public Binding queueBingExchange3(){
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal");
}
// 绑定队列到交换机
@Bean
public Binding queueBingExchange4(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
}
复制代码
2、生产 yml配置
# 服务端口
server:
port: 8300
# 配置rabbitmq服务
spring:
rabbitmq:
username: test
password: test
virtual-host: test
host: 192.168.81.102
port: 5672
template:
# 消息路由失败通知监听者,而不是将消息丢弃
mandatory: true
publisher-confirm-type: correlated
publisher-returns: true
复制代码
3、生产者发送消息
@Slf4j
@RestController
@RequestMapping("/msg")
public class SendController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/dead/{message}")
public void sendConfirmMsg(@PathVariable String message){
rabbitTemplate.convertAndSend(DeadConfig.normal_exchange_name,"normal",message,new CorrelationData("1"));
}
}
复制代码
4、消费者 yml
# 服务端口
server:
port: 8301
# 配置rabbitmq服务
spring:
rabbitmq:
username: test
password: test
virtual-host: test
host: 192.168.81.102
port: 5672
listener:
direct:
# 表示消费者消费成功消息以后需要手动的进行签收(ack)
acknowledge-mode: manual
# 每次处理一个消息
prefetch: 1
retry:
# 开启重试消费
enabled: true
# 最大重试次数
max-attempts: 5
复制代码
5、消费者
@RabbitListener(queues = DeadConfig.normal_queue_name)
public void consumerDead(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
System.out.println(s);
if(s.equals("2")){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
log.info("接收的消息为:{}",message);
}else{
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
log.info("未消费数据");
}
}
复制代码
在浏览器发送消息之后,被拒绝的消息进入到 死信队列中。
3.2 队列满了
在普通队列中添加 x-max-length
然后将之前的队列删除,重新启动完生产者和消费者,生成新的队列,关闭消费者后,让生产者发送消息,这时候没有消费者,队列在两条的时候会满了,然后多余的信息会发送到死信队列里面。如图所示:
该队列的详情信息也可以点进去看:
3.3 发送 TTL 消息
设置队列消息的过期时间 x-message-ttl ,即该时间到了之后该条消息就会被发送到 死信队列中。
先清除之前的队列,然后配置这个参数,设为 5秒
在发送完消息,该条消息在该队列中待了5秒之后进入死信队列中
总结
死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会丢失。
这篇文章还是入门级别的对这死信队列做了不同场景的演示,这种业务场景是要求消息必须可靠的,通过死信队列为这些队列的消息的可靠性提供保障。
近期评论