kafka 的内部结构和 kafka 的工作原理

基本设置

让我们开始安装kafka。下载最新的 Kafka 版本并解压缩。打开终端并启动 kafka 和 zookeeper。

$ cd $HOME
$ tar -xzf kafka_<version>.tgz
$ cd kafka_<version>
$ bin/zookeeper-server-start.sh config/zookeeper.properties
# open another terminal session and start kafka
$ bin/kafka-server-start.sh config/server.properties

让我们在新的终端选项卡中创建一个主题。

# Open another terminal and create a topic.
$ bin/kafka-topics.sh --create --topic payments --partitions 10 --replication-factor 1 \
 --bootstrap-server localhost:9092

如果您想知道上述命令是如何使用这些参数构造的,那非常简单。照做,bin/kafka-topics.sh --help您将看到所有带有描述的参数。文件夹中存在的所有 shell 实用程序也是如此bin

现在让我们看看幕后发生了什么。

转到/tmp/kafka-logs目录并执行ls我们将看到以下结果。

cleaner-offset-checkpoint        payments-0    payments-3    payments-6     payments-9
log-start-offset-checkpoint      payments-1    payments-4    payments-7     recovery-point-offset-checkpoint
meta.properties                  payments-2    payments-5    payments-8     replication-offset-checkpoint

/tmp/kafka-logs是kafka存储数据的默认目录。config/server.properties我们可以将它配置到kafka 和config/zookeeper.propertieszookeeper的不同目录。

恢复点偏移检查点

kafka 代理在内部使用此文件来跟踪刷新到磁盘的日志数量。文件的格式是这样的。

<version>
<total entries>
<topic name> <partition> offset

复制偏移检查点

该文件由 kafka 代理在内部使用,用于跟踪复制到集群中所有代理的日志数量。recovery-point-offset-checkpoint该文件的格式与上述文件相同。

主题和分区

正如我们从上面的结果中看到的,payments-0payments-1payments-10是文件系统中的目录分区。正如我在之前的博文中强调的那样,主题是 kafka 中的一个逻辑概念。它在物理上不存在,只有分区存在。主题是所有分区的逻辑分组。

Producer

现在,让我们使用以下命令为主题生成一些消息。

$ cd $HOME/kafka
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic payments
> hello
> world
> hello world
> hey there!

我们就该主题制作了四条消息。让我们看看它们是如何存储在文件系统中的。很难找出消息去了哪个分区,因为 kafka 使用循环算法将数据分发到分区。简单的方法是找到所有分区(目录)的大小并选择最大的。

$ cd /tmp/kafka-logs
$ du -hs *
8.0K    payments-0
8.0K    payments-1
 12K    payments-2
8.0K    payments-3
 12K    payments-4
8.0K    payments-5
8.0K    payments-6
 12K    payments-7
8.0K    payments-8
 12K    payments-9

正如我们从上面的代码片段中看到的那样,我们的消息进入了分区 2、4、7 和 9。让我们看看每个分区中有什么。

$ ls payments-7

00000000000000000000.index     00000000000000000000.log
00000000000000000000.timeindex leader-epoch-checkpoint
partition.metadata
$ cat 00000000000000000000.log
=
��Mr���Mr����������������
world%
$ cat partition.metadata
version: 0
topic_id: tbuB6k_uRsuEE03FsechjA
$ cat leader-epoch-checkpoint
0
1
0 0
$ cat 00000000000000000000.index
$ cat 00000000000000000000.timeindex

分区元数据

partition.metadata文件包含一个version和一个topic_id。此主题 ID 对于所有分区都是相同的。

日志文件

这是生产者写入的数据以二进制格式存储的地方。下面我们尝试使用kafka提供的命令行工具来查看这些文件的内容。

$ bin/kafka-dump-log.sh --files data/kafka/payments-7/00000000000000000000.log,data/kafka/payments-7/00000000000000000000.index --print-data-log

Dumping data/kafka/payments-7/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0
CreateTime: 1672041637310 size: 73 magic: 2 compresscodec: none crc: 456919687 isvalid: true | offset: 0
CreateTime: 1672041637310 keySize: -1 valueSize: 5 sequence: -1 headerKeys: [] payload: world

除了一些属性外,以上输出的解释是不言自明的。payload是推送到kafka的实际数据。offset告诉当前消息离零索引有多远。producerIdproduerEpoch用于交付保证语义。我们将在以后的博文中讨论它们。我们将在下面了解.index.timeindex文件。

分区键

我们了解到,kafka 以循环方式将数据分发到分区。但是,如果我们想发送按键分组的数据怎么办?这就是分区键的用武之地。当我们将数据与分区键一起发送时,kafka 将它们放在一个分区中。kafka是如何找到partition key的?它使用计算hash(partition_key) % number_of_partitions。如果不存在分区键,则它使用循环算法。

我们可能想知道,分区键的用例是什么?Kafka 只保证分区级别的消息排序,而不是主题级别。分区键的应用是为了确保消息跨所有分区的顺序。

让我们看看它是如何工作的。让我们生成一些消息。

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic payments --property parse.key=true --property key.separator=|
> lokesh1729|{"message": "lokesh1729 : order placed"}
> lokesh1729|{"message": "lokeh1729 : logged in"}
> lokesh1729|{"message": "lokesh1729 : logged out"}
> lokesh1729|{"message": "lokesh1729 : payment success"}

parse.key告诉 kafka 通过分隔符解析密钥。默认情况下key.separator设置为选项卡,我们重写为管道。

让我们使用相同的命令查看数据kafka-dump-log。我们需要在所有 10 个分区中执行命令来找到分区,因为我们不知道它去了哪个分区。

$ $ bin/kafka-dump-log.sh --files data/kafka/payments-7/00000000000000000000.log,data/kafka/payments-7/00000000000000000000.index --print-data-log
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0
isTransactional: false isControl: false position: 147 CreateTime: 1672057287522 size: 118 magic: 2 compresscodec: none crc: 2961270358
isvalid: true | offset: 2 CreateTime: 1672057287522 keySize: 10 valueSize: 40 sequence: -1 headerKeys: [] key: lokesh1729
payload: {"message": "lokesh1729 : order placed"}

baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0
isTransactional: false isControl: false position: 265 CreateTime: 1672057301944 size: 114 magic: 2 compresscodec: none crc: 204260463
isvalid: true | offset: 3 CreateTime: 1672057301944 keySize: 10 valueSize: 36 sequence: -1 headerKeys: [] key: lokesh1729
payload: {"message": "lokeh1729 : logged in"}

baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0
isTransactional: false isControl: false position: 379 CreateTime: 1672057311110 size: 116 magic: 2 compresscodec: none crc: 419761401
isvalid: true | offset: 4 CreateTime: 1672057311110 keySize: 10 valueSize: 38 sequence: -1 headerKeys: [] key: lokesh1729 payload: {"message": "lokesh1729 : logged out"}

baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0
isTransactional: false isControl: false position: 495 CreateTime: 1672057327354 size: 121 magic: 2 compresscodec: none crc: 177029556
isvalid: true | offset: 5 CreateTime: 1672057327354 keySize: 10 valueSize: 43 sequence: -1 headerKeys: [] key: lokesh1729 payload: {"message": "lokesh1729 : payment success"}

正如我们从上面的日志中看到的,所有带有键的消息都lokesh1729去了同一个分区,即分区 7。

索引和时间索引文件

让我们使用此脚本生成更多消息并使用上述命令转储数据。

$ bin/kafka-dump-log.sh --files data/kafka/payments-8/00000000000000000000.log,data/kafka/payments-8/00000000000000000000.index --print-data-log
Dumping data/kafka/payments-8/00000000000000000000.index
offset: 33 position: 4482
offset: 68 position: 9213
offset: 100 position: 13572
offset: 142 position: 18800
offset: 175 position: 23042
offset: 214 position: 27777
offset: 248 position: 32165
offset: 279 position: 36665
offset: 313 position: 40872
offset: 344 position: 45005
offset: 389 position: 49849
offset: 422 position: 54287
offset: 448 position: 58402
offset: 485 position: 62533

正如我们从上面的输出中看到的,索引文件存储了偏移量及其在文件中的位置.log。为什么需要它?我们知道消费者是顺序处理消息的。当消费者请求消息时,kafka 需要从日志中获取它,即它需要执行磁盘 I/O。想象一下,kafka 逐行读取每个日志文件以找到偏移量。它需要O(n)(其中 n 是文件中的行数)磁盘 I/O 的时间和延迟。当日志文件达到千兆字节大小时,它将成为瓶颈。因此,为了优化它,kafka 将偏移量存储到文件中的位置映射.index,这样如果消费者要求任意偏移量,它只需.index及时对文件进行二进制搜索O(log n),然后转到.log文件并再次执行二进制搜索。

让我们举个例子,假设消费者正在读取第 190 个偏移量。首先,kafka broker 读取索引文件(参考上面的日志)并进行二分查找,要么找到确切的偏移量,要么找到最接近的偏移量。在这种情况下,它发现偏移量为 175,其位置为 23042。然后,它转到文件.log并再次执行二进制搜索,因为该.log文件是按偏移量升序存储的仅追加数据结构。

现在,让我们看一下.timeindex文件。让我们使用以下命令转储文件。

$ bin/kafka-dump-log.sh --files data/kafka/payments-8/00000000000000000000.timeindex --print-data-log

Dumping data/kafka/payments-8/00000000000000000000.timeindex
timestamp: 1672131856604 offset: 33
timestamp: 1672131856661 offset: 68
timestamp: 1672131856701 offset: 100
timestamp: 1672131856738 offset: 142
timestamp: 1672131856772 offset: 175
timestamp: 1672131856816 offset: 213
timestamp: 1672131856862 offset: 247
timestamp: 1672131856901 offset: 279
timestamp: 1672131856930 offset: 312
timestamp: 1672131856981 offset: 344
timestamp: 1672131857029 offset: 388
timestamp: 1672131857076 offset: 419
timestamp: 1672131857102 offset: 448
timestamp: 1672131857147 offset: 484
timestamp: 1672131857185 offset: 517
timestamp: 1672131857239 offset: 547

从上面的结果我们可以看出,.timeindex文件中存储了纪元时间戳和文件中偏移量的映射关系.index。当消费者想要根据时间戳重放事件时,kafka首先通过对文件进行二分查找找到偏移量.timeindex,找到偏移量,通过对文件进行二分查找找到位置.index

表示 .index 和 .timeindex 文件如何在 kafka 中工作的图像

消费者

让我们使用以下命令启动消费者

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic payments --group payments-consumer --from-beginning
{"message": "lokesh1729 : order placed"}
{"message": "lokeh1729 : logged in"}
{"message": "lokesh1729 : logged out"}
{"message": "lokesh1729 : payment success"}

请注意,--from-beginning参数用于从头开始读取。如果不使用,消费者读取最新的消息,即消费者启动后产生的消息。

现在,让我们看一下文件系统。我们可以观察到将创建名称为 … 的新__consumer_offsets-0文件__consumer_offsets-1__consumer_offsets-49。Kafka 将每个消费者偏移量的状态存储在一个名为__consumer_offsets默认分区大小为 50 的主题中。如果我们查看文件夹中的内容,将会出现与payments我们在上面看到的主题中相同的文件。

一张描绘kafka broker和consumer之间交互的图片

正如我们从上图中看到的,消费者轮询记录并在处理完成时提交偏移量。Kafka 非常灵活,我们可以配置在单个轮询中获取多少条记录、自动提交间隔等…我们将在单独的博客文章中讨论所有这些配置。

当消费者提交偏移量时,它会发送主题名称、分区和偏移量信息。然后,代理使用它来构造键 as<consumer_group_name>, <topic>, <partition>和值 as<offset>,<partition_leader_epoch>,<metadata>,<timestamp>并将其存储在__consumer_offsets主题中。

当消费者崩溃或重启时,它向kafka broker发送请求,broker__consumer_offsets通过doing找到分区hash(<consumer_group_name>, <topic>, <partition> ) % 50并获取最新的偏移量并将其返回给消费者。

[磁盘 I/O 优化

Kafka 使用硬盘作为其主要数据存储。我们知道磁盘 I/O 比主存慢。因此,我们可能想知道 kafka 是如何在高吞吐量下实现低延迟的。让我们深入研究它。

  1. 顺序磁盘读取比随机内存访问更快。现代操作系统提供以多个块的形式从磁盘读取数据的功能。
  2. 现代操作系统使用空闲主内存进行磁盘缓存,并通过此缓存转移磁盘 I/O。
  3. 依赖磁盘缓存比主内存更优化,因为即使服务崩溃或重新启动,磁盘缓存也会保持温暖。
  4. Kafka 使用索引文件来加快访问速度。我们已经在上面讨论过它们。
  5. Kafka 批处理磁盘写入。

以下是文件中的示例日志.log。让我们剖析一下。

baseOffset- 开始的起始偏移量

lastOffset- 不言自明

count- 批次中的消息总数

CreateTime- 创建日期的纪元时间戳

size- 批处理中消息的总大小(以字节为单位)

baseOffset: 1992 lastOffset: 1995 count: 4 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 260309 CreateTime: 1672131859025 size: 474 magic: 2 compresscodec: none crc: 36982599 isvalid: true
| offset: 1992 CreateTime: 1672131859022 keySize: 12 valueSize: 84 sequence: -1 headerKeys: [] key: craigpearson payload: {"username": "craigpearson", "address": "0414 Fischer Rest\nZacharyshire, MN 38196"}
| offset: 1993 CreateTime: 1672131859024 keySize: 11 valueSize: 80 sequence: -1 headerKeys: [] key: gregoryjoel payload: {"username": "gregoryjoel", "address": "827 Nelson Burg\nSherrimouth, OK 49255"}
| offset: 1994 CreateTime: 1672131859025 keySize: 11 valueSize: 83 sequence: -1 headerKeys: [] key: gregoryjoel payload: {"username": "gregoryjoel", "address": "8306 Reed Trail\nFitzgeraldstad, PA 18715"}
| offset: 1995 CreateTime: 1672131859025 keySize: 12 valueSize: 84 sequence

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/25870.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Canvas画布基本功能及实现网页签名功能

canvas 简介 <canvas> 是 HTML5 新增的&#xff0c;一个可以使用脚本(通常为 JavaScript) 在其中绘制图像的 HTML 元素。它可以用来制作照片集或者制作简单(也不是那么简单)的动画&#xff0c;甚至可以进行实时视频处理和渲染。和所有 DOM 元素一样&#xff0c;拥有自己…

Elasticsearch:数据是如何被写入的?

在我之前的文章 “Elasticsearch&#xff1a;索引数据是如何完成的”&#xff0c;我详述了如何索引 Elasticsearch 的数据的。在今天的文章中&#xff0c;我将从另外一个视角来诠释如何写入数据到 Elasticsearch。更多关于 Elasticsearch 数据操作&#xff0c;请阅读文章 “Ela…

实体店引流获客系统模式开发详解

随着互联网的日益发展&#xff0c;实体店的处境变得越来越艰难&#xff0c;获客难和销量差成为了实体店最头疼的两大问题。面对这种情况&#xff0c;一味固步自封是不行的&#xff0c;最好还是顺应潮流&#xff0c;结合一款合适的商业模式&#xff0c;来帮助自己快速引流获客和…

深入理解设计原则之接口隔离原则(ISP)【软件架构设计】

系列文章目录 C高性能优化编程系列 深入理解软件架构设计系列 深入理解设计模式系列 高级C并发线程编程 LSP&#xff1a;接口隔离原则 系列文章目录1、接口隔离原则的定义和解读2、案例解读3、如何判断一个接口是否符合接口隔离原则&#xff1f;小结 1、接口隔离原则的定义和…

canal server 标准化集群搭建(完结)

4.2. 创建 server 所属集群&#xff1a;选择刚才添加的 “集群名称” server 名称&#xff1a; server_1、server_2、server_3 依次类推 server ip&#xff1a;server 的 ip 地址 admin 端口&#xff1a;canal server 与 canal admin 的通信端口&#xff0c;非生产环境从 2…

云南LED、LCD显示屏系统建设,户外、室内广告大屏建设方案

LED大屏幕显示系统是LED高清晰数字显示技术、显示单元无缝拼接技术、多屏图像处理技术、信号切换技术、网络技术等科技手段的应用综合为一体&#xff0c;形成一个拥有高亮度、高清晰度、技术先进、功能强大、使用方便的大屏幕投影显示系统。通过大屏幕显示系统&#xff0c;可以…

3.1 矩阵连乘问题

博主简介&#xff1a;一个爱打游戏的计算机专业学生博主主页&#xff1a; 夏驰和徐策所属专栏&#xff1a;算法设计与分析 学习目标&#xff1a; 如果我要学习动态规划中的矩阵连乘问题&#xff0c;我会采取以下学习方法&#xff1a; 1. **理解问题的背景和目标&#xff1a;首…

【MySQL】如何速通MySQL(2)

&#x1f4cc;前言&#xff1a;本篇博客介绍如何速通MySQL的第二篇&#xff0c;主要介绍Mysql中主要的基础的入门&#xff0c;学习MySQL之前要先安装好MySQL&#xff0c;如果还没有安装的小伙伴可以看看博主前面的博客&#xff0c;里面有详细的安装教程。或者看一下下面这个链接…

SpringMVC第七阶段:SpringMVC的增删改查(01)

SpringMVC的增删改查 1、准备单表的数据库 drop database if exists springmvc;create database springmvc;use springmvc; ##创建图书表 create table t_book(id int(11) primary key auto_increment, ## 主键name varchar(50) not null, ## 书名 author varchar(50) no…

解决高并发

目录 1.4 对比单体系统、分布式系统和微服务系统 1.4.1 单体系统之痛 1、什么是单体系统 2、单体系统面临的问题 1.4.2 高并发系统之分布式架构 1.4.3 高并发系统之微服务架构 1.4 对比单体系统、分布式系统和微服务系统 接下来从企业真实场景出发&#xff0c;对比单体系统…

JavaEE进阶(5/29)SpringMVC

目录 1.复习 2.URL传参PathVariable 3.上传文件RequestPart 4.获取Cookie/Session/header 5.传统/维新获取cookies 6.传统/维信获得Header 7.获取Session&#xff08;非常重要&#xff09; 8.不加ResponseBody 9.如何获取Json数据&#xff0c;RequestBody 10.想…

python:容器:字符串——常用操作

字符串[下标]根据下标索引取出特定位置字符字符串.index(字符串)查找给定字符的第一个匹配项的下标字符串.replace(字符串1,字符串2) 将字符串内的全部字符串1&#xff0c;替换为字符串2 不会修改原字符串&#xff0c;而是得到一个新的 字符串.split(字符串) 按照给定字符串&am…

数据偏度介绍和处理方法

偏度&#xff08;skewness&#xff09;是用来衡量概率分布或数据集中不对称程度的统计量。它描述了数据分布的尾部&#xff08;tail&#xff09;在平均值的哪一侧更重或更长。偏度可以帮助我们了解数据的偏斜性质&#xff0c;即数据相对于平均值的分布情况。 有时&#xff0c;正…

Linux :: 【基础指令篇 :: 文件及目录操作:(2)】::Linux操作系统的文件“框架”、绝对路径与相对路径及路径定位文件对象的解释

前言&#xff1a;本篇是 Linux 基本操作篇章的内容&#xff01; 笔者使用的环境是基于腾讯云服务器&#xff1a;CentOS 7.6 64bit。 学习集&#xff1a; C 入门到入土&#xff01;&#xff01;&#xff01;学习合集Linux 从命令到网络再到内核&#xff01;学习合集 本篇内容&am…

【JavaSE】Java基础语法(二十九):Map集合

文章目录 1. Map集合概述和特点2. Map集合的基本功能3. Map集合的获取功能【应用】4. Map集合的遍历(方式1)5. Map集合的遍历(方式2)6. map四种遍历方式 1. Map集合概述和特点 Map集合概述 interface Map<K,V> K&#xff1a;键的类型&#xff1b;V&#xff1a;值的类型…

笔记本电脑的BIOS是怎么保护安全的?

随着攻防技术的不断演进&#xff0c;像BIOS攻击、高级网络代码攻击等手段层出不穷&#xff0c;“受害者”也不仅限于企业级服务器、存储&#xff0c;很多魔爪也开始伸向了拥有商业机密数据的PC。 BIOS是Basic Input/Output System&#xff08;基本输入/输出系统&#xff09;的…

(八)Spring之IOC控制反转、DI依赖注入介绍和使用(详解)

文章目录 前言SpringSpring IOC 简介BeanIOC 概述IOC 本质理解 Spring IOC 应用IOC xml装配IOC 依赖注入IOC Bean的作用域 IoC 自动装配Bean 的自动装配注解实现自动装配 IoC 使用注解开发模拟实现Spring IoC 前言 “Spring”在不同的上下文中表示不同的事物。它可以用来引用 …

ASP.NET Core

1. 入口文件 一个应用程序总有一个入口文件&#xff0c;是应用启动代码开始执行的地方&#xff0c;这里往往也会涉及到应用的各种配置。当我们接触到一个新框架的时候&#xff0c;可以从入口文件入手&#xff0c;了解入口文件&#xff0c;能够帮助我们更好地理解应用的相关配置…

用户验证FTP实验

用户FTP实验 目录 匿名用户验证&#xff1a; 本地用户验证&#xff1a; 本地用户访问控制&#xff1a; 匿名用户验证&#xff1a; 例&#xff1a;&#xff08;前提配置&#xff0c;防火墙关闭&#xff0c;yum安装&#xff0c;同模式vmware11&#xff09; 现有一台计算机huy…

什么是项目可交付成果?定义、示例及管理工具

项目产生可交付成果&#xff0c;这只是项目活动的结果。项目可交付成果可大可小&#xff0c;其数量也因项目而异。它们是由项目管理团队和利益相关者在项目规划阶段商定的。 换句话说&#xff0c;任何类型的项目都有投入和产出。投入是你投入到项目中的东西&#xff0c;如数据…