RocketMQ 是一个典型的发布订阅系统,通过 Broker 节点中转和持久化数据、解耦上下游。Broker 是真实存储数据的节点,由多个水平部署但不一定完全对等的副本组构成,单个副本组的不同节点的数据会达到最终一致。RocketMQ 优异的性能表现,绕不开其优秀的存储模型 。
存储机制设计
在存储方式上,RocketMQ/Kafka/RabbitMQ 均采用的是消息刷盘至所部署虚拟机/物理机的文件系统做持久化。ActiveMQ(默认采用的 KahaDB 做消息存储)可选用 JDBC 做消息持久化,通过简单的 xml 配置信息即可实现 JDBC 消息存储。使用文件系统做持久化的情况下,可获得更高效的 I/O 读写。
-
Broker Store 目录结构
storePathRootDir=/cache1/rocketmq/broker/data
├── abort // 该文件在 Broker 启动后会自动创建,正常关闭 Broker,该文件会自动消失。若在没有启动 Broker 的情况下,发现这个文件是存在的,则说明之前 Broker 的关闭是非正常关闭
├── checkpoint // 其中存储着 commitlog、consumequeue、index 文件的最后刷盘时间戳
├── commitlog // 其中存放着 commitlog 文件,而消息是写在 commitlog 文件中的
│ ├── 00000000000000000000
│ ├── 00000000001073741824
│ └── 00000000002147483648
├── config // 存放着 Broker 运行期间的一些配置数据
│ ├── consumerFilter.json // 消费者的过滤器
│ ├── consumerFilter.json.bak
│ ├── consumerOffset.json // offsetTable记录消费进度偏移量
│ ├── consumerOffset.json.bak
│ ├── delayOffset.json
│ ├── delayOffset.json.bak
│ ├── subscriptionGroup.json // 消费者订阅关系
│ ├── subscriptionGroup.json.bak
│ ├── topics.json // topic配置
│ └── topics.json.bak
├── consumequeue // 其中存放着 consumequeue 文件,队列就存放在这个目录中
│ ├── TopicTest1
│ ├── 0
│ └── 00000000000000000000
│ └── 1
│ └── 00000000000000000000
│ └── TopicTest2
├── index // 消息索引文件 indexFile,加快消息查询速度
│ └── 20230902163452641 //文件名以创建时间戳命名
└── lock // 运行期间使用到的全局资源锁
CommitLog
RocketMQ Broker 单个实例下所有的 Topic 都使用同一个 CommitLog 来存储,即单个实例消息整体有序。CommitLog 单个文件大小默认 1G,文件文件名是起始偏移量,总共 20 位,左边补零,起始偏移量是 0。假设文件按照默认大小 1G 来算:
-
第一个文件的文件名为 00000000000000000000 ,当第一个文件被写满之后,开始写入第二个文件;
-
第二个文件的文件名为 00000000001073741824 ,1G=1073741824=1024*1024*1024;
-
第三个文件的文件名是 00000000002147483648,(文件名相差1G=1073741824=1024*1024*1024)。
CommitLog 按照上述命名的好处是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位这个文件的位置,然后用消息物理偏移量减去所在文件的名称,得到的差值就是在该文件中的绝对地址。
消息存储格式
-
MagicCode:MagicCode 是一个特殊的字段,它可以标志 Buffer 中的某个 CommitLog 是一个正常的CommitLog,还是因为 Buffer 没有多余的空间存放该 CommitLog,导致该 CommitLog 是一个空的 CommitLog。MagicCode 有两个值,如下所示:
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
-
BodyCRC:CRC 即循环冗余校验码,是数据通信领域中最常用的一种查错校验码,通过 CRC 就可以知道数据的正确性和完整性。RocketMQ 通过 CRC 来校验消息部分:
if (checkCRC) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
}
}
-
QueueId:消息发往哪个队列,QueueId 在 Producer 发送消息时会选择出来。
-
QueueOffset:存放了消息记录应该在 ConsumerQueue 中的位置,这样构建 ConsumerQueue 的时候,就知道该条记录在 ConsummerQueue 的位置顺序,在消费消息的时候很有用处。
-
PhysicalOffset:消息在 CommitLog 中的物理位置。需要注意的是,我们 CommitLog 对应着磁盘上的多个文件,这里的偏移量不是从某个文件开始算的,而是从第一个文件偏移开始算起的。
-
SysFlag:是 RocketMQ 内部使用的标记位,通过位运算进行标记。例如是否对消息进行了压缩、是否属于事务消息。SysFlag 初始值为 0,可与下面的标记进行位运算。
-
BornTimestamp:Producer 发送消息的时间。
-
BornHost:Producer 发送消息使用的套接字地址。
-
StoreTimestamp:消息在 Broker 上存储时间。
-
StoreHostAddress:Broker 的套接字地址,存储方式同 BornHost。
-
ReconsumeTimes:重复消费次数,初始为 0。Broker 重试的时候,这个 ReconsumeTimes 就会 +1,默认最大重试次数是 16 次。
-
PreparedTransactionOffset:事务消息相关的一个属性(RocketMQ 事务消息基于两阶段提交)。
-
Properties:存放了 RocketMQ 内部用到的一些属性,也存放了用户的一些属性。
顺序写
RocketMQ 的 Commitlog 文件、Consumequeue 文件都是顺序写入的。磁盘顺序写入速度可以达到几百兆/s,而随机写入速度只有几百 KB /s,相差上千倍。
PageCache 机制
Broker 在将消息顺序写入 CommitLog,大大提升性能。但还不够,毕竟仍是磁盘 I/O 操作,要想进一步提升性能,须利用内存。所以 Broker 将数据写入 CommitLog 文件时,不是直接写入物理磁盘文件,而是先进入 OS 的 PageCache 内存缓存,后续由 OS 后台线程异步将 PageCache 数据刷入底层磁盘文件。消费消息时,采用随机读的方式,由于 PageCache 局部性热点原理且整体情况下还是从旧到新的有序读,大部分 Case 消息还能直接从 Page Cache 读,不会产生太多缺页(Page Fault)中断而从磁盘读取。
-
异步刷盘若 Broker 将消息写入 PageCahe 并响应给生产者后突然宕机,此时消息在缓存中没有写入底层磁盘文件,就会造成消息丢失:生产者认为发送成功,实际上消息写入失败。
-
遇到 OS 进行脏页回写,内存回收,内存 Swap 等情况时,可能引起较大的消息读写延迟。
扩展: Java NIO 基于零拷贝的实现
mmap:
-
FileChannel#map():把文件对象映射到虚拟内存。
-
MappedByteBuffer/DirectByteBuffer.get(): 获取内存数据。
-
因其占用虚拟内存(非 JVM 的堆内存),不受 JVM -Xmx 参数限制,但其大小也受到 OS 虚拟内存大小限制。一般一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,这也是为何 RocketMQ 默认设置单 CommitLog 日志数据文件为 1G。
-
sendfile:
-
FileChannel.transferFrom()/transferTo():底层调用了sendfile()内核函数。
RocketMQ 选择 mmap 原因:
(1) sendfile 在用户态不可见,而当前场景下有读有写。
(2) 在 Linux 系统中对于 1G 的文件,mmap 处理的性能要优于 sendfile。
ConsumeQueue
ConsumeQueue 不负责存储消息,只负责记录它所属 Topic 的消息在 CommitLog 中的偏移量,这样当消费者从 Broker 拉取消息的时候,就可以快速根据偏移量定位到消息。
ConsumeQueue 存储的格式如下,包含起始物理位置偏移量
,消息长度
,消息Tag的哈希值
,总共 20B:
每个 ConsumeQueue 都有一个 QueueId,QueueId 的值为 0 到 TopicConfig 配置的队列数量。比如某个 Topic 的消费队列数量为 4,那么四个 ConsumeQueue 的 QueueId 就分别为 0、1、2、3。
消费者消费时会先从 ConsumeQueue 中查找消息在 CommitLog 中的 Offset,再去 CommitLog 中找原始消息数据。如果某个消息只在 CommitLog 中有数据而没在 ConsumeQueue 中则消费者无法进行消费。
ConsumeQueue 类对应的是每个topic和queuId下面的所有文件。默认存储路径是$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由 30w 条数据组成,单个文件的大小是 30w x 20Byte,即每个文件为 600w 字节,单个消费队列的文件大小约为 5.722M=(600w/(1024*1024))。
ConsumeQueue 构建流程:
IndexFile
Broker 除了通过 ConsumeQueue 提供给 Consumer 消费之外,还支持通过 MsgID 或者 MessageKey 来查询消息;使用 ID 查询时,因为 ID 就是用 broker+offset 生成的(这里 MsgID 指的是服务端的),所以很容易就找到对应的 CommitLog 文件来读取消息。对于用 MessageKey 来查询消息,MessageStore 通过构建一个 Index 来提高读取速度。IndexFile 结构如下图:
Checkpoint
Checkpoint 主要记录 CommitLog、ConsumeQueue、Index 文件的刷盘时间点,如果在上一次 Broker 异常结束时,会根据 StoreCheckpoint 的数据进行恢复。
火山引擎基于字节跳动内部的大规模实践,推出的消息队列产品包括消息队列 Kafka / RabbitMQ / RocketMQ 版及云原生消息引擎 BMQ,欢迎咨询了解。
来源团队|字节跳动 IBF 业财研发部 伍楼华