深度解析RocketMq源码-IndexFile

1.绪论

在工作中,我们经常需要根据msgKey查询到某条日志。但是,通过前面对commitLog分析,producer将消息推送到broker过后,其实broker是直接消息到达broker的先后顺序写入到commitLog中的。我们如果想根据msgKey检索一条消息无疑大海捞针,所以们需要像数集一样建立一个目录,我们其实可以想到的是构建一个Map,key存储msgKey,value存储msg在commitLog中的物理偏移量。而这个目录其实就是indexFile。

2.indexFile的组成和原理

indexFile主要由两部分组成,分别是indexFile文件头和index的文件内容。

2.1 indexFile文件头 - IndexHeader 

indexHeader占据40个字节,其中最重要的是他记录了整个索引文件的最开始插入的索引的时间和最后一条数据插入的时间,主要是为了支持根据时间进行范围搜索。以及第一条和最后一条日志的索引位置。还有一个就是已经插入了多少条索引IndexCount。

public class IndexHeader {
//index文件头占4个字节
    public static final int INDEX_HEADER_SIZE = 40;
    private static int beginTimestampIndex = 0;
    private static int endTimestampIndex = 8;
    private static int beginPhyoffsetIndex = 16;
    private static int endPhyoffsetIndex = 24;
    private static int hashSlotcountIndex = 32;
    private static int indexCountIndex = 36;
    private final ByteBuffer byteBuffer;
    //开始的时间戳
    private final AtomicLong beginTimestamp = new AtomicLong(0);
    //结束时间戳
    private final AtomicLong endTimestamp = new AtomicLong(0);
    //开始的物理偏移量
    private final AtomicLong beginPhyOffset = new AtomicLong(0);
    //结束的物理偏移量
    private final AtomicLong endPhyOffset = new AtomicLong(0);
    //hash槽的数量
    private final AtomicInteger hashSlotCount = new AtomicInteger(0);
    //index的数量
    private final AtomicInteger indexCount = new AtomicInteger(1);
}

2.2 indexFile的组成

idnexFile的内容包括:

1. 40个字节的indexFile头

2. 4* 500w个字节hash槽,每个槽记录的其实是:根据key取hash值%槽数在当前hash槽的索引的序号(也即当前有多少条索引)

3. 20*2000w个自己的索引数,每条索引20个字节,包含4个字节索引key的hash值+8个字节的物理偏移量+4个字节的当前索引的插入时间距离该索引文件第一条索引的插入时间的差值+4个字节的上一个在当前hash槽的索引的序号。

我们可以画图来描述一下:

e5b98bd4baf04c1c9393127f0a93ebca.png

可以看出idnexFile是采用链地址法解决hash冲突的,每个索引存储有上一条拥有相同hash值索引的index值,相当于通过链表将这些hash冲突的索引串起来。

public class IndexFile {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    //一个hash槽的大小为4个字节
    private static int hashSlotSize = 4;
    //一条索引的大小为20字节
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    //hash槽的数量
    private final int hashSlotNum;
    //index的总数量
    private final int indexNum;
    //index也是存储在mappedFile中的
    private final MappedFile mappedFile;
    private final MappedByteBuffer mappedByteBuffer;
    //index文件的头
    private final IndexHeader indexHeader;

    public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
        final long endPhyOffset, final long endTimestamp) throws IOException {
        //文件总大小 = 头部所占40个字节 + hash槽数量(默认为500w) * 4个字节 + index数量 * 20个字节
        int fileTotalSize =
            IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
        //新建mappedFile
        this.mappedFile = new MappedFile(fileName, fileTotalSize);
        //获取到与文件建立映射关系的buffer
        this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
        //hash槽数量
        this.hashSlotNum = hashSlotNum;
        //索引文件的数量
        this.indexNum = indexNum;

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        //index文件的头部
        this.indexHeader = new IndexHeader(byteBuffer);

        if (endPhyOffset > 0) {
            //够级整个索引文件的开始的物理偏移量和结束的偏移量
            this.indexHeader.setBeginPhyOffset(endPhyOffset);
            this.indexHeader.setEndPhyOffset(endPhyOffset);
        }

        if (endTimestamp > 0) {
            //够级整个索引文件的开始时间戳和结束时间戳
            this.indexHeader.setBeginTimestamp(endTimestamp);
            this.indexHeader.setEndTimestamp(endTimestamp);
        }
    }
}

3.向indexFile插入一条索引数据

主要的步骤如下:

1.获取msgKey的hash值;

2.通过hash值对总的hash槽数取模得到对应第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量;

5.分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号(也即当前hash槽中存储的值);

6.将最新一条的索引序号写入到hash槽中。

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            //1.获取msgKey的hash值
            int keyHash = indexKeyHashMethod(key);
            //2.通过hash值对总的hash槽数取模得到对应第几个槽
            int slotPos = keyHash % this.hashSlotNum;
            //3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            try {
                //获取到上一个hash槽的所指向的索引序号
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                //获取当前索引与第一条索引的差值
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                //40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                //分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                //将最新一条的索引序号写入到hash槽中
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                //更新idnex中的最后一条索引的时间戳和物理偏移量
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                
                if (invalidIndex == slotValue) {
                    this.indexHeader.incHashSlotCount();
                }   
                //增加indexheader索引序号
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

4.从indexFile中读取一条索引数据

1.获取索引key的hash值;

2.hash值对槽总数取模获得第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.从槽中读取到该槽所指向的最新的一条索引序号;

5.40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量;

6.如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环;

7.如果不匹配,便根据链表寻找到拥有相同hash值并且时间匹配的日志;

    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
                                final long begin, final long end) {
        if (this.mappedFile.hold()) {
            //获取索引key的hash值
            int keyHash = indexKeyHashMethod(key);
            //hash值对槽总数取模获得第几个槽
            int slotPos = keyHash % this.hashSlotNum;
            //40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            try {
                //从槽中读取到该槽所指向的最新的一条索引序号
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }
                       // 40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;
                        //获取索引的hash值
                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        //获取到该索引的物理偏移量
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
                        //获取到时间戳差值
                        long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        //获取到拥有相同槽数的上一条索引序号
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
                        //如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }
                        //如果上一条索引非法,证明已经到达链表头部,跳出循环,证明该条索引就是需要寻找的索引
                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                this.mappedFile.release();
            }
        }
    }

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/751538.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何理解AKM?

关于Wi-Fi的加密认证过程&#xff0c;我们前面已经讲解&#xff1a;WLAN数据加密机制_tls加密wifi-CSDN博客 今天我们来理解下AKM&#xff0c;AKM&#xff08;Authentication and Key Management&#xff09;在Wi-Fi安全中是指认证和密钥管理协议。它是用于确定Wi-Fi网络中的认…

Linux学习第54天:Linux WIFI 驱动:蓝星互联

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 数字化、现代化的今天&#xff0c;随处的WIFI给与了大众极大的方便&#xff0c;也感受到了科技的力量。万物互联、无线互联越来越成为一个不可逆转的趋势。现在比较火…

ISP IC/FPGA设计-第一部分-SC130GS摄像头分析(0)

1.介绍 SC130GS是一款国产的Global shutter CMOS图像传感器&#xff0c;最高支持1280Hx1024V240fps的传输速率&#xff1b;SC130GS有黑白和彩色款&#xff0c;作为ISP开发选择彩色的&#xff0c;有效像素窗口为1288Hx1032V&#xff0c;支持复杂的片上操作&#xff0c;选择他理…

谈谈WebComponents | 前端开发

一、 源起 让我们以一个例子开始。 假设我们要做一个环形进度条&#xff0c;它可以&#xff1a; 1、根据进度数值的不同&#xff0c;计算出百分比&#xff0c;以渲染对应的角度值。 2、根据设置的进度不同&#xff0c;我们用不同的颜色加以区分。 3、在环的中间我们以动画递增的…

基于RabbitMQ的异步消息传递:发送与消费

引言 RabbitMQ是一个流行的开源消息代理&#xff0c;用于在分布式系统中实现异步消息传递。它基于Erlang语言编写&#xff0c;具有高可用性和可伸缩性。在本文中&#xff0c;我们将探讨如何在Python中使用RabbitMQ进行消息发送和消费。 安装RabbitMQ 在 Ubuntu 上安装 Rabbi…

wps的domain转为shp矢量

wps的namelist制作、python出图和转矢量 简介 wps&#xff08;WRF Preprocessing System&#xff09;是中尺度数值天气预报系统WRF(Weather Research and Forecasting)的预处理系统。 wps的安装地址在GitHub上&#xff1a;https://github.com/wrf-model/WPS 下载完成后&…

注册中心不知选哪个?Zookeeper、Eureka、Nacos、Consul和Etcd 5种全方位剖析对比

本文给大家讲解 5 种常用的注册中心&#xff0c;对比其流程和原理&#xff0c;无论是面试还是技术选型&#xff0c;都非常有帮助。 对于注册中心&#xff0c;在写这篇文章前&#xff0c;我其实只对 ETCD 有比较深入的了解&#xff0c;但是对于 Zookeeper 和其他的注册中心了解甚…

pytorch统计学分布

1、pytorch统计学函数 import torcha torch.rand(2,2) print(a) print(torch.sum(a, dim0)) print(torch.mean(a, dim0)) print(torch.prod(a, dim0))print(torch.argmax(a, dim0)) print(torch.argmin(a, dim0)) print(torch.std(a)) print(torch.var(a)) print(torch.median…

AI进阶指南第四课,大模型优缺点研究?

在上一篇文章中&#xff0c;我主要探讨了LM模型与企业级模型的融合。 但是&#xff0c;在文末对于具体的大模型优缺点只是简单地说明了一下&#xff0c;并不细致。 因此&#xff0c;在这一节&#xff0c;我将更为细致地说明一下大模型的优缺点。 一&#xff0c;隐私安全 将L…

Python输入与输出基础

Python输入与输出基础 引言 Python是一种非常直观且功能强大的编程语言&#xff0c;它允许用户轻松地处理输入和输出操作。无论是从用户那里获取数据&#xff0c;还是将结果展示给用户&#xff0c;Python都提供了简单易用的函数和方法。 一、输入数据 在Python中&#xff0c…

UWB:DS-TWR( Double-sided two-way ranging)双边测距公式推导:为啥是乘法?

UWB DS-TWR&#xff08; Double-sided two-way ranging&#xff09;双边测距为啥是乘法&#xff1f;&#xff1f; 公式&#xff1a; 我们先看单边 Single-Sided Two-Way Ranging (SS-TWR) 单边很好理解。 symmetric double-sided TWR (SDS-TWR)对称的双边测距 再看双边 Trou…

LeetCode热题100——最长连续序列

给定一个未排序的整数数组 nums &#xff0c;找出数字连续的最长序列&#xff08;不要求序列元素在原数组中连续&#xff09;的长度。 请你设计并实现时间复杂度为 O(n) 的算法解决此问题。 class Solution(object):def longestConsecutive(self, nums):""":t…

【MAVEN学习 | 第2篇】Maven工程创建及核心功能

文章目录 一. 基于IDEA的Maven工程创建1.1 Maven工程GAVP属性&#xff08;1&#xff09;GroupID 格式&#xff08;2&#xff09;ArtifactID 格式&#xff08;3&#xff09;Version版本号格式&#xff08;4&#xff09;Packaging定义规则 1.2 IDEA构建Maven JavaSE工程1.3 IDEA构…

kettle使用手册 安装9.0版本 建议设置为英语

0.新建转换的常用组件 0. Generate rows 定义一个字符串 name value就是字符串的值 0.1 String operations 字段转大写 去空格 1. Json input 来源于一个json文件 1.json 或mq接收到的data内容是json字符串 2. Json output 定义Jsonbloc值为 data, 左侧Fieldname是数据库…

VS2022(Visual Studio 2022)最新安装教程

1、下载 1、下载地址 - 官网地址&#xff1a;下载 Visual Studio Tools - 免费安装 Windows、Mac、Linux - 根据自己的电脑的 【操作系统】 灵活选择。 2、安装包 【此处为Windows系统安装包】 2、安装 1、打开软件 - 右击【以管理员身份打开】&#xff0c; 2、准备配置 …

昇思25天学习打卡营第03天|张量Tensor

何为张量&#xff1f; 张量&#xff08;Tensor&#xff09;是一个可用来表示在一些矢量、标量和其他张量之间的线性关系的多线性函数&#xff0c;这些线性关系的基本例子有内积、外积、线性映射以及笛卡儿积。其坐标在 &#x1d45b;维空间内&#xff0c;有  &#x1d45b;&a…

机器人控制系列教程之URDF文件语法介绍

前两期推文&#xff1a;机器人控制系列教程之动力学建模(1)、机器人控制系列教程之动力学建模(2)&#xff0c;我们主要从数学的角度介绍了机器人的动力学建模的方式&#xff0c;随着机器人技术的不断发展&#xff0c;机器人建模成为了机器人系统设计中的一项关键任务。URDF&…

聚合项目学习

首先建立一个总的工程目录&#xff0c;里边后期会有我们的父工程、基础工程(继承父工程)、业务工程&#xff08;依赖基础工程&#xff09;等模块 1、在总工程目录中&#xff08;open一个空的文件夹&#xff09;&#xff0c;首先建立一个父工程模块&#xff08;通过spring init…

地铁中的CAN通信--地铁高效安全运转原理

目前地铁采用了自动化的技术来实现控制,有ATC(列车自动控制)系统可以实现列车自动驾驶、自动跟踪、自动调度;SCADA(供电系统管理自动化)系统可以实现主变电所、牵引变电所、降压变电所设备系统的遥控、遥信、遥测;BAS(环境监控系统)和FAS(火灾报警系统)可以实现车站…

AS-V1000外部设备管理介绍(国标GB28181设备管理,可以管理的国标设备包括DVR/NVR、IPC、第三方国标28181平台)

目录 一、概述 1、视频监控平台介绍 2、外部设备定义&#xff08;接入的国标设备&#xff09; 二、外部设备管理 2.1 外部设备添加 &#xff08;1&#xff09;设备侧的配置 &#xff08;2&#xff09;平台侧的配置 2.2 外部设备信息的修改 三、外部通道管理 3.1 外部…