RocketMQ系统性学习-RocketMQ高级特性之消息存储在Broker的文件布局

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

RocketMQ 高级特性

消息在 Broker 的文件布局

RocketMQ 的混合存储

在 RocketMQ 存储架构中,采用混合存储,其中有 3 个重要的存储文件:Commitlog、ConsumeQueue、IndexFile

  • Topic 的消息实体存储在 Commitlog 中,顺序进行写入
  • ConsumeQueue 可以看作是基于 Topic 的 Commitlog 的索引文件,在 ConsumeQueue 中记录了消息在 Commitlog 中的偏移量、消息大小的信息,用于进行消费
  • IndexFile 提供了可以通过 key 来查询消息的功能,key 是由 topic + msgId 组成的,可以很方便地根据 key 查询具体的消息

消费者去 Broker 中消费数据流程如下:

  1. 先读取 ConsumeQueue,拿到具体消息在 Commitlog 中的偏移量
  2. 通过偏移量在 Commitlog 读取具体 Topic 的信息

消费者去寻找 Commitlog 中的数据流程图如下:

在这里插入图片描述

那么先来看一下 Commitlog 文件在哪里进行写入

SendMessageProcessor # processRequest 作为入口,

经过层层调用 this.sendMessage() -> this.brokerController.getMessageStore().putMessage(msgInner) -> DefaultMessageStore # asyncPutMessage ,最终到达 asyncPutMessage() 方法中,在这里会进行消息的磁盘写的操作:

  1. 创建消息存储所对应的 ByteBuffer:putMessageThreadLocal.getEncoder().encode(msg)

    在这个方法中,会对 Commitlog 文件进行写入:

    在这里插入图片描述

    这里的 byteBuffer 也就是 Commitlog 文件的结构如下:

    在这里插入图片描述

  2. 将创建的 ByteBuffer 设置到 msg 中去: msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer())

  3. 开始向文件中追加消息: result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)

appendMessage 方法中主要是写入消息之后,Commitlog 中一些数据会发生变化,因此需要进行修改,还是经过层层调用 appendMessage()-> appendMessagesInner()-> cb.doAppend(),最终到达 doAppend 方法,接下来看这个方法都做了些什么:

  1. 首先取出来在上边创建消息对应的 ByteBuffer:ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff()

  2. 接下来修改这个 ByteBuffer 中的一些数据:

    这个 ByteBuffer 在创建的时候已经将一些默认信息设置好了,这里只需要对写入消息后会变化的信息进行修改!

    • 先修改 QueueOffset (偏移量为 20 字节):preEncodeBuffer.putLong(20, queueOffset)
    • 再修改 PhysiclOffset (偏移量为 28 字节):preEncodeBuffer.putLong(28, fileFromOffset + byteBuffer.position())
    • 再修改 SysFlag、BornTimeStamp、BornHost 等等信息,都是通过偏移量在 ByteBuffer 中进行定位,再修改

那么通过上边就 完成了对 Commitlog 文件的追加操作 ,ReputMessageService 线程中的 run 方法,会每隔 1ms 就会去 Commitlog 中取出数据,写入到 ConsumeQueue 和 IndexFile 中

那么接下来寻找写 ConsumerQueue 的地方,也是通过调用链直接找到核心方法:

  • DefaultMessageStore # ReputMessageService # run
  • -> this.doReput()
  • -> DefaultMessageStore.this.doDispatch(dispatchRequest)
  • -> dispatcher.dispatch(req)
  • -> 这里进入到构建 ConsumeQueue 类的 dispatch 方法中:CommitLogDispatcherBuildConsumeQueue # dispatch()
  • -> DefaultMessageStore.this.putMessagePositionInfo(request)
  • -> this.consumeQueueStore.putMessagePositionInfoWrapper(dispatchRequest)
  • -> this.putMessagePositionInfoWrapper(cq, dispatchRequest)
  • -> consumeQueue.putMessagePositionInfoWrapper(request)
  • -> this.putMessagePositionInfo()

这个调用链比较长,如果不想一步一步点的话,直接找到 ConsumeQueue # this.putMessagePositionInfo() 这个方法即可,在这个方法中向 byteBufferIndex 中放了 3 个数据,就是 ConsumeQueue 的组成 = Offset + Size + TagsCode

在这里插入图片描述

那么 ConsumeQueue 的组成结构就如下所示,通过 ConsumeQueue 主要用于寻找 Topic 下的消息在 Commitlog 中的位置:

在这里插入图片描述

IndexFile 主要是通过 Key(Topic+msgId) 来寻找消息在 Commitlog 中的位置

接下来看一下 IndexFile 结构是怎样的,在上边寻找 ConsumeQueue 的调用链中,有一个 dispatcher.dispatch() 方法,这次我们进入到构建 IndexFile 的实现类的 dispatch 方法中,即:CommitLogDispatcherBuildIndex # dispatch(),那么接下来还是经过调用链到达核心方法:

  • CommitLogDispatcherBuildIndex # dispatch()
  • -> DefaultMessageStore.this.indexService.buildIndex(request)
  • -> indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()))
  • -> indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp())

那么核心方法就在 IndexFile # putKey() 中:

  1. 首先根据 key 计算出哈希值,key 也就是 Topic + 消息的 msgId

  2. 再通过哈希值对哈希槽的数量取模,计算出在哈希槽中的相对位置:slotPos = keyHash % this.hashSlotNum

  3. 计算 key 在 IndexFile 中的绝对位置,通过 哈希槽的位置 * 每个哈希槽的大小(4B) + IndexFile 头部的大小(40B)

    代码即:

    absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize

  4. 计算索引在 IndexFile 中的绝对位置,通过 absIndexPos = IndexFile 头部大小(40B) + 哈希槽位置 * 哈希槽大小(4B) + 消息的数量 * 消息索引的大小(20B)

    int absIndexPos =
        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
            + this.indexHeader.getIndexCount() * indexSize;
    
  5. 向 IndexFile 的第三部分(索引列表)中放入数据的索引,索引包含 4 部分,共 20B:keyHash、phyOffset、timeDiff、slotValue

    在这里插入图片描述

  6. 向 IndexFile 的第二部分(哈希槽)中放入数据

    在这里插入图片描述

IndexFile 的结构如下图所示:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/263286.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C语言】自定义类型:结构体深入解析(二)结构体内存对齐宏offsetof计算偏移量结构体传参

文章目录 📝前言🌠 结构体内存对齐🌉内存对齐包含结构体的计算🌠宏offsetof计算偏移量🌉为什么存在内存对⻬?🌠 结构体传参🚩总结 📝前言 本小节,我们学习结构的内存对…

Redis设计与实现之RDB

目录 一、RDB 1、保存 2、 SAVE 、BGSAVE 、AOF 写入和 BGREWRITEAOF SAVE BGSAVE 3、载入 4、RDB 文件结构 REDIS DB-DATA SELECT-DB KEY-VALUE-PAIRS EOF CHECK-SUM 二、 小结 一、RDB 在运行情况下,Redis 以数据结构的形式将数据维持在内存中&…

头歌—衍生密码体制

# 第1关:Rabin密码体制 题目描述 任务描述 Rabin密码体制是RSA密码体制的一种。 本关任务:使用Rabin密码体制对给定的明文进行加密。 相关知识 为了完成本关任务,你需要掌握:Rabin密码体制。 Rabin密码体制 在本关中&#x…

幻彩LED灯带芯片:SM16703SP单点单控 断点续传

幻彩LED灯带芯片SM16703SP3是一款单点单控断点续传的芯片,它采用了先进的技术,可以实现灯光的变化和控制。这款芯片不仅仅可以提供各种丰富多彩的灯光效果,还有断点续传功能, LED断点续传灯条采用了双信号线交叉传输的方案&#x…

【Spring Boot】面试题汇总,带答案的那种

继上次的文章【MySQL连环炮,你抗的住嘛?】爆火之后,越来越多的小伙伴后台留言,要求阿Q总结下其他的“连环炮”知识点,想在金九银十的面试黄金期轻松对线面试官。 同样为了节省大家的时间,阿Q最近对【Sprin…

链接未来:深入理解链表数据结构(二.c语言实现带头双向循环链表)

上篇文章简述讲解了链表的基本概念并且实现了无头单向不循环链表:链接未来:深入理解链表数据结构(一.c语言实现无头单向非循环链表)-CSDN博客 那今天接着给大家带来带头双向循环链表的实现: 文章目录 一.项目文件规划…

在线商城系统软件源码与报价_OctShop

随着互联网、5G、人工智能的快速发展,人们在家购物已经是生活的重要方式。各种在线商城系统的不断涌现,同时,也给传统的企业商家销售带来了不小的压力,那么,如何调整,以适应时代的发展呢?经过不…

【数据结构和算法】最大连续1的个数 III

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、题目描述 二、题解 2.1 方法一:滑动窗口 2.2 滑动窗口解题模板 三、代码 3.1 方法一:滑动窗口 四、…

世界第一!移动云刷新虚拟化性能测试世界纪录

近日,国际权威性能测评机构SPEC公布了最新一期虚拟化性能基准测试结果,移动云大云天元操作系统(BC-Linux),凭借其出色的虚拟化性能,一举将世界纪录提升了10%,总分达到了8336分。 移动云SPEC vir…

Mybatis-plus动态条件查询QueryWrapper的函数用法

目录 前言1. QueryWrapper2. 函数3. Demo 前言 原本都是在Mapper文件中修改,直到看到项目中使用了QueryWrapper这个函数,大致了解了用法以及功能,发现还可以! 对此此贴为科普帖以及笔记帖 1. QueryWrapper MyBatis-Plus 是 My…

Angular 进阶之五: Signals到底用不用?

Angular 在V16的时候推出了Signals,在17正式作为主打功能之一强烈推荐,看过了各种博主的各种科普文章也没说明白,到底这东西值不值得用?毕竟项目大了,重构代码也不是闹着玩儿的。各种科普文章主要在说两点:…

pake协议传输文件magic-wormhole

pake协议传输文件magic-wormhole 1 magic-wormhole简介其他介绍 2 安装magic-wormhole3 使用示范发送文件指定虫洞码长度 接收文件 1 magic-wormhole简介 16.7k star 强推,丝滑、简洁、安全的开源工具——magic-wormhole 项目地址:https://github.com/…

Android应用-flutter使用Positioned将控件定位到底部中间

文章目录 场景描述示例解释 场景描述 要将Positioned定位到屏幕底部中间的位置,你可以使用MediaQuery来获取屏幕的高度,然后设置Positioned的bottom属性和left或right属性,一般我们left和right都会设置一个值让控制置于合适的位置&#xff0…

Bert-vits2-2.3-Final,Bert-vits2最终版一键整合包(复刻生化危机艾达王)

近日,Bert-vits2发布了最新的版本2.3-final,意为最终版,修复了一些已知的bug,添加基于 WavLM 的 Discriminator(来源于 StyleTTS2),令人意外的是,因情感控制效果不佳,去除…

【大模型】快速体验百度智能云千帆AppBuilder搭建知识库与小助手

文章目录 前言千帆AppBuilder什么是千帆AppBuilderAppBuilder能做什么 体验千帆AppBuilderJava知识库高考作文小助手 总结 前言 前天,在【百度智能云智算大会】上,百度智能云千帆AppBuilder正式开放服务。这是一个AI原生应用开发工作台,可以…

计算机网络:应用层

0 本节主要内容 问题描述 解决思路 1 问题描述 不同的网络服务: DNS:用来把人们使用的机器名字(域名)转换为 IP 地址;DHCP:允许一台计算机加入网络和获取 IP 地址,而不用手工配置&#xff1…

【DWJ_1703225514】基于Sklearn航空公司服务质量分析

【Talk is cheap】 # 导入库 import warnings warnings.filterwarnings(ignore)import pandas as pd import seaborn as sns import matplotlib.pyplot as plt plt.rcParams[font.sans-serif] [SimHei] plt.rcParams[axes.unicode_minus] False %matplotlib inlinefrom skl…

华为科技:辉煌发展、问题应对与未来战略

导言 作为全球领先的科技公司之一,华为经历了辉煌的发展历程。本文将深入探讨华为科技的发展过程、遇到的问题及解决过程、未来的可用范围,以及在各国的应用和未来的研究趋势。同时,分析在哪些领域华为能够取胜,以及在哪些方面发力…

文献管理软件EndNote X9 mac功能介绍

EndNote X9 for Mac是一款文献管理软件,不仅可以让您免于手动收集和整理您的研究资料和格式化书目的繁琐工作,还可以让您在与同事协调时更加轻松自如。让你的团队专注科研,更高效的共享文献开展协作。 EndNote X9 for Mac功能介绍 引文报告 …

数据结构和算法-红黑树(定义 性质 查找 插入 删除)

文章目录 红黑树的定义和性质为什么要发明红黑树?红黑树怎么考总览红黑树的定义实例:一颗红黑树练习:是否符合红黑树的要求一种可能的出题思路补充概念:节点黑高 红黑树的性质 红黑树的查找红黑树的插入实例小结与黑高相关的理论 …