RabbitMQ(三)入门——RabbitMQ的五种模式

上一篇文章:RabbitMQ 入门 (二)—— 创建一个基本的消息队列
本文所示代码已上传到github javaWithoutSmoke/rabbitmq-demo

RabbitMQ 入门 —— RabbitMQ的五种模式和四种交换机

六种消息模式

而在的 RabbitMQ 中,出现了六种消息传播模式: RabbitMQ 官网说明的六种模式

  • Simple Work Queue (简单工作队列):也就是常说的点对点模式,一条消息由一个消费者进行消费。(当有多个消费者时,默认使用轮训机制把消息分配给消费者)。
  • Work Queues (工作队列):也叫公平队列,能者多劳的消息队列模型。队列必须接收到来自消费者的 手动ack 才可以继续往消费者发送消息。
  • Publish/Subscribe (发布订阅模式):一条消息被多个消费者消费。
  • Routing(路由模式):有选择的接收消息。
  • Topics (主题模式):通过一定的规则来选择性的接收消息
  • RPC 模式:发布者发布消息,并且通过 RPC 方式等待结果。目前这个应该场景少,而且代码也较为复杂,本章不做细讲。
  • 注意:官网最后有 Publisher Confirms 为消息确认机制。指的是生产者如何发送可靠的消息。

RabbitMQ 的四种 Exchange

在了解这些消息模式的时候,引入了一个概念 Exchange(交换机)

发布订阅里面有对这个概念做解释:

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。
相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换机类型定义 。

而 Exchange 的类型有下面四种:

  • direct(直连交换机):将队列绑定到交换机,消息的 routeKey 需要与队列绑定的 routeKey 相同。
  • fanout (扇形交换机):不处理 routeKey ,直接把消息转发到与其绑定的所有队列中。
  • topic(主题交换机):根据一定的规则,根据 routeKey 把消息转发到符合规则的队列中,其中 # 用于匹配符合一个或者多个词(范围更广), * 用于匹配一个词。
  • headers (头部交换机):根据消息的 headers 转发消息而不是根据 routeKey 来转发消息, 其中 header 是一个 Map,也就意味着不仅可以匹配字符串类型,也可以匹配其他类型数据。 规则可以分为所有键值对匹配或者单一键值对匹配。

其实在这里我们差不多可以得出消息模型与 Exchange 的关系比较:

消息模式 交换机
Simple Work Queue (简单工作队列),Work Queues (工作队列) 空交换机
Publish/Subscribe (发布订阅模式) fanout (扇形交换机)
Routing(路由模式) direct (直连交换机)
Topics(主题模式) topic(主题交换机)

Simple Work Queue (简单工作队列)

直接看上一篇文章即可 RabbitMQ 入门 (二)—— 创建一个基本的消息队列

Work Queue (工作队列)

我们到RabbitMQ 里面新增一个队列名为 work-queue

在这里插入图片描述
工作队列

  • 生产者
/**
 * 生产者
 */
public class Producer {

    private static final String QUEUE_NAME = "work-queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        while (true) {
//            System.out.println("请输入消息:");
            Scanner scanner = new Scanner(System.in);
            //1、创建连接
            Connection connection = RabbitMQConnection.getConnection();
            //2、创建通道
            Channel channel = connection.createChannel();
            //3、发送消息,这里使用Scanner通过控制台输入的内容来作为消息
            //nextLine() 以回车结束当前的输入,会接收空格
            String message = scanner.nextLine();
            /*
            参数说明:
            exchange:当期尚未指定exchange,又不能为null,这里用空字符串表示为一个默认的exchange或者匿名的exchange
            routingKey: 就是队列名称
            props:消息的额外属性
            body: 消息主体
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("消息已被发送:" + message);
            //发送完记得关闭连接
            channel.close();
            connection.close();
        }
    }
}
复制代码
  • 消费者
    工作队列要确定的是有多个消费者。这里我们设置 消费者1 设置处理消息要 1s , 消费者2 处理消息要 3s

/**
 * 消费者1
 */
public class Consumer1{

    private static final String QUEUE_NAME = "work-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2、创建通道
        Channel channel = connection.createChannel();

        // 3、同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            //接收到一个消息时会使用这个方法,这里进行重写,用来输出接收到的消息
            /*
            参数说明:
            consumerTag:消费者关联的标签
            envelope: 消息包数据
            BasicProperties:消息的额外属性
            body: 消息主体,当前为二进制
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    // 模拟处理请求耗时较长的情况
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String messageBody = new String(body);
                System.out.println("消费者消费消息:"+messageBody);
                // 手动确认,
                // 第一个参数: 默认的消息的唯一标志
                // 第二个参数:是否批量.当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 4、添加监听,改成手动ack
        channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
    }
}

复制代码
  • 消费者2

/**
 * 消费者2
 */
public class Consumer2 {

    private static final String QUEUE_NAME = "work-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1、创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2、创建通道
        Channel channel = connection.createChannel();

        // 3、同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            //接收到一个消息时会使用这个方法,这里进行重写,用来输出接收到的消息
            /*
            参数说明:
            consumerTag:消费者关联的标签
            envelope: 消息包数据
            BasicProperties:消息的额外属性
            body: 消息主体,当前为二进制
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    // 模拟处理请求耗时较长的情况
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String messageBody = new String(body);
                System.out.println("消费者消费消息:"+messageBody);
                // 手动确认,
                // 第一个参数: 默认的消息的唯一标志
                // 第二个参数:是否批量.当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 4、添加监听,改成手动ack
        channel.basicConsume(QUEUE_NAME,false, defaultConsumer);
    }
}
复制代码

先启动两个消费者,再启动生产者,可以看到,消费者1 处理比较快,所以有多个消息会给他处理。

在这里插入图片描述
在这里插入图片描述

Publish/Subscribe (发布订阅模式)

我们先创建两个队列 subscribe1 和 subscribe2 ,

在这里插入图片描述

还有一个 fanout 类型的交换机 Publish-Subscribe

在这里插入图片描述

然后把 Exchange 绑定上两个队列,不绑定的话消息没法投递到队列中

  • 生产者
/**
* 生产者
*/
public class Producer {

   private static final String EXCHANGE_NAME = "Publish-Subscribe";

   public static void main(String[] args) throws IOException, TimeoutException {
       for (int i = 1; i < 7; i++) {
           Connection connection = RabbitMQConnection.getConnection();
           Channel channel = connection.createChannel();
           // 绑定交换机
           channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
           channel.basicPublish(EXCHANGE_NAME, "", null, String.valueOf(i).getBytes());
           System.out.println("消息已被发送:" + i);
           channel.close();
           connection.close();
       }
   }
}
复制代码
  • 消费者1

/**
 * 消费者1
 */
public class Consumer1{

    private static final String QUEUE_NAME = "subscribe1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String messageBody = new String(body);
                System.out.println("消费者1消费消息:"+messageBody);
            }
        };
        channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
    }
}

复制代码
  • 消费者2

/**
 * 消费者subscribe2
 */
public class Consumer2 {

    private static final String QUEUE_NAME = "subscribe2";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String messageBody = new String(body);
                System.out.println("消费者2消费消息:"+messageBody);
            }
        };
        channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
    }
}

复制代码

把消费者启动后,再启动生产者。可以看到两个消费者各自都消费到生产者的所有消息。

在这里插入图片描述
在这里插入图片描述

关于发布订阅的思考

  1. 发布订阅模式其实很像我们平常使用微信公众号时,公众号发布文章,所有订阅该公众号的粉丝都可以收到公众号发送的消息。但是我们关注公众号,公众号并不会把数据给我们传递过来。那么在这里也一样吗?

    场景一:
    我们现在先启动生产者,把消息生产完,再启动消费者。发现两个消费者依旧能够消费到已经生产的消息。

    在这里插入图片描述

    其实生产者发布消息的时候,是把消息投递到了交换机中,而交换机又帮我们把消息投递到两个队列中,那么及时我的生产者已经停止了,但是此时我的队列中已经有了消息。

    在这里插入图片描述

    所以我们一旦启动消费者去监听队列,就能够正常消费数据。

    场景二:
    我们先把 消费者2 与 Exchange 的关系解绑掉,然后再启动 消费者1 和 生产者。

    在这里插入图片描述

    这时候我们把 subscribe2 队列与交换机 Publish-Subscribe 关系再重新绑定后,启动 消费者2
    这时候可以看到,消费者2没有数据可以消费。
    这时候我们回到题目,其实这个场景更符合题目所说的场景,因为这里粉丝看到接收到推送,实际上是队消费者已经完成消费了。而新订阅的用户,相当于是给已存在的 Exchagne 加上了一个新的队列,必须有新的消息进行投递,新的用户才能接收到。

Routing(路由模式)

创建好直连交换机 Routing 和 消费者队列,三个队列 RoutingConsumer1,RoutingConsumer3,RoutingConsumer3

在这里插入图片描述

然后绑定关系:注意下面的 Routing Key,为什么要这样设置呢?因为我想知道当 Routing key 相同时,一个消息会被投递到几个队列中?

在这里插入图片描述

  • 生产者

/**
 * 生产者
 */
public class Producer {

    private static final String EXCHANGE_NAME = "Routing";

    public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = RabbitMQConnection.getConnection();
            Channel channel = connection.createChannel();
            // 绑定直连交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            channel.basicPublish(EXCHANGE_NAME, "key1", null, String.valueOf("key1").getBytes());
            channel.basicPublish(EXCHANGE_NAME, "key2", null, String.valueOf("key2").getBytes());
            System.out.println("消息已发送");
            channel.close();
            connection.close();
    }
}
复制代码
  • 消费者1(其他两个消费者代码几乎相同,改下序号即可)

public class RoutingConsumer1 {

    private static final String QUEUE_NAME = "RoutingConsumer1";

    private static final String EXCHANGE_NAME = "Routing";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String messageBody = new String(body);
                System.out.println("消费者1消费消息:"+messageBody);
            }
        };
        channel.basicConsume(QUEUE_NAME,true, defaultConsumer);
    }
}

复制代码

先把三个消费者启动,再启动生产者。发现消费者1和2都消费了key1,而消费者3输出了key3

结论出来了:
当 Routing 消息模式进行传输消息时,Direct Exchange 会把消息投递到符合的 Routing Key 中。

关于直连交换机的思考

  1. 当 Routing Key 只有部分相同时,是否会投递呢?
    我们只要让生产者投递一个 Routing Key = key的值就行了

       channel.basicPublish(EXCHANGE_NAME, "key", null, String.valueOf("key").getBytes());
    复制代码

    结果发现没有消费者消费到。也就是说 直连情况下不存在这种部分匹配的情况。

Topics (主题模式)

当我们需要进行对 Routing Key 部分匹配时,这时候主题模式就上场了。
topics模式中要注意,三种符号

符号 作用
. 用来分割单词
* 匹配一个单词
# 匹配一个或多个单词

结果如下:
Producer 生产消息的 routeKey 为 java.without.smoke

序号 routeKey 是否消费
TopicConsumer1 java.without.smoke Y
TopicConsumer2 java.* N
TopicConsumer3 java.without.* Y
TopicConsumer4 java.# Y

Header模式

主要是在 Channel 里面设置好header属性,以及匹配方式。

  • 生产者
 public static Map<String, Object> map = new HashMap<>();
 map = new HashMap<>();
 map.put("ID", 1);
 map.put("Name", "aaaa");

 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(map).build();
 channel.basicPublish(EXCHANGE_NAME, "java.without.smoke", props, String.valueOf("key1").getBytes());
复制代码
  • 消费者
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(map).build();
        channel.queueBind(QUEUE_NAME, Producer.EXCHANGE_NAME, "java.*", props.getHeaders());
复制代码

总结

  1. RabbitMQ 给了这么多种消息模式给我们,其中 主题模式 在使用上可以达到 路由模式,和 发布订阅模式的效果。各种模式有各自的特点。在一个队列只有一个消费者的情况下,主题模式也可以达到点对点模式的效果。

    场景 方案
    消息固定投放到一个队列,且无需多个消费端加快消费 使用点对点模式
    消息固定投放到一个队列,需多个消费端加快消费 使用工作队列模式
    按照一定规则,将消息投递到多个队列 topic模式
  2. RabbitMQ 消息的传递是由『 生产者 -> 交换机 -> 队列 -> 消费者 』这么一个模式,只不过点对点模式和工作队列模式我们可以理解成是一个匿名的交换机进行投递队列。

  3. Exchange交换机,其实很像我们在做反向代理时用的 nginx 服务器,nginx 复则请求的转发, Exchagne 负责消息的转发。