CommitLog ~ MappedFileQueue ~ MappedFile集合
正常情况下,RocketMQ支持消息体字节数最多为1个G。注意该消息体并不单单是消息体body。如果生产的消息其字节数超过1个G则该消息是无法被落盘处理的。因为没有一个MapperFile文件可以承载该消息所有的字节数。
1.AllocateMappedFileService
参考文章
异步初始化CommitLog文件时优先初始化nextFilePath、nextNextFilePath两个文件。
同时创建nextFilePath【/data/rocketmq/commitlog/00000000000000000000】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】两个文件是如何使用的呢?
- 优先返回nextFilePath,并添加到MappedFileQueue集合属性mappedFiles中。此时队列requestQueue为空。requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】。
- 如果消息体的长度没有达到当前MapperFile中字节缓冲区capacity的大小,则不会创建新的MapperFile文件。
- 如果步骤2不成立,则创建新的nextFilePath【/data/rocketmq/commitlog/00000000000000000050】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】对应的MapperFile文件。但是由于requestTable集合不为空即存在nextFilePath对应的MapperFile文件【/data/rocketmq/commitlog/00000000000000000050】则删除并返回当前集合元素。
- 此时requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】。MappedFileQueue中集合属性mappedFiles中存在00000000000000000000、00000000000000000050两个MappedFile文件。
如果真实发送的消息字节数没有超过当前字节缓冲区剩余空间则优先当前MapperFile文件处理。否则创建新的MapperFile文件。
public class AllocateMappedFileService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static int waitTimeOut = 1000 * 5;
private ConcurrentMap<String, AllocateRequest> requestTable = new ConcurrentHashMap<>();
private PriorityBlockingQueue<AllocateRequest> requestQueue = new PriorityBlockingQueue<>();
private volatile boolean hasException = false;
private DefaultMessageStore messageStore;
public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
// nextFilePath:CommitLog文件路径 /data/rocketmq/commitlog/00000000000000000000
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
...
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
...
boolean offerOK = this.requestQueue.offer(nextReq);
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
...
boolean offerOK = this.requestQueue.offer(nextNextReq);
...
// 每次只是返回nextFilePath对应的MappedFile。此时 requestQueue 队列为空,requestTable集合中只是存在 nextNextFilePath 对应的MappedFile文件
// 如果
AllocateRequest result = this.requestTable.get(nextFilePath);
messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS");
// 阻塞等待 线程AllocateMappedFileService 初始化MapperFile文件。默认时间为5秒
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
messageStore.getPerfCounter().endTick("WAIT_MAPFILE_TIME_MS");
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
// 返回nextFilePath对应的MappedFile文件,并添加到MappedFileQueue中集合属性mappedFiles中
return result.getMappedFile();
}
}
...
public void run() {// 初始化 MapperFile文件 任务
while (!this.isStopped() && this.mmapOperation()) {
}
}
/**
* Only interrupted by the external thread, will return false
*/
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();//移除并返回元素,否则阻塞等待
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
...
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
//创建对应对应大小、对应磁盘地址的本地文件。并且建立磁盘 & 内核映射关系
MappedFile mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());;
...
// pre write mappedFile 预热处理
if (mappedFile.getFileSize() >= mappedFileSizeCommitLog && warmMapedFileEnable) {
FlushDiskType flushDiskType = this.messageStore.getMessageStoreConfig().getFlushDiskType();
MessageStoreConfig messageStoreConfig = this.messageStore.getMessageStoreConfig();
int flushLeastPagesWhenWarmMapedFile = messageStoreConfig.getFlushLeastPagesWhenWarmMapedFile();
mappedFile.warmMappedFile(flushDiskType,flushLeastPagesWhenWarmMapedFile);
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();//初始化完毕释放锁
}
return true;
}
static class AllocateRequest implements Comparable<AllocateRequest> {
// Full file path
private String filePath;
private int fileSize;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile MappedFile mappedFile = null;
public AllocateRequest(String filePath, int fileSize) {
this.filePath = filePath;
this.fileSize = fileSize;
}
...
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public MappedFile getMappedFile() {
return mappedFile;
}
public void setMappedFile(MappedFile mappedFile) {
this.mappedFile = mappedFile;
}
}
}
2.DefaultMappedFile
public class DefaultMappedFile extends AbstractMappedFile {
public DefaultMappedFile(final String fileName, final int mappedFileSizeCommitLog) throws IOException {
init(fileName, mappedFileSizeCommitLog);
}
private void init(final String fileName, final int mappedFileSizeCommitLog) throws IOException {
this.fileName = fileName;
this.mappedFileSizeCommitLog = mappedFileSizeCommitLog;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
UtilAll.ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, mappedFileSizeCommitLog);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(mappedFileSizeCommitLog);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
}finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
}