RocketMQ 存储机制浅析

RocketMQ 是一个典型的发布订阅系统,通过 Broker 节点中转和持久化数据、解耦上下游。Broker 是真实存储数据的节点,由多个水平部署但不一定完全对等的副本组构成,单个副本组的不同节点的数据会达到最终一致。RocketMQ 优异的性能表现,绕不开其优秀的存储模型 。

存储机制设计

在存储方式上,RocketMQ/Kafka/RabbitMQ 均采用的是消息刷盘至所部署虚拟机/物理机的文件系统做持久化。ActiveMQ(默认采用的 KahaDB 做消息存储)可选用 JDBC 做消息持久化,通过简单的 xml 配置信息即可实现 JDBC 消息存储。使用文件系统做持久化的情况下,可获得更高效的 I/O 读写。

  • Broker Store 目录结构

storePathRootDir=/cache1/rocketmq/broker/data
├── abort // 该文件在 Broker 启动后会自动创建,正常关闭 Broker,该文件会自动消失。若在没有启动 Broker 的情况下,发现这个文件是存在的,则说明之前 Broker 的关闭是非正常关闭
├── checkpoint // 其中存储着 commitlog、consumequeue、index 文件的最后刷盘时间戳
├── commitlog // 其中存放着 commitlog 文件,而消息是写在 commitlog 文件中的
│   ├── 00000000000000000000
│   ├── 00000000001073741824
│   └── 00000000002147483648
├── config // 存放着 Broker 运行期间的一些配置数据
│   ├── consumerFilter.json // 消费者的过滤器
│   ├── consumerFilter.json.bak
│   ├── consumerOffset.json // offsetTable记录消费进度偏移量
│   ├── consumerOffset.json.bak
│   ├── delayOffset.json
│   ├── delayOffset.json.bak
│   ├── subscriptionGroup.json // 消费者订阅关系
│   ├── subscriptionGroup.json.bak
│   ├── topics.json // topic配置
│   └── topics.json.bak
├── consumequeue // 其中存放着 consumequeue 文件,队列就存放在这个目录中
│   ├── TopicTest1
│      ├── 0
│          └── 00000000000000000000  
│      └── 1
│          └── 00000000000000000000    
│   └── TopicTest2
├── index // 消息索引文件 indexFile,加快消息查询速度
│   └── 20230902163452641 //文件名以创建时间戳命名
└── lock // 运行期间使用到的全局资源锁

CommitLog

RocketMQ Broker 单个实例下所有的 Topic 都使用同一个 CommitLog 来存储,即单个实例消息整体有序。CommitLog 单个文件大小默认 1G,文件文件名是起始偏移量,总共 20 位,左边补零,起始偏移量是 0。假设文件按照默认大小 1G 来算:

  • 第一个文件的文件名为 00000000000000000000 ,当第一个文件被写满之后,开始写入第二个文件;

  • 第二个文件的文件名为 00000000001073741824 ,1G=1073741824=1024*1024*1024;

  • 第三个文件的文件名是 00000000002147483648,(文件名相差1G=1073741824=1024*1024*1024)。

CommitLog 按照上述命名的好处是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位这个文件的位置,然后用消息物理偏移量减去所在文件的名称,得到的差值就是在该文件中的绝对地址。

消息存储格式

  • MagicCode:MagicCode 是一个特殊的字段,它可以标志 Buffer 中的某个 CommitLog 是一个正常的CommitLog,还是因为 Buffer 没有多余的空间存放该 CommitLog,导致该 CommitLog 是一个空的 CommitLog。MagicCode 有两个值,如下所示:

// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
  • BodyCRC:CRC 即循环冗余校验码,是数据通信领域中最常用的一种查错校验码,通过 CRC 就可以知道数据的正确性和完整性。RocketMQ 通过 CRC 来校验消息部分:

if (checkCRC) {
    int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
    if (crc != bodyCRC) {
       log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
       return new DispatchRequest(-1, false/* success */);
    }
}
  • QueueId:消息发往哪个队列,QueueId 在 Producer 发送消息时会选择出来。

  • QueueOffset:存放了消息记录应该在 ConsumerQueue 中的位置,这样构建 ConsumerQueue 的时候,就知道该条记录在 ConsummerQueue 的位置顺序,在消费消息的时候很有用处。

  • PhysicalOffset:消息在 CommitLog 中的物理位置。需要注意的是,我们 CommitLog 对应着磁盘上的多个文件,这里的偏移量不是从某个文件开始算的,而是从第一个文件偏移开始算起的。

  • SysFlag:是 RocketMQ 内部使用的标记位,通过位运算进行标记。例如是否对消息进行了压缩、是否属于事务消息。SysFlag 初始值为 0,可与下面的标记进行位运算。

  • BornTimestamp:Producer 发送消息的时间。

  • BornHost:Producer 发送消息使用的套接字地址。

  • StoreTimestamp:消息在 Broker 上存储时间。

  • StoreHostAddress:Broker 的套接字地址,存储方式同 BornHost。

  • ReconsumeTimes:重复消费次数,初始为 0。Broker 重试的时候,这个 ReconsumeTimes 就会 +1,默认最大重试次数是 16 次。

  • PreparedTransactionOffset:事务消息相关的一个属性(RocketMQ 事务消息基于两阶段提交)。

  • Properties:存放了 RocketMQ 内部用到的一些属性,也存放了用户的一些属性。

顺序写

RocketMQ 的 Commitlog 文件、Consumequeue 文件都是顺序写入的。磁盘顺序写入速度可以达到几百兆/s,而随机写入速度只有几百 KB /s,相差上千倍。

PageCache 机制

Broker 在将消息顺序写入 CommitLog,大大提升性能。但还不够,毕竟仍是磁盘 I/O 操作,要想进一步提升性能,须利用内存。所以 Broker 将数据写入 CommitLog 文件时,不是直接写入物理磁盘文件,而是先进入 OS 的 PageCache 内存缓存,后续由 OS 后台线程异步将 PageCache 数据刷入底层磁盘文件。消费消息时,采用随机读的方式,由于 PageCache 局部性热点原理且整体情况下还是从旧到新的有序读,大部分 Case 消息还能直接从 Page Cache 读,不会产生太多缺页(Page Fault)中断而从磁盘读取。

  • 异步刷盘若 Broker 将消息写入 PageCahe 并响应给生产者后突然宕机,此时消息在缓存中没有写入底层磁盘文件,就会造成消息丢失:生产者认为发送成功,实际上消息写入失败。

  • 遇到 OS 进行脏页回写,内存回收,内存 Swap 等情况时,可能引起较大的消息读写延迟。

扩展: Java NIO 基于零拷贝的实现

mmap:

  • FileChannel#map():把文件对象映射到虚拟内存。

  • MappedByteBuffer/DirectByteBuffer.get(): 获取内存数据。

    • 因其占用虚拟内存(非 JVM 的堆内存),不受 JVM -Xmx 参数限制,但其大小也受到 OS 虚拟内存大小限制。一般一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,这也是为何 RocketMQ 默认设置单 CommitLog 日志数据文件为 1G。

sendfile:

  • FileChannel.transferFrom()/transferTo():底层调用了sendfile()内核函数。

RocketMQ 选择 mmap 原因:

(1) sendfile 在用户态不可见,而当前场景下有读有写。

(2) 在 Linux 系统中对于 1G 的文件,mmap 处理的性能要优于 sendfile。

ConsumeQueue

ConsumeQueue 不负责存储消息,只负责记录它所属 Topic 的消息在 CommitLog 中的偏移量,这样当消费者从 Broker 拉取消息的时候,就可以快速根据偏移量定位到消息。

ConsumeQueue 存储的格式如下,包含起始物理位置偏移量消息长度消息Tag的哈希值,总共 20B:

每个 ConsumeQueue 都有一个 QueueId,QueueId 的值为 0 到 TopicConfig 配置的队列数量。比如某个 Topic 的消费队列数量为 4,那么四个 ConsumeQueue 的 QueueId 就分别为 0、1、2、3。

消费者消费时会先从 ConsumeQueue 中查找消息在 CommitLog 中的 Offset,再去 CommitLog 中找原始消息数据。如果某个消息只在 CommitLog 中有数据而没在 ConsumeQueue 中则消费者无法进行消费。

ConsumeQueue 类对应的是每个topic和queuId下面的所有文件。默认存储路径是$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由 30w 条数据组成,单个文件的大小是 30w x 20Byte,即每个文件为 600w 字节,单个消费队列的文件大小约为 5.722M=(600w/(1024*1024))。

ConsumeQueue 构建流程:

IndexFile

Broker 除了通过 ConsumeQueue 提供给 Consumer 消费之外,还支持通过 MsgID 或者 MessageKey 来查询消息;使用 ID 查询时,因为 ID 就是用 broker+offset 生成的(这里 MsgID 指的是服务端的),所以很容易就找到对应的 CommitLog 文件来读取消息。对于用 MessageKey 来查询消息,MessageStore 通过构建一个 Index 来提高读取速度。IndexFile 结构如下图:

Checkpoint

Checkpoint 主要记录 CommitLog、ConsumeQueue、Index 文件的刷盘时间点,如果在上一次 Broker 异常结束时,会根据 StoreCheckpoint 的数据进行恢复。


火山引擎基于字节跳动内部的大规模实践,推出的消息队列产品包括消息队列 Kafka / RabbitMQ / RocketMQ 版云原生消息引擎 BMQ,欢迎咨询了解。

来源团队|字节跳动 IBF 业财研发部 伍楼华

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/554539.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

UE4_动画基础_根运动Root Motion

学习笔记,仅供参考! 在游戏动画中,角色的碰撞胶囊体(或其他形状)通常由控制器驱动通过场景。然后来自该胶囊体的数据用于驱动动画。例如,如果胶囊体在向前移动,系统就会知道在角色上播放一个跑步…

华为“天才少年”4万字演讲:现在的AI技术要么无趣,要么无用

近期,一篇4万字的演讲风靡于国内人工智能(AI)学术圈。 原华为“天才少年”、Logenic AI公司联合创始人李博杰博士,日前发表了一篇关于AI Agent思考的文章,题为“AI Agent 应该更有趣还是更有用”。 李博杰在这篇文章…

存储过程的创建和调用及删除

目录 存储过程 存储过程的创建 存储过程的调用及删除 在 SQL Plus 中调用存储过程 在 PL/SQL 块中调用存储过程 存储过程的删除 Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 存储过程 存储过程是一种命名的 PL/S…

使用Mybatisforeach循环添加字段和值失败问题记录

问题描述: 由于数据表字段非常多,使用foreach循环,key为数据库字段,value为要添加的值.字段中含有小数点的无法正常添加数据 问题展示: 断点展示有值 日志展示获取不到值 sql如下: <insert id"dataMergeInsert" parameterType"java.util.List">IN…

gemini国内怎么用

gemini国内怎么用 Google Gemini 作为一个尚处于研发阶段的大型语言模型&#xff0c;其具体功能和性能尚未公开&#xff0c;因此无法对其好用程度做出明确评价。 然而&#xff0c;基于 Google 在人工智能领域的领先地位和技术实力&#xff0c;我们可以对其潜力进行一些推测&a…

科技云报道:“老三样”不管用了,网络安全要靠啥?

科技云报道原创。 从安全的视角看&#xff0c;网络空间充斥着病毒、黑客、漏洞。在过去&#xff0c;企业习惯用“老三样”——防火墙、IDS、杀毒软件来搞定安全。 如果将网络空间比喻成一个大厦&#xff0c;那么防火墙相当于门锁&#xff0c;用于隔离内外网或不同安全域&…

【MySQL 数据宝典】【内存结构】- 004 自适应哈希索引

自适应哈希索引 https://developer.aliyun.com/article/1230086 什么是自适应哈希索引&#xff1f; 自适应哈希索引是MySQL InnoDB存储引擎中的一种索引结构&#xff0c;用于加速查询。它根据查询模式和数据分布动态地调整自身的大小&#xff0c;以提高性能。 上图就是通过…

hive使用sqoop与oracle传输数据

下载地址 http://archive.apache.org/dist/sqoop 两个版本sqoop1&#xff08;1.4.x&#xff09;和sqoop2&#xff08;1.99.x&#xff09;&#xff0c;两种不同的架构。 本文使用sqoop1。 sqoop是apache旗下一款“hadoop与关系数据库之间传送数据”的工具。 导入数据&#xf…

MES管理系统生产物料管理流程设计的注意事项

随着现代制造业的迅猛发展&#xff0c;MES管理系统在生产物料管理中所扮演的角色愈发重要。一个高效、精准的MES管理系统能够显著提升物料管理的效率&#xff0c;确保生产流程的顺畅进行。然而&#xff0c;在设计生产物料管理流程时&#xff0c;我们需要注意一系列关键问题&…

预选小标题

海岛,广东小岛 汕头 南澳岛(推荐) 距离东凤镇 500公里 珠海 需要坐船出海, 到达珠海码头 75公里 东澳岛, 桂山岛 海岛基本都差不多 阳江市海陵岛 可以赶海 距离东凤镇 240公里 江门 川岛, 下川岛 可下海 距离东凤镇 150公里 山林丹霞地貌 韶关 丹霞景区, 韶石景区,巴寨景…

【AI】DeepStream(01)介绍

1、简介 DeepStream 本质是 GStreamer 的插件,基于GStreamer的管道,实现高效的视频流分析。 DeepStream 将来自 USB/CSI 摄像头的流数据、来自文件的视频或通过 RTSP 的流作为输入,并使用人工智能和计算机视觉从像素中生成AI结果。 DeepStream SDK 可以成为许多视频分析解…

【MySQL 数据宝典】【内存结构】- 001 BufferPool

一、 BufferPool BufferPool 官方文档地址 1.1 什么是 Buffer Pool Buffer Pool 概述&#xff1a; Buffer Pool 是 InnoDB 存储引擎用于缓存磁盘中页的内存区域&#xff0c;它的大小直接影响数据库的性能。 默认大小和调整&#xff1a; 默认情况下&#xff0c;Buffer Pool…

[spring] rest api security

[spring] rest api security 之前的 rest api CRUD 都没有实现验证&#xff08;authentication&#xff09;和授权&#xff08;Authorization&#xff09;&#xff0c;这里使用 Spring security 进行补全 spring security 是一个非常灵活、可延伸的实现方式&#xff0c;比较简…

初识LangChain的快速入门指南

LangChain 概述 LangChain是一个基于大语言模型用于构建端到端语言模型应用的框架&#xff0c;它提供了一系列工具、套件和接口&#xff0c;让开发者使用语言模型来实现各种复杂的任务&#xff0c;如文本到图像的生成、文档问答、聊天机器人等。 LangChain简化了LLM应用程序生…

PostCSS概述和应用

文章目录 PostCSS概述**核心特性与工作原理&#xff1a;****应用场景与优势&#xff1a;****社区与生态&#xff1a;** PostCSS应用实例 PostCSS概述 PostCSS 是一款开源的、用 JavaScript 编写的 CSS 处理工具&#xff0c;其核心设计理念是利用 JavaScript 的强大编程能力和丰…

51-40 Align your Latents,基于LDM的高分辨率视频生成

由于数据工程、仿真测试工程&#xff0c;咱们不得不进入AIGC图片视频生成领域。兜兜转转&#xff0c;这一篇与智驾场景特别密切。23年4月&#xff0c;英伟达Nvidia联合几所大学发布了带文本条件融合、时空注意力的Video Latent Diffusion Models。提出一种基于LDM的高分辨率视…

synchronized的优化策略^o^

synchronized 特点&#xff1a; 开始是乐观锁&#xff0c;如果锁冲突&#xff0c;就转换为悲观锁开始是轻量级锁&#xff0c;如果锁被持有的时间较长&#xff0c;就转换为重量级锁实现轻量级锁的时候大概率用到的是自旋锁策略是一种不公平锁是一种可重入锁不是读写锁 synchro…

【考研数学】《1800》《660》《880》如何选择及搭配?看这一篇!

可以刷880&#xff01;但一定要把心态稳住&#xff01;&#xff01;&#xff01; 我考研的时候刷880前几章还可以&#xff0c;越往后越刷不动 因为很多人在备考前两轮的后期听课和刷题都不如前几章细心...越往后知识点掌握的越来越不熟练&#xff0c;所以也建议大家在前几轮复…

Kafka复习

消息中间件的作用: 异步处理: 与并行相比,虽然减少了时间,但是还是得等待其他线程执行完,但是消息中间件对于简单的业务处理,还要引入一个中间件也比较复杂如果我投递了简历之后需要发送成功邮件以及短信,就可以交给消息中间件就像数据库、redis数据一致性,需要用到延迟…

VScode使用记录

代码颜色是白色 发现没有根据对应的文本类型显示颜色 解决方法&#xff1a; 效果&#xff1a;