1. 绪论
rocketmq之所以能够有如此大的吞吐量,离不开两个组件,一个是利用netty实现的高性能网络通信组件;另一个就是利用mmap技术实现的存储组件。而在rocketmq的存储组件中主要有三个组件,分别是持久化文件commitLog,让消费直接消费信息的文件consumerqueue,同时我们需要根据msgKey进行检索,所以还有一个indexFile。这三个文件是rocketmq存储组件的核心。本文将要介绍的是持久化组件commitLog中与磁盘交互的组件mappedFile。
2. mmap
在介绍commitLog之前,我们来看看它所使用的核心技术,mmap。
2.1 传统读写
我们来看看传统读操作有什么局限?如果是利用传统的读写操作从磁盘文件中读取一块数据的话,需要经过如下几步:
1.用户线程通过调用操作系统的read方法,发起读取磁盘文件的请求。
2.操作系统通过需要读取文件的inode信息,在页缓存中查询是否找到页内容,如果找到,便直接返回文件页内容。
3.如果未找到,便通过inode信息定位到磁盘文件地址,并且通过计算机的dma组件,将磁盘文件复制到页缓存中。
4.cpu再将页缓存数据返回给用户进程的缓冲区中。
通过上面的步骤看出,如果采用普通的读写操作,需要经过磁盘->内核空间页缓存->用户空间缓冲区->内核空间缓冲区->网卡这四个步骤。
2.2 零拷贝技术
2.2.1 物理地址和虚拟地址
1.什么是物理地址?
物理地址就是实际的物理内存的地址
2.什么是虚拟地址
在早期的计算机系统中,cpu其实操作的是物理地址,后来发现内存不够过后,便发明了swap技术,即将内存中的不经常访问的数据放到磁盘中去,这样内存空间可以存储更多的数据,给用户营造了一种内存很大的假象。而这些复杂的操作是操作系统完成的,而操作系统给用户提供的是一片连续的地址空间,这些地址就是虚拟地址。并且通过内存管理单元(mmu)实现虚拟地址和物理地址的转换。
2.2.2 mmap映射步骤
1.进程启动映射,并且在进程的虚拟地址空间中创建映射区域
其实就是进程会在用户空间中的一块专门的虚拟地址空间(直接内存中)划分一块区域用来存储映射磁盘文件的虚拟地址。
2.建立物理内存地址和虚拟内存地址的映射关系
即建立地址虚拟空间和磁盘文件的地址之间的映射关系
3.对映射空间进行访问,发生缺页异常,实现磁盘到主存的拷贝
在通过对虚拟地址空间进行读写时,会通过mmu转换成物理地址,操作系统发现对应的物理地址缺失,产生缺页异常,从而将磁盘文件读取到虚拟空间的这片内存中来。
2.2.3 mmap的优点
mmap其实就是零拷贝,通过上面传统操作的读写,可以看出从磁盘需要拷贝到内核空间中转后才能到用户空间。有没有办法减少中间这次拷贝呢?那就是利用mmap技术。
通过mmap技术可以将磁盘文件地址可以和进程虚拟地址空间中的一段虚拟地址映射,在对虚拟地址进行读写操作时,就相当于对磁盘文件进行读写操作。减少了两次用户缓冲区和内核空间页表复制的过程。
3.mappedFile
mappedfile是rocketmq真正将数据写入到磁盘的组件。接下来,我们看看mappedfile是如何存储数据的。
3.1.1 mappedFile的组成
public class DefaultMappedFile extends AbstractMappedFile {
//一个page页的大小为4kb,即mappedFile隔4kb便写入一个byte值实现文件预热,前面说过mmap技术是利用缺页中断将磁盘中的数据加载到内存中的,而一个内存页的大小为4kb
//如果不采用文件预热机制的话,1gb的commitLog需要发生26w次缺页中断,所以在初始化commitLog的时候,每隔4kb就写入一个假0,这样就一次性的将所以的文件页加载到了内存中。
public static final int OS_PAGE_SIZE = 1024 * 4;
public static final Unsafe UNSAFE = getUnsafe();
private static final Method IS_LOADED_METHOD;
public static final int UNSAFE_PAGE_SIZE = UNSAFE == null ? OS_PAGE_SIZE : UNSAFE.pageSize();
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//加载过的总的虚拟内存大小
protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
//加载过的总的虚拟内存数量
protected static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> WROTE_POSITION_UPDATER;
protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> COMMITTED_POSITION_UPDATER;
protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> FLUSHED_POSITION_UPDATER;
//当前虚拟buffer的写指针
protected volatile int wrotePosition;
//虚拟buffer的读指针
protected volatile int committedPosition;
//flush指针
protected volatile int flushedPosition;
//文件大小
protected int fileSize;
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
//在高并发场景,直接内存可能也抵挡不住,所以可以先将数据写入到堆内存中,然后再commiit到直接内存中,最后通过flush到磁盘中
protected TransientStorePool transientStorePool = null;
//文件名,mappedfile的文件名
protected String fileName;
//当前文件写入offset
protected long fileFromOffset;
protected File file;
//这个是对应的虚拟buffer
protected MappedByteBuffer mappedByteBuffer;
//存储的时间
protected volatile long storeTimestamp = 0;
protected boolean firstCreateInQueue = false;
//最后一次flush的时间
private long lastFlushTime = -1L;
protected MappedByteBuffer mappedByteBufferWaitToClean = null;
protected long swapMapTime = 0L;
protected long mappedByteBufferAccessCountSinceLastSwap = 0L;
}
3.3.2 mmapedFile的写入的两种模式-是否开启瞬时缓存技术
1.如果不开启瞬时缓存技术
在写入的时候,直接写入到直接内存中(MapedFileBuffer),然后flush到磁盘
2.如果开启瞬时缓存技术
如果开启瞬时缓存技术的话,数据会先写入bytebuffer中,然后commit到MappedBytebuffer,最后再flush到磁盘中去。其实commit和flush都是采用异步线程刷入来实现的,所以增加了吞吐量。
3. 瞬时缓存技术TransientStorePool
初始化:
public class TransientStorePool {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//有多少内存页
private final int poolSize;
private final int fileSize;
//核心:就是多个ByteBuffer队列
private final Deque<ByteBuffer> availableBuffers;
private volatile boolean isRealCommit = true;
}
借内存页
其实就是从内存队列中取出第一个buffer。
public ByteBuffer borrowBuffer() {
ByteBuffer buffer = availableBuffers.pollFirst();
if (availableBuffers.size() < poolSize * 0.4) {
log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
}
return buffer;
}
归还内存页
其实就是清空buffer,放入到内存队列中。
public void returnBuffer(ByteBuffer byteBuffer) {
byteBuffer.position(0);
byteBuffer.limit(fileSize);
this.availableBuffers.offerFirst(byteBuffer);
}
3.3.3 mappedFile的初始化
在介绍完上面mappedFile的两种写入方式过后,mappedFile的初始化就很清晰了。
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
//这个是开启瞬时存储技术的话,需要从transientStorePool获取到一个buffer
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
//构造文件
this.file = new File(fileName);
//可以看出,mappedfile的文件名就是他的offset
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
//确保文件夹初始化完成
UtilAll.ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//核心:就是将mappedByteBuffer映射到对于的磁盘文件上
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
至此,mappedfile中的直接内存缓冲区映射完成,堆内存buffer也初始化完成。
3.3.4 mappedFile的数据写入
分析这段逻辑其实我们可以发现,本质上是调用的AppendMessageCallback方法,写入数据封装到了MessageExt中,并且返回了PutMessageContext这一写入结果。
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
PutMessageContext putMessageContext) {
assert messageExt != null;
assert cb != null;
//获取当前的写指针
int currentPos = WROTE_POSITION_UPDATER.get(this);
//判断写指针大小是否超过了文件大小,如果超过便抛出异常
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = appendMessageBuffer().slice();
byteBuffer.position(currentPos);
AppendMessageResult result;
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {
// traditional batch message
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBrokerInner) {
// traditional single message or newly introduced inner-batch message
//本质上是调用的AppendMessageCallback的doAppend方法
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
//更新写指针
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
//更新时间戳
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
1. MappedFile的写入数据格式-MessageExt
MessageExt包含了消息的一些元信息和它的实际内容。
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
//消息属于哪个topic
private String topic;
private int flag;
//额外的附加属性
private Map<String, String> properties;
//真正的消息内容
private byte[] body;
//如果是事务消息,有事务id
private String transactionId;
}
public class MessageExt extends Message {
private static final long serialVersionUID = 5720810158625748049L;
//当前broker的名称
private String brokerName;
//消息属于哪个queueId
private int queueId;
//消息大小
private int storeSize;
//消息在queue中的偏移量是多少
private long queueOffset;
private int sysFlag;
//消息产生时间
private long bornTimestamp;
//消息是诞生主机的ip地址
private SocketAddress bornHost;
private long storeTimestamp;
//消息存储主机的ip地址
private SocketAddress storeHost;
//消息id
private String msgId;
//消息在commitLog中的offset是多少
private long commitLogOffset;
private int bodyCRC;
private int reconsumeTimes;
private long preparedTransactionOffset;
}
2.mappedFile的核心写入逻辑-AppendMessageCallback
其实就是在MessageExt的基础上,补充上写入commitLog的一些信息,并且刷新到buffer中
//byteBuffer - 消息写入到直接内存的buffer
//preEncodeBuffer - 消息内容
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
//取出消息的内容preEncodeBuffer里面
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
boolean isMultiDispatchMsg = messageStoreConfig.isEnableMultiDispatch() && CommitLog.isMultiDispatchMsg(msgInner);
if (isMultiDispatchMsg) {
AppendMessageResult appendMessageResult = handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
if (appendMessageResult != null) {
return appendMessageResult;
}
}
//获取消息的总长度
final int msgLen = preEncodeBuffer.getInt(0);
preEncodeBuffer.position(0);
preEncodeBuffer.limit(msgLen);
// PHY OFFSET
//当前文件的位置+消息长度 = 实际开始写入的位置
long wroteOffset = fileFromOffset + byteBuffer.position();
//构建消息id 机器ip+端口号+偏移量
Supplier<String> msgIdSupplier = () -> {
int sysflag = msgInner.getSysFlag();
int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
return UtilAll.bytes2string(msgIdBuffer.array());
};
// Record ConsumeQueue information
//记录consumerqueue中的偏移量
Long queueOffset = msgInner.getQueueOffset();
// this msg maybe an inner-batch msg.
short messageNum = getMessageNum(msgInner);
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the consume queue
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
//如果写入文件大小超过了磁盘大小,抛出END_OF_FILE的异常
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
msgIdSupplier, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
//更新queue中偏移量、物理地址、产生消息的时间戳、机器地址等,
int pos = 4 + 4 + 4 + 4 + 4;
// 6 QUEUEOFFSET
preEncodeBuffer.putLong(pos, queueOffset);
pos += 8;
// 7 PHYSICALOFFSET
preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
pos += 8 + 4 + 8 + ipLen;
// refresh store time stamp in lock
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
if (enabledAppendPropCRC) {
// 18 CRC32
int checkSize = msgLen - crc32ReservedLength;
ByteBuffer tmpBuffer = preEncodeBuffer.duplicate();
tmpBuffer.limit(tmpBuffer.position() + checkSize);
int crc32 = UtilAll.crc32(tmpBuffer);
tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength);
MessageDecoder.createCrc32(tmpBuffer, crc32);
}
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS");
// Write messages to the queue buffer
//写入文件到直接内存中
byteBuffer.put(preEncodeBuffer);
CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
msgInner.setEncodedBuff(null);
if (isMultiDispatchMsg) {
CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
}
//返回写入成功
return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
}
3.,mappedFile的写入结果-AppendMessageResult
主要是返回了写入的物理地址+消息的MsgId+consumerqueue中的逻辑地址。可以看出MsgId其实是在写入到commitLog后生成的,因为里面需要包含写入的物理地址。
public class AppendMessageResult {
// Return code
private AppendMessageStatus status;
// Where to start writing
private long wroteOffset;
// Write Bytes
private int wroteBytes;
// Message ID
private String msgId;
private Supplier<String> msgIdSupplier;
// Message storage timestamp
private long storeTimestamp;
// Consume queue's offset(step by one)
private long logicsOffset;
private long pagecacheRT = 0;
}
3.3.5 mappedFile的数据commit操作
其实就是将数据写入到bytebuffer中.
public int commit(final int commitLeastPages) {
//如果没有开启瞬时缓存技术,直接返回写指针
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return WROTE_POSITION_UPDATER.get(this);
}
//no need to commit data to file channel, so just set committedPosition to wrotePosition.
if (transientStorePool != null && !transientStorePool.isRealCommit()) {
COMMITTED_POSITION_UPDATER.set(this, WROTE_POSITION_UPDATER.get(this));
} else if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0();
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + COMMITTED_POSITION_UPDATER.get(this));
}
}
protected void commit0() {
int writePos = WROTE_POSITION_UPDATER.get(this);
int lastCommittedPosition = COMMITTED_POSITION_UPDATER.get(this);
if (writePos - lastCommittedPosition > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
//其实就是将当前数据写入到对内存buffer中
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
//更新commit指针
COMMITTED_POSITION_UPDATER.set(this, writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
3.3.6 mappedFile数据的flush操作
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
try {
this.mappedByteBufferAccessCountSinceLastSwap++;
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
//利用force方法,将缓存中数据刷入到磁盘中
this.mappedByteBuffer.force();
}
this.lastFlushTime = System.currentTimeMillis();
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
FLUSHED_POSITION_UPDATER.set(this, value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + FLUSHED_POSITION_UPDATER.get(this));
FLUSHED_POSITION_UPDATER.set(this, getReadPosition());
}
}
return this.getFlushedPosition();
}
3.3.7 mappedFile数据的内存预热
public void warmMappedFile(FlushDiskType type, int pages) {
this.mappedByteBufferAccessCountSinceLastSwap++;
long beginTime = System.currentTimeMillis();
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
long flush = 0;
// long time = System.currentTimeMillis();
//fileSize:总的mappedfile大小,即每隔4kb,便向mappedByteBuffer写入一个零,否者的话,会产生频繁的页中断,导致磁盘和数据频繁交互。
for (long i = 0, j = 0; i < this.fileSize; i += DefaultMappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put((int) i, (byte) 0);
// 如果是同步刷盘的话,便会进行缓存预热,因为同步刷盘可能一次数据量很小,造成频繁的os
//与buffer的交互
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
}
4.总结
mappedFile本质上是利用mapp技术来提高读写效率的,而mappedFile的核心本质上就是mappedFileBuffer,默认大小为1G,可以由mappedFileSizeCommitLog来进行控制。