rabbitMq消息投递不丢失,保证幂等性rabbitMq

rabbitMq消息投递不丢失,保证幂等性

MQ消息投递,MQ服务器宕机导致丢失,rabbitMq通过durable参数持久化,也会概率上产生丢失。

图片

RabbitMQ 如何保证消息不丢失?

1.丢数据场景

  • 生产端丢数据场景,例如生产者将数据推送RabbitMq时,因网络原因导致数据丢失

  • rabbitmq丢数据,例如没有开启持久化,rabbitmq重启导致丢数据。或者开启持久化,在持久化到磁盘过程中挂了。

  • 消费端丢数据场景,例如消费端消费过程中挂了,rabbitmq认为消费了并删除,导致丢数据。

2.解决方案

2.1.消息持久化

将queue、exchange、message都持久化,但不能保证100%不丢失数据,消息持久化解决因为服务器异常奔溃导致的消息丢失

queue持久化

//durable=true,实现queue的持久化

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name", true, false, false, null);
复制代码
//Channel类中queueDeclare的完整定义如下:
/**
 * Declare a queue
 * @see com.rabbitmq.client.AMQP.Queue.Declare
 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 * @param queue(队列名称)the name of the queue 
 * @param durable(持久化)true if we are declaring a durable queue (the queue will survive a server restart) 
 * @param exclusive(排他队列) true if we are declaring an exclusive queue (restricted to this connection)
 * @param autoDelete(自动删除) true if we are declaring an autodelete queue (server will delete it when no longer in use)
 * @param arguments other properties (construction arguments) for the queue
 * @return a declaration-confirm method to indicate the queue was successfully declared
 * @throws java.io.IOException if an error is encountered
 */
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;
                             
exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。
这里需要注意三点:
1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
复制代码

exchange持久化

如果不设置exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。

//durable=true,持久化
channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
复制代码
//exchangeDeclare的完整定义如下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
                                          String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
                           String type,
                           boolean durable,
                           boolean autoDelete,
                           boolean internal,
                           Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
复制代码

message持久化

queue队列持久化为true,但message没有持久化,重启后message还是会丢失。
需要queue和message都设置持久化,broker服务重启后,队列存在,消息也存在。

//MessageProperties.PERSISTENT_TEXT_PLAIN 为消息持久化
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
复制代码
//basicPublish完整定义如下:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;
        
exchange表示exchange的名称
routingKey表示routingKey的名称
body代表发送的消息体

//BasicProperties定义如下,deliveryMode=1代表不持久化,deliveryMode=2代表持久化
public BasicProperties(
            String contentType,//消息类型如:text/plain
            String contentEncoding,//编码
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//优先级
            String correlationId,
            String replyTo,//反馈队列
            String expiration,//expiration到期时间
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)

//MessageProperties.PERSISTENT_TEXT_PLAIN定义如下:其中deliveryMode=2表示持久化
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2,0, null, null, null,null, null, null, null,null, null);
复制代码

消息推送到rabbitmq后,先保存到cache中,然后异步刷入到磁盘中。

消息什么时候刷到磁盘?

写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。
每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

2.2.事务+confirm

在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端。
RabbitMQ的可靠性涉及producer端的确认机制、broker端的镜像队列的配置以及consumer端的确认机制,要想确保消息的可靠性越高,那么性能也会随之而降,鱼和熊掌不可兼得,关键在于选择和取舍。

消息持久化解决因为服务器异常奔溃导致的消息丢失,但不能解决发布者将消息发送之后,消息有没有正确到达broker代理服务器。如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

这时RabbitMQ为我们提供了两种方式:

  • 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  • 通过将channel设置成confirm模式来实现;
2.2.1.事务机制

rabbitMQ事务机制三个方法:

  • txSelect()用于将当前channel设置成transaction模式
  • txCommit用于提交事务
  • txRollback用于回滚事务

在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

try {
    channel.txSelect();
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    int result = 1 / 0;
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    channel.txRollback();
}
复制代码

使用事务机制的话会降低RabbitMQ的性能,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。

2.2.2.confirm模式

producer端confirm模式的实现原理

生产者将信道设置成confirm模式,在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。

在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。

编程模式

客户端实现生产者confirm有三种编程方式:

  1. 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
  2. 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
  3. 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

第1种

\\普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。

channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
	System.out.println("send message failed.");
}
复制代码

第二种

channel.confirmSelect();
for(int i=0;i<batchCount;i++){
	channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
	System.out.println("send message failed.");
}
复制代码

第三种

异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    	System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
});

while (true) {
   long nextSeqNo = channel.getNextPublishSeqNo();
   channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
   confirmSet.add(nextSeqNo);
}
复制代码

性能对比

性能由低到高:事务模式(tx) < 普通confirm模式 < 批量confirm模式 < 异步confirm模式

2.2.3.消息确认(Consumer端)

为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

RabbitMQ管理平台界面上可以看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数。也可以通过命令行来查看上述信息:
img

代码示例(关闭自动消息确认,进行手动ack):

   QueueingConsumer consumer = new QueueingConsumer(channel);
   channel.basicConsume(ConfirmConfig.queueName, false, consumer);
   
   while(true){
       QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       String msg = new String(delivery.getBody());
// do something with msg. 
       channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }
复制代码

broker将在下面的情况中对消息进行confirm:

  • broker发现当前消息无法被路由到指定的queues中(如果设置了mandatory属性,则broker会发送basic.return)
  • 非持久属性的消息到达了其所应该到达的所有queue中(和镜像queue中)
  • 持久消息到达了其所应该到达的所有queue中(和镜像中),并被持久化到了磁盘(fsync)
  • 持久消息从其所在的所有queue中被consume了(如果必要则会被ack)

basicRecover:是路由不成功的消息可以使用recovery重新发送到队列中。

basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。

basicNack:可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的所有未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。

2.3.设置集群镜像模式

RabbitMQ的mirrored-queue即镜像队列,这个相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能完全的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好很多,很多现实生产环境下都是配置了mirrored-queue的。

2.4.消息补偿机制

消息提前持久化+定时任务

图片

上图流程:

(1)订单服务生产者在投递消息之前,先把消息持久化到Redis或DB中,建议Redis,高性能。消息的状态为发送中。

(2)confirm机制监听消息是否发送成功?如ack成功消息,删除Redis中此消息。

(3)如果nack不成功的消息,这个可以根据自身的业务选择是否重发此消息。也可以删除此消息,由自己的业务决定。

(4)这边加了个定时任务,来拉取隔一定时间了,消息状态还是为发送中的,这个状态就表明,订单服务是没有收到ack成功消息。

(5)定时任务会作补偿性的投递消息。这个时候如果MQ回调ack成功接收了,再把Redis中此消息删除。

幂等性保证

1.唯一性索引

根据业务规则,设置表字段唯一性。

2.乐观锁方案

借鉴数据库的乐观锁机制,根据version版本,也就是在操作库存前先获取当前商品的version版本号,然后操作的时候带上此version号。我们梳理下,我们第一次操作库存时,得到version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传递的version还是1,再执行上面的sql语句时,就不会执行;因为version已经变为2了,where条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。

update table set count=count-1,version=version+1 where id=2 and version=1
复制代码

3.分布式锁

若是是分布是系统,构建全局惟一索引比较困难,例如惟一性的字段无法肯定,这时候能够引入分布式锁,经过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,而后作操做,以后释放锁,这样实际上是把多线程并发的锁的思路,引入多多个系统,也就是分布式系统中得解决思路。要点:某个长流程处理过程要求不能并发执行,能够在流程执行以前根据某个标志(用户ID+后缀等)获取分布式锁,其余流程执行时获取锁就会失败,也就是同一时间该流程只能有一个能执行成功,执行完成后,释放分布式锁(分布式锁要第三方系统提供);

2.唯一ID+指纹码机制

唯一ID:如数据库的主键id

指纹码:业务规则标识唯一的。如时间戳+银行返回的唯一码。需要注意的是,这个指纹码不一定就是我们系统生产的,可能是我们自己业务规则或者是外部返回的一些规则经过拼接后的东西。其目的:就是为了保障此次操作达到绝对唯一的。

唯一ID+指纹码机制,利用数据库主键去重。如:

Select count(id) from table where id = 唯一ID+指纹码
复制代码

好处:实现简单,就一个拼接,而后查询判断是否重复。

坏处:高并发下若是是单个数据库就会有写入性能瓶颈

解决方案:根据 ID 进行分库分表,对 id 进行算法路由,落到一个具体的数据库,而后当这个 id 第二次来又会落到这个数据库,这时候就像我单库时的查重同样了。利用算法路由把单库的幂等变成多库的幂等,分摊数据流量压力,提升性能。

3.2.Redis 原子性

相信你们都知道 redis 的原子性操做,我这里就不须要过多介绍了。性能

使用 redis 的原子性去实现须要考虑两个点

一是 是否 要进行数据落库,若是落库的话,关键解决的问题是数据库和缓存如何作到原子性? 数据库与缓存进行同步确定要进行写操做,到底先写 redis 仍是先写数据库,这是个问题,涉及到缓存更新与淘汰的问题

二是若是不落库,那么都存储到缓存中,如何设置定时同步的策略? 不入库的话,可使用双重缓存等策略,保障一个消息副本,具体同步可使用相似 databus 这种同步工具。

摘抄

1.honeypps.com/mq/rabbitmq…

2.mp.weixin.qq.com/s/f5DKk_alX…