一文带你了解RabbitMQ的相关模式RabbitMQ区分

这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战

RabbitMQ区分模式

  • RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此我们只学习前5种

  • 在线手册:www.rabbitm q.com /getstarted.html

image.png

  • 5种消息模型,大体分为两类:

    • 1和2属于点对点
    • 3、4、5属于发布订阅模式(一对多)
  • 点对点模式:P2P(point to point)模式包含三个角色:

    • 消息队列(queue),发送者(sender),接收者(receiver)
    • 每个消息发送到一个特定的队列中,接收者从中获得消息
    • 队列中保留这些消息,直到他们被消费或超时
    • 特点:
      1. 每个消息只有一个消费者,一旦消费,消息就不在队列中了
      2. 发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了)
      3. 接收者成功接收消息之后需向对象应答成功(确认)
    • 如果希望发送的每个消息都会被成功处理,那需要P2P
  • 发布订阅模式:publish(Pub)/subscribe(Sub)

  • pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者

(subcriber)

  • 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者

  • 特点:

      1. 每个消息可以有多个订阅者
      2. 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅
      后,才能消费发布者的消息
      3. 为了消费消息,订阅者必须保持运行状态;类似于,看电视直播。
      
    复制代码
  • 如果希望发送的消息被多个消费者处理,可采用本模式

1 简单模式

下面引用官网的一段介绍:

image.png

RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!

类似邮局,处理信件的应该是收件人而不是邮局!

image.png

1.1 生产者P

package	simplest;


import	com.rabbitmq.client.Channel;
import	com.rabbitmq.client.Connection;
import	util.ConnectionUtil;

/**
*	@Description:	消息生产者
*/
public	class	Sender	{
public	static	void	main(String[]	args)	throws	Exception	{
String	msg	=	"123:Hello,RabbitMQ!";
//	1.获得连接
Connection	connection	=	ConnectionUtil.getConnection();
//	2.在连接中创建通道(信道)
Channel	channel	=	connection.createChannel();
//	3.创建消息队列(1,2,3,4,5)
/*
参数1:队列的名称
参数2:队列中的数据是否持久化
参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)
参数5:队列参数(没有参数为null)
*/
channel.queueDeclare("queue1",false,false,false,null);
//	4.向指定的队列发送消息(1,2,3,4)
/*
参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
参数2:目标队列的名称
参数3:设置消息的属性(没有属性则为null)
参数4:消息的内容(只接收字节数组)
*/
channel.basicPublish("","queue1",null,msg.getBytes());
System.out.println("发送:"	+	msg);
//	5.释放资源
channel.close();
connection.close();
}
}
复制代码

启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认

image.png

1.2 消费者C

package	simplest;


import	com.rabbitmq.client.*;
import	util.ConnectionUtil;

import	java.io.IOException;

/**
*	@Description:	消息接收者
*/
public	class	Recer	{
public	static	void	main(String[]	args)	throws	Exception	{
//	1.获得连接
Connection	connection=ConnectionUtil.getConnection();
//	2.获得通道(信道)
Channel	channel	=	connection.createChannel();
//	3.从信道中获得消息
DefaultConsumer	consumer =new	DefaultConsumer(channel){
@Override	//交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
public	void	handleDelivery(String	consumerTag,	Envelope	envelope,
AMQP.BasicProperties properties,byte[]	body)	throws	IOException	{
//	body就是从队列中获取的消息
String	s=new	String(body);
System.out.println("接收	="+s);
}
};
//	4.监听队列	true:自动消息确认
channel.basicConsume("queue1",	true,consumer);
}
}
复制代码

启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0

image.png

1.3 消息确认机制ACK

  • 消息一旦被消费,消息就会立刻从队列中移除

  • RabbitMQ如何得知消息被消费者接收?

      - 如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得知,这样消息就丢失了
      - RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
      - ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200就是告诉我们服务器执行成功
      - 整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执
      - 回执ACK分为两种情况:
          - 自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
          - 手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
      - 两种情况如何选择,需要看消息的重要性:
          - 如果消息不太重要,丢失也没有影响,自动ACK会比较方便
          - 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把消息从队列中删除,如果此时消费者抛异常宕机,那么消息就永久丢失了
          
    复制代码
  • 修改手动消息确认

/ /	false:手动消息确认
channel.basicConsume( "queue1",false,consumer ) ;
复制代码
  • 结果如下:

image.png

  • 解决问题
public	class	RecerByACK	{
public	static	void	main(String[]	args)	throws	Exception	{
//	1.获得连接
Connection	connection=ConnectionUtil.getConnection();
//	2.获得通道(信道)
final	Channel	channel	=connection.createChannel();
//	3.从信道中获得消息
DefaultConsumer	consumer=new	DefaultConsumer(channel){
@Override	//交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
public	void	handleDelivery(String	consumerTag,	Envelope	envelope,
AMQP.BasicProperties	properties,	byte[]	body)	throws	IOException	{
//	body就是从队列中获取的消息
String	s=new String(body);
System.out.println("接收	="+s);
//	手动确认(收件人信息,是否同时确认多个消息)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//	4.监听队列,false:手动消息确认
channel.basicConsume("queue1",	false,consumer);
}
}

复制代码

2 工作队列模式

image.png

  • 简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能力有限,就会产生消息在队列中堆积(生活中的滞销)

  • 一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?

  • 多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被一个人吃)

2.1 生产者P

public	class	MessageSender	{
public	static	void	main(String[]	args)	throws	Exception{
Connection	connection=ConnectionUtil.getConnection();
Channel	channel	=connection.createChannel();
//	声明队列(此处为生产者,创建队列)注明出餐口位置,通知大家来排队
channel.queueDeclare("test_work_queue",false,false,false,null);
for(int	i=1;i<=100;i++)	{
String	msg="羊肉串	-->"+i;
channel.basicPublish("","test_work_queue",null,	msg.getBytes());
System.out.println("师傅烤好:"+msg);
}
channel.close();
connection.close();
}
}

复制代码

2.2 消费者1

publicclassMessageReceiver1{
static int i=1;//记录执行次数
public static void main(String[]  args) throws IOException,TimeoutException
{
Connection connection=ConnectionUtil.getConnection();
finalChannel channel=connection.createChannel();
//声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,
AMQP.BasicPropertiesproperties,byte[]
body) throws IOException{
String msg=new String(body);
System.out.println("【顾客1】吃掉"+msg+"!共吃【"+i+++"】串");
//撸一会,有延迟
try{
Thread.sleep(200);
}catch(InterruptedExceptione){
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("test_work_queue",false,consumer);
}
}
复制代码

2.3 消费者2

public class MessageReceiver2{
static int i=1;//记录执行次数
public static void main(String[] args) throws IOException,TimeoutException
{
Connection connection=ConnectionUtil.getConnection();
finalChannel channel=connection.createChannel();
//声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,
AMQP.BasicPropertiesproperties,byte[]
body) throws IOException{
String msg= new String(body);
System.out.println("【顾客2】吃掉"+msg+"!共吃【"+i+++"】串");
//撸一会,有延迟
try{
Thread.sleep(200);
}catch(InterruptedExceptione){
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("test_work_queue",	false,	consumer);
}
}

复制代码

解析

  • 先运行2个消费者,排队等候消费(取餐),再运行生产者开始生产消息(烤肉串)
  • 虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个消息
    • 例如:工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A10天完成,B30天完成,A完成自己的编码部分,就无所事事了,等着B完成就可以了,这样是不可以的,应该遵循“能者多劳”
    • 效率高的多干点,效率低的少干点
    • 看下面官网是如何给出解决思路的:

image.png

公平的分配
您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有
奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都
不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。
这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它
只是盲目地将每条第n个消息分派给第n个消费者。
为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉
RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并
确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的
worker。
复制代码
/ /声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel . queueDeclare( "test_work_queue", false,false, false, null) ;
/ /可以理解为:快递一个一个送,送完一个再送下一个,速度快的送件就多
channel.basicQos( 1) ;
复制代码

能者多劳必须要配合手动的ACK机制才生效

2.4 面试题:避免消息堆积?

  1. workqueue,多个消费者监听同一个队列
  2. 接收到消息后,通过线程池,异步消费

3 发布订阅模式

看官网:

image.png

发布-订阅
在上一篇教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都被准确地交
付给一个工作者。在这一部分中,我们将做一些完全不同的事情——将消息传递给多个消费者。
此模式称为“发布/订阅”。
为了演示这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将
发送日志消息,第二个将接收和打印它们。
在我们的日志系统中,接收程序的每一个正在运行的副本都将获得消息。这样我们就可以运行
一个接收器并将日志指向磁盘;与此同时,我们可以运行另一个接收器并在屏幕上看到日志。
基本上,发布的日志消息将广播到所有接收方。
复制代码

生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视频通知

image.png

  • 上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)
  • P生产者发送信息给X路由,X将信息转发给绑定X的队列

image.png

  • X队列将信息通过信道发送给消费者,从而进行消费
  • 整个过程,必须先创建路由
    • 路由在生产者程序中创建
    • 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没有队列,路由并不知道将信息发送给谁
    • 运行程序的顺序:
      1. MessageSender
      2. MessageReceiver1和MessageReceiver2
      3. MessageSender

3.1 生产者

public	class	Sender	{
public	static	void	main(String[]	args)	throws	Exception	{
Connection	connection=ConnectionUtil.getConnection();
Channel	channel	=connection.createChannel();

//声明路由(路由名,路由类型)
//fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该
路由绑定的所有队列上)
channel.exchangeDeclare("test_exchange_fanout",	"fanout");
String	msg="hello,大家好!";
channel.basicPublish("test_exchange_fanout","",null,msg.getBytes());
System.out.println("生产者:"+msg);
channel.close();
connection.close();
}
}
复制代码

3.2 消费者1

public	class	Recer1	{
public	static	void	main(String[]	args)	throws	Exception	{
Connection	connection=ConnectionUtil.getConnection();
Channel	channel=connection.createChannel();
//	声明队列

channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);
//	绑定路由(关注)
/*
参数1:队列名
参数2:交换器名称
参数3:路由key(暂时无用,""即可)
*/
channel.queueBind("test_exchange_fanout_queue_1",
"test_exchange_fanout",	"");
DefaultConsumer	consumer=new	DefaultConsumer(channel){
@Override
public	void	handleDelivery(String	consumerTag,	Envelope	envelope,
AMQP.BasicProperties	properties,	byte[]	body)	throws	IOException	{
String	s=new	String(body);
System.out.println("【消费者1】="+s);
}
};
//	4.监听队列	true:自动消息确认
channel.basicConsume("test_exchange_fanout_queue_1",	true,consumer);
}
}
复制代码

3.3 消费者2

将消费者1代码中的1修改为2即可,具体代码略

4 路由模式

image.png

  • 路由会根据类型进行定向分发消息给不同的队列,如图所示
  • 可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货

4.1 生产者

public	class	Sender	{
public	static	void	main(String[]	args)	throws	Exception	{
Connection	connection=ConnectionUtil.getConnection();
Channel	channel	=connection.createChannel();
//	声明路由(路由名,路由类型)
//	direct:根据路由键进行定向分发消息
channel.exchangeDeclare("test_exchange_direct",	"direct");
String	msg="用户注册,【userid=S101】";
channel.basicPublish("test_exchange_direct",	"insert",	null,
msg.getBytes());
System.out.println("[用户系统]:"+msg);
channel.close();
connection.close();
}
}
复制代码

4.2 消费者1

public	class	Recer1	{
public	static	void	main(String[]	args)	throws	Exception	{
Connection	connection=ConnectionUtil.getConnection();
Channel	channel	=connection.createChannel();
//	声明队列
channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null);
//绑定路由(如果路由键的类型是	添加,删除,修改	的话,绑定到这个队列1上)

channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct",	"insert");
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct",	"update");
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct",	"delete");
DefaultConsumer	consumer=new	DefaultConsumer(channel){
@Override
public	void	handleDelivery(String	consumerTag,	Envelope	envelope,
AMQP.BasicProperties	properties,	byte[]	body)	throws	IOException	{
String	s=new	String(body);
System.out.println("【消费者1】="	+	s);
}
};
//	4.监听队列	true:自动消息确认
channel.basicConsume("test_exchange_direct_queue_1",	true,consumer);
}
}
复制代码

4.3 消费者2

public	class	Recer2	{
public	static	void	main(String[]	args)	throws	Exception	{
Connection	connection=ConnectionUtil.getConnection();
Channel	channel	=connection.createChannel();
//	声明队列

channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null);
//	绑定路由(如果路由键的类型是	查询	的话,绑定到这个队列2上)
channel.queueBind("test_exchange_direct_queue_2","test_exchange_direct",	"select");
DefaultConsumer	consumer=new	DefaultConsumer(channel){
@Override
public	void	handleDelivery(String	consumerTag,	Envelope	envelope,
AMQP.BasicProperties	properties,	byte[]	body)	throws	IOException	{
String	s=new	String(body);
System.out.println("【消费者2】="	+	s);
}
};
//	4.监听队列	true:自动消息确认
channel.basicConsume("test_exchange_direct_queue_2",	true,consumer);
}
}

复制代码
  1. 记住运行程序的顺序,先运行一次sender(创建路由器),
  2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定
  3. 再次运行sender,发出消息

5 通配符模式

image.png

  • 和路由模式90%是一样的。
  • 唯独的区别就是路由键支持模糊匹配
  • 匹配符号
    • *:只能匹配一个词(正好一个词,多一个不行,少一个也不行)
    • #:匹配0个或更多个词
  • 官网案例:
    • Q1绑定了路由键 * .orange.* Q2绑定了路由键 * .* .rabbit 和 lazy.#
    • 下面生产者的消息会被发送给哪个队列?
quick.orange.rabbit	#	Q1	Q2
lazy.orange.elephant	#	Q1	Q2
quick.orange.fox	#	Q1
lazy.brown.fox	#	Q2
lazy.pink.rabbit	#	Q2
quick.brown.fox	#	无
orange	#	无
quick.orange.male.rabbit	#	无

复制代码

5.1 生产者

public	class	Sender	{
public	static	void	main(String[]	args)	throws	Exception	{
Connection	connection=ConnectionUtil.getConnection();
Channel	channel	=connection.createChannel();

//	声明路由(路由名,路由类型)
//	topic:模糊匹配的定向分发
channel.exchangeDeclare("test_exchange_topic",	"topic");
String	msg="商品降价";
channel.basicPublish("test_exchange_topic",	"product.price",	null,
msg.getBytes());
System.out.println("[用户系统]:"+msg);
channel.close();
connection.close();
}
}
复制代码

5.2 消费者1

public class Recer1 {
public static void main(String[] args) throws	Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列

 
channel.queueDeclare("test_exchange_topic_queue_1",false,false,false,null);
// 绑定路由(绑定	用户相关	的消息)
channel.queueBind("test_exchange_topic_queue_1", "test_exchange_topic",
"user.#");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("【消费者1】	= " + s);
}
};
// 4.监听队列	true:自动消息确认
channel.basicConsume("test_exchange_topic_queue_1", true,consumer);
}
}

复制代码

5.3 消费者2

public class Recer2{
public static void main(String[] args) throws	Exception{
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列

channel.queueDeclare("test_exchange_topic_queue_2",false,false,false,null);
//绑定路由(绑定	商品和订单相关	的消息)
channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic",
"product.#");
channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic",
"order.#");
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,
AMQP.BasicPropertiespr operties,byte[] body) throws IOException{
String s=   newString(body);
System.out.println("【消费者2】	="+s);
}
};
//4.监听队列	true:自动消息确认
channel.basicConsume("test_exchange_topic_queue_2",true,consumer);
}
}
复制代码