rabbitMQ–广播模式

这是我参与更文挑战的第6天,活动详情查看: 更文挑战

我们之前的rabbitMQ的模式都是不存在交换机的,直接发送到队列,将下来讲的是订阅模型,一次像多个消费者发消息

image.png
一个生产者发送消息到交换机,交换机发给绑定在自己上边的队列,消费者在从队列拿到消息消费,
X(Exchange):交换机接受生产者发送的消息,另一方面知道如何处理消息,,发给某个队列,还是发给所有的队列,或者是直接舍弃,取决于交换机是如何配置的,交换机只负责发送,而不去存储消息。

交换机分为几类

Publish/Subscribe:广播,将消息交给所有绑定到交换机的队列

Routing:定向,把消息交给符合指定routing key 的队列 

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
复制代码

订阅模型--Publish/Subscribe

image.png

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个消费者
  • 2) 每个消费者有自己的queue(队列)
  • 3) 每个队列都要绑定到Exchange(交换机)
  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 5) 交换机把消息发送给绑定过的所有队列
  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

生产者

生产者声明交换机,不在声明队列,消息发送到交换机,比在发送到队列

public class p1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
       // 声明exchange,指定类型为fanout
        channel.exchangeDeclare("Subscribe_exchange", "fanout");
      // 消息内容
        String message = "Hello_Subscribe";
        // 发布消息到Exchange
        channel.basicPublish("Subscribe_exchange", "", null, message.getBytes());
        System.out.println("生产者发送消息=:'" + message + "'");

        channel.close();
        connection.close();

    }
}
复制代码

消费者1

public class c1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("Subscribe_queue_1", false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind("Subscribe_queue_1", "Subscribe_exchange", "");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("c1消费消息: "+new String(body));
                //手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume("Subscribe_queue_1", false, consumer);
    }
}
复制代码

消费者2

public class c2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("Subscribe_queue_2", false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind("Subscribe_queue_2", "Subscribe_exchange", "");

        // 定义队列的消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("c2消费消息: "+new String(body));
                //手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        // 监听队列,自动返回完成
        channel.basicConsume("Subscribe_queue_2", false, consumer);
    }
}

复制代码

启东消费者,生产者发送一条消息,看输出

image.png

image.png

Routing--有选择的发送消息

订阅模式,在这个中我们可以做到不同的队列接受不同的消息,队列与交换机绑定必须指定,消息发送时也必须指定发送消息的routingKey
image.png

如上图所示生产者生产消息发送到交换机,交换机通过与rontingkley的匹配的队列发送消息。

生产者--分别发送三次不同的消息,匹配不同的routingkey

public class p {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明exchange,指定类型为fanout
        channel.exchangeDeclare("routing_exchange", "direct");
        // 消息内容
        //String message = "新增";
        //String message = "删除";
        String message = "更新";
         // 发布消息到Exchange
        //channel.basicPublish("routing_exchange", "insert", null, message.getBytes());
        //channel.basicPublish("routing_exchange", "delect", null, message.getBytes());
        channel.basicPublish("routing_exchange", "update", null, message.getBytes());
        System.out.println("生产者发送消息=:开始'" + message + "'");

        channel.close();
        connection.close();


    }
}
复制代码

消费者insert

public class insert {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("routing_queue_insert", false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind("routing_queue_insert", "routing_exchange", "insert");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("insert 接收消息 : "+new String(body));
                //手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume("routing_queue_insert", false, consumer);
    }
}

复制代码

消费者delect

public class delect {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("routing_queue_delect", false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind("routing_queue_delect", "routing_exchange", "delect");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("delect 接收消息 : "+new String(body));
                //手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume("routing_queue_delect", false, consumer);
    }
}

复制代码

消费者update

public class update {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("routing_queue_update", false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind("routing_queue_update", "routing_exchange", "update");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("delect 接收消息 : "+new String(body));
                //手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume("routing_queue_update", false, consumer);
    }
}

复制代码

看控制台输出,可以看到,绑定了不同routingkey的收到不同的消息,每个队列可以有很多个routingkey

image.png

image.png

topic--

不同于Direct的交换机,topic匹配可以通过通配符

image.png

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割

*(星号)可以正好代替一个词。
# (hash) 可以代替零个或多个单词。
复制代码

生产者

public class p {
        public static void main(String[] args) throws IOException, TimeoutException {

            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //2.设置参数
            connectionFactory.setHost("192.168.145.3");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/zhaojin");
            connectionFactory.setUsername("zhaojin");
            connectionFactory.setPassword("zhaojin");
            //3.创建连接
            Connection connection = connectionFactory.newConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为fanout
            channel.exchangeDeclare("topic_exchange", "topic");
            // 消息内容
            String message = "新增";
            //String message = "删除";
            //String message = "更新";
            // 发布消息到Exchange
            channel.basicPublish("topic_exchange", "goods.insert", null, message.getBytes());
            //channel.basicPublish("topic_exchange", "goods.delect", null, message.getBytes());
           // channel.basicPublish("topic_exchange", "goods.update", null, message.getBytes());
            System.out.println("生产者发送消息=:开始'" + message + "'");

            channel.close();
            connection.close();


        }
    }
复制代码

消费者1 只接收insert和delect

public class c1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("topic_queue_1", false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind("topic_queue_1", "topic_exchange", "goods.insert");
        channel.queueBind("topic_queue_1", "topic_exchange", "goods.delect");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新增删除 接收消息 : "+new String(body));
                //手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume("topic_queue_1", false, consumer);
    }
}
复制代码

消费者2 只要匹配到 goods.全拿下

public class c2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置参数
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        //3.创建连接
        Connection connection = connectionFactory.newConnection();
        // 获取通道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("topic_queue_2", false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind("topic_queue_2", "topic_exchange", "goods.*");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("商品 接收消息 : "+new String(body));
                //手动ack
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列,自动返回完成
        channel.basicConsume("topic_queue_2", false, consumer);
    }
}

复制代码

运行发送推广三条消息,看控制台输出

image.png

image.png

完美、