一、RabbitMQ概述
1、什么是MQ?
MQ就是消息队列,本质上就是Queue,但是用于储存消息。有了MQ,消息发送上游只需要依赖于MQ,不需要依赖固定的服务,从而实现解耦。
2、MQ的作用是什么?
- 流量削峰:比如服务1s能处理1w条请求,但是如果超过1w条就只能暂停它们的请求,不过有了MQ做缓冲,就可以取消这个限制,而是用一段时间来处理这些请求
- 应用解耦:没有MQ的时候,服务之间相互调用,耦合度非常高,一旦某个服务崩掉,整个服务系统就没法正常工作,有了MQ之后,让它们依赖于MQ,这样一个服务崩掉,其他服务还能正常运行
- 异步处理:以前A调用B,但不是同步的,这样就必须让A不断的轮询监听,现在有了MQ,就可以等B处理完了,给A发送消息,这样A就知道了
3、常用的MQ
- ActiveMQ
- 优点:单机吞吐量万级,可用性高
- 缺点:社区不维护了,高并发场景使用较少
- RabbitMQ
- 优点:适用于高并发场景,单机吞吐量万级,社区稳定更新
- 缺点:商业版要收费,而且有点难学习
- Kafka
- 优点:特别适合大数据场景,而且Kafka是分布式的
- 缺点:单机超过64个队列/分区,load会飙高,社区更新较慢
- RocketMQ
- 优点:单机吞吐量十万级,分布式架构,消息可以做到0丢失
- 缺点:仅仅支持Java和c++,社区活跃度一般
4、RabbitMQ的四大概念
- 生产者:产生数据、发送消息的一方
- 交换机:接收消息,推送给队列。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
- 队列:本质上是一个大的消息缓冲区,生产者者把消息发送到队列,消费者从队列中拿到消息
- 消费者:获取消息的一方
5、连接RabbitMQ操作
- 连接linux和xshell:用户名
root
,密码123
- 启动和关闭rabbitmq:不需要进入目录,直接:
启动:rabbitmq-server start &
关闭:rabbitmqctl stop
复制代码
- 创建用户,设置权限
#创建rabbitmq的用户
rabbitmqctl add_user 用户名 密码
#设置用户权限
rabbitmqctl set_user_tags 用户名 角色
复制代码
- 开启用户界面
同样是用户名admin
,密码123
6、RabbitMQ的常用名词
Broker
:接收和分发消息的应用,也就是RabbitMQ ServerAMQP
:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制Virtual host
:当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建 exchange/queue等Connection
:Publisher/Consumer和Broker之间的tcp连接Channel
:避免每次访问RabbitMQ都建立ConnectionExchange
和Queue
:前面说过了,Exchange和Queue都是Broker的子内容,根据分发的规则,匹配相应的routing key,然后发给适当的queueBinding
:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
二、五大工作模式
1、简单模式(simple)
创建maven工程
创建maven工程时,记得把java版本调整好。引入依赖:
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
复制代码
创建生产者
public class Producer{
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("123");
//创建connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "hello world";
/**
* 发送一个消息
* 1.发送到那个交换机
* 2.路由的 key 是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
复制代码
创建消费者
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("123");
//创建connection
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
System.out.println("等待接收消息");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag,message)->{
String msg = new String(message.getBody());
System.out.println("接收到的消息是" + msg);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = consumerTag -> {
System.out.println("接收消息中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
复制代码
启动使用监测界面测试。
2、工作模式(work-queue)
提取工具类
前面写了创建connection代码,其实每次都写很麻烦,所以提取工具类。
public class RabbitMQUtil {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
复制代码
创建两个消费者
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag,message)->{
String msg = new String(message.getBody());
System.out.println("接收到的消息是" + msg);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = consumerTag -> {
System.out.println("接收消息中断");
};
System.out.println("消费者c1准备接收消息...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
复制代码
第二个消费者Worker02把c1改成c2即可。
创建生产者
public class Publisher{
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("请输入消息");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
System.out.println("输入的消息是" + message);
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
}
}
}
复制代码
消息应答
正常情况下,消息发送是轮询发送,可是存在这样的情况,某一个消费者挂掉,导致无法接收消息。而一旦向消费者发送一条消息后,消息就会标记为删除,那么我们就将丢失消息了。
这样的情况不是我们想要的,所以可以采取消息应答机制。也就是发送消息后,消费者接收并处理到消息后,发送应答给RabbitMQ,告诉RabbitMQ已经处理了,这样就可以删除掉该消息。
我们前面采用的是自动应答的方式:
//autoAck = true就是自动应答
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
复制代码
消息应答模式
睡眠工具类
public class SleepUtil {
public static void sleep(int second) {
try {
Thread.sleep(1000 * second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
复制代码
生产者
public class Task01 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtil.getChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false,
null);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
复制代码
消费者1
public class Consumer01 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
System.out.println("C1接收消息时间较短");
DeliverCallback deliverCallback = (consumerTag, message)->{
SleepUtil.sleep(1);
String msg = new String(message.getBody());
System.out.println("接收到的消息是" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者取消接收");
};
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
复制代码
消费者2
public class Consumer02 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
System.out.println("C2接收消息时间较长");
DeliverCallback deliverCallback = (consumerTag, message)->{
SleepUtil.sleep(30);
String msg = new String(message.getBody());
System.out.println("接收到的消息是" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者取消接收");
};
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
复制代码
持久化
前面说的是,如果某个消费者挂掉了,怎么样才能保证消息不丢失。那如果RabbitMQ挂了怎么办呢?这时就要采用持久化的方式解决。
队列的持久化
以前我们采用的都是队列非持久化,一旦RabbitMQ重新启动,队列就消失了,采用队列持久化就可以避免这个问题。也就是在声明队列的适合修改它的持久化策略
boolean isDurable = true;
channel.queueDeclare(TASK_QUEUE_NAME,isDurable, false, false,null);
复制代码
这里需要把原来的队列删除,再重新启动才行。启动之后queue一栏会显示蓝色的D
。
消息的持久化
消息持久化需要在声明消息时,注明持久化。
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
复制代码
不公平分发
上面轮询的方式虽然让每个消费者都能轮番拿到消息,但是对于一方处理时间很长,一方处理时间很短的情况,这种情况肯定不是最优解,所以可以采用不公平分发的方式。
下面这行代码作用就是设置不公平分发。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
复制代码
也就是说我当前只能处理几个任务,设置为1就是当前只能处理1个任务,我还没干完不要让我干了。
public class Consumer02 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
System.out.println("C2接收消息时间较长");
DeliverCallback deliverCallback = (consumerTag, message)->{
SleepUtil.sleep(30);
String msg = new String(message.getBody());
System.out.println("接收到的消息是" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费者取消接收");
};
int prefetchCount = 1;
channel.basicQos(prefetchCount);
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
复制代码
也就是在处理时间较长的消费者加上面那串代码。
当然prefetchCount
可以设置别的值,同样也就是,我没处理的消息可以有这么多个,超出来的不要给我了。
3、发布确认
发布确认原理
生产者可以把channel信道设置为confirm模式,设置为该模式后,所有该信道上的消息会有统一的ID,然后被发送到队列后,会发送确认消息给生产者;如果队列/消息设置了持久化,消息接收写入磁盘后,broker会把确认消息delivery-tag域的序列号发送给生产者。
怎么开启发布确认呢,创建channel以后:
channel.confirmSelect();
复制代码
单个/批量/异步发布确认
public class ConfirmMessage {
//批量发消息的个数
public static final Integer MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
//调用单个确认---->发布1000条消息用时:573ms
//ConfirmMessage.publishMessageIndividually();
//调用批量确认---->发布1000条消息用时:92ms
//ConfirmMessage.publishMessageBatch();
//调用异步批量确认---->发布1000条消息用时:50ms
ConfirmMessage.publishMessageAsync();
}
//单个确认
public static void publishMessageIndividually() throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//这次设置随机数
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//每条消息都进行发布确认
boolean flag = channel.waitForConfirms();
if (flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条消息用时:" + (end - begin) + "ms");
}
//批量确认
public static void publishMessageBatch() throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//这次设置随机数
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//每100条确认一次
if (i % 100 == 0){
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条消息用时:" + (end - begin) + "ms");
}
//异步确认
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//这次设置随机数
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的hash表,适用于高并发的情况
* 1、它可以轻松将序号与消息进行关联
* 2、可以轻松批量删除条目,只要给序号就行
* 3、支持高并发(多线程)
*/
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
//开始时间
long begin = System.currentTimeMillis();
//确认消息成功发送的回调方法
ConfirmCallback confirmCallback = (deliveryTag,multiple)->{
//删除掉已经确认的消息
//这是批量删除
if (multiple){
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else{
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息:" + deliveryTag);
};
//确认消息失败发送的回调方法
ConfirmCallback nackCallback = (deliveryTag,multiple)->{
//打印一下未确认的消息
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息:" + message + ",未确认消息的tag:" + deliveryTag);
};
//添加一个消息监听器:监听哪些消息成功,哪些消息失败
//这里使用两个参数,一个成功,一个失败,要使用lambda表达式
channel.addConfirmListener(confirmCallback,nackCallback);
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//此处记录下所有要发送的消息
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条消息用时:" + (end - begin) + "ms");
}
}
复制代码
4、发布订阅模式(publish/subscribe)
以前的模式都是一对一的,每一条消息都发给一个消费者(轮询、不公平分发...),那么怎么把一条消息发给多个消费者呢?这就用到不同类型的交换机了。
Exchange交换机简介
其实之前也用到了交换机,但是因为是采用默认交换机,所以就没介绍。
Exchange交换机主要有以下类型:
- direct:消息会发送到bindingkey和routingkey完全匹配的队列中
- fanout:会把消息发送给所有和该交换机绑定的队列中
- topic:和direct类似,但direct是完全匹配,topic是bindingkey满足routingkey的匹配规则就会发送,包括
*
和#
- headers:
Fanout交换机
创建消费者1和消费者2
public class ReceiveLog01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明一个队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("R1等待接收消息");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("R1接收到的消息是" + new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
}
}
复制代码
public class ReceiveLog02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明一个队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("R2等待接收消息");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("R2接收到的消息是" + new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> {});
}
}
复制代码
创建生产者
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtil.getChannel()) {
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
复制代码
5、路由模式(routing)
Direct交换机
生产者
public class DirectLogs {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception{
try (Channel channel = RabbitMQUtil.getChannel()) {
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
复制代码
消费者1
public class Consumer01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.queueDeclare("console",true,false,false,null);
//绑定交换机和队列
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
System.out.println("R1等待接收消息");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收消息:" + new String(message.getBody(),"UTF-8"));
};
channel.basicConsume("console",true,deliverCallback,consumerTag -> {});
}
}
复制代码
消费者2
public class Consumer02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueDeclare("disk",true,false,false,null);
//绑定交换机和队列
channel.queueBind("disk",EXCHANGE_NAME,"error");
System.out.println("R2等待接收消息");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收消息:" + new String(message.getBody(),"UTF-8"));
};
channel.basicConsume("disk",true,deliverCallback,consumerTag -> {});
}
}
复制代码
测试发现,确实是如果匹配才发送消息。
6、主题模式(topics)
Topic交换机
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的:
-
*
可以代替一个单词 -
#
可以替代零个或多个单词
生产者
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitMQUtil.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* Q1-->绑定的是
* 中间带 orange 带 3 个单词的字符串(*.orange.*)
* Q2-->绑定的是
* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
* 第一个单词是 lazy 的多个单词(lazy.#)
*
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry :
bindingKeyMap.entrySet()) {
String bindingKey =
bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
复制代码
消费者1
public class ReceiveLogsTopic01 {
private static final String QUEUE_NAME = "Q1";
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
System.out.println("R1等待接收消息....");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:" + QUEUE_NAME + "绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{
});
}
}
复制代码
消费者2
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "Q2";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
System.out.println("R2等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:" + QUEUE_NAME + "绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});
}
}
复制代码
三、死信队列和延迟队列
1、死信队列
什么是死信队列
producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
死信产生原因
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
案例
消息过期TTL
生产者
public class Producer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
AMQP.BasicProperties properties =
new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}
复制代码
正常消费者
public class Consumer01 {
//普通交换机的名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机的名称
private static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
private static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//声明普通队列,这里和死信队列绑定
Map<String, Object> arguments = new HashMap<>();
//设置过期时间
//arguments.put("x-message-ttl",100000);
//正常的队列设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信routing-key
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通的交换机和队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信的交换机和队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C1接收的消息是" + new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag -> {});
}
}
复制代码
死信消费者
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead_queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
复制代码
这样开启消费者1后再关闭,模拟其不能工作的状态,然后生产者发消息等10s,消息就自动进入死信队列了,这时开启消费者2,就可以接收到死信了。
队列达到最大长度
生产者去掉过期时间。
public class Producer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}
复制代码
消费者1添加队列长度限制
//设置正常队列长度的限制
arguments.put("x-max-length",6);
复制代码
全部代码
public class Consumer01 {
//普通交换机的名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机的名称
private static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
private static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
//声明普通队列,这里和死信队列绑定
Map<String, Object> arguments = new HashMap<>();
//设置过期时间
//arguments.put("x-message-ttl",100000);
//正常的队列设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信routing-key
arguments.put("x-dead-letter-routing-key","lisi");
//设置正常队列长度的限制
arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通的交换机和队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信的交换机和队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("C1接收的消息是" + new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag -> {});
}
}
复制代码
消费者2不变,还是开启消费者1再关掉,开启生产者,最后开启消费者2模拟超过的队列
消息被拒
生产者代码不变。
消费者1代码修改:
public class Consumer01 {
//普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
//声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
{
String message = new String(delivery.getBody(), "UTF-8");
if (message.equals("info5")) {
System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer01 接收到消息" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
});
}
}
复制代码
消费者2代码不变,还是启动1再启动2
2、延迟队列
什么是延时队列
现实当中有很多场景会用到延迟队列,如果采用轮询的方式遍历每个个体,达到失效返回的话,不能保证时效性而且效率极低。
延时队列和死信队列基本类似,通过设置ttl过期时间,达到这个状态的消息会变为死信,然后由死信交换机处理。
通过采用:
//设置过期时间
arguments.put("x-message-ttl",100000);
复制代码
案例-整合SpringBoot
创建新项目,选择Spring脚手架创建。
引入依赖,注意上面springboot版本选择2.3.4版,不然可能不兼容。
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
复制代码
application.properties配置
spring.rabbitmq.host=192.168.200.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
复制代码
Swagger配置
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288", "http://atguigu.com",
"1551388580@qq.com")).build();
}
}
复制代码
创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列 QD:
TtlQueueConfig注册上面的队列和交换机
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean("queueB")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//声明死信队列 QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
复制代码
生产者代码
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
}
}
复制代码
死信消费者代码
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
复制代码
主程序类启动,发送http://localhost:8080/ttl/sendMsg/嘻嘻嘻
可以思考一下,上面是两个队列,处理时间是不同的,如果还有新的时间需求,那么还需要再创建一个队列,这样不是我们希望的。
延时队列优化
这次新建一个延时队列,但是不设置过期时间,而是采用传入的方式。
创建延时队列:
@Configuration
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
复制代码
消费者添加如下代码:
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
}
复制代码
发送请求:
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
测试发现
四、发布确认高级
1、问题
这次考虑的是rabbitmq服务器重启了,这样就会出现问题,在rabbitmq重启期间,生产者投递消息失败了,导致消息丢失。那么怎么处理呢?
两种情况:
- 交换机和队列都不存在
- 交换机或队列有一个不存在
这种情况出现时,生产者发送消息,消息势必就丢失了,生产者根本就不知道rabbitmq挂了,所以采用的方式是设置缓存,加入缓存当发送失败就重新投递。
2、案例
搭建正常发布环境
application.properties中添加如下代码:
spring.rabbitmq.publisher-confirm-type=correlated
复制代码
它可以设置3个参数:
-
NONE:禁用发布确认模式,是默认值
-
CORRELATED:发布消息成功到交换器后会触发回调方法
-
SIMPLE:简单模式,经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法,等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
写发布确认配置类
@Configuration
public class ConfirmConfig {
//交换机名称
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列名称
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routingkey
public static final String CONFIRM_ROUTING_KEY = "key1";
//声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
复制代码
生产者
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
//发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message){
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message);
log.info("发送的消息内容为:{}",message);
}
}
复制代码
消费者
@Component
@Slf4j
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message){
String msg = new String(message.getBody());
log.info("接收到的队列confirm.queue消息:{}",msg);
}
}
复制代码
测试:localhost:8080/confirm/sendMessage/你好呀1 成功。
这相当于搭建了环境,后面测试出现问题的情况。
回调接口
如果队列或交换机出现问题,我们希望发送失败了可以保存到缓存,这里就需要用到回调接口,也就是一旦发送失败了,会自动触发回调接口,让消息保存至缓存中。
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
//因为是内部接口,所以还需要把它注入回到RabbitTemplate中
@Autowired
private RabbitTemplate rabbitTemplate;
//注入rabbitTemplate
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机不管是否收到消息的一个回调方法
* CorrelationData
* 消息相关数据
* ack
* 交换机是否收到消息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
}
复制代码
把消息生产者的匹配交换机名称修改一下测试,发现发送消息后,有了提示。说明回调是起作用的
如果是修改routing key的值,再测试,发现收不到,交换机收到消息,但是队列没收到。
回退消息
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
也就是说开启生产确认模式到目前,只会判断生产者和交换机是否连接成功,传到交换机了,交换机没传到队列,这里不负责告知生产者,那么怎么办呢?
可以通过设置mandatory参数,来实现消息不可送达目的地,将消息返还给生产者。
application.properties配置文件写:也就是回退消息给生产者
spring.rabbitmq.publisher-returns=true
复制代码
实现回退接口RabbitTemplate.ReturnCallback:
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
//因为是内部接口,所以还需要把它注入回到RabbitTemplate中
@Autowired
private RabbitTemplate rabbitTemplate;
//注入rabbitTemplate
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机不管是否收到消息的一个回调方法
* CorrelationData
* 消息相关数据
* ack
* 交换机是否收到消息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
//当消息无法路由的时候的回调方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new
String(message.getBody()), exchange, replyText, routingKey);
}
}
复制代码
备份交换机
可以通过设置备份交换机,一旦消息不能从正常的交换机发送,那么会给备份交换机发送消息,让备份交换机来处理,通常备份交换机是fanout
类型。架构如下:
修改之前的配置,先在配置中,添加一个交换机,两个队列,并声明其中的绑定关系:
@Configuration
public class ConfirmConfig {
//交换机名称
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列名称
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routingkey
public static final String CONFIRM_ROUTING_KEY = "key1";
//备份交换机和队列
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
public static final String BACKUP_QUEUE_NAME = "backup_queue";
public static final String WARNING_QUEUE_NAME = "warning_queue";
//声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
}
//队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
//备份交换机
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
//备份队列
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
//备份队列
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
//绑定
@Bean
public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
//绑定
@Bean
public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
复制代码
报警消费者
@Component
@Slf4j
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
}
}
复制代码
启动测试,发现错误的消息会被报警消费者接收。
五、其他知识点
1、幂等性问题
概念
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。
比如网络不好,已经下单扣款了,用于又点击了一次,发现又扣款了,这就是幂等性问题。
在rabbitmq中也有类似的情况,比如MQ发送消息给消费者,结果本来消费者给MQ返回收到消息ack,但是网络不好,没发出去,MQ又把这条消息给其他消费者发,这就造成了重复消费同一条消息。
解决方案
MQ 消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳,或者UUID,或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。
消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:
- 唯一 ID+指纹码机制,利用数据库主键去重,
- 利用 redis 的原子性去实现
唯一ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。
2、优先级队列
场景
比如下单会给我们发送短信,比如很多大企业,系统希望发送消息的优先级高,普通用户消息发送的优先级较低。默认数字0-255,数字越大越优先。
设置方式
- 使用rabbitmq页面添加
- 使用代码添加:队列要设置优先级,消息也要设置优先级
案例
生产者,有一条消息设置了优先级
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Map<String,Object> arguments = new HashMap<>();
//设置最大优先级
arguments.put("x-max-priority",10);
channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
for (int i = 0; i < 10; i++) {
String message = "hello world" + i;
//i=5设置它的优先级,别人的都没有
if (i == 5){
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());
}else {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
System.out.println("消息发送完毕");
}
}
复制代码
消费者
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("123");
//创建connection
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
System.out.println("等待接收消息");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, message)->{
String msg = new String(message.getBody());
System.out.println("接收到的消息是" + msg);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = consumerTag -> {
System.out.println("接收消息中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
复制代码
设置优先级了,先接收到。
3、惰性队列
概念
正常情况下,消息是保存再内存中的,惰性队列是保存在磁盘中的,正常队列的消费快,惰性队列的消费慢,因为消费需要从磁盘读取到内存再消费。
应用场景
惰性队列使用在消费者下线、宕机、维护的情况下,避免消费者宕机,导致大量消息留在内存中
设置方式
通过设置x-queue-mode
实现惰性队列。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
复制代码
内存开销对比
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB。但是速度会变得很慢。
六、RabbitMQ集群
1、集群搭建
linux克隆两个虚拟机,当作集群使用。
看看各台服务器的ip,连接它们。
修改三台机器的主机名称:
vim /etc/hostname
#修改内部的名称node2和node3
#重启,否则不生效
reboot
复制代码
配置各个节点的 hosts 文件,让各个节点都能互相识别对方:
vim /etc/hosts
192.168.200.130 node1
192.168.200.129 node2
192.168.200.128 node3
复制代码
以确保各个节点的 cookie 文件使用的是同一个值,在node1虚拟机上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
#是否继续连接选yes
#输入密码...
复制代码
启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以下命令)
rabbitmq-server -detached
复制代码
分别在node2节点和node3节点输入以下指令:
#关闭
rabbitmqctl stop_app
(rabbitmqctl stop 会将Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务)
#重置
rabbitmqctl reset
#将该节点(2号和3号节点)加入到1号节点
rabbitmqctl join_cluster rabbit@node1
#当然也可以启动完2号节点后,把3号节点加入到2号上
#rabbitmqctl join_cluster rabbit@node2
#启动节点
rabbitmqctl start_app(只启动应用服务)
复制代码
查看集群的状态
rabbitmqctl cluster_status
复制代码
创建账户
#创建账号
rabbitmqctl add_user admin 123
#设置用户角色
rabbitmqctl set_user_tags admin administrator
#设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
复制代码
进入rabbitmq查看,看到图就是搭建成功了。
怎么解除某个节点:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)
复制代码
2、镜像队列
前面我们只是搭建好了RabbitMQ的集群,可是集群里的队列还是独立的,那还是没什么用。某一个RabbitMQ宕机了,其他节点都没有这个队列,肯定发送消息过去还是要丢失掉的。
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
3、高可用负载均衡
比如最开始我们配置connection时:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.130");
factory.setUsername("admin");
factory.setPassword("123");
复制代码
这里明显写死了,如果node1挂掉了,生产者是无法找到另外两个node的,无法变更ip是一个很麻烦的问题。
采用Haproxy实现负载均衡
一旦主机node1挂掉,keepalive会转到备机node2,然后由备机2来帮助工作,并且不断监测主机是否还活着,如果不活着了,备机就会接管主机的工作。
HAProxy 提供高可用性、负载均衡及基于TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。
Federation Exchange
北京的客户访问北京交换机还行,深圳的用户如果访问北京的交换机就会产生网络延迟。我们希望北京的客户访问北京的交换机,深圳的用户访问深圳的交换机,但是还有个问题,数据不一致怎么办?北京的信息和深圳的信息要及时同步。
几个步骤:
- 需要保证每台节点单独运行
- 在每台机器上开启 federation 相关插件
- rabbitmq-plugins enable rabbitmq_federation
- rabbitmq-plugins enable rabbitmq_federation_management
原理图:
Federation Queue
联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。
Shovel
Federation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为"铲子",是一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel 行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。
近期评论