消息中间件 RocketMQ 高级功能和源码分析(八)
一、消息中间件 RocketMQ 源码分析:实时更新消息消费队列与索引文件流程说明
1、实时更新消息消费队列与索引文件
消息消费队文件、消息属性索引文件都是基于 CommitLog 文件构建的,当消息生产者提交的消息存储在 CommitLog 文件中,ConsumerQueue、IndexFile 需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ 通过开启一个线程 ReputMessageService 来准实时转发 CommitLog 文件更新事件,相应的任务处理器根据转发的消息及时更新 ConsumerQueue、IndexFile 文件。
2、消息存储结构 示例图:
3、构建消息消费队列和索引文件 示例图:
4、 代码:DefaultMessageStore:start
//设置CommitLog内存中最大偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动
this.reputMessageService.start();
5、 代码:DefaultMessageStore:run
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
//每隔1毫秒就继续尝试推送消息到消息消费队列和索引文件
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
6、 代码:DefaultMessageStore:deReput
//从result中循环遍历消息,一次读一条,创建DispatherRequest对象。
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
}
}
}
7、 DispatchRequest
String topic; //消息主题名称
int queueId; //消息队列ID
long commitLogOffset; //消息物理偏移量
int msgSize; //消息长度
long tagsCode; //消息过滤tag hashCode
long storeTimestamp; //消息存储时间戳
long consumeQueueOffset; //消息队列偏移量
String keys; //消息索引key
boolean success; //是否成功解析到完整的消息
String uniqKey; //消息唯一键
int sysFlag; //消息系统标记
long preparedTransactionOffset; //消息预处理事务偏移量
Map<String, String> propertiesMap; //消息属性
byte[] bitMap; //位图
二、消息中间件 RocketMQ 源码分析:转发数据到 ConsumerQueue 文件
1、转发到 ConsumerQueue 消息分发到消息消费队列 示例图:
2、 代码 CommitLogDispatcherBuildConsumeQueue 类:
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
//消息分发
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
3、 代码:DefaultMessageStore#putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//获得消费队列
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
//消费队列分发消息
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
4、 代码:DefaultMessageStore#putMessagePositionInfo
//依次将消息偏移量、消息长度、tag写入到ByteBuffer中
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
//获得内存映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
//将消息追加到内存映射文件,异步输盘
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
三、消息中间件 RocketMQ 源码分析:转发 IndexFile 文件
1、转发到 Index 消息分发到索引文件 示例图:
2、 代码 CommitLogDispatcherBuildIndex 类:
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
3、 代码:DefaultMessageStore#buildIndex
public void buildIndex(DispatchRequest req) {
//获得索引文件
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
//获得文件最大物理偏移量
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
//如果该消息的物理偏移量小于索引文件中的最大物理偏移量,则说明是重复数据,忽略本次索引构建
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
//如果消息ID不为空,则添加到Hash索引中
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
return;
}
}
//构建索引key,RocketMQ支持为同一个消息建立多个索引,多个索引键空格隔开.
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
四、消息中间件 RocketMQ 源码分析:消息队列和索引文件恢复
1、消息队列和索引文件恢复
由于 RocketMQ 存储首先将消息全量存储在 CommitLog 文件中,然后异步生成转发任务更新 ConsumerQueue 和 Index 文件。如果消息成功存储到 CommitLog 文件中,转发任务未成功执行,此时消息服务器 Broker 由于某个愿意宕机,导致CommitLog、ConsumerQueue、IndexFile 文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在 CommitLog 中文件中存在,但由于没有转发到 ConsumerQueue,这部分消息将永远复发被消费者消费。
2、文件恢复总体流程 示例图:
3、存储文件加载
代码:DefaultMessageStore#load
判断上一次是否异常退出。实现机制是 Broker 在启动时创建 abort 文件,在退出时通过 JVM 钩子函数删除 abort 文件。如果下次启动时存在 abort 文件。说明 Broker 时异常退出的,CommitLog 与 ConsumerQueue 数据有可能不一致,需要进行修复。
//判断临时文件是否存在
boolean lastExitOK = !this.isTempFileExist();
//根据临时文件判断当前Broker是否异常退出
private boolean isTempFileExist() {
String fileName = StorePathConfigHelper
.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
}
4、 代码:DefaultMessageStore#load
//加载延时队列
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// 加载CommitLog文件
result = result && this.commitLog.load();
// 加载消费队列文件
result = result && this.loadConsumeQueue();
if (result) {
//加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点
this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//加载index文件
this.indexService.load(lastExitOK);
//根据Broker是否异常退出,执行不同的恢复策略
this.recover(lastExitOK);
}
5、 代码:MappedFileQueue#load
加载 CommitLog 到映射文件
//指向CommitLog文件目录
File dir = new File(this.storePath);
//获得文件数组
File[] files = dir.listFiles();
if (files != null) {
// 文件排序
Arrays.sort(files);
//遍历文件
for (File file : files) {
//如果文件大小和配置文件不一致,退出
if (file.length() != this.mappedFileSize) {
return false;
}
try {
//创建映射文件
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
//将映射文件添加到队列
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
6、 代码:DefaultMessageStore#loadConsumeQueue
加载消息消费队列
//执行消费队列目录
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
//遍历消费队列目录
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
//获得子目录名称,即topic名称
String topic = fileTopic.getName();
//遍历子目录下的消费队列文件
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
//遍历文件
for (File fileQueueId : fileQueueIdList) {
//文件名称即队列ID
int queueId;
try {
queueId = Integer.parseInt(fileQueueId.getName());
} catch (NumberFormatException e) {
continue;
}
//创建消费队列并加载到内存
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
}
}
}
}
}
log.info("load logics queue all over, OK");
return true;
7、 代码:IndexService#load
加载索引文件
public boolean load(final boolean lastExitOK) {
//索引文件目录
File dir = new File(this.storePath);
//遍历索引文件
File[] files = dir.listFiles();
if (files != null) {
//文件排序
Arrays.sort(files);
//遍历文件
for (File file : files) {
try {
//加载索引文件
IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
f.load();
if (!lastExitOK) {
//索引文件上次的刷盘时间小于该索引文件的消息时间戳,该文件将立即删除
if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
.getIndexMsgTimestamp()) {
f.destroy(0);
continue;
}
}
//将索引文件添加到队列
log.info("load index file OK, " + f.getFileName());
this.indexFileList.add(f);
} catch (IOException e) {
log.error("load file {} error", file, e);
return false;
} catch (NumberFormatException e) {
log.error("load file {} error", file, e);
}
}
}
return true;
}
8、 代码:DefaultMessageStore#recover
文件恢复,根据 Broker 是否正常退出执行不同的恢复策略
private void recover(final boolean lastExitOK) {
//获得最大的物理便宜消费队列
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
//正常恢复
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
//异常恢复
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
//在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量
this.recoverTopicQueueTable();
}
9、 代码:DefaultMessageStore#recoverTopicQueueTable
恢复 ConsumerQueue 后,将在 CommitLog 实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列 ID、还存储了消息队列的关键所在。
public void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
//CommitLog最小偏移量
long minPhyOffset = this.commitLog.getMinOffset();
//遍历消费队列,将消费队列保存在CommitLog中
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
table.put(key, logic.getMaxOffsetInQueue());
logic.correctMinOffset(minPhyOffset);
}
}
this.commitLog.setTopicQueueTable(table);
}
五、消息中间件 RocketMQ 源码分析:正常恢复和异常恢复
1、正常恢复
代码:CommitLog#recoverNormally
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
//Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
//代表当前已校验通过的offset
long mappedFileOffset = 0;
while (true) {
//查找消息
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
//消息长度
int size = dispatchRequest.getMsgSize();
//查找结果为true,并且消息长度大于0,表示消息正确.mappedFileOffset向前移动本消息长度
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
//如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
break;
} else {
//取出每个文件
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
}
}
// 查找结果为false,表明该文件未填满所有消息,跳出循环,结束循环
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
//更新MappedFileQueue的flushedWhere和committedWhere指针
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
//删除offset之后的所有文件
this.mappedFileQueue.truncateDirtyFiles(processOffset);
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
2、 代码:MappedFileQueue#truncateDirtyFiles
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
//遍历目录下文件
for (MappedFile file : this.mappedFiles) {
//文件尾部的偏移量
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
//文件尾部的偏移量大于offset
if (fileTailOffset > offset) {
//offset大于文件的起始偏移量
if (offset >= file.getFileFromOffset()) {
//更新wrotePosition、committedPosition、flushedPosistion
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
//offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,释放mappedFile占用内存,删除文件
file.destroy(1000);
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
3、异常恢复
Broker 异常停止文件恢复的实现为 CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果 CommitLog 目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。
代码:CommitLog#recoverAbnormally
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
//判断消息文件是否是一个正确的文件
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
//根据索引取出mappedFile文件
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
//...验证消息的合法性,并将消息转发到消息消费队列和索引文件
}else{
//未找到mappedFile,重置flushWhere、committedWhere都为0,销毁消息队列文件
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(七)