RabbitMQ工具实现原理及使用教程
目标:
减少重复代码的开发工作,简化RabbitMQ的接入工作,让程序员的关注点更多的集中在业务逻辑的处理上。
源码请跳转至2-②
1、实现原理
动态创建核心原理
在Spring启动时,利用Spring Bean管理工厂BeanFactory接口,实现动态
创建交换机
、队列
、交换机和队列的绑定关系
,让我们无需进行重复的编码工作。
广播交换机原理
当发送消息发送至“fanout
”类型交换机时,RabbitMQ将会忽略路由键
,直接
将消息发送至该交换机下所有队列
。从而被每一个队列的消费者进行消费。
直连交换机原理
当发送消息发送至“direct
”类型交换机时,RabbitMQ将会查找与传入路由键完全匹配
的队列,然后将消息放入队列当中,从被队列的消费者进行消费。
通配符交换机原理
当发送消息发送至“topic
”类型交换机时,RabbitMQ将会查找与传入路由键规则匹配
的队列,然后将消息放入队列当中,从被队列的消费者进行消费。
头交换机原理
当发送消息发送至“headers
”类型交换机时,RabbitMQ将会忽略路由键
,根据在绑定路由器及队列时设置的完全匹配
或部分匹配
作为依据,以消息内容中的headers属性对队列进行匹配
,匹配成功的队列才可以收到该消息从而进一步被消费者消费。
特别提示
manual模式
(手动ACK)下,注意一定要
进行消息的确认
。如果忘记了ACK,那么后果很严重。当consumer退出时候,message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏
"是致命
的。
2、源码解析
①、文件说明
ExchangeTypeEnum
说明:RabbitMQ交换器类型枚举
作用:声明了交换器的常用类型
操作方式:无需操作
备注:无
ExchangeEnum
说明:RabbitMQ交换器枚举
作用:声明了项目需要使用到的交换机
操作方式: 按需增删
备注:由于fanout类型的交换机会忽略路由键直接下发所有的路由器,故每个不同的广播都应新建一个交换机
QueueArguments
说明:队列参数
作用:声明了在创建队列时使用到的参数Map
操作方式: 按需修改
备注:无
QueueHeaders
说明:队列头信息
作用:声明了ExchangeTypeEnum.headers类型的交换机所使用的匹配信息
操作方式: 按需修改
备注:无
QueueEnum
说明:队列配置枚举
作用:声明了项目需要使用的队列
操作方式: 无需操作
备注:所有指定为ExchangeEnum.fanout的队列,都将在队列名称后面追加“.10_87_10_*”(数字部分为服务所在IP地址)
RabbitMQConfig
说明:RabbitMQ核心配置类
作用:实现动态创建交换机、动态创建队列、动态绑定
操作方式: 无需操作
备注:该类已将所有队列名称以Map形式注册至spring bean工厂
,bean名称“queuesNames
”,结构:Map<String, String>
(Map<队列配置枚举名称, 队列名称>) 消费者可直接使用@RabbitListener(queues = {"#{queuesNames.队列配置枚举名称}"})
进行监听
DefaultMQCallback
说明:默认回调类,实现自MQCallback
作用:消息发送成功或失败的回调类
操作方式: 无需操作
备注:可手动进行实现,调用queueMessageServiceImpl.setMQCallback(mqCallback)方法即可实现修改
QueueMessageServiceImpl
说明:默认消息发送类,实现自QueueMessageService
作用:实现了消息发送的基本接口
操作方式: 无需操作
备注:无
②、核心代码解析
spring示例注册
public static <T> boolean registerBean(String beanName, T bean) {
// 将applicationContext转换为ConfigurableApplicationContext
ConfigurableApplicationContext context = (ConfigurableApplicationContext) SpringContextUtil.getApplicationContext();
// 将bean对象注册到bean工厂
context.getBeanFactory().registerSingleton(beanName, bean);
log.debug("【SpringContextUtil】注册实例“" + beanName + "”到spring容器:" + bean);
return true;
}
复制代码
动态创建队列名称Bean
@Bean("queuesNames")
public Map<String, String> queuesNames() {
return QueueEnum.getQueuesNames();
}
// 这个方法在QueueEnum
public static Map<String, String> getQueuesNames() {
return Arrays.asList(QueueEnum.values()).stream().collect(Collectors.toMap(queueEnum -> queueEnum.toString(), queueEnum -> queueEnum.getName()));
}
复制代码
动态创建交换机
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createExchange();
* @description 动态创建交换机
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:28
*/
@Bean("createExchange")
public Object createExchange() {
// 遍历交换机枚举
ExchangeEnum.toList().forEach(exchangeEnum -> {
// 声明交换机
Exchange exchange;
// 根据交换机模式 生成不同的交换机
switch (exchangeEnum.getType()) {
case fanout:
exchange = ExchangeBuilder.fanoutExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case topic:
exchange = ExchangeBuilder.directExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case headers:
exchange = ExchangeBuilder.headersExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
case direct:
default:
exchange = ExchangeBuilder.topicExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
break;
}
// 将交换机注册到spring bean工厂 让spring实现交换机的管理
if (exchange != null) {
SpringContextUtil.registerBean(exchangeEnum.toString() + "_exchange", exchange);
}
}
);
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
复制代码
动态创建队列
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createQueue();
* @description 动态创建队列
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:29
*/
@Bean("createQueue")
public Object createQueue() {
// 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
QueueEnum.toList().forEach(queueEnum -> SpringContextUtil.registerBean(queueEnum.toString() + "_queue", new Queue(queueEnum.getName(), queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments())));
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
复制代码
动态创建绑定关系
/**
* @return java.lang.Object
* @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createBinding();
* @description 动态将交换机及队列绑定
* @param: this is no-parameter method.
* @author zhiqiang94@vip.qq.com
* @create 2020/4/16 0016 9:29
*/
@Bean("createBinding")
public Object createBinding() {
// 遍历队列枚举 将队列绑定到指定交换机
QueueEnum.toList().forEach(queueEnum -> {
// 从spring bean工厂中获取队列对象(刚才注册的)
Queue queue = SpringContextUtil.getBean(queueEnum.toString() + "_queue", Queue.class);
// 声明绑定关系
Binding binding;
// 根据不同的交换机模式 获取不同的交换机对象(注意:刚才注册时使用的是父类Exchange,这里获取的时候将类型获取成相应的子类)生成不同的绑定规则
switch (queueEnum.getExchangeEnum().getType()) {
case fanout:
FanoutExchange fanoutExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", FanoutExchange.class);
binding = BindingBuilder.bind(queue).to(fanoutExchange);
break;
case topic:
TopicExchange topicExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", TopicExchange.class);
binding = BindingBuilder.bind(queue).to(topicExchange).with(queueEnum.getRoutingKey());
break;
case headers:
HeadersExchange headersExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", HeadersExchange.class);
if (queueEnum.isWhereAll()) {
// whereAll表示全部匹配
binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(queueEnum.getHeaders()).match();
} else {
// whereAny表示部分匹配
binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(queueEnum.getHeaders()).match();
}
break;
case direct:
default:
DirectExchange directExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", DirectExchange.class);
binding = BindingBuilder.bind(queue).to(directExchange).with(queueEnum.getRoutingKey());
break;
}
// 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
if (binding != null) {
SpringContextUtil.registerBean(queueEnum.toString() + "_binding", binding);
}
}
);
// 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
return null;
}
复制代码
3、使用教程
①、按需增删交换机枚举
②、按需增删队列枚举
③、实现消费者监听
④、调用生产者发送消息
⑤、查看测试结果
4、常见问题
①、调用QueueMessageService.convertSendAndReceive方法返回值为null
当消费者方法为void
方法时,QueueMessageService.convertSendAndReceive方法返回值为null,注意检查消费者监听方法。
②、QueueMessageService.convertSendAndReceive方法被多个消费者消费怎么办?
当QueueMessageService.convertSendAndReceive发送的消息被多个消费者消费,返回的结果为第一个消费者的返回值。建议有返回值的队列使用直连模式(direct
),这样能保证该消息只有一个消费者进行消费。
③、广播交换机(ExchangeTypeEnum.fanout)的使用场景有哪些
广播
交换机的角色类似
于村里面的小喇叭
,当有新的政策传达给这个村时,就会使用小喇叭播放,这样每个人(队列)都停到了内容。
可用于发送类似公告、系统全局通知等广播
需求。
④、直连交换机(ExchangeTypeEnum.direct)的使用场景有哪些
直连
交换机的角色类似
于邮递员
,信封上写明了收件地址,收到的消息会根据目的地的不同投入到不同的队列中,若是邮递员没有找到这个地址,这封信就被丢弃了。
可用于点对点
之间的通讯。
⑤、通配符交换机(ExchangeTypeEnum.topic)的使用场景有哪些
通配符
交换机类似
于脑子不聪明的邮件分发员
,收到有一封送往“张家镇李家村12号
”的信件,分发员来到分发点,发现有两个有污渍的邮箱(队列),一个能看清\*李家村\*
,一个能看清张家镇\*
,快递员干脆就将信件复制
了一份,往两个地方都送
了投递了一份。
可用于同一个任务,需要不同模块或者分布式系统的模块同时执行
时使用。
⑥、头交换机(ExchangeTypeEnum.headers)的使用场景有哪些
头
交换机类似
于程序中的鉴权
,管理员拥有全部权限,用户拥有查看权限,用户无法调用需要管理员权限的接口(消息无法分发到未监听该header的队列),拥有相应权限才能调用相应权限的接口(消息分发到监听该header的队列)。这里需要注意,头交换机支持完全匹配
和部分匹配
,完全匹配
为管理员接口必须拥有所有
管理员权限才可调用,即拥有所有管理员权限才显示进入管理员后台的按钮
。部分匹配
为拥有其中一个或部分权限
即可调用,即拥有任意管理员权限就显示进入管理员后台的按钮
。
本文仅为自己学习笔记,由于实现原理非常简单,且核心源码已在本文中展示,故不发放源码连接了。由于本人对RabbitMQ研究并不深入,本文若有错误之处,请大佬指出,我一定虚心学习,积极改正。
第一次写文章,希望大家多多鼓励,多多支持。
近期评论