RabbitMQ消息流转过程

概述

前一篇笔记写了RabbitMQ的整体架构组成:一文搞懂RabbitMQ架构设计   那么,一条消息到底是怎么从生产者正确地发送给消费者的?下图是整个流转过程的示意图,下面会详细介绍每个步骤环节。对各个组成部件不清楚的同学可以先花几分钟看一下上一篇笔记。
image.png

流转过程

整个消息的退送消费过程可以大致分为两个部分:
生产者推送消息到队列,消费者从队列消费消息,其中,生产者发送消息到队列整个流转过程从建立连接到关闭连接大致可以分为8个步骤,消费者从队列消费消息也可以大致分为6个步骤。下面将分别展开介绍。

生产消息

生产者推送消息到队列的过程如下:

  1. 建立连接

生产者连接到 RabbitMQ Broker 建立一个连接( Connection) ,开启一个信道 (Channel),这里有一点要注意,并不是每次发送一条消息都会建立一个新的TCP连接,我们知道,TCP是一个可靠的连接,因此我们才会选择TCP连接,但是我们也知道建立TCP连接会先经历3次握手才能传输消息,所以每次重新建议一个连接的成本是极高的。所以会选择尽可能地复用已经建立好的连接。这个概念和JAVA里面的线程池里面的线程复用以及连接池复用有一些相同的意思,但是又有不同,复用的方式是通过在一个连接里面增加信道(channel)来尽可能地复用TCP连接,如下图:

image.png
一旦TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道 (Channel) ,每个信道都会被指派一个唯一的ID 。信道是建立在 Connection 之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。它是以信道为最小处理单位而不是TCP连接。这种做法与NIO有着异曲同工之妙。当然,当信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。
此时就需要开辟多 Connection ,将这些信道均摊到这些 Connection 中, 至于这些相关的调优策略需要根据业务自身的实际情况进行调节的每条 AMQP 指令都是通过信道完成的。更多的细节可以去仔细的了解一下AMQP协议。

  1. 声明交换器

生产者声明一个交换器 ,并设置相关属性,比如交换机类型、是否持久化等。RabbitMQ 常用的交换器类型有fanout、direct、 topic headers 这四种。

  1. 声明队列并设置相关属性

生产者声明 个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等。
RabbitMQ 的消息存储在 队列 中,交换器 并不真正耗费服务器的性能,而队列会。如果要衡量 RabbitMQ 当前的 QPS 只需看队列的即可。在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有 个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。

  1. 绑定交换器与队列

生产者通过路由键将交换器和队列绑定起来。通过绑定键与路由键的匹配,消息才能正对地到达目标队列。

  1. 消息发送至Broker

生产者发送消息至 RabbitMQ Broker ,其中包含路由键、交换器等信息。如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者。

  1. 查找队列相应的交换器

根据接收到的路由键查找相匹配的队列

  1. 消息存入队列或者丢弃

如果找到 ,则将从生产者发送过来的消息存入相应的队列中

  1. 信道与连接关闭

关闭信道,关闭连接

整个过程的AMQP协议流转如下:

image.png

消费消息

RabbitMQ 的消费模式分两种 Push 模式和拉 Pull 模式 推模式采用 Basic Consume进行消费,而拉模式则是调用 Basic Get 进行消费。消费者接收消息的过程:

  1. 消费者连接到 RabbitMQ Broker ,建立一个连接,开启一个信道(Channel)
  2. 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做些准备工作。
  3. 等待 RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
  4. 消费者发送确认ack表示正确接收消费了消息。
  5. RabbitMQ 从队列中删除相应己经被确认的消息。
  6. 关闭信道与连接

整体的流转过程如下:

image.png

【参考】
【1】《RabbitMQ实战指南》