消息队列(RabbitMQ)入门篇
序章
大家好!我是老猿,一名在深拼搏的新一代农民工。来深有一年了,这段时间可以说是成长最快的一段时间,不管是生活上,还是学习上。但是深圳的鬼天气,真的是不得不让我吐槽(天天下雨),这不,伴随着大雨写下了我掘金第一篇文章。其实比起南方的四季如夏,我还是更喜欢我们北方的四季分明。话不多说!
一、什么是消息队列
消息队列(MQ,Message Queue),是一种应用程序之间的通信方法。
二、消息队列的应用场景
1、异步处理
将一些不必要的业务逻辑,写入消息队列,以异步的方式运行,减少系统响应时间
2、服务解耦
在传统模式中,系统之间有调用,修改代码则需要同时修改两个系统的代码,而消息队列则可以将被调用的消息写入消息队列,需要调用的系统订阅即可,不需要修改被调方的代码。
3、量值削峰
也就是流量削峰,大多运用在电商系统的秒杀环节或者一些突然间访问量暴增的业务中。使用消息队列把消息保存起来,系统以自身的消费能力慢慢从队列中消费消息,直到消费完积压消息为止。
三、初识RabbitMQ
1、简介
Broker:接收和分发消息的应用。
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:提供者 / 消费者 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP 连接的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:交换机。message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point定向), topic (publish-subscribe发布订阅) , fanout (multicast广播)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
2、RabbitMQ的五种模式
2.1、简单模式(入门)
简单模式,也就是RabbitMQ入门案例,一提供一消费,简单明了。
2.1.1、依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
复制代码
2.1.2、提供者
package com.zthl.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Provider(提供者)
* */
public class Provider {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置参数
//主机地址;默认localhost
connectionFactory.setHost("localhost");
//连接端口;默认5672
connectionFactory.setPort(5672);
//虚拟主机名称;默认 /
connectionFactory.setVirtualHost("/");
//连接用户名;默认guest
connectionFactory.setUsername("guest");
//连接密码;默认guest
connectionFactory.setPassword("guest");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//创建队列
/**
*参数1: queue 队列名称
*参数2: durable 是否定义持久化队列,当mq重启之后,还存在
*参数3: exclusive 是否独占本次连接
*参数4: autoDelete 是否在不使用的时候自动删除
*参数5: arguments 其他参数
* */
channel.queueDeclare("simple_queue",true,false,false,null);
//要发送的消息
String message = "老弟!在家不?";
/**
* 参数1:交换机名称,不指定则默认Default Exchage
* 参数2:路由key,简单模式可以设为队列名称
* 参数3:配置消息
* 参数4:消息内容
* */
channel.basicPublish("","simple_queue",null,message.getBytes());
System.out.println("消息已发送:" + message);
//关闭资源
channel.close();
connection.close();
}
}
复制代码
提供者运行结果:
我们在控制台可以看到,已经产生了队列和消息。
2.1.3、消费者
package com.zthl.comsumer;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Consumer
* */
public class Consumer {
public static void main(String[] args) throws Exception{
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置参数
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数1. queue:队列名称
* 参数2. durable:是否持久化
* 参数3. exclusive:是否独占。只能有一个消费者监听这队列
* 参数4. autoDelete:是否自动删除
* 参数5. arguments:参数。
*/
channel.queueDeclare("simple_queue",true,false,false,null);
//接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会自动执行该方法
* 参数1:consumerTag:标识
* 参数2:envelope:获取交换机,路由key等信息
* 参数3:properties:配置信息
* 参数4:body:接收数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+ consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
/**
* basicConsume(String queue,boolean autoAck,Consumer callback)
* 参数1:queue 队列名称
* 参数2:autoAck 是否自动确认,就像发短息,发送成则收到一个发送成功的确认消息
* 参数3:callback 回调对象
* */
channel.basicConsume("simple_queue",true,consumer);
}
}
复制代码
消费者运行结果:
消费者消费之后,控制台变化:
.2、工作队列模式
简单来说,就是一个提供者提供消息,由多个消费者进行消费。
2.2.1、提供者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class WorkProvider {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//创建队列Queue
channel.queueDeclare("work_queue",true,false,false,null);
for (int i = 1; i <= 10; i++) {
String message = i+"hello rabbitmq~~~";
//发送消息
channel.basicPublish("","work_queue",null,message.getBytes());
}
//释放资源
channel.close();
connection.close();
}
}
复制代码
运行提供者控制台结果:
2.2.2、消费者1
import com.rabbitmq.client.*;
import java.io.IOException;
public class WorkConsumer1 {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建队列Queue
channel.queueDeclare("work_queue",true,false,false,null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//只打印消费的消息,方便观察。
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queue",true,consumer);
}
}
复制代码
消费者1运行结果:
2.2.3、消费者2
import com.rabbitmq.client.*;
import java.io.IOException;
public class WorkConsumer2 {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建队列Queue
channel.queueDeclare("work_queue",true,false,false,null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//只打印消费的消息,方便观察。
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queue",true,consumer);
}
复制代码
消费者2运行结果:
总结:
多个消费者消费同一个队列的消息,则多个消费者之间属于竞争的关系。
2.3、发布与订阅模式
交换机类型为:fanout(广播类型)
发布与订阅中多出了一个角色,就是exchange(交换机)。
Proveider:提供者。
Consumer:消费者。
Exchange:交换机,交换机有两个作用,接收消息和处理消息,那么如何处理消息,这就用到交换机类型,常见的三种交换机类型:
1、Fanout:广播类型,将消息广播给所有绑定此交换机的队列
2、Direct:定向,把消息给指定routing key的队列。
3、Topic:通配符类型,把消息给符合规则的队列。
交换机只有转发消息的能力,不具备存储能力,如果没有队列绑定交换机,那么交换机的消息将丢失。
queue:消息队列
2.3.1、提供者
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//定义交换机名称
String exchangeName = "fanout_exchange";
//创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
/**
* 参数1、 exchange 交换机名称
* 参数2、 type:交换机类型
* FANOUT("fanout"),广播类型
* DIRECT("direct"),定向类型
* TOPIC("topic")通配符类型
* 参数3、 durable:是否持久化
* 参数4、 autoDelete:自动删除
* 参数5、 internal:内部使用
* 参数6、 arguments:参数
*/
//创建队列
String queue1Name = "fanout_queue1";
String queue2Name = "fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
/**
参数1、 queue:队列名称
参数2、 exchange:交换机名称
参数3、 routingKey:路由键,绑定规则
如果交换机的类型为fanout(广播模式) ,则routingKey设置为""
*/
String body = "痛苦,越早经历越好,比如:爱情";
//发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
//释放资源
channel.close();
connection.close();
}
复制代码
程序运行后控制台结果:
可以看到交换机所绑定的队列
2.3.2、消费者1
import com.rabbitmq.client.*;
public class FanoutConsumer1 {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
String queue1Name = "fanout_queue1";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
复制代码
运行结果
2.3.3、消费者2
import com.rabbitmq.client.*;
public class FanoutConsumer2 {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
String queue2Name = "fanout_queue2";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
复制代码
运行结果
启动所有消费者,用提供者发送消息,可以看到所有的消费者都消费到了这个消息,这就是fanout(广播)
控制台变化:可以看到消息已被消费
2.4、路由模式
交换机类型为:Direct(定向类型)
注意:
1、队列与交换机的绑定需要指定RoutingKey
。
2、提供者向 Exchange发送消息时,需要指定消息的 RoutingKey
。
3、Exchange会判断Routing Key是否相同,只有队列的Routingkey
与消息的 Routing key
相同,才能消费消息。
2.4.1、提供者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DirectProvider {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
String exchangeName = "direct_exchange";
// 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "direct_queue1";
String queue2Name = "direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 队列绑定交换机
// queue1Name绑定one
channel.queueBind(queue1Name,exchangeName,"one");
// queue2Name绑定one two three
channel.queueBind(queue2Name,exchangeName,"one");
channel.queueBind(queue2Name,exchangeName,"two");
channel.queueBind(queue2Name,exchangeName,"three");
String message = "three人行,必有我师焉!";
// 发送消息
channel.basicPublish(exchangeName,"three",null,message.getBytes());
System.out.println("发送成功:" + message);
//9. 释放资源
channel.close();
connection.close();
}
}
复制代码
运行提供者控制台信息:
下面两张图片中,大家可以看出,交换机绑定的队列中,只有对应在发送消息时设置的routingKey的direct_queue2队列中有一条待消费信息。
2.4.2、消费者1
import com.rabbitmq.client.*;
public class DirectConsumer1 {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
String queue1Name = "direct_queue1";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
复制代码
2.4.3、消费者2
import com.rabbitmq.client.*;
public class DirectConsumer2 {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
String queue2Name = "direct_queue2";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue2Name,true,consumer);
}
复制代码
消费者1运行结果:
消费者2运行结果:
我们可以看出只有有对应routingKey的消费者2消费到一条消息。这就是Direct(定向)类型,只有指定队列的routingKey和消息的routingKey相同才能消费消息。
2.5、通配符模式
交换机类型:Topic(通配符)类型
注意:
Topic和Direct类型其实道理都是一样的道理,都是通过判断routingKey把消息发送给对应的队列,不同的是Topic的routingKey是由一个或多个单词组成的通配符。多个单词使用”.“隔开。
通配符是有规则的:
1、#:匹配一个或多个词。
例1:one.# 可以匹配以one开头的所有通配符,比如:one.two 或者one.two.three或者one.two.three.four
例2:#.one 可以匹配以one结尾的所有通配符,比如:two.one 或者 three.two.one
2、*:只匹配一个词
例1:one.* 可以匹配one.two 或者 one.three
例2:*.one 可以匹配two.one 或者 three.one
解释的应该还算清楚。嘿嘿嘿!!!
2.5.1、提供者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicProvider {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "topic_exchange";
//创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//创建队列
String queue1Name = "topic_queue1";
String queue2Name = "topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//绑定队列和交换机
channel.queueBind(queue1Name,exchangeName,"two.*");
channel.queueBind(queue1Name,exchangeName,"#.one");
channel.queueBind(queue2Name,exchangeName,"*.*");
String body = "你学废了吗?";
//发送消息
channel.basicPublish(exchangeName,"two.one",null,body.getBytes());
//释放资源
channel.close();
connection.close();
}
}
复制代码
运行控制台结果:
2.5.2、消费者1
import com.rabbitmq.client.*;
public class TopicConsumer1 {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
String queue1Name = "topic_queue1";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
复制代码
运行结果:
2.5.3、消费者2
import com.rabbitmq.client.*;
public class TopicConsumer2 {
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//创建连接 Connection
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
String queue2Name = "topic_queue2";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
复制代码
运行结果:
其实前面说过,Topic类型和Direct类型是差不多的,区别就在与routingKey,Topic的routingKey是通配符,所以相比与Direct运用会更加灵活多变。
结束
rabbitMQ入门到这里就结束了,希望大家能共同学习,共同进步!
后面会继续更新高级的文章,如果有兴趣,大家可以一起学习!
近期评论