这是我参与11月更文挑战的第1天,活动详情查看:2021最后一次更文挑战
一、核心组成
1.1、核心组成部分
Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server
Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手
Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange
Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)
Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。
1.2、RabbitMQ 整体架构
1.3、运行流程
- 1、生产者产生消息数据
- 2、经过序列化只会指定交换机和路由信息
- 3、将消息发送到Broker 中
- 4、消费者根据路由来消费指定队列消息
二、消息模式入门案例
RabbitMQ 它提供了六种消息模型,但是第 6 种其实是 RPC,并非 MQ,不用专门学习,剩下的5种,3、4、5这三种都是订阅模型,只不过进行路由的方式不同。第一种的简单模式我们已经在初始篇中学习过了,在这小节中,我们来学习其他四种消息的使用入门。
2.1 Work Queues
工作队列或者竞争消费者模式。
与简单模式不同的是存在多个队列来消费这些消息。也就是存在多个消费者,同时一条消息只能被一个工作队列消费。这还是很好理解的。
那么 RabbitMQ 如何确保消息发送到哪个消费者呢?这就有两种发送策略:
- 轮询发送:一个消费者一条,按均分配;
- 公平发送:根据消费者的消费能力进行公平分发,按劳分配;
2.1.1 轮询发送
默认就是轮询发送。
生产者代码:
/**
* 工作队列---轮询
* @Author xiaolei
* @Date 2021/10/28 11:21
**/
public class ProducerRobinTest {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("111111");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("producer");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
//5、申明 queue 存储消息
/**
* 如果队列不存在,则会创建,不允许相同topic 存在
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,
* @params4: autoDelete 是否自动删除
*/
channel.queueDeclare("queue1", false, false, false, null);
// 6、发送消息
for (int i = 1; i <=10; i++) {
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("","queue1",null,("潇雷挺帅,说第"+i+"遍。").getBytes());
Thread.sleep(1000);
}
// 7、关闭连接
channel.close();
}
}
复制代码
消费者代码:两个消费者一致
public class RobinWork1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("111111");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("consumer");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
DeliverCallback deliverCallback =(String a, Delivery b)->{
String message = new String(b.getBody());
System.out.println("work1"+message);
};
CancelCallback cancelCallback =(String a)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* @params1: 消费哪个队列
* @params2:消费成功之后是否要自动应答 true代表自动应答 ,flase代表手动应答。
* @params3: 消费者消费成功的回调
* @params4: 消费者消费失败的回调
*/
channel.basicConsume("queue1",true,deliverCallback,cancelCallback);
}
}
复制代码
打印结果是按均分配的,
2.1.2 公平发送
该策略是根据消费者处理消息的能力不同,存在处理慢的问题时候,就采用能者多劳的模式处理。 “厉害的多干点” 。
要开启该策略,就需要消费者开启手动应答,关闭自动应答。而
关闭自动应答的代码:
channel.basicConsume("queue1",false,deliverCallback,cancelCallback);
复制代码
开启手动应答的代码:
channel.basicAck(b.getEnvelope().getDeliveryTag(),false);
复制代码
原理是消费者要告知MQ,在未处理完当前消息前,不要发新的消息过来,每消费完一次就应答一次。
消费者代码都一样,不同的是给其中一个消费者sleep 的时间久一点就好。
public class FairWork2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("111111");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("consumer2");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
//声明在一条消息被确认消费前,不会再发给该消费者另外的消息
channel.basicQos(1);
DeliverCallback deliverCallback =(String a, Delivery b)->{
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(b.getBody());
System.out.println("work2"+message);
// 手动应答消息 false 代表单个应答 true:代表批量应答,是以通道未单位的
channel.basicAck(b.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback =(String a)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* @params1: 消费哪个队列
* @params2:消费成功之后是否要自动应答 true代表自动应答 ,flase代表手动应答。
* @params3: 消费者消费成功的回调
* @params4: 消费者消费失败的回调
*/
channel.basicConsume("queue2",false,deliverCallback,cancelCallback);
}
}
复制代码
在写测试类的时候,如果channel没有很好的关闭连接,可以看到这边有多余的消费者存在,将这些线程关闭即可。保证工作线程只有自己。
2.2 发布订阅模式(fanout)
发布订阅模式的结构如图:
简单模式和工作队列模式本质是一样的,只有一个 queue。message 发送给 exchange 后,exchange 判断为 direct 模式后,就会把message 转发到绑定的 queue。而且一条消息只能发送给 1个consumer,发布订阅模式存在多个队列来共同消费数据。
该模式下:
- 1、发布者只创建 type 为 fanout 的命名 exchange;
- 2、consumer 根据需要创建自己的 queue,并且连接到 publisher 创建的 exchange 中,实施exchange与 queue 的绑定;如果 queue 不需要持久化,则可用临时 queue。
- 3、发布者发送 message,message 到达 exchange 后,exchange 判断为 fanout 模式,就直接把 message 发送给与自己绑定的 queue 中
- 4、每个queue将消息分发给自己连接的 consumer
2.2.1 交换机四种类型:
发送消息 rabbitmq 它一定有个 交换机,默认不指定的话为下面这个。
-
direct :处理路由键,需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
-
topic: 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“ ”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc. ” 只会匹配到“abc.def”。
-
headers:
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
-
fanout : 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
2.2.2 发布订阅模式测试
生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("test");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("111111");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("producer");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
/**
* 5、制定交换机的模式 fanout、名称 exchange1
*/
channel.exchangeDeclare("exchange1","fanout");
// 6、发送消息
for (int i = 1; i <=20; i++) {
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("exchange1","",null,("潇雷挺帅,说第"+i+"遍。").getBytes());
System.out.println("发送第"+i);
}
}
}
复制代码
消费者:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("test");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("consumer2");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
//声明在一条消息被确认消费前,不会再发给该消费者另外的消息
channel.basicQos(1);
DeliverCallback deliverCallback =(String a, Delivery b)->{
String message = new String(b.getBody());
System.out.println("work2"+message);
};
CancelCallback cancelCallback =(String a)->{
System.out.println("消息消费被中断");
};
channel.queueDeclare("queue2",false,false,true,null);
channel.queueBind("queue2","exchange1","");
/**
* 消费者消费消息
* @params1: 消费哪个队列
* @params2:消费成功之后是否要自动应答 true代表自动应答 ,flase代表手动应答。
* @params3: 消费者消费成功的回调
* @params4: 消费者消费失败的回调
*/
channel.basicConsume("queue2",true,deliverCallback,cancelCallback);
}
}
复制代码
消费者2 队列名字不一样,其他都一样,此时能看到 exchange1 下面bind 的两个队列。
2.3 路由模式(direct)
Direct 模式是 fanout模式的一种,新增加了路由key 的选择。
结构如下:
设置路由模式,然后绑定对应的 key 之后:
- routing-key 为 error 的 message 只会发送到 Q1;
- routing-key 为 info、error、warning 的消息会发送到 Q2,而其他消息都会忽略丢弃。
- 其中的 error 被所有消费者都关注了,那么它其实就相当于发布订阅模式,它的消息都会发送到consumer1和consumer2.
生产者指定路由:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("test");
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("producer");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
/**
* 5、制定交换机的模式 fanout、名称 exchange1
*/
channel.exchangeDeclare("exchange2","direct");
// 6、发送消息
for (int i = 1; i <=20; i++) {
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
if(i%2==0){
channel.basicPublish("exchange2","error",null,("潇雷挺帅,说第"+i+"遍。").getBytes());
}else{
channel.basicPublish("exchange2","info",null,("潇雷挺帅,说第"+i+"遍。").getBytes());
}
System.out.println("发送第"+i);
}
}
}
复制代码
消费者指定路由:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("test");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("consumer2");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
//声明在一条消息被确认消费前,不会再发给该消费者另外的消息
channel.basicQos(1);
DeliverCallback deliverCallback =(String a, Delivery b)->{
String message = new String(b.getBody());
System.out.println("work2"+message);
};
CancelCallback cancelCallback =(String a)->{
System.out.println("消息消费被中断");
};
channel.queueDeclare("queue2",false,false,true,null);
channel.queueBind("queue2","exchange2","error");
/**
* 消费者消费消息
* @params1: 消费哪个队列
* @params2:消费成功之后是否要自动应答 true代表自动应答 ,flase代表手动应答。
* @params3: 消费者消费成功的回调
* @params4: 消费者消费失败的回调
*/
channel.basicConsume("queue2",true,deliverCallback,cancelCallback);
}
}
复制代码
2.4 主题模式(Topic)
主题模式可以看作路由模式的拓展,它增加了路由key 的模式。就像我们的模糊匹配。
发送到 topic exchange 的 routing-key 必须是有规则的字符串,例如以 " . " 分割,并且每个分隔符内的字符串带有特征,例如 “xiaolei.orange”,它的最大字符长度最大为 255 字节。
topic 其实是与 direct 模式很相似,对于 bind keys ,topic 模式多了两个特征:
- “ * ” 星号可以替代任意一个字符
- “ # ” 可以替代 0 到多个字符。
理解了这个,我们来做个案例,例如:一批影片里面有 爱国片、动作片和喜剧片。爱国片、动作片里面有吴京;爱国片、喜剧片里面有沈腾。
- 那么有批粉丝是吴京的粉丝,那他就找出所有吴京的影片,路由规则为 “*. 吴京”或"#.吴京";
- 有粉丝只看爱国片里的吴京,对动作片不感兴趣,路由规则为 “爱国.吴京”;
生产者:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("test");
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("producer");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
/**
* 5、制定交换机的模式 fanout、名称 exchange1
*/
channel.exchangeDeclare("exchange3","topic");
// 6、发送消息
for (int i = 1; i <=40; i++) {
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
if(i%4==0){
channel.basicPublish("exchange3","爱国.吴京",null,("爱国.吴京,说第"+i+"遍。").getBytes());
}else if(i%4 ==1){
channel.basicPublish("exchange3","爱国.沈腾",null,("爱国.沈腾,说第"+i+"遍。").getBytes());
}else if(i%4 ==2){
System.out.println("ggg");
channel.basicPublish("exchange3","动作.吴京",null,("动作.吴京,说第"+i+"遍。").getBytes());
}else if(i%4 ==3){
channel.basicPublish("exchange3","喜剧.沈腾",null,("喜剧.沈腾,说第"+i+"遍。").getBytes());
}
System.out.println("发送第"+i);
}
}
}
复制代码
消费者:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接属性
connectionFactory.setHost("192.168.81.102");
connectionFactory.setPort(5672);
connectionFactory.setUsername("test");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("test");
//3、从连接工厂中获取连接
Connection connection = connectionFactory.newConnection("consumer2");
//4、从连接中获取通道 channel
Channel channel = connection.createChannel();
//声明在一条消息被确认消费前,不会再发给该消费者另外的消息
channel.basicQos(1);
DeliverCallback deliverCallback =(String a, Delivery b)->{
String message = new String(b.getBody());
System.out.println("work2"+message);
};
CancelCallback cancelCallback =(String a)->{
System.out.println("消息消费被中断");
};
channel.queueDeclare("queue2",false,false,true,null);
channel.queueBind("queue2","exchange3","#.吴京");
/**
* 消费者消费消息
* @params1: 消费哪个队列
* @params2:消费成功之后是否要自动应答 true代表自动应答 ,flase代表手动应答。
* @params3: 消费者消费成功的回调
* @params4: 消费者消费失败的回调
*/
channel.basicConsume("queue2",true,deliverCallback,cancelCallback);
}
}
复制代码
三、小结
本文主要介绍了 RabbitMQ 的队列结构和它几大核心的路由模式的使用。
在下一的章节中,将会介绍Rabbbitmq 与 SpringBoot 项目的集成使用。
近期评论