RabbitMQ之消息可靠性投递实现

方案一:消息入库、定时拉取重新投递

消息流转流程图

消息可靠性投递方案一.jpg

优劣势

这种方式的优势就是实现简单,没有那么多花里胡哨的操作,同时可以保证消息百分百成功。但是劣势也很明显,就是在最开始的时候除了业务数据需要入库以外,还要将消息进行入库,而且在进行消息补偿的时候还需要额外查询一次数据库获取投递失败的消息,而数据库的操作都是很缓慢的,会使得业务处理变得比较缓慢,在数据量不是很大的时候可以考虑。

具体实现

第一步: 将业务数据和消息数据分别入库,在消息入库的时候需要注意,消息的实体应该有一个状态status字段来表明该条消息是否已经被消费,比如0表示未消费,1表示已消费,最开始消息入库的时候,status字段的值为0。

第二步: 将消息投递出去,在这个投递过程中,消息首先到达Broker中的交换机Exchange,如果没有正确投递到Exchange,那么会触发ConfirmCallback,之后才会根据RoutingKey来路由到对应的队列,如果没有正确路由到队列,此时会触发ReturnCallback,这两个回调都可以进行监听从而进行各自的处理。

第三步: 消费者监听对应的队列,并对其中的消息进行消费,在消费完成之后需要ACK。

第四步: 生产者在收到ACK之后,需要将消息数据库中的消息状态改成1,即已消费。

第五步: 开启一个定时任务,定时查询消息数据库中消息状态为0的消息,然后通知生产者将未消费的消息进行重新投递。

代码Demo

RabbitConfig 一些RabbitMQ交换机和队列相关的配置。

@Configuration
public class RabbitConfig {

    /**
     * 订单交换机
     *
     * @return fanout类型的交换机
     */
    @Bean
    public FanoutExchange ORDER_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange("ORDER_EXCHANGE").durable(true).build();
    }

    /**
     * 订单队列
     *
     * @return 队列
     */
    @Bean
    public Queue ORDER_QUEUE() {
        return QueueBuilder.durable("ORDER_QUEUE")
                .maxLengthBytes(1024 * 1024 * 128)
                .maxLength(50000)
                .build();
    }

    /**
     * 订单交换机和队列之间的绑定关系
     *
     * @return 绑定关系
     */
    @Bean
    public Binding ORDER_BINDING() {
        return BindingBuilder.bind(ORDER_QUEUE()).to(ORDER_EXCHANGE());
    }
}
复制代码

RabbitConfirmCallback 当消息没有正确投递到对应的交换机时会触发Confirm回调,可以在这里做一些额外处理,比如日志记录等。

@Component
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

    }
}
复制代码

RabbitReturnCallback 当消息已经投递到交换机了,但是没有通过RoutingKey正确路由到对应的队列时会触发Return回调,也可以在这里做一些额外的处理,比如日志记录等。

@Component
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

    }
}
复制代码

RabbitTemplateFactory 这个类主要是配置RabbitTemplate以及消息转换器等,额外做了一个RabbitTemplate的池化处理,使得每一个交换机都对应一个RabbitTemplate,提高投递消息的效率。

@Configuration
public class RabbitTemplateFactory {

    @Resource
    private ConnectionFactory connectionFactory;

    @Resource
    private RabbitConfirmCallback rabbitConfirmCallback;

    @Resource
    private RabbitReturnCallback rabbitReturnCallback;

    Map<String, RabbitTemplate> rabbitTemplateMap = new ConcurrentHashMap<>();

    /**
     * RabbitMQ的消息转换器
     *
     * @return 消息转换器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 根据不同的交换机生产不同的RabbitTemplate
     *
     * @param exchangeName 交换机的名称
     * @return RabbitTemplate
     */
    public RabbitTemplate getRabbitTemplate(String exchangeName) {
        RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(exchangeName);
        if (rabbitTemplate != null) {
            return rabbitTemplate;
        }
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange(exchangeName);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setRetryTemplate(new RetryTemplate());
        rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
        rabbitTemplate.setReturnCallback(rabbitReturnCallback);
        return rabbitTemplate;
    }
}
复制代码

MessageConsumer 消息的消费者,对消息进行业务处理,在消息消费完成之后将消息的消费状态改为已消费。

@Component
public class MessageConsumer {

    private final MessageRepository messageRepository;

    public MessageConsumer(MessageRepository messageRepository) {
        this.messageRepository = messageRepository;
    }

    @RabbitListener(queues = "ORDER_QUEUE")
    public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {

        // 3. 对消息进行消费,消费成功则ACK
        System.out.println("====== 开始消费消息 ======");
        System.out.println("消息内容为:" + order);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
        System.out.println("====== 消费消息成功 ======");

        // 4. 修改消息数据库中的消息状态为已消费
        Message message = messageRepository.findMessageByMessageId(order.getId());
        message.setStatus(true);
        messageRepository.save(message);
    }
}
复制代码

RabbitController 主要用来模拟消息投递,方便测试。

@RestController
public class RabbitController {

    private final MessageProducer messageProducer;
    private final MessageRepository messageRepository;
    private final OrderRepository orderRepository;

    public RabbitController(MessageProducer messageProducer, MessageRepository messageRepository, OrderRepository orderRepository) {
        this.messageProducer = messageProducer;
        this.messageRepository = messageRepository;
        this.orderRepository = orderRepository;
    }

    @PostMapping("/place/order")
    public String placeOrder(@RequestBody Order order) {

        // 1. 业务数据入库/消息数据入库
        Order result = orderRepository.save(order);
        Message message = new Message();
        message.setMessageId(result.getId());
        message.setContent(order);
        message.setStatus(false);
        messageRepository.save(message);

        // 2. 投递消息
        messageProducer.send(order);
        return "Success";
    }
}
复制代码

Message 消息的业务模型,对应数据库的message表,用来保存消息。

@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long messageId;

    @Type(type = "json")
    @Column(columnDefinition = "json")
    private Order content;

    /** 消息的状态,1为未消费或消费失败,0为已消费 **/
    private boolean status;
}
复制代码

Order 订单的业务模型,对应数据库的order表,用来保存订单数据。

@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Integer productNumber;

    private BigDecimal totalPrice;
}
复制代码

MessageProducer 消息生产者,主要用来投递消息到RabbitMQ。

@Component
public class MessageProducer {

    private final RabbitTemplateFactory rabbitTemplateFactory;

    public MessageProducer(RabbitTemplateFactory rabbitTemplateFactory) {
        this.rabbitTemplateFactory = rabbitTemplateFactory;
    }

    /**
     * 发送消息
     * @param order 消息体
     */
    public void send(Order order) {
        RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate("ORDER_EXCHANGE");
        CorrelationData correlationData = new CorrelationData(String.valueOf(order.getId()));
        System.out.println("====== 开始发送消息 ======");
        rabbitTemplate.convertAndSend("ORDER_EXCHANGE", null, order, correlationData);
        System.out.println("====== 结束发送消息 ======");
    }
}
复制代码

MessageRepository ORM采用的是JPA,MessageRepository主要是用来做一些数据库对于message表的操作。

public interface MessageRepository extends JpaRepository<Message, Long> {

    Message findMessageByMessageId(Long messageId);

    List<Message> findMessagesByStatus(boolean status);
}
复制代码

OrderRepository 主要用来对订单表做一些操作。

public interface OrderRepository extends JpaRepository<Order, Long> {
}
复制代码

MessageCheckTask 一个定时任务,定时从数据库中查询message表中状态为未消费的消息,然后进行补偿。

@Component
public class MessageCheckTask {

    private final MessageRepository messageRepository;
    private final MessageProducer messageProducer;

    public MessageCheckTask(MessageRepository messageRepository, MessageProducer messageProducer) {
        this.messageRepository = messageRepository;
        this.messageProducer = messageProducer;
    }

    @Scheduled(initialDelay = 60000, fixedDelay = 60000)
    public void messageCheck() {
        List<Message> messages = messageRepository.findMessagesByStatus(false);
        if (messages == null || messages.size() == 0) {
            System.out.println("====== 没有消费失败的消息 ======");
            return;
        }
        for (Message message : messages) {
            System.out.println("====== 发现消费失败的消息 ======");
            Order order = message.getContent();
            messageProducer.send(order);
            System.out.println("====== 投递消费失败的消息 ======");
        }
    }
}
复制代码

方案二:消息延迟投递

消息流转流程图

消息可靠性投递方案二.jpg

优劣势

这种方式实现起来要比上一种方式要更加复杂一点,但是在消息投递最开始的时候只需要将业务数据进行入库,而不需要将业务数据也进行入库,同时也不需要开启一个定时任务来定时查询消息数据库中投递失败或消费失败的消息,在高并发的场景下,这种方式无疑比上一种方式要更加好。

同时这种方式不能保证百分百成功,如果第一次消息投递失败,同时第一次延迟消息也投递失败的话,那么这条消息就永久丢失了,因为消息的入库现在是分离在两个服务之中。

具体实现

第一步: 只需要将业务数据进行入库。

第二步: 将消息投递出去,在这个投递过程中,消息首先到达Broker中的交换机Exchange,如果没有正确投递到Exchange,那么会触发ConfirmCallback,之后才会根据RoutingKey来路由到对应的队列,如果没有正确路由到队列,此时会触发ReturnCallback,这两个回调都可以进行监听从而进行各自的处理。

第三步: 同时投递一条延迟消息,延迟消息内容和上一步中的消息是一模一样的,但是投递的队列是不一样的,延迟消息投递的队列是延迟队列。延迟时间可以根据具体业务来定,在RabbitMQ3.6版本之前要实现延迟消息的投递稍稍有点麻烦,可以借助死信队列来实现,也可以利用Java的延迟队列DelayQueue来实现,不过在3.6版本之后官方已经提供了一个rabbitmq_delayed_message_exchange插件,利用该插件可以实现延迟消息的投递。

第四步: 消费者监听对应的队列,并对其中的消息进行消费,在消费完成之后需要ACK,同时还需要组装ConfirmMessage到消息确认队列(这条消息是明确当前消息已经被消费成功了的,或者使用Redis保存消息消费确认结果也可以)。

第五步: 另起一个回调服务,这个服务主要监听延迟队列和消息确认队列的,首先监听消息确认队列中的消息,收到其中的消息(或者查询Redis中是否保存有确认消息)之后就对消息进行入库处理,将消息保存到消息数据库(保存消费成功的消息)中,同时监听延迟消息队列,在收到延迟消息的时候,首先判断当前消息是否已经被消费,如果已经消费的话,就不做任何处理,直接ACK,否则就通过RPC通知生产者消息重新投递,即回到第二步执行。

代码Demo

注:以下代码中为了书写方便,使用了HTTP代替了RPC

事先准备

  1. 去RabbitMQ官网下载rabbitmq_delayed_message_exchange插件,然后将该插件移动到plugins
  2. 启用rabbitmq_delayed_message_exchange插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码

Message服务

ClientConfig 配置RestTemplate,主要用来发起HTTP请求。

@Configuration
public class ClientConfig {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
复制代码

RabbitConfig 一些RabbitMQ交换机和队列相关的配置,这里需要主要Delay的交换机需要设置成延迟交换机。

@Configuration
public class RabbitConfig {

    /**
     * 订单交换机
     *
     * @return fanout类型的交换机
     */
    @Bean
    public FanoutExchange ORDER_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange("ORDER_EXCHANGE").durable(true).delayed().build();
    }

    /**
     * 订单队列
     *
     * @return 队列
     */
    @Bean
    public Queue ORDER_QUEUE() {
        return QueueBuilder.durable("ORDER_QUEUE")
                .maxLengthBytes(1024 * 1024 * 128)
                .maxLength(50000)
                .build();
    }

    /**
     * 订单交换机和队列之间的绑定关系
     *
     * @return 绑定关系
     */
    @Bean
    public Binding ORDER_BINDING() {
        return BindingBuilder.bind(ORDER_QUEUE()).to(ORDER_EXCHANGE());
    }

    /**
     * 订单延迟消息交换机
     *
     * @return fanout类型的交换机
     */
    @Bean
    public FanoutExchange ORDER_DELAY_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange("ORDER_DELAY_EXCHANGE").durable(true).delayed().build();
    }

    /**
     * 订单延迟消息队列
     *
     * @return 队列
     */
    @Bean
    public Queue ORDER_DELAY_QUEUE() {
        return QueueBuilder.durable("ORDER_DELAY_QUEUE")
                .maxLengthBytes(1024 * 1024 * 128)
                .maxLength(50000)
                .build();
    }

    /**
     * 订单延迟消息交换机和延迟消息队列之间的绑定关系
     *
     * @return 绑定关系
     */
    @Bean
    public Binding ORDER_DELAY_BINDING() {
        return BindingBuilder.bind(ORDER_DELAY_QUEUE()).to(ORDER_DELAY_EXCHANGE());
    }

    /**
     * 订单确认消息交换机
     *
     * @return fanout类型的交换机
     */
    @Bean
    public FanoutExchange CONFIRM_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange("CONFIRM_EXCHANGE").durable(true).build();
    }

    /**
     * 订单确认消息队列
     *
     * @return 队列
     */
    @Bean
    public Queue CONFIRM_QUEUE() {
        return QueueBuilder.durable("CONFIRM_QUEUE")
                .maxLengthBytes(1024 * 1024 * 128)
                .maxLength(50000)
                .build();
    }

    /**
     * 订单确认消息交换机和确认消息队列之间的绑定关系
     *
     * @return 绑定关系
     */
    @Bean
    public Binding CONFIRM_BINDING() {
        return BindingBuilder.bind(CONFIRM_QUEUE()).to(CONFIRM_EXCHANGE());
    }
}
复制代码

RabbitConfirmCallback 当消息没有正确投递到对应的交换机时会触发Confirm回调,可以在这里做一些额外处理,比如日志记录等。

@Component
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

    }
}
复制代码

RabbitReturnCallback 当消息已经投递到交换机了,但是没有通过RoutingKey正确路由到对应的队列时会触发Return回调,也可以在这里做一些额外的处理,比如日志记录等。

@Component
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

    }
}
复制代码

RabbitTemplateFactory 这个类主要是配置RabbitTemplate以及消息转换器等,额外做了一个RabbitTemplate的池化处理,使得每一个交换机都对应一个RabbitTemplate,提高投递消息的效率。

@Slf4j
@Configuration
public class RabbitTemplateFactory {

    @Resource
    private ConnectionFactory connectionFactory;

    @Resource
    private RabbitConfirmCallback rabbitConfirmCallback;

    @Resource
    private RabbitReturnCallback rabbitReturnCallback;

    Map<String, RabbitTemplate> rabbitTemplateMap = new ConcurrentHashMap<>();

    /**
     * RabbitMQ的消息转换器
     *
     * @return 消息转换器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 根据不同的交换机生产不同的RabbitTemplate
     *
     * @param exchangeName 交换机的名称
     * @return RabbitTemplate
     */
    public RabbitTemplate getRabbitTemplate(String exchangeName) {
        RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(exchangeName);
        if (rabbitTemplate != null) {
            return rabbitTemplate;
        }
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange(exchangeName);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setRetryTemplate(new RetryTemplate());
        rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
        rabbitTemplate.setReturnCallback(rabbitReturnCallback);
        return rabbitTemplate;
    }
}
复制代码

RedisConfig Redis的一些配置,比如Key和Value的序列化器等。

@Configuration
public class RedisConfig {

    private final RedisConnectionFactory redisConnectionFactory;

    public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(valueSerializer());
        return redisTemplate;
    }

    @Bean
    public GenericJackson2JsonRedisSerializer valueSerializer() {
        return new GenericJackson2JsonRedisSerializer();
    }
}
复制代码

MessageProducer 消息生产者,主要用来投递消息到RabbitMQ,包含即时消息的投递和延迟消息的投递。

@Component
public class MessageProducer {

    private final RabbitTemplateFactory rabbitTemplateFactory;

    public MessageProducer(RabbitTemplateFactory rabbitTemplateFactory) {
        this.rabbitTemplateFactory = rabbitTemplateFactory;
    }

    /**
     * 投递消息入口,包含重试机制(即时投递和延迟投递)
     *
     * @param exchangeName      交换机名称
     * @param delayExchangeName 延迟交换机名称
     * @param content           消息体
     * @param count             消息延迟投递的次数
     */
    public void send(String exchangeName, String delayExchangeName, Object content, String eventId, Integer count) {
        sendInTime(exchangeName, content, eventId, count);
        delaySend(delayExchangeName, content, eventId, count);
    }

    /**
     * 即时投递消息
     *
     * @param exchangeName 交换机名称
     * @param content      消息体
     * @param eventId      事件id,也就是消息的唯一标识
     * @param count        消息延迟投递的次数
     */
    public void sendInTime(String exchangeName, Object content, String eventId, Integer count) {
        RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate(exchangeName);
        CorrelationData correlationData = new CorrelationData(eventId);
        rabbitTemplate.convertAndSend(exchangeName, null, content, message -> {
            // 对消息投递的次数进行统计,当同一条消息累积投递过3次还未消费成功时,就转而进行入库处理
            System.out.println("====== 开始第" + count + "次发送消息 ======");
            message.getMessageProperties().setHeader("messageCount", count);
            System.out.println("====== 结束第" + count + "次发送消息 ======");
            return message;
        }, correlationData);
    }

    /**
     * 延迟投递消息
     *
     * @param exchangeName 交换机名称
     * @param content      消息体
     * @param eventId      事件id,也就是消息的唯一标识
     * @param count        消息延迟投递的次数
     */
    public void delaySend(String exchangeName, Object content, String eventId, Integer count) {
        RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate(exchangeName);
        CorrelationData correlationData = new CorrelationData(eventId);
        rabbitTemplate.convertAndSend(exchangeName, null, content, message -> {
            int delayTime = 30 * 1000;
            System.out.println("====== 开始第" + count + "次发送延迟消息 ======");
            message.getMessageProperties().setHeader("messageCount", count);
            message.getMessageProperties().setDelay(delayTime);
            System.out.println("====== 结束第" + count + "次发送延迟消息 ======");
            return message;
        }, correlationData);

    }
}
复制代码

MessageConsumer 消息的消费者,对消息进行业务处理,在消息消费完成之后将消息的消费结果保存在Redis中。

@Component
public class MessageConsumer {

    private final RedisTemplate<String, Object> redisTemplate;

    public MessageConsumer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @RabbitListener(queues = "ORDER_QUEUE")
    public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {

        // 3. 对消息进行消费,消费成功则ACK
        System.out.println("====== 开始消费消息 ======");
        System.out.println("消息内容为:" + order);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
        System.out.println("====== 消费消息成功 ======");

        Message message = new Message();
        message.setMessageId(order.getId());
        message.setContent(order);

        // Redis保存消息消费结果
        redisTemplate.opsForValue().set(String.valueOf(order.getId()), message, 4 * 60, TimeUnit.SECONDS);
    }
}
复制代码

RabbitController 主要用来模拟消息投递,方便测试。

@RestController
public class RabbitController {

    private final MessageProducer messageProducer;
    private final OrderRepository orderRepository;

    public RabbitController(MessageProducer messageProducer, OrderRepository orderRepository) {
        this.messageProducer = messageProducer;
        this.orderRepository = orderRepository;
    }

    @PostMapping("/place/order")
    public String placeOrder(@RequestBody Order order, @RequestParam(value = "count", defaultValue = "1") Integer count) {

        // 1. 业务数据入库
        order.setTimestamp(System.currentTimeMillis());
        orderRepository.save(order);

        // 2. 投递消息
        messageProducer.send("ORDER_EXCHANGE", "ORDER_DELAY_EXCHANGE", order, String.valueOf(order.getId()), count);

        return "Success";
    }
}
复制代码

Message 消息的业务模型,对应数据库的message表,用来保存消息。

@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long messageId;

    @Type(type = "json")
    @Column(columnDefinition = "json")
    private Order content;

}
复制代码

ConfirmMessage 确认消息的业务模型,在消息消费完成之后,可以组装ConfirmMessage消息进行投递,通知回调服务消息已经消费完成(在这次的代码中没有使用到这个模型,而是使用Redis来记录消息消费结果)。

@Data
public class ConfirmMessage {

    private Long messageId;

    private boolean status;
}
复制代码

Order 订单的业务模型,对应数据库的order表,用来保存订单数据。

@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Integer productNumber;

    private BigDecimal totalPrice;

    private Long timestamp;
}
复制代码

MessageRepository ORM采用的是JPA,MessageRepository主要是用来做一些数据库对于message表的操作。

public interface MessageRepository extends JpaRepository<Message, Long> {

    Message findMessageByMessageId(Long messageId);
}
复制代码

OrderRepository 主要用来对订单表做一些操作。

public interface OrderRepository extends JpaRepository<Order, Long> {
}
复制代码

Callback-Service服务

ClientConfig 配置RestTemplate,主要用来发起HTTP请求。

@Configuration
public class ClientConfig {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
复制代码

RabbitConfig 一些RabbitMQ交换机和队列相关的配置,这里需要主要Delay的交换机需要设置成延迟交换机。

@Configuration
public class RabbitConfig {

    /**
     * RabbitMQ的消息转换器
     *
     * @return 消息转换器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
复制代码

RedisConfig Redis的一些配置,比如Key和Value的序列化器等。

@Configuration
public class RedisConfig {

    private final RedisConnectionFactory redisConnectionFactory;

    public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(valueSerializer());
        return redisTemplate;
    }

    @Bean
    public GenericJackson2JsonRedisSerializer valueSerializer() {
        return new GenericJackson2JsonRedisSerializer();
    }
}
复制代码

MessageConsumer 消息的消费者,对延迟消息进行业务处理,在收到延迟消息时首先判断消息是否已经被消费,如果已经被消费的话(Redis中有消费的确认结果),就消息入库,然后直接ACK,否则使用RPC通知消息生产者重新投递。

@Component
public class MessageConsumer {

    private final RedisTemplate<String, Object> redisTemplate;

    private final RestTemplate restTemplate;

    private final MessageRepository messageRepository;

    private final ObjectMapper objectMapper;

    public MessageConsumer(RedisTemplate<String, Object> redisTemplate, RestTemplate restTemplate, MessageRepository messageRepository, ObjectMapper objectMapper) {
        this.redisTemplate = redisTemplate;
        this.restTemplate = restTemplate;
        this.messageRepository = messageRepository;
        this.objectMapper = objectMapper;
    }

    @RabbitListener(queues = "ORDER_DELAY_QUEUE")
    public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {

        // 1. 判断当前消息是否已经被消费成功,如果消费成功,则直接ACK,否则转用RPC通知Producer重新投递
        Object result = redisTemplate.opsForValue().get(String.valueOf(order.getId()));
        if (result == null) {
            // 判断当前是第几次投递
            Integer messageCount = (Integer) headers.get("messageCount");
            System.out.println("时间:" + System.currentTimeMillis() + ",第" + messageCount + "次延迟投递收到,进行重新投递");
            restTemplate.postForObject("http://localhost:8080/place/order?count=" + (messageCount + 1), order, String.class);
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
            return;

        }

        // 入库
        Message message = objectMapper.convertValue(result, Message.class);
        messageRepository.save(message);

        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
        System.out.println("====== 消息已经消费 ======");
    }
}
复制代码

MessageRepository ORM采用的是JPA,MessageRepository主要是用来做一些数据库对于message表的操作。

public interface MessageRepository extends JpaRepository<Message, Long>, JpaSpecificationExecutor<Message> {
}
复制代码

Message 消息的业务模型,对应数据库的message表,用来保存消息。

@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long messageId;

    @Type(type = "json")
    @Column(columnDefinition = "json")
    private Order content;

}
复制代码

ConfirmMessage 确认消息的业务模型,在消息消费完成之后,可以组装ConfirmMessage消息进行投递,通知回调服务消息已经消费完成(在这次的代码中没有使用到这个模型,而是使用Redis来记录消息消费结果)。

@Data
public class ConfirmMessage {

    private Long messageId;

    private boolean status;
}
复制代码

Order 订单的业务模型,对应数据库的order表,用来保存订单数据。

@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Integer productNumber;

    private BigDecimal totalPrice;

    private Long timestamp;
}
复制代码

方案三:消息延迟投递 + HTTP

消息流转流程图

消息可靠性投递方案三.jpg

优劣势

对比第一种方案来说,该种方案将消息入库的步骤转移到了另一个服务当中,使其不会拖累业务的执行,但是对于第二种方案来说,又有一个劣势,就是需要开启一个定时任务,定时去查询数据库中消费失败的消息,但是这种方案可以兼顾消息消费的及时性,当我们的业务需要消息在一定时间内消费完成的话就需要灵活去变动消息投递方案了。

同时这种方式不能保证百分百成功,如果第一次消息投递失败,同时第一次延迟消息也投递失败的话,那么这条消息就永久丢失了,因为消息的入库现在是分离在两个服务之中。

具体实现

第一步: 只需要将业务数据进行入库。

第二步: 将消息投递出去,在这个投递过程中,消息首先到达Broker中的交换机Exchange,如果没有正确投递到Exchange,那么会触发ConfirmCallback,之后才会根据RoutingKey来路由到对应的队列,如果没有正确路由到队列,此时会触发ReturnCallback,这两个回调都可以进行监听从而进行各自的处理。

第三步: 同时投递一条延迟消息,延迟消息内容和上一步中的消息是一模一样的,但是投递的队列是不一样的,延迟消息投递的队列是延迟队列。延迟时间可以根据具体业务来定,在RabbitMQ3.6版本之前要实现延迟消息的投递稍稍有点麻烦,可以借助死信队列来实现,也可以利用Java的延迟队列DelayQueue来实现,不过在3.6版本之后官方已经提供了一个rabbitmq_delayed_message_exchange插件,利用该插件可以实现延迟消息的投递。但是这个延迟投递是要限制次数和时间的,比如延迟投递最多2次,时间最多3分钟,如果在3分钟内还没有被消费成功的话,就需要将这条消息入库(这里的消息数据库只保存消费失败的消息),等待定时任务从数据库从拉取,进行业务处理。

第四步: 消费者监听对应的队列,并对其中的消息进行消费,在消费完成之后需要ACK,同时还需要组装ConfirmMessage到消息确认队列(这条消息是明确当前消息已经被消费成功了的,或者使用Redis保存消息消费确认结果也可以)。

第五步: 另起一个回调服务,这个服务主要监听延迟队列和消息确认队列的,首先监听消息确认队列中的消息,收到其中的消息(或者查询Redis中是否保存有确认消息)之后进行Redis保存,然后就可以直接ACK,不做任何处理。同时监听延迟消息队列,在收到延迟消息的时候,首先判断当前消息是否已经被消费,如果已经消费的话,就不做任何处理,直接ACK,否则就通过RPC通知生产者消息重新投递,即回到第二步执行,重复投递一定次数或者超过一定时间的话就对消息进行入库处理。

代码Demo

注:以下代码中为了书写方便,使用了HTTP代替了RPC,同时对于全局异常没有进行捕获,在API接口中也未考虑各种异常情况

事先准备

  1. 去RabbitMQ官网下载rabbitmq_delayed_message_exchange插件,然后将该插件移动到plugins
  2. 启用rabbitmq_delayed_message_exchange插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码

Message服务

ClientConfig 配置RestTemplate,主要用来发起HTTP请求。

@Configuration
public class ClientConfig {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
复制代码

RabbitConfig 一些RabbitMQ交换机和队列相关的配置,这里需要主要Delay的交换机需要设置成延迟交换机。

@Configuration
public class RabbitConfig {

    /**
     * 订单交换机
     *
     * @return fanout类型的交换机
     */
    @Bean
    public FanoutExchange ORDER_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange("ORDER_EXCHANGE").durable(true).delayed().build();
    }

    /**
     * 订单队列
     *
     * @return 队列
     */
    @Bean
    public Queue ORDER_QUEUE() {
        return QueueBuilder.durable("ORDER_QUEUE")
                .maxLengthBytes(1024 * 1024 * 128)
                .maxLength(50000)
                .build();
    }

    /**
     * 订单交换机和队列之间的绑定关系
     *
     * @return 绑定关系
     */
    @Bean
    public Binding ORDER_BINDING() {
        return BindingBuilder.bind(ORDER_QUEUE()).to(ORDER_EXCHANGE());
    }

    /**
     * 订单延迟消息交换机
     *
     * @return fanout类型的交换机
     */
    @Bean
    public FanoutExchange ORDER_DELAY_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange("ORDER_DELAY_EXCHANGE").durable(true).delayed().build();
    }

    /**
     * 订单延迟消息队列
     *
     * @return 队列
     */
    @Bean
    public Queue ORDER_DELAY_QUEUE() {
        return QueueBuilder.durable("ORDER_DELAY_QUEUE")
                .maxLengthBytes(1024 * 1024 * 128)
                .maxLength(50000)
                .build();
    }

    /**
     * 订单延迟消息交换机和延迟消息队列之间的绑定关系
     *
     * @return 绑定关系
     */
    @Bean
    public Binding ORDER_DELAY_BINDING() {
        return BindingBuilder.bind(ORDER_DELAY_QUEUE()).to(ORDER_DELAY_EXCHANGE());
    }

    /**
     * 订单确认消息交换机
     *
     * @return fanout类型的交换机
     */
    @Bean
    public FanoutExchange CONFIRM_EXCHANGE() {
        return ExchangeBuilder.fanoutExchange("CONFIRM_EXCHANGE").durable(true).build();
    }

    /**
     * 订单确认消息队列
     *
     * @return 队列
     */
    @Bean
    public Queue CONFIRM_QUEUE() {
        return QueueBuilder.durable("CONFIRM_QUEUE")
                .maxLengthBytes(1024 * 1024 * 128)
                .maxLength(50000)
                .build();
    }

    /**
     * 订单确认消息交换机和确认消息队列之间的绑定关系
     *
     * @return 绑定关系
     */
    @Bean
    public Binding CONFIRM_BINDING() {
        return BindingBuilder.bind(CONFIRM_QUEUE()).to(CONFIRM_EXCHANGE());
    }
}
复制代码

RabbitConfirmCallback 当消息没有正确投递到对应的交换机时会触发Confirm回调,可以在这里做一些额外处理,比如日志记录等。

@Component
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

    }
}
复制代码

RabbitReturnCallback 当消息已经投递到交换机了,但是没有通过RoutingKey正确路由到对应的队列时会触发Return回调,也可以在这里做一些额外的处理,比如日志记录等。

@Component
public class RabbitReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

    }
}
复制代码

RabbitTemplateFactory 这个类主要是配置RabbitTemplate以及消息转换器等,额外做了一个RabbitTemplate的池化处理,使得每一个交换机都对应一个RabbitTemplate,提高投递消息的效率。

@Slf4j
@Configuration
public class RabbitTemplateFactory {

    @Resource
    private ConnectionFactory connectionFactory;

    @Resource
    private RabbitConfirmCallback rabbitConfirmCallback;

    @Resource
    private RabbitReturnCallback rabbitReturnCallback;

    Map<String, RabbitTemplate> rabbitTemplateMap = new ConcurrentHashMap<>();

    /**
     * RabbitMQ的消息转换器
     *
     * @return 消息转换器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 根据不同的交换机生产不同的RabbitTemplate
     *
     * @param exchangeName 交换机的名称
     * @return RabbitTemplate
     */
    public RabbitTemplate getRabbitTemplate(String exchangeName) {
        RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(exchangeName);
        if (rabbitTemplate != null) {
            return rabbitTemplate;
        }
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange(exchangeName);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setRetryTemplate(new RetryTemplate());
        rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
        rabbitTemplate.setReturnCallback(rabbitReturnCallback);
        return rabbitTemplate;
    }
}
复制代码

RedisConfig Redis的一些配置,比如Key和Value的序列化器等。

@Configuration
public class RedisConfig {

    private final RedisConnectionFactory redisConnectionFactory;

    public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(valueSerializer());
        return redisTemplate;
    }

    @Bean
    public GenericJackson2JsonRedisSerializer valueSerializer() {
        return new GenericJackson2JsonRedisSerializer();
    }
}
复制代码

MessageProducer 消息生产者,主要用来投递消息到RabbitMQ,包含即时消息的投递和延迟消息的投递。

@Component
public class MessageProducer {

    private final RabbitTemplateFactory rabbitTemplateFactory;

    public MessageProducer(RabbitTemplateFactory rabbitTemplateFactory) {
        this.rabbitTemplateFactory = rabbitTemplateFactory;
    }

    /**
     * 投递消息入口,包含重试机制(即时投递和延迟投递)
     *
     * @param exchangeName      交换机名称
     * @param delayExchangeName 延迟交换机名称
     * @param content           消息体
     * @param count             消息延迟投递的次数
     */
    public void send(String exchangeName, String delayExchangeName, Object content, String eventId, Integer count) {
        sendInTime(exchangeName, content, eventId, count);
        delaySend(delayExchangeName, content, eventId, count);
    }

    /**
     * 即时投递消息
     *
     * @param exchangeName 交换机名称
     * @param content      消息体
     * @param eventId      事件id,也就是消息的唯一标识
     * @param count        消息延迟投递的次数
     */
    public void sendInTime(String exchangeName, Object content, String eventId, Integer count) {
        RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate(exchangeName);
        CorrelationData correlationData = new CorrelationData(eventId);
        rabbitTemplate.convertAndSend(exchangeName, null, content, message -> {
            // 对消息投递的次数进行统计,当同一条消息累积投递过3次还未消费成功时,就转而进行入库处理
            System.out.println("====== 开始第" + count + "次发送消息 ======");
            message.getMessageProperties().setHeader("messageCount", count);
            System.out.println("====== 结束第" + count + "次发送消息 ======");
            return message;
        }, correlationData);
    }

    /**
     * 延迟投递消息
     *
     * @param exchangeName 交换机名称
     * @param content      消息体
     * @param eventId      事件id,也就是消息的唯一标识
     * @param count        消息延迟投递的次数
     */
    public void delaySend(String exchangeName, Object content, String eventId, Integer count) {
        RabbitTemplate rabbitTemplate = rabbitTemplateFactory.getRabbitTemplate(exchangeName);
        CorrelationData correlationData = new CorrelationData(eventId);
        rabbitTemplate.convertAndSend(exchangeName, null, content, message -> {
            int delayTime;
            if (count == 1) {
                // 第一次延迟投递时间为1分钟
                delayTime = 60 * 1000;
            } else {
                // 第二次延迟投递时间为2分钟
                delayTime = 2 * 60 * 1000;
            }
            System.out.println("====== 开始第" + count + "次发送延迟消息 ======");
            message.getMessageProperties().setHeader("messageCount", count);
            message.getMessageProperties().setDelay(delayTime);
            System.out.println("====== 结束第" + count + "次发送延迟消息 ======");
            return message;
        }, correlationData);

    }
}
复制代码

MessageConsumer 消息的消费者,对消息进行业务处理,在消息消费完成之后将消息的消费结果保存在Redis中。

@Component
public class MessageConsumer {

    private final RedisTemplate<String, Object> redisTemplate;

    public MessageConsumer(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @RabbitListener(queues = "ORDER_QUEUE")
    public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {

        // 3. 对消息进行消费,消费成功则ACK
        System.out.println("====== 开始消费消息 ======");
        System.out.println("消息内容为:" + order);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
        System.out.println("====== 消费消息成功 ======");

        Message message = new Message();
        message.setMessageId(order.getId());
        message.setContent(order);

        // Redis保存消息消费结果
        redisTemplate.opsForValue().set(String.valueOf(order.getId()), message, 4 * 60, TimeUnit.SECONDS);
    }
}
复制代码

RabbitController 主要用来模拟消息投递,方便测试。

@RestController
public class RabbitController {

    private final MessageProducer messageProducer;
    private final OrderRepository orderRepository;

    public RabbitController(MessageProducer messageProducer, OrderRepository orderRepository) {
        this.messageProducer = messageProducer;
        this.orderRepository = orderRepository;
    }

    @PostMapping("/place/order")
    public String placeOrder(@RequestBody Order order, @RequestParam(value = "count", defaultValue = "1") Integer count) {

        // 1. 业务数据入库
        order.setTimestamp(System.currentTimeMillis());
        orderRepository.save(order);

        // 2. 投递消息
        messageProducer.send("ORDER_EXCHANGE", "ORDER_DELAY_EXCHANGE", order, String.valueOf(order.getId()), count);

        return "Success";
    }
}
复制代码

Message 消息的业务模型,对应数据库的message表,用来保存消息。

@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long messageId;

    @Type(type = "json")
    @Column(columnDefinition = "json")
    private Order content;

}
复制代码

ConfirmMessage 确认消息的业务模型,在消息消费完成之后,可以组装ConfirmMessage消息进行投递,通知回调服务消息已经消费完成(在这次的代码中没有使用到这个模型,而是使用Redis来记录消息消费结果)。

@Data
public class ConfirmMessage {

    private Long messageId;

    private boolean status;
}
复制代码

Order 订单的业务模型,对应数据库的order表,用来保存订单数据。

@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Integer productNumber;

    private BigDecimal totalPrice;

    private Long timestamp;
}
复制代码

MessageRepository ORM采用的是JPA,MessageRepository主要是用来做一些数据库对于message表的操作。

public interface MessageRepository extends JpaRepository<Message, Long> {

    Message findMessageByMessageId(Long messageId);
}
复制代码

OrderRepository 主要用来对订单表做一些操作。

public interface OrderRepository extends JpaRepository<Order, Long> {
}
复制代码

FailureMessageCheckTask 一个定时任务,定时从消息数据库中拉取消费失败的消息,第一次的时候传入的latestId为0,表示拉取所有消费失败的消息,之后保存拉取到的消费失败的消息的最后一个id,赋值给latestId,之后在发起请求查询消费失败的消息的时候就需要传入这个latestId,表示查询大于latestId之后的消费失败的消息。

@Component
public class FailureMessageCheckTask {

    private final RestTemplate restTemplate;

    private final RedisTemplate<String, Object> redisTemplate;

    private final ObjectMapper objectMapper;

    private static final String latestIdKey = "order:latestId";

    private static final int count = 10;

    public FailureMessageCheck(RedisTemplate<String, Object> redisTemplate, ObjectMapper objectMapper, RestTemplate restTemplate) {
        this.redisTemplate = redisTemplate;
        this.objectMapper = objectMapper;
        this.restTemplate = restTemplate;
    }

    @Scheduled(fixedDelay = 30 * 1000)
    public void getFailureMessages() {
        Long latestId;
        Object value = redisTemplate.opsForValue().get(latestIdKey);
        if (value == null) {
            latestId = 0L;
        } else {
            latestId = Long.valueOf(String.valueOf(value));
        }
        DataResponse<Object> dataResponse;
        String url = "http://localhost:8081/failure/messages?page=0&count=10&latestId=" + latestId;
        dataResponse = restTemplate.getForObject(url, DataResponse.class);

        // 如果code为0则表示请求出现异常或者data中无数据,就无须后续处理
        if (dataResponse.getCode() != 0) {
            return;
        }

        PageParameter<Object> messages = objectMapper.convertValue(dataResponse.getData(), PageParameter.class);

        System.out.println("The number of consumption failure messages in this round is: " + messages.getTotal());

        int totalPage = messages.getTotalPage();
        // 处理当前页
        List<Object> items = messages.getItems();
        for (Object obj : items) {
            Message message = objectMapper.convertValue(obj, Message.class);
            businessHandle(message);
            latestId = message.getId();
        }

        // 如果页数大于1,则循环请求接口获取后续的数据进行处理
        if (totalPage > 1) {
            for (int page = 2; page < totalPage; page++) {
                url = "http://localhost:8081/failure/messages?page=" + page + "&count=10&latestId=" + latestId;
                dataResponse = restTemplate.getForObject(url, DataResponse.class);
                messages = objectMapper.convertValue(dataResponse.getData(), PageParameter.class);
                items = messages.getItems();
                for (Object obj : items) {
                    Message message = objectMapper.convertValue(obj, Message.class);
                    businessHandle(message);
                    latestId = message.getId();
                }
            }
        }

        // 更新latestId
        redisTemplate.opsForValue().set(latestIdKey, latestId);
    }

    @Async("orderThreadPool")
    public void businessHandle(Message message) {
        System.out.println("处理业务");
    }
}
复制代码

Callback-Service服务

ClientConfig 配置RestTemplate,主要用来发起HTTP请求。

@Configuration
public class ClientConfig {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
复制代码

RabbitConfig 一些RabbitMQ的配置,主要配置消息转换器的类型。

@Configuration
public class RabbitConfig {

    /**
     * RabbitMQ的消息转换器
     *
     * @return 消息转换器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
复制代码

RedisConfig Redis的一些配置,比如Key和Value的序列化器等。

@Configuration
public class RedisConfig {

    private final RedisConnectionFactory redisConnectionFactory;

    public RedisConfig(RedisConnectionFactory redisConnectionFactory) {
        this.redisConnectionFactory = redisConnectionFactory;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(valueSerializer());
        return redisTemplate;
    }

    @Bean
    public GenericJackson2JsonRedisSerializer valueSerializer() {
        return new GenericJackson2JsonRedisSerializer();
    }
}
复制代码

MessageConsumer 消息的消费者,对延迟消息进行业务处理,在收到延迟消息时首先判断消息是否已经被消费,如果已经被消费的话(Redis中有消费的确认结果),就消息入库,然后直接ACK,否则使用RPC通知消息生产者重新投递。

@Component
public class MessageConsumer {

    private final RedisTemplate<String, Object> redisTemplate;

    private final RestTemplate restTemplate;

    private final MessageRepository messageRepository;

    public MessageConsumer(RedisTemplate<String, Object> redisTemplate, RestTemplate restTemplate, MessageRepository messageRepository) {
        this.redisTemplate = redisTemplate;
        this.restTemplate = restTemplate;
        this.messageRepository = messageRepository;
    }

    @RabbitListener(queues = "ORDER_DELAY_QUEUE")
    public void onMessage(@Headers Map<String, Object> headers, @Payload Order order, Channel channel) throws Exception {
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        
        // 1. 判断当前消息是否已经被消费成功,如果消费成功,则直接ACK,否则转用RPC通知Producer重新投递
        Object result = redisTemplate.opsForValue().get(String.valueOf(order.getId()));
        if (result == null) {
            // 判断当前是第几次投递
            Integer messageCount = (Integer) headers.get("messageCount");
            if (messageCount == 2) {
                // 消费失败的消息入库
                System.out.println("====== 入库 ======");
                Message message = new Message();
                message.setMessageId(order.getId());
                message.setContent(order);
                messageRepository.save(message);
                channel.basicAck(deliveryTag, false);
                return;
            }
            System.out.println("时间:" + System.currentTimeMillis() + ",第" + messageCount + "次延迟投递收到,进行重新投递");
            # 此处使用HTTP代替RPC
            restTemplate.postForObject("http://localhost:8080/place/order?count=" + (messageCount + 1), order, String.class);
            channel.basicAck(deliveryTag, false);
            return;

        }
        channel.basicAck(deliveryTag, false);
        System.out.println("====== 消息已经消费 ======");
    }
}
复制代码

MessageRepository ORM采用的是JPA,MessageRepository主要是用来做一些数据库对于message表的操作。

public interface MessageRepository extends JpaRepository<Message, Long>, JpaSpecificationExecutor<Message> {
}
复制代码

Message 消息的业务模型,对应数据库的message表,用来保存消息。

@Entity
@Data
@TypeDef(name = "json", typeClass = JsonStringType.class)
public class Message implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Long messageId;

    @Type(type = "json")
    @Column(columnDefinition = "json")
    private Order content;

}
复制代码

ConfirmMessage 确认消息的业务模型,在消息消费完成之后,可以组装ConfirmMessage消息进行投递,通知回调服务消息已经消费完成(在这次的代码中没有使用到这个模型,而是使用Redis来记录消息消费结果)。

@Data
public class ConfirmMessage {

    private Long messageId;

    private boolean status;
}
复制代码

Order 订单的业务模型,对应数据库的order表,用来保存订单数据。

@Entity
@Data
@Table(name = "`order`")
public class Order implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private Integer productNumber;

    private BigDecimal totalPrice;

    private Long timestamp;
}
复制代码

FailureMessageController 获取消费失败的消息的接口,提供分页查询。

@RestController
public class FailureMessageController {

    private final FailureMessageService failureMessageService;

    public FailureMessageController(FailureMessageService failureMessageService) {
        this.failureMessageService = failureMessageService;
    }
    
    @GetMapping("/failure/messages")
    public DataResponse<PageParameter<Message>> getFailureMessages(@RequestParam Integer page, @RequestParam Integer count, @RequestParam Long latestId) {
        PageParameter<Message> messages = failureMessageService.getServiceExternalFailureMessage(page, count, latestId);
        if (messages.getTotal() == 0) {
            return new DataResponse<>(1, "No Failure Message", messages);
        }
        return new DataResponse<>(0, "Success", messages);
    }
}
复制代码

FailureMessageService 获取消费失败的消息的具体实现。

@Service
public class FailureMessageService {

    private final MessageRepository messageRepository;

    public FailureMessageService(MessageRepository messageRepository) {
        this.messageRepository = messageRepository;
    }

    /**
     * 查询 id 大于 latestId 的数据(服务外部失败消息)
     *
     * @param page 页码,从0开始
     * @param count 没有数量
     * @param latestId    最新的id
     * @return 分页数据
     */
    public PageParameter<Message> getServiceExternalFailureMessage(Integer page, Integer count, Long latestId) {
        PageRequest pageRequest = PageRequest.of(page, count);
        Page<Message> messages;
        // 如果latestId为0,则表示查询所有的数据
        if (latestId == 0) {
            messages = messageRepository.findAll((Specification<Message>) (root, criteriaQuery, criteriaBuilder) -> null, pageRequest);
        } else {
            messages = messageRepository.findAll((Specification<Message>) (root, criteriaQuery, criteriaBuilder) -> {
                Path<Long> idPath = root.get("id");
                List<Predicate> predicates = new ArrayList<>();
                predicates.add(criteriaBuilder.gt(idPath, latestId));
                Predicate[] predicateArr = new Predicate[predicates.size()];
                criteriaQuery.where(predicates.toArray(predicateArr));
                return null;
            }, pageRequest);
        }
        return new PageParameter<>(messages);
    }
}
复制代码