RocketMQ扫盲贴及JavaAPI使用精讲

大家好,在上篇文章《高可用架构及二主二从异步集群部署》中介绍了的 RocketMQ 的历史及安装,为本文开始做实验打下了基础,接下来就正式开始讲解RocketMQ的概念以及Java API的操作使用。

RocketMQ的架构

这是在RocketMQ官网的架构图,rocketmq.apache.org/docs/rmq-ar…

image.png
一般见到的架构图都是这样的,其中这些重要的角色需要解释下。

Broker

RocketMQ 的服务,或者说一个进程,叫做 Broker,Broker 的作用是存储和转发消息。RocketMQ 单机大约能承受10万 QPS 的请求,为了提升 Broker 的性能(做负载均衡)以及可用性(防止单点故障),通常会做集群部署。

跟 Kafka 或者 Redis Cluster 一样,RocketMQ 集群的每个 Broker 节点保存总数据的一部分,因为可用横向扩展。为了提高可靠性(防止数据丢失),每个 Broker 可以有自己的副本(Slave)。

Topic

Topic用于将消息按主题做划分,比如订单消息,支付消息,人员消息,注意,跟kafka不同的是,在RocketMQ 中,Topic是一个逻辑概念,消息不是按Topic划分存储的。

Producer 将消息发往指定的 Topic,Consumer 订阅这个 Topic 就可以收到相应的消息。Topic 跟生产者和消费者都是多对多的关系,一个生产者可以发送消息到多个 Topic,一个消费者也可以订阅多个 Topic。

NameServer

在 rocketmq 的早版本(2.x)的时候,是没有 namesrv 组件的,用的是 zookeeper 做分布式协调和服务发现,但是后期阿里数据根据实际业务需求进行改进和优化,自组研发了轻量级的 namesrv,用于注册 Client 服务与 Broker 的请求路由工作,namesrv 上不做任何消息的位置存储,频繁操作 zookeeper 的位置存储数据会影响整体集群性能。

为了保证高可用,NameServer 自身也可以做集群的部署,节点之间无任何信息同步。

Producer

生产者,拥有相同 Producer Group 的 Producer 组成一个集群, 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

RocketMQ 的生产者支持批量发送
image.png

Consumer

消费者,接收消息进行消费的实例,拥有相同 Consumer Group 的 Consumer 组成一个集群,与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取Topic 路由信息,并向提供 Topic 服务的Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。

消费者有两种消费方式:一种是集群消费(消息轮询),一种是广播消费(全部收到相同副本)。

消费模型来说,一种是 pull 主动拉去,另一种是 push,被动接收。但实际上 RocketMQ 都是 pull 模式,只是 push 在 pull 模式上做了一层封装,PushConsumer 会注册 MessageListener 监听器,取到消息后,唤醒 MessageListener 的 ConsumeMessage()来消费,对用户而言,感觉消息是被推送过来的。RocketMQ是基于长轮训来实现消息的 pull。

Message Queue

大家知道发往某一个 Topic 的多条信息,是分布在不同的 Broker 上的,在 Kafka 里面设计了一个partition,一个 Topic 可以拆分成多个 partition,这些 partition 可以分布在不同的 Broker 上,这样就实现了数据的分片,也决定了 kafka 可以实现横向扩展。

在 RocketMQ 中只有一个存储文件,并没有像 kafka 一样按照不同的 Topic 分开存储。所以它设计了一个Message Queue 的逻辑概念,作用跟partition类似。

首先我们在创建 Topic 的时候会让我们指定队列数量,一个叫 writeQueueNums(写队列数量),一个叫readQueueNums(读队列数量),写队列数量决定了有几个 Message Queue,读队列数量决定了有几个线程来消费这些 Message Queue(只是用来负载的)。perm 表示队列权限,2表示W,4表示R,6表示RW。

image.png
这里是我们创建 topic 的时候指定的,如果我们由代码自动创建 topic 的时候默认是几个 Message Queue呢?

 //服务端创建一个Topic默认8个队列,在BrokerConfig类里面
 private int defaultTopicQueueNums = 8;
 //topic不存在,生产者发送消息时创建默认4个队列,在DefaultMQProducer类里面
 private volatile int defaultTopicQueueNums = 4;
 //最终服务端创建的时候有一个判断,取小一点的值,在MQClientInstance类里面
 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
复制代码

最终计算结果应该是4吧,我们找一个由代码创建的topic看下,确实是4。

image.png
写队列数量和读队列数量这两个值需要相等,在集群模式下如果不相等,假如说writeQueueNums=6,readQueueNums=3, 那么每个 broker 上会有3个 queue 的消息是无法消费的。

如果消费者数大于readQueueNumbs,那么会有一些消费者消费不到消息,浪费资源。

Java API

现在开始API的教程使用,官网提供了Java客户端API,只需要引入pom依赖就可以了。

      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>4.7.1</version>
      </dependency>
复制代码

先写一个生产者Producer类:

/**
 * @author jackxu
 */
public class Producer {

    public static void main(String[] args) throws MQClientException {
        //生产者组
        DefaultMQProducer producer = new DefaultMQProducer("jackxu_producer_group");
        //生产者需用通过NameServer获取所有broker的路由信息,多个用分号隔开,这个跟Redis哨兵一样
        producer.setNamesrvAddr("39.103.144.86:9876;42.192.77.73:9876");
        //启动
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                /*Message(String topic, String tags, String keys, byte[] body)
                 Message代表一条信息,第一个参数是topic,这是主题
                第二个参数是tags,这是可选参数,用于消费端过滤消息
                第三个参数是keys,这也是可选参数,如果有多个,用空格隔开。RocketMQ可以根据这些key快速检索到消息,相当于
                消息的索引,可以设置为消息的唯一编号(主键)。*/
                Message msg = new Message("jackxu_test_topic", "TagA", "6666", ("RocketMQ Test message " + i).getBytes());
                //SendResult是发送结果的封装,包括消息状态,消息id,选择的队列等等,只要不抛异常,就代表发送成功
                SendResult sendResult = producer.send(msg);
                System.out.println("第" + i + "条send结果: " + sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
        producer.shutdown();
    }

}
复制代码

SendResult中,有 一个SendStatus状态,表示消息的发送状态。一共有四种状态。

  1. FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略配置SYNC_FLUSH才会报这个错误)。
  2. FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在规定时间内完成主从同步。
  3. SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
  4. SEND_OK:表示发送成功

再写一个简单消费者类SimpleConsumer:

/**
 * @author jackxu
 */
public class SimpleConsumer {

    public static void main(String[] args) throws MQClientException {
        //消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jackxu_consumer_group");
        //消费者从NameServer拿到topic的queue所在的Broker地址,多个用分号隔开
        consumer.setNamesrvAddr("39.103.144.86:9876;42.192.77.73:9876");
        //设置Consumer第一次启动是从队列头部开始消费
        //如果非第一次启动,那么按照上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //subscribe订阅的第一个参数就是topic,第二个参数为生产者发送时候的tags,*代表匹配所有消息,
        //想要接收具体消息时用||隔开,如"TagA||TagB||TagD"
        consumer.subscribe("jackxu_test_topic", "*");
        //Consumer可以用两种模式启动,广播(Broadcast)和集群(Cluster),广播模式下,一条消息会发送给所有Consumer,
        //集群模式下消息只会发送给一个Consumer
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //批量消费,每次拉取10条
        consumer.setConsumeMessageBatchMaxSize(10);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                //msgs是一个List,一般是Consumer先启动,所有每次都是一条数据
                //如果Producer先启动Consumer端后启动,会积压数据,此时setConsumeMessageBatchMaxSize会生效,
                //msgs的数据就是十条
                StringBuilder sb = new StringBuilder();
                sb.append("msgs条数:" + msgs.size());
                MessageExt messageExt = msgs.get(0);
                //消息重发了三次
                if (messageExt.getReconsumeTimes() == 3) {
                    //todo 持久化消息记录表
                    //重试了三次不再重试了,直接签收掉
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                for (MessageExt msg : msgs) {
                    try {
                        String topic = msg.getTopic();
                        String messageBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        //todo 业务逻辑处理
                        sb.append("topic:" + topic + ",tags:" + tags + ",msg:" + messageBody);
                    } catch (Exception e) {
                        e.printStackTrace();
                        // 重新消费
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                System.out.println(sb.toString());
                //签收,这句话告诉broker消费成功,可以更新offset了,也就是发送ack。
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }

}
复制代码

每行代码的功能作用都写在注释里了,小伙伴要仔细观看下哦。现在测试一下,先启动消费者,然后启动生产者。

这是生产者启动后发送的十条数据。

image.png
msgId:生产者生成的唯一编号,全局唯一,也叫uniqId。

offsetMsgId:消息偏移id,该id记录了消息所在集群的物理地址,主要包含所存储Broker服务器的地址(IP与端口号)以及所在commitlog文件的物理偏移量。

在控制台可以通过 msgId 来查询该条消息。
image.png
通过Key也一样可以找到
image.png
再看下消费者端,十条数据都已经被成功消费了
image.png

源码以及官方提供的 rocketmq\example 的示例代码已经上传,需要的小伙伴可以下载下来观看。地址:github.com/xuhaoj/rock…

rocketmq\example中各个包的作用如下:

package 作用
batch 批量消息,用List发送
broadcast 广播消息,setMessageModel(MessageModel.BROADCASTING)
delay 延迟消息,msg.setDelayTimeLevel(3)
filter 基于tag或者 SQL表达式过滤
ordermessage 顺序消息
quickstart 入门
rpc 实现RPC调用
simple ACL、异步、assign、subscribe
tracemessage 消息追踪
transaction 事务消息

Spring Boot 集成

在Srping Boot中提供了更简单的配置方式和操作方式,使用起来非常的舒服,干净,简洁。首先还是引入RocketMQ starter 的依赖。

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.4</version>
        </dependency>
复制代码

然后客户端的配置直接写在application.properties中

server.port=9096
spring.application.name=springboot-rocketmq-demo
rocketmq.name-server=39.103.144.86:9876;42.192.77.73:9876
rocketmq.producer.group=jackxu-springboot-rocketmq-group
rocketmq.producer.send-message-timeout=3000
复制代码

创建一个消费者类Consumer,加上@RocketMQMessageListener注解监听消息

/**
 * @author jackxu
 */
@Component
@RocketMQMessageListener(topic = "springboot-topic", consumerGroup = "springboot-consumer-group",
        selectorExpression = "tag1", selectorType = SelectorType.TAG,
        messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        try {
            System.out.println("接收到rocketmq消息:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
复制代码

注解里面的配置和 java api中的一样,相信大家都能够看懂。

MessageModel有两个选项,BROADCASTING代表所有消费者消费同样的消息,CLUSTERING代表多个消费者轮询消费消息(默认)。

ConsumeMode也有两个选项,CONCURRENTLY代表消费端并发消费(默认),消息顺序得不到保证,到底有多少个线程并发消费,取决于线程池的大小,ORDERLY代表有序消费,也就是生产者发送的顺序跟消费者消费的顺序一致。

两者的区别:顺序消费需要对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理。很显然并发消费效率更高。

创建一个生产者类MessageSender,生产者的代码更加简单,只需要注入RocketMQTemplate就可以发送消息。

/**
 * @author jackxu
 */
@Component
public class MessageSender {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void syncSend() {
        /**
         * 发送可靠同步消息 ,可以拿到SendResult 返回数据
         * 同步发送是指消息发送出去后,会在收到mq发出响应之后才会发送下一个数据包的通讯方式。
         * 参数1: topic:tag
         * 参数2:  消息体 可以为一个对象
         * 参数3: 超时时间 毫秒
         */
        SendResult result = rocketMQTemplate.syncSend("springboot-topic:tag", "这是一条同步消息", 10000);
        System.out.println(result);
    }


    public void asyncSend() throws Exception {
        /**
         * 发送 可靠异步消息
         * 发送消息后,不等mq响应,接着发送下一个数据包。发送方通过设置回调接口接收服务器的响应,并可对响应结果进行处理。
         * 参数1: topic:tag
         * 参数2:  消息体 可以为一个对象
         * 参数3: 回调对象
         */
        rocketMQTemplate.asyncSend("springboot-topic:tag1", "这是一条异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("回调sendResult:" + sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.println(e.getMessage());
            }
        });
        TimeUnit.SECONDS.sleep(100000);
    }


    public void sendOneWay() {
        /**
         * 发送单向消息,特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
         * 此方式发送消息的过程耗时非常短,一般在微秒级别。应用场景:适用于某些耗时非常短,但对可靠性要求并
         * 不高的场景,例如日志收集。
         * 参数1: topic:tag
         * 参数2:  消息体 可以为一个对象
         */
        rocketMQTemplate.sendOneWay("springboot-topic:tag1", "这是一条单向消息");
    }


    public void sendOneWayOrderly() {
        /**
         * 发送单向的顺序消息
         * 参数1: topic:tag
         * 参数2:  消息体 可以为一个对象
         */
        rocketMQTemplate.sendOneWayOrderly("springboot-topic:tag1", "这是一条顺序消息", "8888");
    }

}
复制代码

一共有三种类型,它们的使用方法和作用已经写在注释上了,它们的选择方案如下:

  • 当发送的消息不重要时,采用one-way方式,以提高吞吐量,效率最高
  • 当发送的消息很重要时,且对响应时间不敏感的时候采用sync方式
  • 当发送的消息很重要时,且对响应时间非常敏感的时候采用async方式

写一个测试类测试下

/**
 * @author jackxu
 */
@SpringBootTest
class SpringbootRocketmqApplicationTests {

    @Autowired
    private MessageSender sender;

    @Test
    public void syncSendTest() {
        sender.syncSend();
    }


    @Test
    public void asyncSendTest() throws Exception {
        sender.asyncSend();
    }


    @Test
    public void sendOneWayTest() {
        sender.sendOneWay();
    }


    @Test
    public void sendOneWayOrderlyTest() {
        sender.sendOneWayOrderly();
    }

}
复制代码

这里选择异步发送sendOneWayTest,执行一下,查看结果,发送成功并且回调了。

image.png
看下控制台,也有这条消息
image.png
消费端也消费成功,测试完成。
image.png

源码已经上传至 github.com/xuhaoj/spri… ,感兴趣的小伙伴可以下载下来观看。

结语

最后推荐一本电子书《RocketMQ实战与原理解析》作为课外读物,下载链接在:pan.baidu.com/s/1Ah1Gm3CX…
提取码:jack ,原创不易,觉得写的不错请点一个赞。。