RabbitMQClient怎么用?

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!

RabbitMQ client 的使用

连接 RabbitMQ

private static Channel _getChannelBySetter() throws IOException, TimeoutException {
    ConnectionFactory factory = _getConnectionFactoryBySetter();
    Connection connection = factory.newConnection();
    // channel 不建议线程共享, 多线程使用 Channel 可能造成错误的通信帧交错, 也会影响发送方确认的机制
    return channel = connection.createChannel(); // 可能返回 null
    // Optional<Channel> optionalChannel = connection.openChannel(); // optional 封装 createChannel 的结果

}

private static Channel _getChannelByUri() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
    ConnectionFactory factory = _getConnectionFactoryByUri();
    Connection connection = factory.newConnection();
    return connection.createChannel();
}

private static ConnectionFactory _getConnectionFactoryBySetter() {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("root");
    factory.setPassword("root");
    factory.setHost("localhost");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    return factory;
}

private static ConnectionFactory _getConnectionFactoryByUri() throws NoSuchAlgorithmException, KeyManagementException, URISyntaxException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("root");
    factory.setPassword("root");
    factory.setHost("localhost");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUri("amqp://root:root@localhost:5672//");
    return factory;
}
复制代码

connection 或者channel 继承了 isOpen() 可以判断 connection 或 channel 是否正常建立,但是内部使用 Synchronized,,会造成线程竞争
而且依赖的是内部的一个状态值,随时可能被修改。

public boolean isOpen() {
    synchronized(this.monitor) {
        return this.shutdownCause == null;
    }
}
复制代码

不能依赖该方法的结果。 作后续处理。除非业务代码也使用锁。

比如像下面的代码,就可能出问题。

if (channel.isOpen()) {
  doSomeThings();// 在中间的过程中,channel 的状态完全有可能被改变。
	channel.basicQos(1);
}
复制代码

准备交换器和队列

按照之前的步骤,我们需要声明交换器和队列,并将其通过路由键绑定在一起。

生产建议不要通过代码声明持久化的交换器和队列。毕竟只需要声明一次,多次声明浪费时间,二是不易管理,不易确保正确绑定交换器和队列。

// 声明持久化、非自动删除、绑定类型为 direct 的交换器
channel.exchangeDeclare("exchange_demo", BuiltinExchangeType.DIRECT, true);
// 声明持久化、排他的、非自动删除的队列
channel.queueDeclare("queue_demo", true, true, false, new HashMap<>());
// 通过 “demo” 将队列和交换器绑定
channel.queueBind("queue_demo", "exchange_demo", "demo");
复制代码

声明的动作时幂等的,只要交换器或队列的参数完全匹配。否则就会报错。

声明交换器 exchangeDeclare
Exchange.DeclareOk exchangeDeclare(String exchange,
    BuiltinExchangeType type,
    boolean durable,
    boolean autoDelete,
    boolean internal,
    Map<String, Object> arguments) throws IOException;
复制代码

完整的声明方法。同时存在许多重载的方法,大部分都是以上参数的缺省值。

  • exchange:交换器名称
  • type:交换器类型,BuiltinExchangeType 枚举值列出来四种类型。也存在 String 的重载方法。
  • durable:是否持久化。为 true,则会将交换器的元数据落到磁盘。重启不丢失。
  • autoDelete:是否自动删除。true 为自动删除。自动删除的前提是,至少有过一个队列或交换器与之绑定,之后都解绑了。
  • internal:是否内置的。true 的话,则客户端无法发消息到该交换器。只能交换器路由到交换器的方式。
  • arguments:更多参数。

更多重载方法:

Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete,
    Map<String, Object> arguments) throws IOException;

Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;

Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
复制代码

还有一种类似于异步的,不等待 Broker 返回信息,提前退出到调用方法。

void exchangeDeclareNoWait(String exchange,
    BuiltinExchangeType type,
    boolean durable,
    boolean autoDelete,
    boolean internal,
    Map<String, Object> arguments) throws IOException;
复制代码

可以看到,与上面的方法不同, noWait 是没有返回值的。

Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
复制代码

主要用于检测交换器是否存在。存在则正常返回,否则排除异常:404 channel exception,同时 Channel 被关闭。

交换器的删除:

Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
复制代码

ifUnused 标识是否在交换器没被使用的情况下删除。true 则表示只在没有被使用情况下删除;反之则直接删除。

声明队列 QueueDeclare

QueueDeclare 的重载方法很少,不过存在和交换器类似的特殊方法。

Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
复制代码

无参数的方法,会声明一个由 RabbitMQ 自动命名的、排他的、自动删除、非持久化的队列。队列的名称,可以通过返回值获得: DeclareOk.getQueue();

  • queue:队列名称
  • durable:是否持久化。true 则为持久化,队列元数据落盘,重启不丢失。
  • exclusive:是否排他的。true 则仅对首次声明它的 Connection 可见,并在连接断开时自动删除。也就是说,一个 Connection 的多个 Channel 都是可见的;首次 指一个连接如果已经声明了排他队列,则不允许其他连接建立同名的排他队列。注意:即使声明为持久化,只要连接断开,就会自动删除。
  • autoDelete:是否自动删除。含义与交换器的参数类似,至少有一个消费者连接到这个队列,之后全部断开,才会自动删除,
  • arguments:更多参数,诸如消息存活时间、过期时间、最大长度、死信队列相关参数、优先级等等。

生产者和消费者都可以声明队列。但如果消费者已经在 Channel 上订阅了队列,则无法声明。

必须取消订阅,将 Channel 置为 “传输”模式。

void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                        Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
复制代码

删除队列

Queue.DeleteOk queueDelete(String queue) throws IOException;

Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
复制代码

前两个参数类同交换器的,ifEmpty 指是否在队列为空(没有消息积压)时才删除。

清空队列消息

Queue.PurgeOk queuePurge(String queue) throws IOException;
复制代码
交换器、队列绑定
queueBind
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
复制代码
  • queue:队列名称
  • exchange:交换器名称
  • routingKey:路由键
  • arguments:更多参数。

解绑:

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
复制代码
exchangeBind

交换机和交换机的绑定。参数类同 queueDeclare ,消息从 source 到 destination。

Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;

Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
复制代码

发送消息

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;
复制代码
  • exchange:交换器名称,为空则投递到默认交换器。任何一个队列都会和默认交换器绑定,路由键就是队列名称。

  • routingKey:路由键

  • props:消息的基本属性集。属性集包含以下14项:

    public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
            private String contentType; //消息体文本类型
            private String contentEncoding;// 消息体编码
            private Map<String,Object> headers; // 消息头
            private Integer deliveryMode; // 投递模式,1 为消息不持久化,2 为消息持久化
            private Integer priority; // 优先级
            private String correlationId; // 关联ID
            private String replyTo; // 响应消息返回地址
            private String expiration;// 过期时间
            private String messageId;// 消息标识
            private Date timestamp;// 发送消息时的时间戳
            private String type;// 类型
            private String userId;//  用户ID
            private String appId;// 生成消息的应用程序标识符
            private String clusterId;// 集群ID
    }
    复制代码

    MessageProperties 定义了一些常用的属性集。

  • body:消息体

  • mandatory:true:如果 exchange 根据自身类型和消息 routeKey 无法找到一个符合条件的 queue,那么会调用 basic.return 方法将消息返还给生产者。false:出现上述情形 broker 会直接将消息扔掉。

  • immediate:true:如果exchange在将消息 route 到 queue(s) 时发现对应的 queue 上没有消费者,那么这条消息不会放入队列中。当与消息 routeKey 关联的所有 queue(一个或多个) 都没有消费者时,该消息会通过basic.return 方法返还给生产者。3.0 以后废弃。

消费消息

RabbitMQ 消费消息的模式有两种:推(push)和拉(pull)。推模式使用 Basic.Consume 消费,拉使用 Basic.Get 消费。

推模式

通过持续订阅的方式消费消息,相关类:

com.rabbitmq.client.Consumer
com.rabbitmq.client.DefaultConsumer
复制代码

消费消息一般通过实现 Consumer 接口或继承 DefaultConsumer 类实现。不同的订阅采用不同的消费者标签(consumerTag)区分。

channel.basicConsume(QUEUE_DEMO, false, "demo_consumer", new DefaultConsumer(channel) {
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException
    {
        String exchange = envelope.getExchange();
        String routingKey = envelope.getRoutingKey();
        System.out.println("from " + exchange + " by " + routingKey + " get message :" + new String(body));
        long deliveryTag = envelope.getDeliveryTag();
        channel.basicAck(deliveryTag, false);
    }
});
复制代码

上述示例中,将自动确定设置为 false, 防止在消息消费失败的情况下,消息直接丢失。

String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
复制代码

这是最全参数的 basicConsumer 方法。其他则存在许多重载的方法。

  • queue:队列名称

  • autoAck 是否自动确认

  • consumerTag:消费者标签

  • noLocal:为true,则不能在同一个Connection 中 生产者发送的消息发送给消费者。

  • exclusive:是否排他的消费者

  • arguments:更多参数

  • deliverCallback:消息送达时的回调函数

  • cancelCallback:消费者取消的回调函数

  • shutdownSignalCallback : 当 connection 或 channel 关闭时的回调函数

  • callback:消费者的回调函数,相当于上面三个回调的包装类,甚至更多。

    image-20201010181443471

方法返回 consumerTag。

image-20201010180719617

拉模式

拉模式通过 GetResponse basicGet(String queue, boolean autoAck) throws IOException;获取单条信息。

注意,虽然推送消息的个数搜到 Basic.Qos 的限制,即一个 Channel 最多可以同时消费几条消息。但是如果要只获取单条,建议使用 basicGet。

但是不能通过循环调用 basicGet 来替代 basicConsumer,影响性能。

消息确认与拒绝

消息确认

消息确认机制是为了保证消息可靠地到达消费者。消费者订阅队列时u,通过 autoAck 参数指定是否自动确认。

为 True时,当 Broker 将消息发送给消费者时,就会自动确认,然后从内存中删除消息;

反之,只有当消费者明确地返回确认信号时,才会执行删除。

注意:上面的删除,并不一定是即刻删除,而是标记删除,等待真正的物理删除。

如果 Broker 一直没有受到确认信号,且消费者已经断开连接,则该消息会重新入队。

是否重新投递的唯一标准只有接收到改消息的消费者是否已经断开连接。

消息拒绝
void basicReject(long deliveryTag, boolean requeue) throws IOException;
复制代码

deliveryTag 可以看做是消息的标识,是一个 64位的长整型。

requeue 表示是否重新入队。

注意,该方法一次只能拒绝一条。如果想批量拒绝,可以使用

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
        throws IOException;
复制代码

multiple 为 true,则拒绝 deliveryTag 及之前所有未确认的消息;反之则只拒绝档条。

关闭连接

channel.close();
connection.close(); // 关闭时会自动关闭该连接上的 channel。
复制代码

两种都有三种状态:

  • Open:开启状态
  • Closing:正在关闭。等待注册的一些关闭操作的执行(类似于监听器或者钩子)。
  • closed:已关闭。

通过实现 ShutdownNotifier 类,两种都可以使用该类的API来注册或者移除关闭监听器。

image-20201010183319203

getCloseReson() 可以获取关闭原因。

public interface ShutdownListener extends EventListener {
    void shutdownCompleted(ShutdownSignalException cause);
}
复制代码

参考

《RabbitMQ 实战指南》