回顾
前面介绍了Producer把消息发到了Broker,接下来研究一下Broker是如何进行消息存储的,最终存储文件的有哪些?
- commitLog: 消息存储目录;
- config:运行期间的配置信息;
- consumerqueue:消息消费队列存储目录;
- index:消息索引文件存储目录;
- abort:如果该文件存在,则表明Broker非正产关闭;
- checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间、index索引文件最后一次刷盘时间戳
消息存储入口
org.apache.rocketmq.store.DefaultMessageStore#putMessage
这个是根据视频找的,Netty一层层调用太多了,最终会调用到这个方法执行消息的存储。
执行commitLog写入的方法是
// 消息写入由这个方法执行
PutMessageResult result = this.commitLog.putMessage(msg);
复制代码
延迟消息Topic替换
org.apache.rocketmq.store.CommitLog#putMessage
// 主体逻辑:
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 延迟消息:延迟消息写入时会转为写到SCHEDULE_TOPIC_xxxx这个Topic中
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 这里会替换我们设置的topic, 改为MQ自己定义的Topic名称,头天换日了。
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
复制代码
有意思的是直接修改了消息的topic和id,在延时消息中有种偷梁换柱的感觉。
文件写入
// 零拷贝实现->mmp
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 线程锁,注意使用锁的这种方式
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
复制代码
使用mmp技术拷贝文件,如果文件满了,会新创建一个文件,然后重写消息
// 以追加的方式写入文件
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
// 文件写入结果
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE: // 如果文件写满了,重新创建一个文件,重写消息
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
复制代码
接着刷盘、同步文件
// 文件刷盘
handleDiskFlush(result, putMessageResult, msg);
// 主从同步
handleHA(result, putMessageResult, msg);
复制代码
刷盘分为同步刷盘和异步,对应不同的处理逻辑。
org.apache.rocketmq.store.CommitLog#handleDiskFlush
// 文件刷盘方法
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush 同步
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
...
// 同步等待文件刷新
try {
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush 异步
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
复制代码
在异步休眠启动线程中,会有500ms的休眠,这也是为什么数据会丢失的原因:
org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
复制代码
主从同步
ToDo:涉及到Raft协议,Raft包含很多模块,后面与其他协议一起做对比分析。
org.apache.rocketmq.store.CommitLog#handleHA
写入之后,要进行消息分发,来看一下消息分发的逻辑。
消息分发
org.apache.rocketmq.broker.BrokerController#start
在启动MessageStore的时候
org.apache.rocketmq.store.DefaultMessageStore#start
// ToDo: k2 Broker启动时,会启动一个线程来更新Consumerqueue索引文件
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
复制代码
调用这里的start方法启动线程将CommitLog中的消息分发到consumequeue中
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
复制代码
每隔1ms分发一次消息。
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
...
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 从CommitLog中获取一个dispatchRequest,拿到一个需要进行转发的消息,即从commitlog中读取的
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 分发CommitLog写入消息
DefaultMessageStore.this.doDispatch(dispatchRequest);
// ToDo: k2->长轮训:如果消息到了主节点,并且开启了长轮训
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
// 唤醒NotifyMessageArrivingListener方法进行一次请求线程的检查
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
...
复制代码
消息分发具体有两个任务:分发ConsumeQueue、分发IndexFile
org.apache.rocketmq.store.DefaultMessageStore#doDispatch
// k2 将commitlog写入的事件转发到ConsumeQueue和IndexFile
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
复制代码
ConsumerQueue分发构建
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
// ToDo: k1->ConsumereQueue消息分发构建器
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
复制代码
indexFile文件分发构建
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex#dispatch
// ToDo: k1->IndexFile文件分发构建器
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
复制代码
一般做定制化也是从这两个方法下手。
过期文件删除
org.apache.rocketmq.store.DefaultMessageStore#start
// ToDo: k2->Broker启动删除过期文件的定时任务
this.addScheduleTask();
复制代码
两种文件定期删除
org.apache.rocketmq.store.DefaultMessageStore#addScheduleTask
// ToDo: k1->定时删除过期消息的任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
复制代码
org.apache.rocketmq.store.DefaultMessageStore#cleanFilesPeriodically
private void cleanFilesPeriodically() {
// 定时删除过期的commitlog
this.cleanCommitLogService.run();
// 定时删除过期的Consumerqueue
this.cleanConsumeQueueService.run();
}
复制代码
至此,消息存储的大致流程以及入口代码都陈列了出来。
小结
流程图如下:




近期评论