RabbitMq你真的了解吗?(二)
本文章使用环境
- RabbitMq 3.7.27(没有用docker上最新的,因为发现Exchange点不进去会报错)
- openJdk8
RabbitMq的消息是怎么传送的
- RabbitMq是这么解释的,由消息生产者发布消息到对应的交换机,交换机通过路由策略再发到队列中,然后消费者再通过队列获取到消息。也就是说,生产者是不可以直接发送消息到消费者队列上的,那为什么在RabbitMq你真的了解吗?(一)可以直接发送队列名称,该队列就能直接受到信息呢?接下来我们就来说。
- 当然其中还有生产者如何确认消息不丢失并且发送到了Mq上面,还有消费者如何确认消息已经消费了。而生产者跟消费者确认是否发送成功或消费成功有自己不同的机制,这个我们这篇文章也会详细的说下去。
了解RabbitMq的4种交换机
Exchange type | Default pre-declared names |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
- 其实还有一个叫做Default.Exchange这个就是RabbitMq你真的了解吗?(一)这篇文章上写的,为什么可以直接发队列名称而不发交换机可以获取到信息的原因。
fanout
扇形交换机类型fanout其实就是 fan out的意思,大佬们都叫把他叫做扇形交换机,他像扇形一样把消息发散出去。
由上图就基本可以确认这个交换机发送原理,fanout类型的交换机,绑定了这个交换机的队列都可以收到信息。
这个交换机是比较常用,并且也是使用起来没有一些难度的交换机类型。
direct
直接交换器根据消息路由键将消息传递到队列。直接交换是消息单播路由的理想选择(尽管它们也可用于多播路由)。它的工作原理:
下面这是RabbitMq官方的解释
- A queue binds to the exchange with a routing key K
- When a new message with routing key R arrives at the direct exchange, the exchange routes it to the queue if K = R
蹩脚英文翻译一下
- 一个带着routingKey的K绑定到直接交换机上
- 当一个新的消息发到直接交换机上,并且该routingKey为R是等于K的话,那么就会由交换机路由到这个队列上
headers
headers交换机用在多个属性上进行路由,也就是说路由的策略不再是使用routingKey去路由,而是使用属性进行路由,属性如果能对得上就可以收到,反之而不行。
下面我们试下这几个例子
/**
* @author Kakki
* @version 1.0
* @create 2021-06-18 16:56
*/
@Configuration
public class QueueAutoConfig implements SmartInitializingSingleton {
@Autowired
private AmqpAdmin amqpAdmin;
@Override
public void afterSingletonsInstantiated() {
// 创建两个不用匹配策略的交换机并且绑定
Queue queueAll = new Queue("Headers_ALL_QUEUE");
amqpAdmin.declareQueue(queueAll);
CustomExchange exchangeAll = new CustomExchange("Headers_EXCHANGE", "headers");
amqpAdmin.declareExchange(exchangeAll);
Binding bindingAll = BindingBuilder
.bind(queueAll)
.to(exchangeAll)
.with("Headers_ROUTING_KEY")
.and(new HashMap<String, Object>(){{put("all-1", "value1");put("all-2", "value2");put("x-match", "all");}});
amqpAdmin.declareBinding(bindingAll);
Queue queueAny = new Queue("Headers_ANY_QUEUE");
amqpAdmin.declareQueue(queueAny);
CustomExchange exchangeAny = new CustomExchange("Headers_EXCHANGE", "headers");
amqpAdmin.declareExchange(exchangeAny);
Binding bindingAny = BindingBuilder
.bind(queueAny)
.to(exchangeAny)
.with("Headers_ROUTING_KEY")
.and(new HashMap<String, Object>(){{put("any-1", "value1");put("any-2", "value2");put("x-match", "any");}});
amqpAdmin.declareBinding(bindingAny);
}
/**
* 单元测试
*/
@Test
public void sendMq() {
MessageProperties all1 = new MessageProperties();
all1.setHeader("all-1", "value1");
all1.setHeader("all-2", "value2");
MessageProperties all2 = new MessageProperties();
all2.setHeader("all-1", "value1");
MessageProperties any1 = new MessageProperties();
any1.setHeader("any-1", "value1");
any1.setHeader("any-2", "value2");
MessageProperties any2 = new MessageProperties();
any2.setHeader("any-1", "value1");
Message all1Message = new Message("ALL1".getBytes(StandardCharsets.UTF_8), all1);
Message all2Message = new Message("ALL2".getBytes(StandardCharsets.UTF_8), all2);
Message any1Message = new Message("ANY1".getBytes(StandardCharsets.UTF_8), any1);
Message any2Message = new Message("ANY2".getBytes(StandardCharsets.UTF_8), any2);
rabbitTemplate.convertAndSend("Headers_EXCHANGE", null, all1Message);
rabbitTemplate.convertAndSend("Headers_EXCHANGE", null, all2Message);
rabbitTemplate.convertAndSend("Headers_EXCHANGE", null, any1Message);
rabbitTemplate.convertAndSend("Headers_EXCHANGE", null, any2Message);
while (true) ;
}
}
复制代码
/**
* @author Kakki
* @version 1.0
* @create 2021-06-18 17:13
* mq消费者
*/
@Component
@Slf4j
public class MqCustomer {
@RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ALL_QUEUE", arguments = {@Argument(name = "all-1", value = "value1"), @Argument(name = "all-2", value = "value2")})})
public void handlerAll1(Message message) {
log.info("[Headers_ALL_QUEUE1]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ALL_QUEUE", arguments = {@Argument(name = "all-1", value = "value1")})})
public void handlerAll2(Message message) {
log.info("[Headers_ALL_QUEUE2]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ANY_QUEUE", arguments = {@Argument(name = "any-1", value = "value1"), @Argument(name = "any-2", value = "value2")})})
public void handlerAny1(Message message) {
log.info("[Headers_ANY_QUEUE1]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ANY_QUEUE", arguments = {@Argument(name = "any-1", value = "value1")})})
public void handlerAny2(Message message) {
log.info("[Headers_ANY_QUEUE2]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(queuesToDeclare = {@Queue(name = "Headers_ANY_QUEUE", arguments = {@Argument(name = "any-1", value = "value1"), @Argument(name = "any-3", value = "value3")})})
public void handlerAny3(Message message) {
log.info("[Headers_ANY_QUEUE3]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
复制代码
2021-07-03 17:47:24.922 INFO 9544 --- [ntContainer#1-1] com.example.demo.config.MqCustomer : [Headers_ALL_QUEUE1]:ALL1
2021-07-03 17:47:24.923 INFO 9544 --- [ntContainer#3-1] com.example.demo.config.MqCustomer : [Headers_ANY_QUEUE1]:ANY1
2021-07-03 17:47:24.923 INFO 9544 --- [ntContainer#4-1] com.example.demo.config.MqCustomer : [Headers_ANY_QUEUE2]:ANY2
复制代码
- 由此可见,headers交换机是不需要routingKey的,并且匹配规则是通过header的属性进行匹配,匹配方式有all和any两种。
- 而当消息生产者发送到交换机的时候,会随机被不用的消费者消费。也就是说我只生产两条消息,就算有100个能有匹配上的消费者,也只有两个消费者能消费得到
- 而当匹配机制为all的时候,一定要所有的header全部匹配才算成功
- x-开头的不会被算到匹配机制(笔者没测过,因为懒,但是官方这么说了,我们不能不信吧。手动狗头)
topic
topic交换机根据消息路由键和用于将队列绑定到交换的模式之间的匹配将消息路由到一个或多个队列。主题交换类型通常用于实现各种发布/订阅模式变体。主题交换通常用于消息的多播路由。
主题交换有非常广泛的用例。每当一个问题涉及多个消费者/应用程序有选择地选择他们想要接收的消息类型时,就应该考虑使用主题交换。
/**
* @author Kakki
* @version 1.0
* @create 2021-06-18 16:56
*/
@Configuration
public class QueueAutoConfig implements SmartInitializingSingleton {
@Autowired
private AmqpAdmin amqpAdmin;
@Override
public void afterSingletonsInstantiated() {
Queue topicQueue = new Queue("topicQueue");
amqpAdmin.declareQueue(topicQueue);
CustomExchange topicExchange = new CustomExchange("topicExchange", "topic");
amqpAdmin.declareExchange(topicExchange);
Binding topicBinding = BindingBuilder
.bind(topicQueue)
.to(topicExchange)
.with("topic.#")
.noargs();
amqpAdmin.declareBinding(topicBinding);
}
/**
* 单元测试
*/
@Test
public void sendMq() {
rabbitTemplate.convertAndSend("topicExchange", "topic.你好1", "topic.你好1");
rabbitTemplate.convertAndSend("topicExchange", "topic.你好2", "topic.你好2");
rabbitTemplate.convertAndSend("topicExchange", "topic你好1", "topic你好1");
rabbitTemplate.convertAndSend("topicExchange", "你好1topic.你好1", "你好1topic.你好1");
while (true) ;
}
@RabbitListener(queues = {"topicQueue"})
public void handlerT(Message message) {
log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
复制代码
2021-07-03 18:15:30.076 INFO 9296 --- [ntContainer#0-1] com.example.demo.config.MqCustomer : [消息来了]:topic.你好1
2021-07-03 18:15:30.077 INFO 9296 --- [ntContainer#0-1] com.example.demo.config.MqCustomer : [消息来了]:topic.你好2
复制代码
- topic是多播的方式,也就是生产者发送一条信息,只要能匹配得上的都可以收到信息,有点像发布订阅的类型,只要我订阅了,就都可以收到
- topic的#就是匹配类型,只要匹配到就可以收到
default
其实RabbitMq还有默认的交换机,叫做default交换机,他其实也是direct交换机类型的一种,只不过这种比较特殊,不需要发到交换机上面,只需要选定好RoutingKey就可以发送到指定队列了。看起来就像信息发送不是经过交换机,直接往队列上发而已,殊不知其实还是要走direct交换机
小结
RabbitMq的4种交换机,应用的业务场景都不一样。不可以说那个交换机应该适合这个业务适合另外一个业务,都是要靠具体的业务场景去灵活运用。当然最简单对于笔者而言就是fanout和direct了。
RabbitMq其实用处挺多的,而且也有很多好用的地方,对于笔者而言,遇到的业务场景,都可以用mq进行解耦削峰,灵活运用。当然RabbitMq不仅仅学完这4种交换机类型就完全掌握了。当然笔者还会继续更新关于RabbitMq的东西。
近期评论