本文着重分析为consumequeue/topic/queueId
目录下的索引文件。
1.ConsumeQueueStore
public class ConsumeQueueStore {
protected final ConcurrentMap<String>, ConcurrentMap<Integer>, ConsumeQueueInterface>> consumeQueueTable;
public boolean load() {
String storePathRootDir = this.messageStoreConfig.getStorePathRootDir();
String storePathConsumeQueue = getStorePathConsumeQueue(storePathRootDir);
boolean cqLoadResult = loadConsumeQueues(storePathConsumeQueue, CQType.SimpleCQ);
String storePathBatchConsumeQueue = getStorePathBatchConsumeQueue(storePathRootDir);
boolean bcqLoadResult = loadConsumeQueues(storePathBatchConsumeQueue, CQType.BatchCQ);
return cqLoadResult && bcqLoadResult;
}
//Broker启动后加载本地的consumequeue文件
private boolean loadConsumeQueues(String storePath, CQType cqType) {
File dirLogic = new File(storePath);
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
for (File fileTopic : fileTopicList) {
String topic = fileTopic.getName();
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null) {
for (File fileQueueId : fileQueueIdList) {
int queueId = Integer.parseInt(fileQueueId.getName());;
queueTypeShouldBe(topic, cqType);
//选择 ConsumeQueue or BatchConsumeQueue 本文以 ConsumeQueue 作为分析案例
ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);
this.putConsumeQueue(topic, queueId, logic);
if (!this.load(logic)) {
return false;
}
}
}
}
}
return true;
}
private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
if (null == map) {
map = new ConcurrentHashMap<>();
map.put(queueId, consumeQueue);
this.consumeQueueTable.put(topic, map);
} else {
map.put(queueId, consumeQueue);
}
}
public boolean load(ConsumeQueueInterface consumeQueue) {
// 通过 topic & queueId 从consumeQueueTable 获取到 对应的FileQueueLifeCycle 即ConsumeQueue
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
return fileQueueLifeCycle.load();
}
}
1.1.ConsumeQueue
public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
private final MappedFileQueue mappedFileQueue;
@Override
public boolean load() {
boolean result = this.mappedFileQueue.load();
return result;
}
}
1.2.MappedFileQueue
mappedFileQueue.load
核心功能就是加载consumequeue/topic/queueId
目录下的消费索引本地文件。区别CommitLog加载的是/commitlog目录下真正的用户数据。
ConsumeQueue & CommitLog 均持有属性类MappedFileQueue【mmap零拷贝之内存映射的磁盘文件】。
DefaultMessageStore#ReputMessageService
CommitLog & ConsumerQueue 目录下的所有问题在Broker端启动的时候默认都会加载到内存中建立与磁盘之间的映射关系。但是在CommitLog不断增加数据过程中,ConsumerQueue是如何确认每条消息的索引文件呢?