RabbitMQ基础知识RabbitMQ高级特性

RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是给予AMQP协议(Advanced Message Queuing Protocol 高级消息队列协议,是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计)的。

AMQP核心概念

  1. Server:又称Broker,接收客户端的连接,实现AMQP实体服务;
  2. Connection:连接,应用程序和Broker之间的网络连接;
  3. Channel:网络信道,几乎所有的操作都是在Channel中进行的,Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务,有点类似于数据中的session;
  4. Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性,Body则就是消息体内容;
  5. Virtual Host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或Queue,有点类似于Redis中的16个db,是逻辑层面的隔离;
  6. Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列(Producer生产消息后都是直接投递到Exchange中);
  7. Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key;
  8. Routing Key:一个路由规则,虚拟机可以用它来确定如何路由一个特定的消息;
  9. Queue:也被称为Message Queue,消息队列,保存消息并将它们转发给消费者。

RabbitMQ架构图

rabbitmq架构图.jpeg
Producer生产消息之后直接将消息投递到Exchange中,在投递的时候需要指定两个重要的信息,一个是消息需要被投递到哪个Exchange上,另一个是Routing Key,也就是将消息路由到哪个Message Queue上。

RabbitMQ安装

参考官网的安装,已经非常详细了,官网推荐的安装是将RabbitMQ和Erlang一起安装了,如果要单独安装的话,需要注意RabbitMQ和Erlang之间的版本需要对应。
www.rabbitmq.com/install-rpm…

RabbitMQ基本使用

  1. 服务的启动:rabbitmq-server start &
  2. 服务的停止:rabbitmqctl stop_app
  3. 管理插件:rabbitmq-plugins enable rabbitmq_management(启动管控台插件,方便图形化管理rabbitmq)
  4. 访问地址:http://localhost:15672

RabbitMQ常用命令-基础操作

  1. rabbitmqctl stop_app: 关闭应用
  2. rabbitmqctl start_app: 启动应用
  3. rabbitmqctl status: 查看节点状态
  4. rabbitmqctl add_user username password: 添加用户
  5. rabbitmqctl list_users: 列出所有用户
  6. rabbitmqctl delete_user username: 删除用户
  7. rabbitmqctl clear_permissions -p vhostpath username: 清除用户权限
  8. rabbitmqctl list_user_permissions username: 列出用户权限
  9. rabbitmqctl change_password username newpassword: 修改密码
  10. rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*": 设置用户权限(权限分别为configure write read,也就是可以配置、可写、可读)
  11. rabbitmqctl add_vhost vhostpath: 创建虚拟主机
  12. rabbitmqctl list_vhosts: 列出所有虚拟主机
  13. rabbitmqctl list_permissions -p vhostpath: 列出虚拟主机上所有权限
  14. rabbitmqctl list_queues: 查看所有队列信息
  15. rabbitmqctl -p vhostpath purge_queue blue: 清楚队列中的消息

RabbitMQ常用命令-高级操作

  1. rabbitmqctl reset: 移除所有数据,要在rabbitmqctl stop_app之后使用
  2. rabbitmqctl join_cluster <clusternode> [--ram]: 组成集群命令
  3. rabbitmqctl change_cluster_node_type <clusternode> disc | ram: 修改集群节点的存储形式,disc为磁盘存储,消息数据是存储在磁盘上的,可靠性高,但是持久化时间长,ram是内存存储,消息是存储在内存中,性能好,但是可能存在丢失
  4. rabbitmqctl forget_cluster_node [--offline]: 忘记节点(摘除节点)
  5. rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]...: 修改节点名称

生产者消费者模型构建

  1. 创建好一个SpringBoot或者Spring或者普通的Java项目
  2. 安装RabbitMQ相关依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>
复制代码
public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 通过Channel发送数据
        /*
         * basicPublish的四个参数为别为:
         * exchange: 交换机,如果为空的,routingKey的规则就是routingKey需要和消息队列的名称一样,不然就发送失败
         * routingKey: 路由规则
         * properties: 消息的额外修饰
         * body: 消息体,也就是消息的主要内容
         */
        for (int i = 0; i < 5; i++) {
            String msg = "Hello, RabbitMQ!";
            channel.basicPublish("", "test001", null, msg.getBytes());
        }

        // 5. 关闭连接
        channel.close();
        connection.close();
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明一个队列
        /*
         * queueDeclare方法的五个参数
         * queue: 队列的名称
         * durable: 是否是持久化,也就是RabbitMQ服务重启之后消息队列是否被保存,为true就是持久化,服务重启消息队列不会被删除
         * exclusive: 是否独占,有点类似于独占锁
         * autoDelete: 是否开启自动删除,也就是当该消息队列没有被绑定到任何一个Exchange上时是否自动删除
         * arguments: 额外的参数
         */
        String queueName = "test001";
        channel.queueDeclare(queueName, true, true, false, null);

        // 5. 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 设置Channel
        /*
         * basicConsume的三个参数的函数
         * queue: 队列的名称
         * autoAck: 是否自动签收,为true表示当Consumer收到消息之后自动发送ACK确定给Broker
         * callback: 指定消费者
         */
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 获取消息
        while (true) {
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端:" + msg);
        }
    }
}
复制代码

交换机Exchange详解

交换机属性

  1. Name:交换机名称
  2. Type:交换机类型,大致有direct、topic、fanout、headers四种
  3. Durability:是否需要持久化,true为持久化
  4. AutoDelete:当最后一个绑定到Exchange上的队列被删除后,是否自动删除该Exchange
  5. Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
  6. Arguments:扩展参数,用于扩展AMQP协议定制化使用

交换机类型 - Direct Exchange

所有发送到Direct Exchange上的消息都会被转发到RoutingKey中指定的Queue中,在Direct模式下可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作(默认的RoutingKey就是队列的名称),消息传递时,RoutingKey必须完全匹配(名称完全一样,不支持模糊匹配)才会被队列接收,否则该消息会被抛弃。

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange的名称和RoutingKey
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";

        // 5. 发送消息
        String msg = "Hello RabbitMQ - Direct Exchange Message...";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        // 6. 关闭连接
        channel.close();
        connection.close();
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, true, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 设置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端:" + msg);
        }
    }
}
复制代码

交换机类型 - Topic Exchange

所有发送到Topic Exchange上的消息被转发到所有关系RoutingKey中指定Topic的Queue中,Exchange将RoutingKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic。
上面这句话有点拗口,其实简单来说,就是当Exchange的类型为topic时,RoutingKey是一组规则(不再仅仅表示一个规则,Direct Exchange中的RoutingKey就是一个规则,Producer传递的RoutingKey必须和Exchange中的RoutingKey名称完全一致才能发送成功),通过这组规则可以将多个RoutingKey和一个Queue进行关联,只要满足RoutingKey的规则就会被路由到相关的队列中(比如RoutingKey为log.#,只要符合这个规则的消息都会被路由到相关队列中)。
在制定RoutingKey时可以使用通配符进行模糊匹配,符号#表示匹配一个或多个词,*表示匹配一个词(注意这里是词,而不是字符),比如log.#可以匹配到log.info.oalog.*只能匹配到log.info,是匹配不到log.info.oa

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange的名称和RoutingKey
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "log.info.oa";
        String routingKey2 = "log.error";
        String routingKey3 = "log.debug";

        // 5. 发送消息
        String msg = "Hello RabbitMQ - Topic Exchange Message...";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());

        // 6. 关闭连接
        channel.close();
        connection.close();
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        // String routingKey = "log.*";
        String routingKey = "log.#";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, true, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 设置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端:" + msg);
        }
    }
}
复制代码

交换机类型 - Fanout Exchange

该种交换机类型是不会处理RoutingKey的,只会简单地将队列绑定到交换机上,发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,Fanout Exchange是转发消息最快的,因为不会处理路由相关的操作,即使指定了RoutingKey也不会理会

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange的名称和RoutingKey
        String exchangeName = "test_fanout_exchange";
        // 指定了RoutingKey也没有作用
        String routingKey = "log.debug";

        // 5. 发送消息
        String msg = "Hello RabbitMQ - Fanout Exchange Message...";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        // 6. 关闭连接
        channel.close();
        connection.close();
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "test";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, true, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 设置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端:" + msg);
        }
    }
}
复制代码

绑定、队列、消息、虚拟主机详解

绑定Binding

是指Exchange和Exchange、Exchange和Queue之间的连接关系

队列

是指消息队列,实际存储消息数据的。包含一些属性,比如Durability表示是否持久化,Durable就是持久化,Transient表示不持久化;Autodelete表示当最后一个监听被移除后,该Queue是否被自动删除。

Message

是指服务器和应用程序之间传送的数据,本质上就是一段数据,由Properties和Payload(Body)组成,也包含一些属性,比如delivery modeheaders(自定义属性)、content_typecontent_encodingprioritycorrelation_idreply_toexpirationmessage_idtimestamptypeuser_idapp_idcluster_id

如何发送携带Properties的Message呢?

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 通过Channel发送数据

        Map<String, Object> headers = new HashMap<>();
        headers.put("name", "snow");
        headers.put("sex", "man");
        
        // 设置Properties
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .expiration("15000")
                .contentEncoding("UTF-8")
                .headers(headers)
                .build();

        for (int i = 0; i < 5; i++) {
            String msg = "Hello, RabbitMQ!";
            channel.basicPublish("", "test001", properties, msg.getBytes());
        }

        // 5. 关闭连接
        channel.close();
        connection.close();
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明一个队列
        String queueName = "test001";
        channel.queueDeclare(queueName, true, false, false, null);

        // 5. 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 设置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 获取消息
        while (true) {
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            Map<String, Object> headers = delivery.getProperties().getHeaders();
            System.out.println("消费端:" + msg);
            System.out.println(headers.get("name"));
        }
    }
}
复制代码

RabbitMQ高级特性

消息如何保证100%的投递成功方案-1

什么是生产端的可靠性投递?

  1. 保障消息的成功发出
  2. 保障MQ节点的成功接收
  3. 发送端收到MQ节点(Broker)的确认应答
  4. 完善的消息补偿机制(也就是消息投递失败或者未收到Broker的确认应答的补偿措施)

消息可靠性投递的解决方案

  1. 消息落库,对消息状态进行打标
  2. 消息的延迟投递,做二次确认,回调检查

消息可靠性投递方案一.jpg

  1. Producer端首先将业务信息入库,同时创建一条消息入库,设置消息的status为0(表示消息已经投递)
  2. Producer端生成一条消息Message投递到Broker
  3. Broker收到消息之后,发送确认Confirm返回给Producer
  4. Producer收到Broker发送过来的Confirm之后,就将消息数据库中消息的状态为1(表示消息已经投递成功)
  5. 因为步骤2和步骤3都有可能发生故障,也就是消息投递失败,或者网络等原因造成Producer未收到Broker发送过来的Confirm消息,所以需要开启一个分布式定时任务从消息数据库中抓取status为0的消息
  6. 将抓取出来的status为0的消息重新投递给Broker,重复上述动作
  7. 因为在极端状况下有些消息可能就是会投递失败,不能无休止地重新投递,可以设置一个投递上限,比如最大重新投递次数为3,如果3次投递均失败,就将消息数据库中的消息状态设置为3,之后再建立补偿措施来对status为3的消息进行处理

缺点:由于在最开始进行了两次入库的操作,所以在高并发的情况下其实会有性能上的问题。

消息如何保证100%的投递成功方案-2

消息可靠性投递方案二.jpg

  1. Producer端首先对业务消息进行入库,然后同时生成两条相同的消息,一条消息立即发出,另一条消息延迟一段时间再次发出
  2. Consumer端对消息队列进行监听,从中取出消息进行消费,在消费完一条消息之后,需要向Broker发送一个消费确认Confirm,表示该条消息已被消费
  3. Callback Service对Consumer端发送的消费确认消息进行监听,如果收到了Consumer端发送过来的消费确认,就将消息数据库中的消息进行入库
  4. 同时Callback还会对Producer端发送的另一条延迟消息进行监听,如果收到了Producer发送过来的延迟消息,就从消息数据库中查询该条消息是否已被消费,如果查询不到或者消息消费失败,Callback Service就通知Producer进行消息重发

优点:由于最开始只是进行了一次入库的操作,性能得到了较大的提升,而Callback Service是一个补偿措施,对业务的性能并不会产生实际的影响

具体的实现请参考:RabbitMQ之消息可靠性投递实现

幂等性概念及业界主流解决方案

什么是幂等性?
通俗来说,就是假如我们要对一件事进行操作,这个操作可能重复进行100次或者1000次,那么无论操作多少次,这些操作的结果都是一样的,就像数据库中的乐观锁机制,比如我们多个线程同时更新库存的SQL语句,不采用乐观锁的机制的话可能会存在线程安全问题导致数据不一致,update sku set count = count - 1, version = version + 1 where version = 1,加上一个乐观锁来保证线程安全,当然乐观锁的背后采用的原理是CAS(CompareAndSwap,也就是先比较然后再替换,保证操作的原子性)。

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
在业务高峰期,可能会存在网络原因或者其他原因导致Producer端的消息重发,消费端要实现幂等性,就意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息,解决方案大致有两种:

  1. 唯一ID + 指纹码 机制,利用数据库主键去重
  2. 利用Redis的原子性去实现

唯一ID + 指纹码 机制

  1. 唯一ID + 指纹码 机制,利用数据库进行主键去重
  2. select count(1) from order where id = 唯一ID + 指纹码,在消费的时候先进行查询,如果查询结果为1的话就表示已经被消费过了就不再重复进行消费了,没有查询出结果的话就说明没有被消费,就进行数据库的入库
  3. 好处:实现简单
  4. 坏处:高并发下有数据库写入的性能瓶颈
  5. 解决方案:根据ID进行分库分表,进行算法路由,比如对ID进行路由算法路由到不同的数据库中,分摊整个数据流量的压力

利用Redis原子特性实现

  1. 使用Redis实现消费端的幂等,有几个需要考虑的问题
  2. 第一:是否要进行数据库入库的操作,如果要入库的话,如何使得数据库和缓存的入库做到原子性,也就是如何实现数据库和缓存的数据一致性,因为有可能出现这样的情况,redis中保存了该order的数据,但是在保存到数据库的时候出现了问题,导致数据库中没有保存成功,然后如何保证数据准确地被同时保存在数据库中呢?
  3. 第二:如果不进行数据库入库的话,那么都存储到缓存redis中,又如何设置定时同步的策略呢,因为数据不可能一直保存在redis中,而且就算一直保存在redis中,redis服务也有可能会出现问题,这也是需要重点考虑的问题

Confirm确认消息详解

什么是Confirm消息确认机制?
消息的确认,是指Producer投递消息后,如果Broker收到消息,则会给我们Producer一个应答,Producer进行接收应答,用来确定这条消息是否正常地发送到了Broker,这种方式也是消息的可靠性投递的核心保障。

如何实现Confirm确认消息?

  1. 在channel上开启确认模式:channel.confirmSelect()
  2. 在channel上添加监听:addConfirmListener,监听成功或者失败的返回结果,根据具体的结果对消息进行重新发送或者日志记录等后续处理
public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 指定消息投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";

        // 5. 发送消息
        String msg = "Hello RabbitMQ! Send a confirm message.";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        // 6. 添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("------ACK!------");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("------NO ACK!------");
            }
        });
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.*";
        String queueName = "test_confirm_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 创建消费者消费消息
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端: " + msg);
        }

    }
}
复制代码

Return返回消息详解

什么是Return返回消息机制?
ReturnListener用于处理一些不可路由的消息,Producer生产一条消息之后,通过指定一个Exchange和RoutingKey,将消息送达到某一个队列中去,然后Consumer监听队列,进行消息的消费处理操作,但是在某些情况下,Producer在投递消息的时候,指定的Exchange不存在或者RoutingKey路由不到,就说明消息投递失败,这个时候如果需要监听这种不可达的消息,就需要使用ReturnListener。
在使用ReturnListener的基础API时有一个关键的配置项是Mandatory,该参数为true,则ReturnListener会接收到路由不可达的消息,然后进行后续的处理,如果为false,那么Broker端会自动删除该消息,ReturnListener是监听不到的。

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "snow.save";

        String msg = "Hello RabbitMQ! Send a Return message.";
        boolean mandatory = true;
        channel.basicPublish(exchangeName, routingKeyError, mandatory, null, msg.getBytes());

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----handle return----");
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        });
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_return_exchange";
        String routingKey = "return.*";
        String queueName = "test_return_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 创建消费者消费消息
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端: " + msg);
        }

    }
}
复制代码

自定义消费者使用

如何自定义消费者进行消息消费?
在之前,我们都是采用默认的QueueingConsumer来创建一个消费者,之后再使用while循环来不停地取出消息,但是这种方式不是特别好,一般我们会自定义自己的Consumer,那么要实现自定义的Consumer有两种方式,一种是实现Consumer的接口,但是这种实现方式需要重写很多方法,另一种是继承DefaultConsumer,重写其中的

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.save";

        String msg = "Hello RabbitMQ! Send a Consumer message.";
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {
// 1. 创建一个ConnectionFactory,并且进行相关连接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.*";
        String queueName = "test_consumer_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 创建消费者消费消息
        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}
复制代码
public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----my consumer handle delivery----");
        System.out.println("consumerTag: " + consumerTag);
        System.out.println("envelope: " + envelope);
        System.out.println("properties: " + properties);
        System.out.println("body: " + new String(body));
    }
}
复制代码

消费端的限流策略

什么是消费端的限流?
假设一个场景,就是我们的RabbitMQ服务器有上万条未处理的消息,此时如果我们随便打开一个消费者客户端,会出现下面的情况,就是巨量的消息瞬间全部推送过来,但是我们的单个客户端无法同时处理这么多数据,就有可能造成服务器崩溃。
RabbitMQ提供了一种qos(Quality of Service 服务质量保证)功能,即在非自动确认消息(autoAck为false)的前提下,如果一定数目的消息(通过基于Consumer或者channel设置的Qos的值)未被确认前,不进行消费新的消息。
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)中的prefetchSize表示单个消息的大小,为0表示不限制单个消息的大小,prefetchCount会告诉RabbitMQ不要同时给一个消费者推送超过N个消息,即一旦有N个消息还没有Ack,则该Consumer就将block阻塞住,直到有消息被Ack,global表示是否将前两个参数的设置应用于channel,简单点说就是前两个限制是channel级别还是Consumer级别的,一般设置为false,表示Consumer级别(prefetchCount只在autoAck为false的情况下才会生效,在自动Ack的情况下是无效的)

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.save";

        String msg = "Hello RabbitMQ! Send a QOS message.";
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.*";
        String queueName = "test_qos_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 限流,记得将basicConsume方法中的autoAck的值设置为false
        channel.basicQos(0, 1, false);

        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
复制代码
public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----my consumer handle delivery----");
        System.out.println("consumerTag: " + consumerTag);
        System.out.println("envelope: " + envelope);
        System.out.println("properties: " + properties);
        System.out.println("body: " + new String(body));

        // 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true
        this.channel.basicAck(envelope.getDeliveryTag(), false);
    }
}
复制代码

消费端ACK与重回队列机制

消费端的手工ACK和NACK为什么会存在?

  1. 消费端在进行消息消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿,如果采用自动ACK的话就达不到需求
  2. 如果由于服务器宕机等严重问题,我们也需要手工进行ACK来保障消费端消费成功,因为消费者宕机后,Broker收不到ACK或者NACK,就会重新发送消息给消费端再次消费,因为在自动ACK的机制下Broker发送消息给消费者时,自动确认消息被处理完毕

消费端的重回队列机制

  1. 消费端重回队列是为了将没有处理成功的消息重新投递给Broker
  2. 一般在实际应用中,都会关闭重回队列,也就是将requeue设置为false
public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.save";

        for (int i = 0; i < 5; i++) {

            Map<String, Object> headers = new HashMap<>();
            headers.put("num", i);

            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();

            String msg = "Hello RabbitMQ! Send a ACK message." + i;
            channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
        }
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.*";
        String queueName = "test_ack_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 将autoAck设置为false,手工Ack确认
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
复制代码
public class MyConsumer extends DefaultConsumer {

    private final Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----my consumer handle delivery----");
        System.out.println("body: " + new String(body));

        if ((Integer) properties.getHeaders().get("num") == 0) {
            // 第三个参数requeue表示是否重回队列
            this.channel.basicNack(envelope.getDeliveryTag(), false, false);
        } else {
            // 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true
            this.channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}
复制代码

TTL消息详解

  1. TTL 是Time To Live的缩写,也就是生存时间
  2. RabbitMQ支持消息的过期时间,在消息发送的时候可以再Properties中指定expiration过期时间
  3. RabbitMQ支持队列的过期时间,从消息入队列开始计算,如果超过了队列设置的超时时间配置还没有被消费,该消息就会被自动清除

死信队列详解

死信队列 DLX Dead-Letter-Exchange

  1. 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
  2. DLX也是一个正常的Exchange,和一般的Exchange没有什么区别,它可以在任何队列上被指定(也就是需要设置队列的属性),这样的话只要这个队列中有死信就会被重新发布到DLX中
  3. 当设置了DLX的队列中有死信时,RabbitMQ就会自动将这个死信重新发布到设置的Exchange中去,从而被路由到另一个队列
  4. 可以监听这个队列中的消息做相应的处理,这个特性可以弥补RabbitMQ3.0版本以前支持的immediate参数的功能

消息变成死信的情况

  1. 消息被拒绝或消费失败(basicReject/basicNack)并且requeue为false(不重回队列)
  2. 消息TTL过期
  3. 队列达到最大长度

死信队列的设置
首先要设置死信队列的Exchange和Queue,然后进行绑定

  1. Exchange: dlx.exchange(名字可以任意取)
  2. Queue: dlx.queue(名字可以任意取)
  3. RoutingKey: # (为#表示任何消息都可以被路由到dlx.queue中)

然后再进行正常的交换机、队列声明和绑定,只不过需要再被设置死信队列的队列中加上一个参数:arguments.put("x-dead-letter-exchange", "dlx.exchange"),这样消息在过期、不重回队列、队列达到最大长度时被直接路由到死信队列中

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.save";

        for (int i = 0; i < 1; i++) {

            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .expiration("10000")
                    .contentEncoding("UTF-8")
                    .build();

            String msg = "Hello RabbitMQ! Send a ACK message.";
            channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
        }
    }
}
复制代码
public class Consumer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通过连接工厂创建一个连接
        Connection connection = connectionFactory.newConnection();

        // 3. 通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange、Queue、RoutingKey
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.*";
        String queueName = "test_dlx_queue";

        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "dlx.exchange");

        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, arguments);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 死信队列的声明
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");

        // 将autoAck设置为false,手工Ack确认
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
复制代码
public class MyConsumer extends DefaultConsumer {

    private final Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----my consumer handle delivery----");
        System.out.println("body: " + new String(body));
        // 消息的Ack确认,basicAck的第一个参数为消息的deliveryTag,第二个参数为是否批量签收,如果限制的消息个数大于1,可以设置为true
        this.channel.basicAck(envelope.getDeliveryTag(), false);
    }
}
复制代码