1.绪论
commitLog是rocketmq存储的核心,前面我们介绍了mappedfile、mappedfilequeue、刷盘策略,其实commitlog的核心组件我们基本上已经介绍完成。
2.commitLog的组成
commitLog的核心其实就是MqppedFilequeue,它本质上就是多个mappedFile的queue,所以可以看出commitLog和mappedFile是一对多的关系。
2.1 commitLog的基本组件
下面是commitLog的具体组成的一些组件:
public class CommitLog implements Swappable {
// Message's MAGIC CODE daa320a7
//commitLog的魔数
public final static int MESSAGE_MAGIC_CODE = -626843481;
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
public final static int BLANK_MAGIC_CODE = -875286124;
/**
* CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit fixed-length string + PROPERTY_SEPARATOR]
*/
public static final int CRC32_RESERVED_LEN = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1;
//核心:即MqppedFilequeue是真正存储消息的地方,可以看出是由多个MappedFile组成
protected final MappedFileQueue mappedFileQueue;
//当前commitLog所属的MessageStore组件
protected final DefaultMessageStore defaultMessageStore;
//进行flush的组件
private final FlushManager flushManager;
//不常用的数据的检查组件
private final ColdDataCheckService coldDataCheckService;
//真正写入数据的组件
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
protected final PutMessageLock putMessageLock;
protected final TopicQueueLock topicQueueLock;
private volatile Set<String> fullStorePaths = Collections.emptySet();
//刷新磁盘的监视器
private final FlushDiskWatcher flushDiskWatcher;
//commitlog大小,默认为1gb
protected int commitLogSize;
private final boolean enabledAppendPropCRC;
//多路分发器,rocketmq为了支持mutt协议的组件
protected final MultiDispatch multiDispatch;
}
构造函数:
public CommitLog(final DefaultMessageStore defaultMessageStore) {
String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
//初始化messagequeue,这个时候文件会与直接内存建立映射关系,并且完成文件预热
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
defaultMessageStore.getAllocateMappedFileService());
}
this.defaultMessageStore = defaultMessageStore;
//如果是同步的时候,采用GroupCommitService进行flush
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
//如果是异步刷盘,采用FlushRealTimeService进行flush
this.flushCommitLogService = new FlushRealTimeService();
}
//开启瞬时缓存池技术的话,采用CommitRealTimeService进行commit
this.commitLogService = new CommitRealTimeService();
this.appendMessageCallback = new DefaultAppendMessageCallback();
//每个线程都有一个消息的编码器,MessageExtEncoder负责将消息进行编码到bytebuffer中
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@Override
protected PutMessageThreadLocal initialValue() {
return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
//rocketmq实现,mutt协议的组件,它可以让消息只有一个commitlog但是分发到多个queue中
this.multiDispatch = new MultiDispatch(defaultMessageStore, this);
//当采用同步刷盘的时候,调用线程会阻塞等待刷盘结果,FlushDiskWatcher会检测每一个flush请求,如果超过完成时间
//便会自动的唤醒调用线程,防止调用线程阻塞过久的场景出现
flushDiskWatcher = new FlushDiskWatcher();
}
2.2 commtLog真正存储消息的地方-MappedFileQueue
mappedfilequeue是commitLog的核心,在前面mappedfilequeue中已经仔细分析过,这里不再赘述。详情请看:深度解析RocketMq源码-持久化组件(二) MappedFileQueue-CSDN博客
2.3 commitLog是如何落盘的-FlushCommitLogService
flushmanager包含了是对commotlog罗盘进行处理的组件,它包含了commit和flush两个逻辑。在前面刷盘策略中已经仔细分析过,这里不再赘述。详情请看:
深度解析RocketMq源码-持久化组件(三) 刷盘策略-CSDN博客
2.4 同步刷盘失败怎么办 -FlushDiskWatcher
当采用同步刷盘的时候,调用线程会阻塞等待刷盘结果,FlushDiskWatcher会检测每一个flush请求,如果超过完成时间便会自动的唤醒调用线程,防止调用线程阻塞过久的场景出现。由此可以看出,就算是同步刷盘策略,也有可能因为超过3s没有刷盘成功导致数据丢失。
@Override
public void run() {
while (!isStopped()) {
GroupCommitRequest request = null;
try {
//获取提交的flush请求
request = commitRequests.take();
} catch (InterruptedException e) {
log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
continue;
}
//
while (!request.future().isDone()) {
long now = System.nanoTime();
//如果超过了请求的超时时间
if (now - request.getDeadLine() >= 0) {
//返回刷盘超时
request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
break;
}
// To avoid frequent thread switching, replace future.get with sleep here,
long sleepTime = (request.getDeadLine() - now) / 1_000_000;
sleepTime = Math.min(10, sleepTime);
if (sleepTime == 0) {
request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
break;
}
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
log.warn(
"An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
break;
}
}
}
}
3. commitLog的常见方法
3.1 commitLog是如何写入消息的 - asyncPutMessage
commitLog的核心方法就是写入消息,本质上就是在写入消息前对消息设置一些参数,比如存储时间戳,或者顺序消息的话,会转移topic,会把消息真正的topic覆盖原来的topic。然后获取到最后一个commitLog,并调用commitLog的appendMessage方法写入消息。所以commitLog其实是将消息顺序写入的。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// Set the storage time
//设置存储时间戳
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
//将消息体进行crc编码,并存储到bodyCrc中
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
//获取到对应的topic
String topic = msg.getTopic();
// int queueId msg.getQueueId();
//如果是事务消息,获取到事务类型prepared或者commit或者rollback
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//如果是非事务消息或者commit消息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
//如果是延迟消息的话,需要校验延迟消息的最大延迟是否在mq的要求的范围内
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//并且将消息的延迟级别获取到延迟消息需要放入的queueid
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
//并且设置消息真正放入的地方也就是SCHEDULE_TOPIC_XXXX中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
//设置存储的时间戳
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
updateMaxMessageSize(putMessageThreadLocal);
if (!multiDispatch.isMultiDispatchMsg(msg)) {
//获取到消息的编码器,并且编码,因为encode为一个公共类,为了防止锁竞争,所以放入到threadLocal中
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
}
//构建消息上下文中
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
//开始写入消息
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
//获取到最后一个mappedfile
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
}
//调用mappedfile的appendMessage方法写入消息
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
} finally {
beginTimeInLock = 0;
putMessageLock.unlock();
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//设置返回结果
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
3.2 commitLog是如何flush消息的-submitFlushRequest
commitLog如果要flush磁盘的话,其实是提交一个flush请求,然后根据同步刷盘还是异步刷盘,最后交由FlushCommitLogService来消费请求,最后调用mappedFile的flush方法,将数据从buffer中flush到磁盘中去的。
/**
* @param result 向mappedfile中追加消息的结果
* @param messageExt 具体的消息内容已经协议头等
* @return
*/
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
//如果采用同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//采用的是GroupCommitService进行刷盘
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//默认是需要等待消息刷盘成功的
if (messageExt.isWaitStoreMsgOK()) {
//构建刷盘请求,包括当前消息的在buffer中的写指针
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
//调用GroupCommitService的putRequest推送刷盘请求
service.putRequest(request);
//当前线程阻塞等待刷盘结果
return request.future();
} else {
//如果不需要等待刷盘线程成功,便直接唤醒刷盘线程,并且返回成功(此时也可能有丢失数据的风险)
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
//如果采用异步刷盘策略
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//如果未采用了瞬时缓存池技术,便唤醒flush服务线程
flushCommitLogService.wakeup();
} else {
//如果未采用了瞬时缓存池技术,便唤醒commit服务线程
commitLogService.wakeup();
}
//返回成功
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
3.3 rocketMq是如何实现主从同步的
rocketmq以前采用的方式是通过主从的方式,主节点负责写入,然后并且commitLog同步到从节点的方式实现高可用的,但是这样有个问题,就是主节点宕机过后,需要手动的修改某个从节点为主节点。现在这种架构基本上没有使用,而换用的是后面要讲解的用raft协议实现dledger高可用架构。我们接下来大致了解一下主从架构是如何同步的。
3.3.1 提交复制请求 -submitReplicaRequest
public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
//如果节点为主节点,并且为同步复制
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
//如果消息已经同步完毕
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
//会调用HAserver的putRequest向里面发送复制请求
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
return request.future();
}
else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
3.3.2 rocketmq的主从同步组件-HAService
1.主节点监听从节点的连接请求,并且建立socket连接
下面的代码是nio的监听并且建立连接的标准写法。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
//监听多路复用的请求
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
//拿到与从节点的channel过后,并且凑早HAconnection
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
//加入到链接池connectionList中
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
2.将commitLog通过网络连接发送至从节点-GroupTransferService
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
3.从节点读取消息并且将其加入到自己的commitLog中-HAClient
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
//从网络中读取消息到byteBufferRead中
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
//将bytebuffer中的数据写入到磁盘中
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
private boolean dispatchReadRequest() {
//获取到消息头大小
final int msgHeaderSize = 8 + 4; // phyoffset + size
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
//获取到完整的消息内容
if (diff >= msgHeaderSize) {
//获取到master的物理偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
//获取到消息体
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
//当前同步的最大偏移量
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
//如果从节点偏移量和主节点偏移量不想当便直接返回
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = byteBufferRead.array();
//获取到消息体开始的指针
int dataStart = this.dispatchPosition + msgHeaderSize;
//调用mappedfile的appendToCommitLog方法直接写入待commitLog中
HAService.this.defaultMessageStore.appendToCommitLog(
masterPhyOffset, bodyData, dataStart, bodySize);
//更新同步偏移量
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
4.总结
至此,rocketmq的核心持久化主键commitLog我们便已经全部分析完成,简单而言,commitLog持有一个messageFileQueue,而mappedFileQueue对应不同的mappedFile文件,而mappedFile通过mmap技术与磁盘中的文件建立映射。而磁盘中的物理文件的每个文件的名称都是以它当前的文件的起始偏移量命名的,比如第一个文件为0.log,1个mappedfile的大小为1gb,假设第一个文件的内容刚好全部写满,所以其第二个文件的名称为1024*1024*1024.log。一有消息到来,就会向commitLog中写入文件,进行持久化,其实就是获取到最后一个mappedFile,并且调用他的appendMessage方法完成消息的写入。