1.绪论
rocketmq的broker中关于消息持久化的组件主要包含三个,分别是:持久化消息到文件中的组件commitLog;根据消息key索引commitLog日志的indexFile;消费者根据topic和queueId查询commitLog日志的consumeQueue。前面已经介绍commitLog和indexFile,这节我们就来学习一下consumeQueue。
2. consumeQueue
4.1.1 consumeQueue的中每条消息的组成
消费者在消息的时候,只知道它在哪个topic的哪个queue里面消费哪个tags的拿条消息,那我们怎么由此定位到一条消息呢?所以我们需要为commitLog建立一条索引。
其实每个topic+queueId都会建立一个ConsumeQueue文件,而这个映射关系存储在broker中consumeQueueTable中,我们查询消息的时候,过consumeQueueTable根据queueId和topic快速定位到我们需要的ConsumeQueue,然后我们再根据消费者所提交的consumeQueueOffset*每条consumequeue索引的大小便能找到我们所以要的consume索引文件的位置,再根据里面存储的commitLog的物理偏移量便能在commitLog中定位到具体的消息的位置。
commitLog的存储结构可以如下所示:
└── TopicTest
├── 0
└── 00000000000000000000
├── 1
└── 00000000000000000000
├── 2
└── 00000000000000000000
└── 3
└── 00000000000000000000
可以通过下图来说明,consumequeue是如何组成的,并且和commitLog的关系:
3.consumeQueue中消息的创建和获取
consumequeue其实底层和commitLog是一样的,其实由多个mappedFile来构成的,这里我们就不在讨论consumequeue的具体存储逻辑。有兴趣的小伙伴可以看这篇文章:
深度解析RocketMq源码-持久化组件(四) CommitLog_rocketmq一主一备commitlog-CSDN博客
接下来我们主要看一看consumequeue是什么时候创建的,并且在消费者知道它需要消费的topic和queueId过后,如何找到它具体要消费的哪条消息的,这其实也是rocketmq的核心之一。
3.1. consumequeue建立消息
其实consumequeue是一个采用mappedFile持久化数据的组件,它写入数据其实发分两步:
1.根据topic和queueId在topic和queueid与consumequeue的映射表(consumeQueueTable)中找到Consumequeue。
2.构造一条consumequeue记录,包括8字节的8字节的commitLog的offset + 4字节的消息大小+8自己的tagsCode,然后顺序写入到consumequeue中。
3.1.1 根据topic和queueId找到需要写入的consumequeue
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//根据topic和queueId找到对应的consumequeue
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
//根据consumequeue顺序写入consumemequeue的索引数据
cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}
//根据queueId和topic检索到这个topic和queueId是属于哪个topic的
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
//根据consumeQueueTable和queueId获取到具体的ConsumeQueue
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
//如果consumeQueue为空,便新建一个Consumequeue
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
//并且设置到consumeQueueTable中
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
if (MixAll.isLmq(topic)) {
lmqConsumeQueueNum.getAndIncrement();
}
logic = newLogic;
}
}
return logic;
}
3.1.2 向consumequeue中写入一条记录
//commitLog的真正组成8字节的commitLog的offset + 4字节的消息大小+8自己的tagsCode
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
this.byteBufferIndex.flip();
//consumqueue的每条数据占20个字节
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
//8字节的commotLog的物理偏移量
this.byteBufferIndex.putLong(offset);
//4字节消息大小
this.byteBufferIndex.putInt(size);
//8字节的tagsCode
this.byteBufferIndex.putLong(tagsCode);
//写入到哪个offset
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
//获取到offset的最后一条数据
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
//将跟新mappedFilde1写指针为expectLogicOffset
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
//
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
//实际的物理地址为offset+size大小
this.maxPhysicOffset = offset + size;
//将数据写入到bytebuffer中
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
3.2 根据topic和messagequeue检索消息
先根据topic和queueId找到以及consumeoffset查询一条消息分三步:
1.根据topic和queueId查询到对应的consumequeue;
2.根据consumeoffset在consumequeue中找到一条consumequeue的记录,里面包含一个属性就是实际消息在commitLog中的物理偏移量和大小;
3.根据物理偏移量和消息大小在commitLog中获取到实际消息内容。
//根据queueId和topic检索到这个topic和queueId是属于哪个topic的
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
//根据consumeQueueTable和queueId获取到具体的ConsumeQueue
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
//如果consumeQueue为空,便新建一个Consumequeue
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
//并且设置到consumeQueueTable中
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
if (MixAll.isLmq(topic)) {
lmqConsumeQueueNum.getAndIncrement();
}
logic = newLogic;
}
}
return logic;
}
再根据consumeoffset在consumequeue中获取到具体consumequeue的索引数据。
public boolean get(final long address, final CqExtUnit cqExtUnit) {
if (!isExtAddr(address)) {
return false;
}
final int mappedFileSize = this.mappedFileSize;
final long realOffset = unDecorate(address);
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0);
if (mappedFile == null) {
return false;
}
int pos = (int) (realOffset % mappedFileSize);
SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos);
if (bufferResult == null) {
log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset);
return false;
}
boolean ret = false;
try {
ret = cqExtUnit.read(bufferResult.getByteBuffer());
} finally {
bufferResult.release();
}
return ret;
}
4.总结
至此,我们已经大概了解消息在进入到broker过后做了什么。在生产者推送消息到broker过后,为了保证数据的能够快速的持久化,是直接按照到达顺序写入到commitLog中的,然后就会给主线程返回生产消息成功的通知。但是消费者需要根据topic和queueId获取到一条消息,并且需要根据消息的key检索一条消息。为了满足上述两个需求,rocketmq会启动一个线程,扫描commitLog,如果有新的消息写入,便会构建IndexFile和consumequeue两个文件,其实相当于两个索引文件。这一步骤在我们后面章节会详细介绍。
其实先持久化文件,然后启动线程对消息做其他处理,这一思想的本质就是为了增大吞吐量。在其他框架中也会应用到这种思想,比如elasticsearch中,在写消息的时候,会同时写入到transLog和memory buffer中后便会返回成功,后续单独启动线程根据memory buffer中的数据来进行其他操作,比如分词,建立倒排索引等,可以看出translog其实就类似于rokcetmq的commitLog。所以万变不离其中,只要有一份持久化数据过后,便可以跟客户端返回成功了,然后再单独的启动线程根据这份持久化数据做定制化处理。