【大数据学习 | kafka】kafka的数据存储结构

以上是kafka的数据的存储方式。

这些数据可以在服务器集群上对应的文件夹中查看到。

[hexuan@hadoop106 __consumer_offsets-0]$ ll
总用量 8
-rw-rw-r--. 1 hexuan hexuan 10485760 10月 28 22:21 00000000000000000000.index
-rw-rw-r--. 1 hexuan hexuan        0 10月 28 22:21 00000000000000000000.log
-rw-rw-r--. 1 hexuan hexuan 10485756 10月 28 22:21 00000000000000000000.timeindex
-rw-rw-r--. 1 hexuan hexuan        8 10月 28 22:21 leader-epoch-checkpoint
-rw-rw-r--. 1 hexuan hexuan       43 10月 28 22:21 partition.metadata

每个文件夹以topic+partition进行命名,更加便于管理和查询检索,因为kafka的数据都是按照条进行处理和流动的一般都是给流式应用做数据供给和缓冲,所以检索速度必须要快,分块管理是最好的方式。

消费者在检索相应数据的时候会非常的简单。

consumer检索数据的过程。

首先文件的存储是分段的,那么文件的名称代表的就是这个文件中存储的数据范围和条数。

00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
代表存储的数据是从0条开始的

00000000000000100000.index
00000000000000100000.log
00000000000000100000.timeindex
代表存储的数据是从100000条开始的

所以首先检索数据的时候就可以跳过1G为大小的块,比如检索888这条数据的,就可以直接去00000000000000000000.log中查询数据

那么查询数据还是需要在1G大小的内容中找寻是比较麻烦的,这个时候可以从index索引出发去检索,首先我们可以通过kafka提供的工具类去查看log和index中的内容

# 首先创建一个topic_b
 kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_b --partitions 5 --replication-factor 2
# 然后通过代码随机向不同的分区中分发不同的数据1W条
package com.hainiu.kafka.consumer;

/**
 * ClassName : test1
 * Package : com.hainiu.kafka.consumer
 * Description
 *
 * @Author HeXua
 * @Create 2024/11/3 22:45
 * Version 1.0
 */
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class test1 {
    public static void main(String[] args) throws InterruptedException {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);
        pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        pro.put(ProducerConfig.RETRIES_CONFIG, 3);
        pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        for (int i = 0; i < 10000; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("topic_b", ""+i,"this is hainiu");
            producer.send(record);
        }
        producer.close();
    }
}

然后去查看log和index中的内容

# kafka查看日志和索引的命令
kafka-run-class.sh kafka.tools.DumpLogSegments --files xxx

查看日志.log

[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log 
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
baseOffset: 606 lastOffset: 1205 count: 600 baseSequence: 606 lastSequence: 1205 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 5149 CreateTime: 1730645208577 size: 4929 magic: 2 compresscodec: snappy crc: 1974998903 isvalid: true
baseOffset: 1206 lastOffset: 1439 count: 234 baseSequence: 1206 lastSequence: 1439 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 10078 CreateTime: 1730645208584 size: 2085 magic: 2 compresscodec: snappy crc: 1665550202 isvalid: true

查看索引.index

内容即:

index索引

offset 第几条position 物理偏移量位置,也就是第几个字
11875275
176710140
202215097

log日志

# 打印日志内容的命令 --print-data-log 打印数据
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.log --print-data-log
Dumping 00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 605 count: 606 baseSequence: 0 lastSequence: 605 producerId: 11 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1730645208553 size: 5149 magic: 2 compresscodec: snappy crc: 595601909 isvalid: true
| offset: 0 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 0 headerKeys: [] key: 14 payload: this is hainiu
| offset: 1 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 1 headerKeys: [] key: 19 payload: this is hainiu
| offset: 2 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 2 headerKeys: [] key: 24 payload: this is hainiu
| offset: 3 CreateTime: 1730645208524 keySize: 2 valueSize: 14 sequence: 3 headerKeys: [] key: 26 payload: this is hainiu

可以看到刷写的日志

baseOffset: 0 lastOffset: 605 count: 606

从0 到605 条一次性刷写606条

lastSequence: 605 producerId

刷写事务日志编号,生产者的编号

通过名称跳过1G的端,然后找到相应的index的偏移量,然后根据偏移量定位log位置,不断向下找寻数据。

大家可以看到index中的索引数据是轻量稀疏的,这个数据是按照4KB为大小生成的,一旦刷写4KB大小的数据就会写出相应的文件索引。

官网给出的默认值4KB

一个数据段大小是1G

timeIndex

我们看到在数据中还包含一个timeindex的时间索引

# 查询时间索引
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex 
[hexuan@hadoop106 topic_b-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.timeindex 
Dumping 00000000000000000000.timeindex
timestamp: 1730645208577 offset: 1205
timestamp: 1730645208584 offset: 1439

可以看到和index索引一样,这个也是4Kb写出一部分数据,但是写出的是时间,我们可以根据时间进行断点找寻数据,指定时间重复计算

也就是说,写到磁盘的数据是按照1G分为一个整体部分的,但是这个整体部分需要4KB写一次,并且一次会生成一个索引问题信息,在检索的时候可以通过稀疏索引进行数据的检索,效率更快。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/910427.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

软件测试面试题——移动端

一、常用的adb命令有哪些&#xff1f; 命令含义adb devices展示当前电脑连接的设备&#xff0c;如果电脑上有多个手机&#xff0c;需要adb -s指定对应设备adb install xxx.apk直接安装xxx.apk到手机中&#xff0c;注意&#xff1a;必须打开手机设置里的USB安装adb install -r …

软考教材重点内容 信息安全工程师 第1章 网络信息安全概述

第 1 章 网络信息安全概述 1.1.1 网络信息安全相关概念 狭义上的网络信息安全特指网络信息系统的各组成要素符合安全属性的要求&#xff0c;即机密性、完整性、可用性、抗抵赖性、可控性。 广义上的网络信息安全是涉及国家安全、城市安全、经济安全、社会安全、生产安全、人身安…

Perforce《2024游戏技术现状报告》Part2:游戏引擎、版本控制、IDE及项目管理等多种开发工具的应用分析

游戏开发者一直处于创新前沿。他们的实践、工具和技术受到各行各业的广泛关注&#xff0c;正在改变着组织进行数字创作的方式。 近期&#xff0c;Perforce发布了《2024游戏技术现状报告》&#xff0c;通过收集来自游戏、媒体与娱乐、汽车和制造业等高增长行业的从业者、管理人…

网站架构知识之密钥认证(day020)

1.密钥认证 两个节点&#xff0c;通过密钥形式进行访问&#xff0c;不要输入密码&#xff0c;单向。 应用场景&#xff1a;部分服务使用前要求我们做密钥认证。 1.使用rsa算法创建公钥私钥 ssh-keygen -t rsa /root/.ssh/id_rsa&#xff0c; 私钥地址 /root/.ssh/…

《重学Java设计模式》之 工厂方法模式

《重学Java设计模式》之 建造者模式 《重学Java设计模式》之 原型模式 《重学Java设计模式》之 单例模式 模拟发奖多种商品 工程结构 奖品发放接口 package com.yys.mes.design.factory.store;public interface ICommodity {/*** Author Sherry* Date 14:20 2024/11/6**/voi…

微信小程序的汽车维修预约管理系统

文章目录 项目介绍具体实现截图技术介绍mvc设计模式小程序框架以及目录结构介绍错误处理和异常处理java类核心代码部分展示详细视频演示源码获取 项目介绍 系统功能简述 前台用于实现用户在页面上的各种操作&#xff0c;同时在个人中心显示各种操作所产生的记录&#xff1a;后…

Oh My Posh安装

nullSet up your terminalhttps://ohmyposh.dev/docs/installation/windows Git ee oh-my-posh: Windows上的oh-my-zsh&#xff0c;源地址 https://github.com/JanDeDobbeleer/oh-my-posh.git (gitee.com)https://gitee.com/efluent/oh-my-posh

unity 镜面 反射

URP 镜面 资源绑定 下载 namespace UnityEngine.Rendering.Universal { [ExecuteInEditMode]public class PlanarURP : MonoBehaviour{public bool VR false;public int ReflectionTexResolution 512;public float Offset 0.0f;[Range(0, 1)]public float Reflecti…

深度学习(十):伦理与社会影响的深度剖析(10/10)

深度学习&#xff1a;伦理与社会影响的深度剖析 一、深度学习的伦理挑战 &#xff08;一&#xff09;数据隐私之忧 深度学习模型的训练往往需要大量数据&#xff0c;而数据的收集过程可能会侵犯个人隐私。例如&#xff0c;据统计&#xff0c;面部识别技术在全球范围内每天会收…

网络安全从入门到精通(特别篇I):应急响应之APT事件处置流程

应急响应 应急响应之APT处置流程1.现场询问1.1 了解威胁事件表现1.2 了解威胁事件发现时间1.3 了解系统架构,如服务器类型、业务架构、网络拓扑等2 判断安全事件状态3 确认事件对象4 确定事件时间5 问题排查应急响应之APT处置流程 1.现场询问 1.1 了解威胁事件表现 1.C&…

美格智能5G车规级通信模组: 5G+C-V2X连接汽车通信未来十年

自2019年5G牌照发放开始&#xff0c;经过五年发展&#xff0c;我国5G在基础设施建设、用户规模、创新应用等方面均取得了显著成绩&#xff0c;5G网络建设也即将从基础的大范围覆盖向各产业融合的全场景应用转变。工业和信息化部数据显示&#xff0c;5G行业应用已融入76个国民经…

鸿蒙next打包流程

鸿蒙打包 下载团结引擎添加开源鸿蒙打包支持 团结引擎版本要和sdk版本相对应,图中最新版1.3.1团结引擎,需要sdk12,直接在模块里自动下载即可。 打包报错 在unity社区搜索到,是burst的问题,在package manager里将burst升级到1.8.18就打包成功了,不知道为啥。 团结引擎打包…

python实现RSA算法

目录 一、算法简介二、算法描述2.1 密钥产生2.2 加密过程2.3 解密过程2.4 证明解密正确性 三、相关算法3.1 欧几里得算法3.2 扩展欧几里得算法3.3 模重复平方算法3.4 Miller-Rabin 素性检测算法 四、算法实现五、演示效果 一、算法简介 RSA算法是一种非对称加密算法&#xff0c…

Android笔记(三十一):Deeplink失效问题

背景 通过deeplink启动应用之后&#xff0c;没关闭应用的情况下&#xff0c;再次使用deeplink会失效的问题&#xff0c;是系统bug导致的。此bug仅在某些设备&#xff08;Nexus 5X&#xff09;上重现&#xff0c;launchMode并且仅当应用程序最初通过深层链接启动并再次通过深层…

基于Multisim拔河比赛游戏+计分电路(含仿真和报告)

【全套资料.zip】拔河比赛游戏计分电路Multisim仿真设计数字电子技术 文章目录 功能一、Multisim仿真源文件二、原理文档报告资料下载【Multisim仿真报告讲解视频.zip】 功能 1.拔河游戏机用9个发光二极管排成一行。 2.开机后只有中间一个点亮&#xff0c;以此作为拔河的中心…

A20红色革命文物征集管理系统

&#x1f64a;作者简介&#xff1a;在校研究生&#xff0c;拥有计算机专业的研究生开发团队&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339; 赠送计算机毕业设计600…

面向 TP 场景能力全面升级, OceanBase 4.2.5 LTS 版正式发布

去年的 OceanBase 年度发布会中&#xff0c;OceanBase 推出了一体化数据库的首个长期支持版——4.2.1 LTS。这一年来&#xff0c;已有数百位客户在真实的生产环境中对该版本进行了验证并成功上线&#xff0c;证明了OceanBase 在TP场景中的卓越性能。与此同时&#xff0c;越来越…

在线预览 Word 文档

引言 随着互联网技术的发展&#xff0c;Web 应用越来越复杂&#xff0c;用户对在线办公的需求也日益增加。在许多业务场景中&#xff0c;能够直接在浏览器中预览 Word 文档是一个非常实用的功能。这不仅可以提高用户体验&#xff0c;还能减少用户操作步骤&#xff0c;提升效率…

MongoDB笔记02-MongoDB基本常用命令

文章目录 一、前言二、数据库操作2.1 选择和创建数据库2.2 数据库的删除 3 集合操作3.1 集合的显式创建3.2 集合的隐式创建3.3 集合的删除 四、文档基本CRUD4.1 文档的插入4.1.1 单个文档插入4.1.2 批量插入 4.2 文档的基本查询4.2.1 查询所有4.2.2 投影查询&#xff08;Projec…

对称二叉树(力扣101)

题目如下: 思路 对于这道题, 我会采用递归的解法. 看着对称的二叉树, 写下判断对称的条件, 再进入递归即可. 值得注意的是, 代码中会有两个函数, 第一个是isSymmetric,第二个是judge. 因为这里会考虑到一种特殊情况, 那就是 二叉树的根结点(最上面的那个),它会单独用…