RabbitMQ客户连接池的实现1、RabbitClient提供的

目前RabbitMQ官方给的出的客户端发送消息的Demo的都是基于短连接来做的,例如:

ConnectionFactory cf = new ConnectionFactory();
cf.Uri = serverAddress;
using (IConnection conn = cf.CreateConnection()){
    using (IModel ch = conn.CreateModel()) {
        if (exchange != ""){
            ch.ExchangeDeclare(exchange, exchangeType);
        }
        ch.BasicPublish(exchange,routingKey,null,Encoding.UTF8.GetBytes(message));               
    }
}
复制代码

我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒,为此,我专门查了资料,得出如下结论:

1、Rabbit Client提供的连接方式介绍:

RabbitMQ官方提供了:

Connection对象,就是一个TCP连接对象。

Channels对象,虚拟连接。虚拟连接建立在上面Connection对象的TCP连接中。数据流动都是在Channel中进行的。每个Connection对象的虚拟连接也是有限的,如果单个Connnection的Channel对象超出指定范围了,也会有性能问题,另外一个TCP连接上的多个虚拟连接,实际在传输数据时,传输数据的虚拟连接还是独占了TCP连接,其它虚拟连接在排队等待。

2、RabbitMQ官方推荐客户端连接使用方式:

在一个Connection对象上创建多个Channel,然后程序发送数据时,分别共享使用创建好的Channel,但使用具体的单个Channel时,需要保障单个Chanel的线程独占性使用,不要让多个线程同时在使用某一个Channel,这样会导致并发错误。使用单个Channel时,加一个锁即可解决,具体官方文档说明和代码示例如下:

Sharing Channels Between Threads
As a rule of thumb, IModel instances should not be used by more than one thread simultaneously: application code should maintain a clear notion of thread ownership for IModel instances. If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion itself. One way of achieving this is for all users of an IModel to lock the instance itself:

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
ch.BasicPublish(…);
}
复制代码

1

3、RabbitMQ Client客户端目前存在的问题

3.1、Connection对象创建、销毁的耗时问题,管理问题:

Connection对象创建的是TCP连接,TCP连接的创建和销毁本身就是很耗时,解决的办法就是创建一个Connection对象之后,一直不关闭,在传输数据时,共享这一个Connection对象就行了,MQ官方也是推荐一个Connection对象上创建多个Chnanel来实现快速数据传输,具体实现方式,创建一个静态的Connection对象不就搞定了,但这种方式问题是,如果我需要连接多个MQ服务器呢,如果我这边根据业务进行了划分,不同的业务数据,需要分别传输到不同的MQ服务器上,如订单数据和聊天数据,是分别发到不同的MQ服务器上面,如果只是有一个静态的Connection对象,怎么可能同时连接2台服务器呢,另外的问题,如果现在客户端发送数据量较大,一个Connection实在是传输不过来,而且Connection虽说是可以给大家共享传输,但具体传输时,还是某一个具体传输Channel会独占整个Connection中的TCP连接,这样传输量一大,Connection忙不过来,还是会导致拥塞的发生。

解决办法: 创建Connection池,如果不同的服务器,会分别有不同的Connection对象,如果一个Connection对象传输不过来,会有多个Connection对象同时在传输数据。连接池在创建多个连接之后,如果某个连接闲置时间超过指定的时间,则连接池会进行单个连接的dispose和remove动作,将连接先销毁,再从连接池中移除,确保不会长期占用无效连接。

3.2、Channel对象的创建、销毁的耗时问题,管理问题

Channel对象,即Connection对象上的TCP连接上的软连接,我们程序具体使用的就是Channel对象进行数据传输,我自己记录了Channel对象创建和销毁的耗时,也是非常长的,为什么Channel对象的创建和销毁会非常耗时,我仔细查了MQ Clinet的源码,发现创建和销毁Channel时,MQ Client分别向MQ服务器发送了2次数据,用于分别通知MQ服务器,当前这个Channel接下来数据传输时用的数据协议,报文格式,以及其它通信相关的信息。销毁时,又重新向MQ服务器发送了数据,通知MQ服务器,断开这个Channel,并且释放MQ服务器上面关于这个Channel所占用的资源。在平常情况下2次TCP数据传输,一般要耗时1毫秒左右,还可以接受,但在高并发下,2次TCP数据传输,则会很耗时,而且如果MQ服务器压力比较大,迟迟不响应客户端请求,则客户端会等待以及整个耗时会更长。而且MQ官方也是推荐共享使用Channel,而不是每次都创建和销毁Channel。

现在问题来了,我怎么才能实现共享Channel,我查了MQ Client源码,Connection对象中确实有集合在存放所有的Channel,但居然没有提供方法让我来使用和管理这里面的Chanel,

具体的原因不详,有兴趣的朋友可以自己在查一下MQ Client的源码。另外如果我创建多个Channel之后,如果不再使用的Channel在闲置时间超过指定的时间之后,如何销毁呢,另外我查了源码,如果我们不设置Connection对象Channel池的长度,则一个Connection对象的Channel数量可以无限增加,因为Channel在传输时,实际上是独占TCP连接,如果Channel无限增加的话,会导致这个TCP拥塞,如果我设置了Channel池的长度,则我创建Channel的数量超过Channel池的长度,则MQ Client直接抛出异常,提示Channel池长度越界(这是我从MQ Client源码中查到的),这样的话,我创建Channel时,需要判断Channel池的长度,防止越界,基于这些问题,我再在外面开发了一个Channel池,用于创建和管理单个Connection对象的Channel对象。如果Chnanel的池数量达到指定数量时,则会新建Connection对象,在新的Connection对象中创建Channel,如果Channel的闲置时间到达指定时间,则会在后台销毁这个Channel对象,如果一个Connection对象的的Channel全部被销毁了并且Connection对象的闲置时间也到达了,则Connection对象也会被销毁。

1

性能测试结果如下
我这边自己用200线程,发送了20万个和60万个消息,做了压力测试,另外还有心跳功能和闲置超过指定时间,主动dispose的功能。
Connection池数量:4,Channel池数量:15 总耗时:218.731秒,平均每秒发送数量:913

1