0.RabbitMQ介绍,应用场景
RabbitMQ是一种较为流行的一种消息中间件技术,它与Spring可以良好地整合。
那么RabbitMQ是用来做什么的,又或者说,我们在什么地方需要用到消息中间件。以下说明为消息中间件的应用场景:
0.0 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种,1.串行的方式,2.并行的方式。
串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
除了以上两种方式,还可以使用消息队列异步去操作:
消息队列:假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行方式使用时间100ms。虽然并行已经提高了处理时间,但是短信和邮件对于正常使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回。引入消息队列之后,把发送邮件,短信不是必须的业务逻辑异步处理。
0.1应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。
但是这种做法有一个缺点:当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合。引入消息队列之后:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
0.2 流量削峰
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过设置的数量的订单直接丢弃
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。
2.秒杀业务根据消息队列中的请求信息,再做后续处理。
1.Rabbitmq安装-以rpm的方式
1.1 安装Erlang环境
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
复制代码
安装erlang:
yum install -y erlang
复制代码
检查erlang版本:
erl
复制代码
在安装Rabbitmq之前需要注意的是,一定要保证erlang的版本和rabbitmq的版本是对应的。
我这里安装的erlang版本是:
因为我安装的erlang版本是24的,所以接下来我安装RabbitMQ的版本至少也要3.8.16
1.2 安装RabbitMQ
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
rpm --import https://packagecloud.io/gpg.key
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum -y install epel-release
yum -y install socat
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
复制代码
下载rpm包:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.25/rabbitmq-server-3.8.25-1.el7.noarch.rpm
复制代码
这里下载可能会很慢,也可以在windows下载rpm包之后再上传到服务器上。
安装:
rpm -ivh rabbitmq-server-3.8.25-1.el7.noarch.rpm
复制代码
然后启动管理平台插件,如果需要访问RabbitMQ的web管理页面就需要操作这一步:
rabbitmq-plugins enable rabbitmq_management
复制代码
启动RabbitMQ:
systemctl start rabbitmq-server
复制代码
然后可以访问控制台地址:
http:你的服务器ip:15672
如果访问不了,需要检查你的RabbitMQ是否成功启动并且相关端口(15672)是否开放。
使用默认的账号进行登录:guest/guest。
这时候会发现登录不进去,并且会有一个提示”User can only log in via localhost“
出现这个信息表示RabbitMQ web管理页面只允许以localhost的方式进行访问,那么我们在外网如何进行web管理后台的访问呢?
需要进入到/etc/rabbitmq/目录下:
cd /etc/rabbitmq
复制代码
vi rabbitmq.config
复制代码
在rabbitmq.config配置文件里面加上(最后的那个小点别忘了,不然会重启不成功):
[{rabbit, [{loopback_users, []}]}].
复制代码
然后重启:
systemctl restart rabbitmq-server
复制代码
2.AMQP协议
2.1 什么是AMQP
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一种网络协议。用于客户端应用和消息中间件之间的通信。
2.2 AMQP模型简介
上图是AMQP模型,主要的工作过程就是:消息由Publisher(发布者或者称为提供者),发送到交换机,交换机将收到的消息根据路由规则转发到不同的队列中去。最后RabbitMQ会将队列中的消息推送给订阅了此队列的消费者,或者消费者主动的从队列中获取消息。
从安全角度考虑,网络是不可靠的。所以消费者在消费消息的时候,可能会因为某种原因处理失败。基于这个原因,RabbitMQ提供了一种消息确认机制(message acknowledgements):当一个消息从队列投递给消费者之后,要么会自动进行消息确认,要么就需要手动进行消息确认。消费者确认之后,队列就会将此条消息进行删除。
消息确认有两种方式:
1.自动确认,当消息被发送到消费者之后,自动删除。
2.待应用发送一个确认回执后再删除消息。确认回执可以是在收到消息后立马发送,也可以将消息存储之后再进行回执发送,也可以在处理完该消息之后,进行回执发送。
在某些情况,例如有一个消息没有被成功路由时。消息或许会被返回给发布者并被丢弃。
2.3 交换机和交换机类型
AMQP提供了四种交换机:
2.3.1 默认交换机
默认交换机(default exchange)是RabbitMQ预先声明好的没有名字的交换机。每个新建队列,如果没有指明特定的交换机名称,那么它将会自动绑定到默认交换机上,绑定的路由键名称就是队列的名称。
我看到有些说法,就是说,通道可以把消息直接发送到交换机,或者直接发送到队列。我觉得这样说是不准确的,”直接发送到队列“这种情况应该是发送到默认的交换机,然后由默认交换机根据路由key来路由到对应的与之绑定的队列中。
2.3.2 直连交换机
直连交换机(direct exchange)是根据消息携带的路由键将消息转发给对应队列。直连交换机默认的模式是平均分配,如果一个直连交换机绑定了多个路由键一样的队列,那么直连交换机则会将消息平均的分配给所绑定的符合条件的队列。
2.3.3 扇形交换机
扇形交换机(fanout exchange)将消息路由给绑定到它审核的所有队列,而不理会绑定的路由键。如果有多个队列绑定到扇形交换机上,当有消息发送给扇形交换机的时候,交换机会将消息的拷贝分别发送给绑定到扇形交换机上面的所有队列。
2.3.4 主题交换机
主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
2.4 队列
2.4.1 队列持久化
持久化队列(Durable queues)会被存储在磁盘上,当RabbitMQ重启的时候,持久化队列依旧存在。没有被持久化的队列称为暂存队列(Transient queues)。
需要注意的是,队列的持久化,不意味着队列中的消息持久化。如果消息没有被持久化,那么在RabbitMQ重启之后,虽然持久化队列存在,但是消息不会被保留。
2.5 通道
消息是通过通道被发送到交换机和队列上的。是消息传输的介质。
2.6 虚拟主机
RabbitMQ提供虚拟主机来隔离多个环境。当建立连接的时候,需要指定使用哪一个虚拟主机。
3.Rabbitmq web管理页面大致了解
4.五种模型的demo实现
4.0 相关依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>4.12</version>
</dependency>
</dependencies>
复制代码
4.1 Hello World
这个消息的要素就三个,一个生产者,一个消费者,和一个队列。这个模型很容易的就让人觉得,生产者把消息直接发送到队列中,其实不是的。RabbitMQ中存在一个默认交换机,如果没有显式地把队列和一个具体名称的交换机绑定的话,默认就会把队列和默认交换机进行绑定,绑定的路由key就是队列的名称。所以这个模型的消息发送实质上还是消费者发送到交换机,然后交换机再把消息路由到队列,最后由消费者获取队列消息,或者队列向订阅了的消费者推送消息。
在操作之前需要创建一个虚拟主机和对应的测试用户:
消息生产者Provider生产消息:
/**
* "hello world" 模型-消息提供者
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Provider {
@Test
public void sendMessage()throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
/**
* 绑定对应的消息队列
* @param1 队列名称,如果不存在则会创建
* @param2 定义队列是否需要持久化,true为持久化
* @param3 exclusive 是否独占队列,true 独占
* @param4 额外参数
*/
channel.queueDeclare("hello",true,false,false,null);
/**
* 发布消息
* @param1 交换机名称
* @param2 队列名称
* @param3 传递消息额外设置
* @param4 发布的消息内容
*/
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
//关闭资源
channel.close();
connection.close();
}
}
复制代码
执行完毕之后,就可以看到队列中已经存在一条消息了:
消费者Consumer消费消息:
/**
* "hello world" 模型-消息消费者
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Consumer {
@Test
public void Receive()throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
/**
* 绑定对应的消息队列
* @param1 队列名称,如果不存在则会创建
* @param2 定义队列是否需要持久化,true为持久化
* @param3 exclusive 是否独占队列,true 独占
* @param4 额外参数
*/
channel.queueDeclare("hello",true,false,false,null);
/**
* 消费消息
* @param1 队列名称
* @param2 是否自动确认 true 是自动确认
* @param3 接收到消息之后进行回调
*/
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message:"+new String(body));
}
});
}
}
复制代码
执行完毕之后:
可以看到队列中的消息已经被消费了。
4.2 Work queues
可以看到工作队列(work queues)模型与第一个Helloworld模型类似,只不过工作队列模型的消费者由一个变成多个。
那这个工作队列(work queues)模型是在什么背景下使用的?在一个消费者一个生产者的模型下,如果生产者生产消息的速度远大于消费者消费消息的速度,那么消息就不断地累积在队列中,会阻塞队列。
使用工作队列模型,在原有的基础上,增加多个消费者,多个消费者共同消费同一个队列里面的消息,并且使用这个模型的前提下,并不会导致消息被重复消费。
创建消息生产者生产消息:
/**
* "work queues" 模型-消息提供者
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Provider {
@Test
public void sendMessage()throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
/**
* 绑定对应的消息队列
* @param1 队列名称,如果不存在则会创建
* @param2 定义队列是否需要持久化,true为持久化
* @param3 exclusive 是否独占队列,true 独占
* @param4 额外参数
*/
channel.queueDeclare("hello",true,false,false,null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "hello", null, (i+"====>:我是消息").getBytes());
}
//关闭资源
channel.close();
connection.close();
}
}
复制代码
创建消费者1Consumer1:
/**
* "work queues" 模型-消息消费者1
*/
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
/**
* 绑定对应的消息队列
* @param1 队列名称,如果不存在则会创建
* @param2 定义队列是否需要持久化,true为持久化
* @param3 exclusive 是否独占队列,true 独占
* @param4 额外参数
*/
channel.queueDeclare("hello", true, false, false, null);
/**
* 消费消息
* @param1 队列名称
* @param2 是否自动确认 true 是自动确认
* @param3 接收到消息之后进行回调
*/
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: " + new String(body));
}
});
}
}
复制代码
创建消费者2Consumer2:
/**
* "work queues" 模型-消息消费者2
*/
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
/**
* 绑定对应的消息队列
* @param1 队列名称,如果不存在则会创建
* @param2 定义队列是否需要持久化,true为持久化
* @param3 exclusive 是否独占队列,true 独占
* @param4 额外参数
*/
channel.queueDeclare("hello", true, false, false, null);
/**
* 消费消息
* @param1 队列名称
* @param2 是否自动确认 true 是自动确认
* @param3 接收到消息之后进行回调
*/
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//处理消息比较慢 一秒处理一个消息
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2: " + new String(body));
}
});
}
}
复制代码
先运行两个消费者,然后再运行生产者,然后可以看到控制台输出:
可以看到消息被消费了。这里要说明一下,虽然在消费者2中的回调方法加了线程休眠,但是目前应该是不生效的,因为这里开启了消息自动确认,就是说,当消费者拿到消息之后,消息就会被自动确认,队列中对应的消息就会被删除,即使是回执方法没有执行完毕,队列也当做消费者已经消费了消息。
而且在控制台中可以看到,工作队列模型(work queues),默认的消息分配策略是轮询的,也可以说是平均的,队列会将消息平均地分配到每一个消费者上。
那么这个平均分配会带来什么问题,假设某一个消费者,处理消息的速度远慢于消息队列分发消息的速度,消费者此时是开启消息自动确认机制,消费者收到消息,但是并没有马上处理完成,如果程序在某个地方执行错误,导致程序崩溃,那么剩下没有被执行的消息将会被丢失。而消息队列中,在收到消息确认之后就会删除相关的消息。
那么如何来解决这个问题:1.消费者不开启消息自动确认,采用手动确认方式。就是消费者要在接收消息,处理消息完毕之后,再进行手动确认。
channel.basicQos(1);//每一次只能消费一个消息
//@param2 false为关闭消息自动确认
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try{
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("消费者-1: "+new String(body));
// 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确实
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
复制代码
经过修改之后,就会发现,处理速度 越快的消费者,它所收到的消息就越多。我们的工作队列(work queues)从原来的"平均分配",变成了如今的"多劳多得"。
4.3 Fanout
fanout模型,也叫做广播模型。在这个模型下,消费者可以有多个,并且每个消费者有自己的队列。生产者把消息发送到交换机,由交换机来决定要发给哪个队列,交换机把消息发送给绑定过的所有队列。队列的消费者都能拿到消息。实现一条消息被多个消费者消费。
生产者:
/**
* fanout模型-生产者
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Provider {
@Test
public void sendMessage()throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
/**
* 声明交换机
* @param1 交换机名称
* @param2 交换机类型
*/
channel.exchangeDeclare("logs","fanout");
//发送消息
channel.basicPublish("logs","",null,"fanout type message".getBytes());
//释放资源
channel.close();
connection.close();
}
}
复制代码
消费者1:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//队列和交换机进行绑定
channel.queueBind(queueName,"logs","");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: "+new String(body));
}
});
}
}
复制代码
消费者2:
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//队列和交换机进行绑定
channel.queueBind(queueName,"logs","");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2: "+new String(body));
}
});
}
}
复制代码
消费者3:
public class Consumer3 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare("logs","fanout");
//创建临时队列
String queueName = channel.queueDeclare().getQueue();
//队列和交换机进行绑定
channel.queueBind(queueName,"logs","");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者3: "+new String(body));
}
});
}
}
复制代码
运行效果:
创建的交换机:
4.4 Routing-Direct
生产者:
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
//声明交换机
String exchangeName = "logs_direct";
channel.exchangeDeclare(exchangeName,"direct");
//定义路由key
String routingKey = "info";
//发送消息
channel.basicPublish(exchangeName,routingKey,null,("这是direct模型发布的基于route key: ["+routingKey+"] 发送的消息").getBytes());
//关闭资源
channel.close();
connection.close();
}
}
复制代码
消费者1:
/**
* Routing-Direct 消费者1
*/
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
channel.exchangeDeclare(exchangeName,"direct");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
/**
* 队列和交换机进行绑定
* @param1 队列名
* @param2 交换机名
* @param3 路由key
*/
channel.queueBind(queue,exchangeName,"error");
//获取消费的消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: "+ new String(body));
}
});
}
}
复制代码
消费者2:
/**
* Routing-Direct 消费者2
*/
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
channel.exchangeDeclare(exchangeName,"direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,exchangeName,"info");
channel.queueBind(queue,exchangeName,"error");
channel.queueBind(queue,exchangeName,"warning");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2: "+new String(body));
}
});
}
}
复制代码
运行结果:
direct交换机:
因为在消费者1中队列和交换机绑定的路由key为error,然而生产者发送消息时指定的路由key为info,所以消费者1自然就收不到消息。
4.5 Routing-Topic
Topic类型的交换机exchange与Direct类型的交换机相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型的交换机Exchange可以让队列在绑定RoutingKey的时候使用通配符。
Topic模型的通配符一般是由一个或者多个单词组成,多个单词之间以“.”分割。例如:item.insert.
// 通配符
* 仅仅匹配一个词
# 匹配零个、一个、或多个词
例子:
qingyuan.# 匹配qingyuan qingyuan.test qingyuan.test.ok
qingyuan.* 匹配qingyuan.success
复制代码
生产者:
/**
* Topic-生产者
*/
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("topics","topic");
//路由key
String routekey = "save.user.delete.qingyuan";
channel.basicPublish("topics",routekey,null,("这里是topic动态路由模型,routekey: ["+routekey+"]").getBytes());
//关闭资源
channel.close();
connection.close();
}
}
复制代码
消费者1:
/**
* topic-消费者1
*/
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//队列和交换机进行绑定
channel.queueBind(queue,"topics","*.user.*");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1: "+ new String(body));
}
});
}
}
复制代码
消费者2:
/**
* topic-消费者2
*/
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq的主机
connectionFactory.setHost("yourserverip");
//设置端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/demo1");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("qingyuan");
connectionFactory.setPassword("qingyuan");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("topics","topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","*.user.#");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2: "+ new String(body));
}
});
}
}
复制代码
运行结果:
生产者发送消息设置的路由key为:"save.user.delete.qingyuan"
消费者1的临时队列和交换机绑定的路由key为:".user."
消费者2的临时队列和交换机绑定的路由key为:"*.user.#"
消费者1只能匹配user后面只带有一个词的,而消费者2可以匹配user后面带有多个词的。
5.SpringBoot中使用RabbitMQ
5.0 配置
server.port=9090
spring.application.name=rabbitmq-demo
## rabbitmq
spring.rabbitmq.host=yourserverip
spring.rabbitmq.port=5672
spring.rabbitmq.username=qingyuan
spring.rabbitmq.password=qingyuan
spring.rabbitmq.virtual-host=/demo1
复制代码
5.1 HelloWorld
生产者:
/**
* helloworld provider
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProviderBoot {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads(){
/**
* @param1 队列名称
* @param2 发送的消息
*/
rabbitTemplate.convertAndSend("hello","hello world");
}
}
复制代码
消费者:
@Component
@RabbitListener(queuesToDeclare = @Queue(value="hello"))
public class ConsumerBoot {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitHandler
public void reveive(String message){
System.out.println("message = " + message);
}
}
复制代码
运行SpringBoot启动类:
5.2 SpringBoot版Work Queues
生产者:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProviderBoot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void convertAndSend(){
for(int index =0;index<10;index++){
rabbitTemplate.convertAndSend("work","hello work");
}
}
}
复制代码
消费者:
@Component
public class Consumer {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitListener(queuesToDeclare = @Queue(value="work"))
public void receive1(String message){
System.out.println("work message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void receive2(String message){
System.out.println("work message2 = " + message);
}
}
复制代码
同样的,在默认情况下,work queues模式是平均地将消息分配的消费者。
5.3 Fanout模式
fanout啊 topic啊其实说的就是消费端,队列和交换机的绑定关系。
生产者:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProviderBoot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void convetAndSend(){
/**
* @param1 交换机名称
* @param2 路由key,为空表示任意路由
* @param3 消息
*/
rabbitTemplate.convertAndSend("logs2","","this is fanout message");
}
}
复制代码
消费者:
@Component
public class FanoutConsumer {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitListener(
bindings = @QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(name="logs2",type="fanout")
)
)
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name="logs2",type = "fanout")
)
)
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
复制代码
5.4 Direct
生产者:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProviderBoot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void convertAndSend(){
/**
* @param1 交换机名称
* @param2 路由key
* @param3 消息
*/
rabbitTemplate.convertAndSend("directs","error","error的日志信息");
}
}
复制代码
消费者:
@Component
public class DirectConsumer {
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue, //创建临时队列
key = {"info","error"}, //路由key
exchange = @Exchange(name = "directs",type = "direct")
)
}
)
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
key = {"other"},
exchange = @Exchange(name="directs",type = "direct")
)
}
)
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
复制代码
5.5 Topic
生产者:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProviderBoot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void convertAndSend(){
rabbitTemplate.convertAndSend("topics2","user.save.findAll","user.save.findAll 的消息");
}
}
复制代码
消费者:
@Component
public class TopicConsumer {
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
key = {"user.*"},
exchange = @Exchange(name="topics2",type = "topic")
)
}
)
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
key = {"user.#"},
exchange = @Exchange(name = "topics2",type = "topic")
)
}
)
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
复制代码
运行结果:
6.参考资料
1.参考视频
近期评论