RabbitMQ官方教程及理解-第二章(WorkerQue

简介 Work Queues

什么是Work Queues,和前面的Queue有什么不同?细心的同学会发现.这里的Queues用了复数形式,前面还加了Work.那就说明这个Queue是支持多个work的.官方图,最为致命.

image.png

上代码 (don't talk to me, show me the code!)

新建项目 WorkQueues

image.png

依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
    </dependency>
</dependencies>
复制代码

发送者 NewTask

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * @author caiqian
 */
public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        argv = new String[]{"second msg"};
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String message = String.join(" ", argv);

            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}
复制代码

接收者Worker

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @author caiqian
 */
public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}
复制代码

运行结果

NewTask, 这里先运行两个实例的worker , 再运行NewTask, 从1st,2nd,3rd,4th,5th一直发送

image.png

Worker 2 实例

实例1

image.png

实例2

image.png

结论

  • 1 worker queues支持多实例
  • 2 两个worker是以后的顺序轮流获取信息

消息确认(Message acknowledgment) 我更喜欢称之为消息回执

  • 当消息发给worker后, mq不知道worker处理的时间是否过长,不知道worker是否死掉, 不知道是否有消息遗失.于是,就有了Message acknowledgment.
  • 如果一个消费者在没有发送ack的情况下死亡(通道关闭,连接关闭,或者TCP连接丢失),RabbitMQ会明白消息没有被完全处理,并将其重新排队。如果有其他消费者在同一时间在线,然后它将迅速重新交付给另一个消费者。这样你就可以确保没有信息丢失,即使worker偶尔死亡。
  • 默认是30分钟时间

上代码

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
复制代码

ack需要注意的地方

如果忘记/丢失回执,则mq将会一直保留此消息,长时间下来, 会影响MQ甚至崩溃.

消息持久化

我们已经学习了如何确保即使用户死亡,任务也不会丢失。但是如果RabbitMQ服务器停止了,我们的任务仍然会丢失。
于是便有了持久化

首先,我们要确定队列中的消息是否存在,其次,我们要标记持久化的状态

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
复制代码

但是,已经存的队列是无法修改的,所以我们可以重新定义一个队列

将消息标记为persistent并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息后仍然有一个很短的时间窗口尚未保存。此外,RabbitMQ不会对每条消息执行fsync(2)操作——它可能只是被保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们的简单任务队列来说已经足够了。如果你需要更有力的保证,你可以使用发行商确认。

公平调度

从之前的情形来看,调度仍然没有完全按照我们希望的那样工作。例如,在有两个worker的情况下,当有的消息很重,有的消息又很轻时,一个工作人员将一直很忙,而另一个几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。所以这里就需要代码中控制

int prefetchCount = 1;
channel.basicQos(prefetchCount);
复制代码