RabbitMQ消息确认(九)前言事务机制发送方确认

这是我参与更文挑战的第 27 天,活动详情查看: 更文挑战

日积月累,水滴石穿 😄

前言

在上一篇中,笔者介绍了怎么让 RabbitMQ 如何保证数据不丢失, 但除此之外,我们还会遇到一个问题,当生产者将消息发送出去之后,消息到底有没有正确地到达RabbitMQ 服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达 RabbitMQ 服务器的。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ 针对这个问题,提供了两种解决方式:

  • 通过事务机制实现

  • 通过发送方确认机制实现

事务机制

注:事务机制是确认生产者是否成功发送消息到交换机

RabbitMQ 客户端中与事务机制相关的方法有3个:channel.txSelect,channel.txCommit,channel.txRollback

channel.txSelect 用于开启事务;

channel.txCommit 用于提交事务;

channel.txRollback 用于回滚事务。

在通过 channel.txSelect 方法开启事务之后,我们便可以发送消息给 RabbitMQ了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback 方法来实现事务回滚。

提交事务

public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        Channel channel = conn.createChannel();
        String exchange = "exchange-1";
        String key = "key-1";

        // 创建交换机
        channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
        //开启事务
        channel.txSelect();
        try{
            // 发送消息到交换机
            channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
            //提交事务
            channel.txCommit();
            System.out.println("发送成功");
        }catch (Exception e){
            System.out.println("发送失败,进行日志记录");
            //回滚事务
            channel.txRollback();
        }
    }

复制代码

运行 main方法,输出 发送成功。这是由于交换机已经存在了。
流程图
根据上图可以看出开启事务机制与未开启事务机制(直接发送)多了四个步骤:

  • 1、客户端发送 Tx.Select ,将信道置为事务模式。
  • 2、 Broker 回复 Tx.Select-Ok ,确认己将信道置为事务模式。
  • 3、在发送完消息之后,客户端发送 Tx.Commit 提交事务。
  • 4、 Broker回复 Tx.Commit.Ok ,确认事务提交。

事务回滚

下面来看一下事务回滚,上代码。将 exchange的值修改为 exchange-122,并且将创建交换机的代码注释。

public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        Channel channel = conn.createChannel();
        String exchange = "exchange-122";
        String key = "key-1";

        // 创建交换机
       //channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
        //开启事务
        channel.txSelect();
        try{
            // 发送消息到交换机
            channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
            //提交事务
            channel.txCommit();
            System.out.println("发送成功");
        }catch (Exception e){
            System.out.println("发送失败,进行日志记录");
            //回滚事务
            channel.txRollback();
        }
    }
复制代码

image.png
运行 main方法,输出结果:发送失败,进行日志记录
image.png
流程步骤为:

  • 1、客户端发送 Tx.Select ,将信道置为事务模式。
  • 2、Broker 回复 Tx.Select-Ok ,确认己将信道置为事务模式。
  • 3、在发送完消息之后,发现异常,客户端发送 Tx.Rollback 回滚事务。
  • 4、Broker 回复 Tx.Rollback.Ok ,确认事务回滚。

批量事务

如果要发送多条消息,则将 channel.basicPublishchannel.txCommit 等方法包裹进循环内即可。

示例:

发送信息到 exchange-1交换机,该交换机是已经存在了的,但是在发送消息之后发生了异常,这种也会进入回滚事务操作。

 String exchange = "exchange-1";
 //开启事务
        channel.txSelect();
        for (int a = 0; a < 10; a++) {
            try{
                channel.basicPublish(exchange,key,null,"发送路由key为 = key-1 的消息".getBytes());
                int i = 1/0;
                //提交事务
                channel.txCommit();
                System.out.println("发送成功");
            }catch (Exception e){
                System.out.println("发送失败,进行日志记录");
                //回滚事务
                channel.txRollback();
            }
        }
复制代码

image.png

事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会降低RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢? 下面就来介绍 RabbitMQ提供另外一种方式:发送方确认机制

发送方确认机制

注:发送方确认机制是确认生产者是否成功发送消息到交换机

原理

生产者通过调用channel.confirmSelect方法将channel设置成confirm模式,一旦channel进入confirm模式,所有在该channel上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所匹配的队列之后,broker就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中的deliver-tag包含了确认消息的序号,此外broker也可以设置basic.ackmultiple参数,表示到这个序号之前的所有消息都已经得到了处理。

事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。confirm模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等channel返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack消息。

channel被设置成 confirm模式之后,所有被发送的后续消息都将被 ack或者被nack一次。,不会出现一条消息既被 ack 又被 nack 情况,并且 RabbitMQ 没有对消息被 confirm 的快慢做任何保证。

原生api

普通confirm

每发送一条消息后就调用 channe.waitForConfirms方法,等待服务端的确认,这实际上是一种串行同步等待的方式。和事务机制一样。也就是慢。

public static void main(String[] args) throws Exception {
    Connection conn = RabbitMQUtil.createConn();
    Channel channel = conn.createChannel();
    String quequ = "queue-2";
    String exchange = "exchange-2";
    String key = "key-2";
    //创建交换机
    channel.exchangeDeclare(exchange, 
    BuiltinExchangeType.TOPIC, true);
    //创建队列
    channel.queueDeclare(quequ, true, false, false, null);
    //队列与交换机绑定
    channel.queueBind(quequ, exchange, key);
    //将信道置为 publisher confirm 模式
    channel.confirmSelect();
    String message = "发送路由key为 = "+ key + "的消息";
    channel.basicPublish(exchange,key,null,
    message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("发送成功 = " + b);
}
结果:发送成功 = true
复制代码

将路由key 修改为:key-22121,创建交换机、创建队列、队列与交换机绑定进行注释,观察结果是否成功。

public static void main(String[] args) throws Exception {
    Connection conn = RabbitMQUtil.createConn();
    Channel channel = conn.createChannel();
    String quequ = "queue-2";
    String exchange = "exchange-2";
    String key = "key-22121";
    //创建交换机
    //channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
    //创建队列
   // channel.queueDeclare(quequ, true, false, false, null);
    //队列与交换机绑定
   // channel.queueBind(quequ, exchange, key);
    //将信道置为 publisher confirm 模式
    channel.confirmSelect();
    String message = "发送路由key为 = "+ key + "的消息";
    channel.basicPublish(exchange,key,null,message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("发送成功 = " + b);
}
结果:发送成功 = true
复制代码

可以看到发送结果是成功的。那再次修改代码,将 exchange 的值修改为exchange-2222,其余代码不动,观察结果。

image.png
启动直接报错!

如果发送多条消息,只需要将 channel.basicPublishchannel.waitForConfirms方法包裹在循环里面即可。但还是每发送一条消息后就调用 channe.waitForConfirms方法,等待服务端的确认。

channel.confirmSelect();
for (int i = 1; i < 10; i++) {
    String message = "发送路由key为 = "+ key + "的消息";
    channel.basicPublish(exchange,key,null,message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("发送成功" + b);
}
复制代码

批量confirm

每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回(也是同步的,只是一次发送多条信息,然后统一确定)。

channel.confirmSelect();
for (int i = 1; i < 10; i++) {
    String message = "发送路由key为 = "+ key + "的消息";
    channel.basicPublish(exchange,key,null,message.getBytes());
}
//批量确认信息,发送的消息中,如果有失败的,不知道是哪一条失败了
boolean b = channel.waitForConfirms();
System.out.println("发送成功" + b);
复制代码

异步confirm

异步 confirm 方法的编程实现最为复杂,也是最高效的。在客户端 Channel 接口中提供的
addConfirmListener方法可以添加 ConfirmListener这个回调接口,这个
ConfirmListener 接口包含两个方法: handleAckhandleNack,分别用来处理 RabbitMQ 回传的 Basic.AckBasic.Nack 。在这两个方法中都包含有两个参数 deliveryTag(标记消息的唯一有序序号) multiple(是否批量confirm true代表是)

String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-2";
//创建交换机
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// 添加一个异步确认的监听器
channel.addConfirmListener(new ConfirmListener() {
    //参数一:deliveryTag: 消息的编号
    //参数二:multiple:是否批量confirm true 是
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("map 数据:" + map.size());
        if (multiple) {
            //如果是批量确认  返回的是小于等于当前序列号的消息 是一个 map
            ConcurrentNavigableMap<Long, String> confirmed =
                    map.headMap(deliveryTag, true);
            //清除该部分未确认消息
            confirmed.clear();
            System.out.println("批量确认清楚 map 数据:" + map.size());
        }else{
            //只清除当前序列号的消息
            map.remove(deliveryTag);
            System.out.println("只清除当前序列号的消息 map 数据:" + map.size());
        }
        System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
        String message = map.get(deliveryTag);
        System.out.println("消息发送到交换机失败,发布的消息:"+message+"未被确认,序列号为:"+deliveryTag);
        //拿到了未确认的信息,可以进行其他逻辑,比如添加处理消息重发
    }
});

for (int i = 1; i < 6; i++) {
    String message = "发送路由key为 = "+ key + "的消息";
    // channel.getNextPublishSeqNo()获取下一个消息的序列号
    map.put(channel.getNextPublishSeqNo(),message);
    channel.basicPublish(exchange,key,null,message.getBytes());

}
System.out.println("其他逻辑");
复制代码

image.png

再来测试交换机不存在的情况,将exchange的名称修改为exchange-9527,创建交换机的代码进行注释。

Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String quequ = "queue-2";
String exchange = "exchange-9527";
String key = "key-2";
//创建交换机
// channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
//创建队列
// channel.queueDeclare(quequ, true, false, false, null);
//队列与交换机绑定
//  channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// 添加一个异步确认的监听器
channel.addConfirmListener(new ConfirmListener() {
    //参数一:deliveryTag: 消息的编号
    //参数二:multiple:是否批量confirm true 是
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("map 数据:" + map.size());
        if (multiple) {
            //如果是批量确认  返回的是小于等于当前序列号的消息 是一个 map
            ConcurrentNavigableMap<Long, String> confirmed =
                    map.headMap(deliveryTag, true);
            //清除该部分未确认消息
            confirmed.clear();
            System.out.println("批量确认清楚 map 数据:" + map.size());
        }else{
            //只清除当前序列号的消息
            map.remove(deliveryTag);
            System.out.println("只清除当前序列号的消息 map 数据:" + map.size());
        }
        System.out.println("消息发送到交换机成功,deliveryTag: " + deliveryTag + ", multiple: " + multiple);
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送到交换机失败, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
        String message = map.get(deliveryTag);
        System.out.println("消息发送到交换机失败,发布的消息:"+message+"未被确认,序列号为:"+deliveryTag);
        //拿到了未确认的信息,可以进行其他逻辑,比如添加处理消息重发
    }
});

for (int i = 1; i < 6; i++) {
    String message = "发送路由key为 = "+ key + "的消息";
    // channel.getNextPublishSeqNo()获取下一个消息的序列号
    map.put(channel.getNextPublishSeqNo(),message);
    channel.basicPublish(exchange,key,null,message.getBytes());

}
System.out.println("其他逻辑");
复制代码

image.png
可以看到,监听器内的代码都没有执行。也就是没有交换机收到消息。

总结

普通confirm:同步等待确认,简单,但吞吐量非常有限。

批量confirm:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。

异步confirm:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微有些麻烦。

Boot方式

在yml中配置是否需要消息确认

spring:
  application:
    name: info-config-boot
  rabbitmq:
    host: 47.105.*
    port: 5672
    virtual-host: /test-1
    username: *
    password: *
    # 开启消息确认
    publisher-confirm-type: correlated
复制代码

publisher-confirm-type有三个选项:

  • NONE:禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后会触发回调方法
  • SIMPLE:经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirmswaitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker

编码

实现ConfirmCallback

@Component
public class InfoConfirm implements RabbitTemplate.ConfirmCallback {

   Logger logger = LoggerFactory.getLogger(InfoConfirm.class);

   @Autowired
   private RabbitTemplate rabbitTemplate;

    /**
     * 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
     */
   @PostConstruct
   public void init(){
       rabbitTemplate.setConfirmCallback(this);
   }

    /**
     * 此方法用于监听消息是否发送到交换机
     * @param correlationData
     * @param ack
     * @param cause
     */
   @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            logger.info("消息成功发送到交换机");
            logger.info("id = {} ",correlationData.getId());
            if(correlationData.getReturnedMessage() == null){
                logger.info("消息被确认");
            }else{
                byte[] body = correlationData.getReturnedMessage().getBody();
                logger.info("message = {}",new String(body));
            }

        }else {
            logger.info("消息发送到交换机失败");
            logger.info("cause = {}",cause);
            logger.info("id = {} ",correlationData.getId());
            if(correlationData.getReturnedMessage() == null){
                logger.info("消息异常");
            }else{
                byte[] body = correlationData.getReturnedMessage().getBody();
                logger.info("message = {}",new String(body));
            }

        }
    }
}
复制代码

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationDataackcause

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

对外提供发送方法

   @GetMapping("/send")
    public void send(){
        CorrelationData correlation = new CorrelationData("设置:" + UUID.randomUUID().toString());
        // exchange-1 的交换机之前已经存在了
        rabbitTemplate.convertAndSend("exchange-1","key-55","发送消息",correlation);
    }
复制代码

调用接口:http://localhost:8080/send

image.png
再发送一条交换机不存在的消息,将交换机的值修改为exchange-12222

调用接口:http://localhost:8080/send
image.png
可以发现,即使交换机不存在,confirm 方法也能监听到。比上述所有方式都要智能。Boot YYDS

  • 如你对本文有疑问或本文有错误之处,欢迎评论留言指出。如觉得本文对你有所帮助,欢迎点赞和关注。