前言
刷盘是将内存中的消息写入磁盘,分为同步刷盘和异步刷盘。同步刷盘指一条消息写入磁盘才返回成功,异步刷盘指写入内存就返回成功,稍后异步线程刷盘。
在创建CommitLog对象的时候,会初始化刷盘服务:
//代码位置:org.apache.rocketmq.store.CommitLog
public CommitLog(final DefaultMessageStore defaultMessageStore) {
//省略代码
//同步刷盘
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
//异步刷盘
this.flushCommitLogService = new FlushRealTimeService();
}
//实时提交服务
this.commitLogService = new CommitRealTimeService();
//省略代码
}
RocketMQ默认的是异步刷盘。刷盘服务的继承关系如下:
从上面类图可以看出,不管是同步刷盘还是异步刷盘都是继承了FlushCommitLogService类,GroupCommitService类用于同步刷盘,FlushRealTimeService用于异步刷盘。如果transientStorePoolEnable=true,并且是异步刷盘,则消息append时会放入writeBuffer(堆外内存),CommitRealTimeService的作用是将堆外内存中的消息异步写入到fileChannel中。
在CommitLog启动的时候,会启动刷盘服务,代码如下:
//org.apache.rocketmq.store.CommitLog#start
public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
isTransientStorePoolEnable的判断为transientStorePoolEnable为true,并且是异步刷盘。由于transientStorePoolEnable默认为false,所以默认不会进行commit操作
public boolean isTransientStorePoolEnable() {
return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
&& BrokerRole.SLAVE != getBrokerRole();
}
在putMessage方法中,当消息写到缓存以后,就会调用handleDiskFlush方法进行刷盘,handleDiskFlush方法如下:
//代码位置:org.apache.rocketmq.store.CommitLog#handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
//同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//等待消息存储成功
if (messageExt.isWaitStoreMsgOK()) {
//创建一个同步刷盘请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//添加同步刷盘请求
service.putRequest(request);
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
//阻塞获取刷盘状态
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
//刷盘的状态不成功
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
//唤醒刷盘服务
service.wakeup();
}
}
// Asynchronous flush
else {
//TransientStorePool开关不可用
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable())