RabbitMQ教程1.“HelloWorld”1″

消息队列作为开发中常用的中间件,主要应用于处理削峰、异步、解耦等场景。RabbitMQ因其使用简单,配置灵活,管理方便而广受使用。为了方便小白快速入门,课代表翻译了官方教程原文供大家参考,以下为其第一篇:“Hello World”

1 "Hello World"

介绍

RabbitMQ 是一个消息代理(message broker):它接收并转发消息。你可以把它想象成邮局,当你把要发送的信件放到邮箱里时,你可以确信某位邮递员最终会将你的信件投递给接收人。在这个类比中,RabbitMQ 就是邮箱+邮局+邮递员

RabbitMQ 和邮局的主要区别是,它不处理纸质信件,取而代之的是,它接收、存储并转发二进制数据——消息(message)

RabbitMQ 和消息投递中,用到了如下术语:

  • 生产(Producing)就是发送。发送消息的程序就是生产者:

producer.png

  • 队列就是 RabbitMQ 里的邮箱。尽管消息从 RabbitMQ 流向你的应用,但消息只能存储在队列中。队列受限于宿主机的内存和硬盘大小,它的本质是一个巨大的消息缓冲。生产者们可以发送消息给队列,消费者们可以从队列接收消息。队列可用下图表示:

队列(Queue)

  • 消费(Consuming )就是接收消息。消费者是等待接收消息的程序:

生产者,消费者和消息代理没必要在同一台主机上,实际上在大多数应用场景中,三者都在不同的机器上。一个应用既可以是生产者,也可以是消费者。

"Hello World"

(Java 代码实现)

在这部分教程中,我们将会编写两个 Java 应用:生产者用于发送单条消息,消费者用于接收消息并打印收到的消息。我们将会详细介绍几个 Java API,用于实现这个简单的功能。这就是消息发送界的 "Hello World"。

下图中,“P”是生产者,“C”是消费者,中间的红框是队列——RabbitMQ 为消费者提供的消息缓冲区。

Producer-queue-Consumer

Java库

RabbitMQ 支持多种协议,本教程使用 AMQP 0-9-1,它是开源、通用的消息发送协议。RabbitMQ的客户端支持多种编程语言。本文使用 RabbtiMQ 提供的Java 客户端。

下载相应 Java 库和相关依赖 (SLF4J APISLF4J Simple)。把文件复制到工作目录中。

需要注意的是, SLF4J Simple 仅用于教程演示,生产环境请使用全功能日志记录类库,如: Logback

(RabbitMQ 的 Java 客户端也在Maven 仓库中提供,groupId:com.rabbitmq,artifactId:amqp-client)

现在有了 Java 客户端和依赖库,可以写点代码了。

发送(Sending)

我们把消息发布者(sender) 类命名为 Send,消息消费者(receiver)命名为Recv。发布者将会连接到 RabbitMQ,发送一条消息,然后退出。

Send.java 中,需要引入如下类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
复制代码

编写类代码给队列命名:

public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
      ...
  }
}
复制代码

然后连接到服务器

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
     
}
复制代码

Connection 类封装了socket 连接,并替我们处理协议版本协商和认证等工作。这里我们连接到了本机的RabbitMQ 节点——因为连接地址写的是 localhost

如果想连接到其他机器节点,只需要指定主机名或者IP地址即可。

接下来创建 channel,API 的所有工作都依赖于 channel 完成。由于 Connectionchannel 都实现了java.io.Closeable,我们可以使用 try-with-resource 语句,防止显式编写关闭代码。

要发送消息,需要声明一个队列(queue)作为目标;然后把消息发送给这个队列,所有代码可以写到 try-with-resource 语句中

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
复制代码

声明队列操作是幂等的——也就是说只有当声明的队列不存在时才会创建。消息内容是字符数组,从而可以对任意内容进行编码。

点击查看 Send.java 源文件

接收(Receiving)

上面介绍的是发布者。我们的消费者要从 RabbitMQ 监听消息,与发布者每次发送单个消息不同,消费者会一直运行并监听消息,然后把监听到的消息打印出来。

接收(Receiving)

Recv.java 中的引用 和 Send一样:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
复制代码

我们将使用DeliverCallback接口来缓存服务端发给我们的消息。

配置代码和发布者一样:创建connectionchannel,声明需要从哪个队列(queue)消费消息。这里要和发布者的 queue 相对应。

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}
复制代码

注意这里我们也声明了队列。因为我们可能在运行发布者之前,先运行生产者,这样写是为了确保消费消息时,相应队列存在。

为什么不用 try-with-resource 语句来自动关闭channelconnection 呢?如果这样写,相当于让程序继续往下执行,关闭所有资源然后退出应用!而我们的目的是希望应用一直存活,持续地异步监听消息。

接下来我们将告知服务器将 queue 里的消息发送过来。由于服务器异步给我们推送消息,我们提供一个对象形式的回调用来缓存消息,直到消息可用。这就是DeliverCallback子类的作用

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
复制代码

点击查看 Recv.java 源文件

代码整合(Putting it all together)

只需要把 RabbitMQ java 客户端代码放在 classpath下,你就可以编译这俩文件:

javac -cp amqp-client-5.7.1.jar Send.java Recv.java
复制代码

为了运行他们,你需要 rabbitmq-client.jar 和它的依赖包。在终端中运行消费者(receiver):

java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Recv
复制代码

然后运行发布者(sender):

java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Send
复制代码

Windows 系统下,类路径中的项目分隔符使用分号取代冒号。

消费者将会打印出通过 RabbitMQ 获取到的发布者所发布的消息。消费者程序将会持续运行,等待接收消息(使用ctrl+c 停止),因此,可以再开一个终端来运行发送者。

列出队列

你可能想知道 RabbitMQ 有哪些队列(queue),每个队列里有多少消息。

可以通过 rabbitmqctl 工具查看:

sudo rabbitmqctl list_queues
复制代码

Windows系统下省略 sudo:

rabbitmqctl list_queues
复制代码

接下来我们学习第二部分,建立一个简单的工作队列。

小提示

为了缩短命令,可以给这些类路径设置环境变量

export CP=.:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar
java -cp $CP Send
复制代码

Windows下:

set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
java -cp %CP% Send
复制代码

往期干货推荐

下载的附件名总乱码?你该去读一下 RFC 文档了!

深入浅出 MySQL 优先队列(你一定会踩到的order by limit 问题)

Freemarker 教程(一)-模板开发手册


码字不易,欢迎点赞关注和分享。
搜索:【Java课代表】,关注公众号。每日一更,及时获取更多Java干货。