RocketMQ源码学习(四)-消息存储回顾消息存储入口

回顾

前面介绍了Producer把消息发到了Broker,接下来研究一下Broker是如何进行消息存储的,最终存储文件的有哪些?

  • commitLog: 消息存储目录;
  • config:运行期间的配置信息;
  • consumerqueue:消息消费队列存储目录;
  • index:消息索引文件存储目录;
  • abort:如果该文件存在,则表明Broker非正产关闭;
  • checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间、index索引文件最后一次刷盘时间戳

消息存储入口

org.apache.rocketmq.store.DefaultMessageStore#putMessage

这个是根据视频找的,Netty一层层调用太多了,最终会调用到这个方法执行消息的存储。

执行commitLog写入的方法是

// 消息写入由这个方法执行
PutMessageResult result = this.commitLog.putMessage(msg);
复制代码

延迟消息Topic替换

org.apache.rocketmq.store.CommitLog#putMessage

// 主体逻辑:
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    // Delay Delivery
    // 延迟消息:延迟消息写入时会转为写到SCHEDULE_TOPIC_xxxx这个Topic中
    if (msg.getDelayTimeLevel() > 0) {
        if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
        }

        topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

        // Backup real topic, queueId
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        // 这里会替换我们设置的topic, 改为MQ自己定义的Topic名称,头天换日了。
        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
}
复制代码

有意思的是直接修改了消息的topic和id,在延时消息中有种偷梁换柱的感觉。

文件写入

// 零拷贝实现->mmp
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 线程锁,注意使用锁的这种方式
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
复制代码

使用mmp技术拷贝文件,如果文件满了,会新创建一个文件,然后重写消息

// 以追加的方式写入文件
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
// 文件写入结果
switch (result.getStatus()) {
    case PUT_OK:
        break;
    case END_OF_FILE:   // 如果文件写满了,重新创建一个文件,重写消息
        unlockMappedFile = mappedFile;
        // Create a new file, re-write the message
        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
        if (null == mappedFile) {
            // XXX: warn and notify me
            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
        }
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
        break;
    case MESSAGE_SIZE_EXCEEDED:
    case PROPERTIES_SIZE_EXCEEDED:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
    case UNKNOWN_ERROR:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
    default:
        beginTimeInLock = 0;
        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
复制代码

接着刷盘、同步文件

// 文件刷盘
handleDiskFlush(result, putMessageResult, msg);
// 主从同步
handleHA(result, putMessageResult, msg);
复制代码

刷盘分为同步刷盘和异步,对应不同的处理逻辑。

org.apache.rocketmq.store.CommitLog#handleDiskFlush

// 文件刷盘方法
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush 同步
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
...
            // 同步等待文件刷新
            try {
                flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                        TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                //flushOK=false;
            }
            if (flushStatus != PutMessageStatus.PUT_OK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // Asynchronous flush 异步
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}
复制代码

在异步休眠启动线程中,会有500ms的休眠,这也是为什么数据会丢失的原因:

org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run

if (flushCommitLogTimed) {
    Thread.sleep(interval);
} else {
    this.waitForRunning(interval);
}
复制代码

主从同步

ToDo:涉及到Raft协议,Raft包含很多模块,后面与其他协议一起做对比分析。

org.apache.rocketmq.store.CommitLog#handleHA

写入之后,要进行消息分发,来看一下消息分发的逻辑。

消息分发

org.apache.rocketmq.broker.BrokerController#start

在启动MessageStore的时候

org.apache.rocketmq.store.DefaultMessageStore#start

// ToDo: k2 Broker启动时,会启动一个线程来更新Consumerqueue索引文件
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
复制代码

调用这里的start方法启动线程将CommitLog中的消息分发到consumequeue中

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run

@Override
public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            Thread.sleep(1);
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
复制代码

每隔1ms分发一次消息。

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput

...

for (int readSize = 0; readSize < result.getSize() && doNext; ) {
    // 从CommitLog中获取一个dispatchRequest,拿到一个需要进行转发的消息,即从commitlog中读取的
    DispatchRequest dispatchRequest =
        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

    if (dispatchRequest.isSuccess()) {
        if (size > 0) {
            // 分发CommitLog写入消息
            DefaultMessageStore.this.doDispatch(dispatchRequest);
            // ToDo: k2->长轮训:如果消息到了主节点,并且开启了长轮训
            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                // 唤醒NotifyMessageArrivingListener方法进行一次请求线程的检查
                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
            }
            
            ...
复制代码

消息分发具体有两个任务:分发ConsumeQueue、分发IndexFile

org.apache.rocketmq.store.DefaultMessageStore#doDispatch

// k2 将commitlog写入的事件转发到ConsumeQueue和IndexFile
public void doDispatch(DispatchRequest req) {
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}
复制代码

ConsumerQueue分发构建

org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch

// ToDo: k1->ConsumereQueue消息分发构建器
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}
复制代码

indexFile文件分发构建

org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex#dispatch

// ToDo: k1->IndexFile文件分发构建器
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}
复制代码

一般做定制化也是从这两个方法下手。

过期文件删除

org.apache.rocketmq.store.DefaultMessageStore#start

// ToDo: k2->Broker启动删除过期文件的定时任务
this.addScheduleTask();
复制代码

两种文件定期删除

org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask

// ToDo: k1->定时删除过期消息的任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //
        DefaultMessageStore.this.cleanFilesPeriodically();
    }
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
复制代码

org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically

private void cleanFilesPeriodically() {
    // 定时删除过期的commitlog
    this.cleanCommitLogService.run();
    // 定时删除过期的Consumerqueue
    this.cleanConsumeQueueService.run();
}
复制代码

至此,消息存储的大致流程以及入口代码都陈列了出来。

小结

流程图如下:

image.png