Kafka KRaft模式探索

1.概述

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。其核心组件包含Producer、Broker、Consumer,以及依赖的Zookeeper集群。其中Zookeeper集群是Kafka用来负责集群元数据的管理、控制器的选举等。

2.内容

目前,Kafka在使用的过程当中,会出现一些问题。由于重度依赖Zookeeper集群,当Zookeeper集群性能发生抖动时,Kafka的性能也会收到很大的影响。因此,在Kafka发展的过程当中,为了解决这个问题,提供KRaft模式,来取消Kafka对Zookeeper的依赖。

 上图是在未使用KRaft模式时,Kafka的一个架构,在做元数据管理、Controller的选举等都需要依赖Zookeeper集群。

 在Kafka引入KRaft新内部功能后,对Zookeeper的依赖将会被取消。在 KRaft 中,一部分 broker 被指定为控制器,这些控制器提供过去由 ZooKeeper 提供的共识服务。所有集群元数据都将存储在 Kafka 主题中并在内部进行管理。

 2.1 KRaft模式的优势

  • 更简单的部署和管理——通过只安装和管理一个应用程序,Kafka 现在的运营足迹要小得多。这也使得在边缘的小型设备中更容易利用 Kafka;
  • 提高可扩展性——KRaft 的恢复时间比 ZooKeeper 快一个数量级。这使我们能够有效地扩展到单个集群中的数百万个分区。ZooKeeper 的有效限制是数万;
  • 更有效的元数据传播——基于日志、事件驱动的元数据传播可以提高 Kafka 的许多核心功能的性能。
1. KRaft集群节点角色

 在 KRaft 模式下,Kafka 集群可以以专用或共享模式运行。在专用模式下,一些节点将其process.roles配置设置为controller,而其余节点将其设置为broker。对于共享模式,一些节点将process.roles设置为controller, broker并且这些节点将执行双重任务。采用哪种方式取决于集群的大小。

2. KRaft模式控制器

 在 KRaft 模式集群中充当控制器的代理列在controller.quorum.voters每个代理上设置的配置属性中。这允许所有代理与控制器进行通信。这些控制器代理之一将是活动控制器,它将处理与其他代理通信对元数据的更改。

所有控制器代理都维护一个保持最新的内存元数据缓存,以便任何控制器都可以在需要时接管作为活动控制器。这是 KRaft 的特性之一,使其比基于 ZooKeeper 的控制平面高效得多。

3. KRaft集群元数据

 KRaft 基于 Raft 共识协议,该协议作为 KIP-500 的一部分引入 Kafka,并在其他相关 KIP 中定义了更多细节。在 KRaft 模式下,集群元数据(反映所有控制器管理资源的当前状态)存储在名为__cluster_metadata. KRaft 使用这个主题在控制器和代理节点之间同步集群状态更改。

活动控制器是这个内部元数据主题的单个分区的领导者。其他控制器是副本追随者。经纪人是副本观察者。因此,不是控制器将元数据更改广播给其他控制器或代理,而是它们各自获取更改。这使得保持所有控制器和代理同步非常有效,并且还缩短了代理和控制器的重启时间。

4. KRaft元数据复制

由于集群元数据存储在 Kafka 主题中,因此该数据的复制与我们在数据平面复制模块中看到的非常相似。活动控制器是元数据主题的单个分区的领导者,它将接收所有写入。其他控制器是跟随者,将获取这些更改。我们仍然使用与数据平面相同的偏移量和领导者时期。但是,当需要选举领导者时,这是通过仲裁完成的,而不是同步副本集。因此,元数据复制不涉及 ISR。另一个区别是元数据记录在写入每个节点的本地日志时会立即刷新到磁盘。

5. Leader选举

当集群启动时以及当前领导者停止时,无论是作为滚动升级的一部分还是由于故障,都需要进行控制器领导者选举。现在让我们看一下 KRaft 领导人选举所涉及的步骤。

  • 投票请求:
    • 当需要选举leader控制器时,其他控制器将参与选举新的leader。一个控制器,通常是第一个意识到需要新领导者VoteRequest的控制器,将向其他控制器发送一个。该请求将包括候选者的最后一个偏移量以及与该偏移量关联的时期。它还将增加该时期并将其作为候选时期传递。候选控制器也将为该时期投票给自己;
  • 投票响应:
    • 当跟随者控制器接收到 aVoteRequest时,它将检查它是否看到了比候选者传入的时期更高的时期。如果它有,或者如果它已经在同一时期投票给了不同的候选人,它将拒绝该请求。否则,它将查看候选人传递的最新偏移量,如果它与自己的相同或更高,它将授予投票。该候选控制器现在有两票:它自己的票和刚刚被授予的票。第一个获得多数票的控制器成为新的领导者。
  • 完成:
    •  一旦候选人获得了多数票,它将认为自己是领导者,但它仍然需要将此通知其他控制者。为此,新领导者将向BeginQuorumEpoch其他控制器发送包括新纪元在内的请求。现在选举已经完成。当旧的leader控制器重新上线时,它将在新的epoch跟随新的leader,并将自己的元数据日志与leader同步。

3.KRaft 集群元数据快照

 没有明确的点我们知道不再需要集群元数据,但我们不希望元数据日志无休止地增长。此要求的解决方案是元数据快照。每个控制器和代理都会定期对其内存中的元数据缓存进行快照。这个快照被保存到一个用结束偏移和控制器纪元标识的文件中。现在我们知道元数据日志中所有早于该偏移量和纪元的数据都已安全存储,并且可以将日志截断到该点。快照连同元数据日志中的剩余数据仍然会为我们提供整个集群的完整元数据。

3.1 读取快照

元数据快照的两个主要用途是代理重新启动和新代理上线。

当现有代理重新启动时,它 (1) 将其最近的快照加载到内存中。然后EndOffset从其快照开始,它 (2) 从其本地__cluster_metadata日志中添加可用记录。然后它 (3) 开始从活动控制器中获取记录。如果获取的记录偏移量小于活动控制器LogStartOffset,则控制器响应包括其最新快照的快照 ID。然后代理 (4) 获取此快照并将其加载到内存中,然后再次继续从__cluster_metadata分区领导者(活动控制器)获取记录。

当一个新的代理启动时,它 (3) 第一次开始从活动控制器中获取记录。通常,此偏移量将小于活动控制器LogStartOffset,并且控制器响应将包括其最新快照的快照 ID。代理 (4) 获取此快照并将其加载到内存中,然后再次继续从__cluster_metadata分区领导者(活动控制器)获取记录。

__cluster_metadata主题将snapshot作为cleanup.policy。Kafka 控制器和元数据缓存将在内存中表示复制的日志,最多可达高水位线。在执行快照时,Kafka 控制器和元数据缓存会将这个内存状态序列化到磁盘。磁盘上的此快照文件由已包含的复制日志的结束偏移量和纪元描述。

Kafka 控制器和元数据缓存将在 Kafka Raft 客户端完成生成新快照时通知它。将日志的前缀截断到最新的快照是安全的。主题分区将__cluster_metadata拥有最新的快照和零个或多个旧快照。这些额外的快照必须被删除,这在“何时删除快照”中有描述。 

3.2 快照设计

Kafka Raft 主题分区如下所示:

复制代码

Kafka Replicated Log:

LogStartOffset  --             high-watermark --           LEO --
                 V                             V                V
               -----------------------------------------------
       offset: | x | ... | y - 1 | y |  ...  |   |  ...  |   |
       epoch:  | b | ... | c     | d |  ...  |   |  ...  |   |
               -----------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/x-a.checkpoint
<topic_name>-<partition_index>/y-c.checkpoint

复制代码

需要注意的是,checkpoint将使用扩展名,因为 Kafka 已经有一个带有snapshot扩展名的文件。

  • LEO - 日志结束偏移量 - 要写入磁盘的下一个偏移量。

  • high-watermark - 已复制到 N/2 + 1 个副本的最大偏移量和 epoch。

  • LogStartOffset - 日志开始偏移量 - 复制日志中的最小偏移量。

3.3 快照格式

Kafka 控制器和元数据缓存负责快照的内容。每个快照都由一个唯一标识SnapshotId,即快照中包含的复制日志中记录的纪元和结束偏移量。快照将存储在主题分区目录中,名称为<SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint. 例如,对于主题 __cluster_metadata、分区 0、快照结束偏移 5120793 和快照 epoch 2,完整文件名将是__cluster_metadata-0/00000000000005120793-00000000000000000002.checkpoint.

快照时期将在订购快照时使用,更重要的LastFetchedEpoch是在 Fetch 请求中设置字段时使用。追随者可能有快照和空日志。在这种情况下,follower 将LastFetchEpoch在 Fetch 请求中设置时使用快照的纪元。

快照文件的磁盘格式将与日志格式的版本 2 相同。这是版本 2 的日志格式供参考:

复制代码

RecordBatch => BatchHeader [Record]

BatchHeader
   BaseOffset => Int64
   Length => Int32
   PartitionLeaderEpoch => Int32
   Magic => Int8
   CRC => Uint32
   Attributes => Int16
   LastOffsetDelta => Int32 // also serves as LastSequenceDelta
   FirstTimestamp => Int64
   MaxTimestamp => Int64
   ProducerId => Int64
   ProducerEpoch => Int16
   BaseSequence => Int32

Record =>
   Length => Varint
   Attributes => Int8
   TimestampDelta => Varlong
   OffsetDelta => Varint
   Key => Bytes
   Value => Bytes
   Headers => [HeaderKey HeaderValue]
     HeaderKey => String
     HeaderValue => Bytes

复制代码

使用日志格式的版本 2 将允许 Kafka 控制器和元数据缓存压缩记录并识别快照中的损坏记录。即使快照使用日志格式存储此状态,也没有要求:

  • 在and中分别使用有效的BaseOffset和。OffsetDeltaBatchHeaderRecord
  • 使快照中的记录与复制日志中的记录相匹配。

3.4 快照记录

为了允许 KRaft 实现在不影响 Kafka 控制器和元数据缓存的情况下包含有关快照的附加信息,快照将包含两个控制记录批次。控制记录批次SnapshotHeaderRecord  将始终是快照中的第一个记录批次。控制记录批次SnapshotFooterRecord  将是快照中的最后一个记录批次。这两条记录将具有以下架构。

1.快照头架构

复制代码

{
  "type": "data",
  "name": "SnapshotHeaderRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    {"name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the snapshot header record"},
    { "name": "LastContainedLogTimestamp", "type": "int64", "versions": "0+",
      "about": "The append time of the last record from the log contained in this snapshot" }
  ]
}

复制代码

2.快照脚架构

复制代码

{
  "type": "data",
  "name": "SnapshotFooterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the snapshot footer record" }
  ]
}

复制代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/105139.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Flutter extended_image库设置内存缓存区大小与缓存图片数

ExtendedImage ExtendedImage 是一个Flutter库&#xff0c;用于提供高级图片加载和显示功能。这个库使用了 image 包来进行图片的加载和缓存。如果你想修改缓存大小&#xff0c;你可以通过修改ImageCache的配置来实现。 1. 获取ImageCache实例: 你可以通过PaintingBinding…

css 雷达扫描图

html 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>css 雷达扫描</title><style>* {margin: 0;padding: 0;}body {background: #000000;height: 100vh;display: flex;align-items…

Java基础总结

0、Java语言 1.java和c 2.编译和解释 3.jre和jdk&#xff0c;jvm 简单来说&#xff0c;编译型语言是指编译器针对特定的操作系统将源代码一次性翻译成可被该平台执行的机器码&#xff1b;解释型语言是指解释器对源程序逐行解释成特定平台的机器码并立即执行。 Java 语言既具…

【单片机学习笔记】Windows+Vscode+STM32F4+freeRTOS+FatFs gcc环境搭建

为摒弃在接受keil邮件&#xff0c;研究了下gun编译&#xff0c;以STM32F407为例&#xff0c;简单记录 1. 软件包准备 Git 选择对应版本直接安装即可https://git-scm.com/download/winmakegcc ​ 1&#xff09;将上述软件包放置于C盘根目录 2&#xff09;添加环境变量 3&am…

[SpringCloud] Eureka 与 Ribbon 简介

目录 一、服务拆分 1、案例一&#xff1a;多端口微服务 2、案例二&#xff1a;服务远程调用 二、Eureka 1、Eureka 原理分析 2、Eureka 服务搭建&#xff08;注册 eureka 服务&#xff09; 3、Eureka 服务注册&#xff08;注册其他服务&#xff09; 4、Eureka 服务发现…

鸿鹄工程项目管理系统em Spring Cloud+Spring Boot+前后端分离构建工程项目管理系统

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…

【组合计数】CF1866 H

Problem - H - Codeforces 题意 思路 不知道这种trick叫什么&#xff0c;昨天VP刚遇到过 设 f[x] 为恰好有一个最大值为 x 的方案数&#xff0c;我们要求这个&#xff0c;那就设 g[x] 为 至少有一个最大值为 x 的方案数&#xff0c;那么答案就是 f[x] g[x] - g[x - 1] 这里…

FL Studio21版无限破解版下载 软件内置破解补丁

FL Studio是一款非常好用方便的音频媒体制作工具&#xff0c;它的功能是非常的强大全面的&#xff0c;想必那些喜欢音乐创作的朋友们应该都知道这款软件是多么的好用吧&#xff0c;它还能够给用户们带来更多的创作灵感&#xff0c;进一步加强提升我们的音乐制作能力。该软件还有…

浅谈uniapp中开发安卓原生插件

其实官方文档介绍的比较清楚而且详细,但是有时候他太墨迹,你一下子找不到自己想要的,所以我总结了一下开发的提纲,也是为了自己方便下次使用。 1.第一步,下载官方提供的Android的示例工程,然后倒入UniPlugin-Hello-AS工程请在App离线SDK中查找,之后Android studio,编译运行项目…

Android Studio Gradle中没有Task任务,没有Assemble任务,不能方便导出aar包

Gradle中&#xff0c;没有Assemble任务 1. 在编译aar包或者编译module的时候&#xff0c;没有release包&#xff0c;我们一般都是通过assemble进行编译。 如果在Gradle中找不到task。 可以通过设置File->setting -->Experimental→取消勾选“Do not build Gradle task …

wordpress数据库迁移Invalid default value for ‘comment_date‘

问题说明 最近在往新的电脑上迁移一个wordpress网站&#xff0c;在往新电脑上的mysql数据库中导入数据时&#xff0c;报错&#xff1a;1067 - Invalid default value for comment_date。 异常分析 这个错误的字面意思就是字段‘comment_date’的默认值是无效的&#xff0c;于…

Flink将数据写入MySQL(JDBC)

一、写在前面 在实际的生产环境中&#xff0c;我们经常会把Flink处理的数据写入MySQL、Doris等数据库中&#xff0c;下面以MySQL为例&#xff0c;使用JDBC的方式将Flink的数据实时数据写入MySQL。 二、代码示例 2.1 版本说明 <flink.version>1.14.6</flink.version…

UE5 Blueprint发送http请求

一、下载插件HttpBlueprint、Json Blueprint Utilities两个插件是互相依赖的&#xff0c;启用&#xff0c;重启项目 目前两个是Beta的状态&#xff0c;如果你使用的平台支持就可以使用&#xff0c;我们的项目因为需要取Header的值&#xff0c;所有没法使用这两个插件&#xff0…

单链表的定义(数据结构与算法)

单链表的定义 单链表是一种常见的数据结构&#xff0c;用于存储元素的序列。它由一系列节点组成&#xff0c;每个节点包含一个数据元素和一个指向下一个节点的引用&#xff08;指针&#xff09;。单链表中的节点之间通过指针连接起来&#xff0c;形成一个线性结构。单链表是一种…

【Elasticsearch】es脚本编程使用详解

目录 一、es脚本语言介绍 1.1 什么是es脚本 1.2 es脚本支持的语言 1.3 es脚本语言特点 1.4 es脚本使用场景 二、环境准备 2.1 docker搭建es过程 2.1.1 拉取es镜像 2.1.2 启动容器 2.1.3 配置es参数 2.1.4 重启es容器并访问 2.2 docker搭建kibana过程 2.2.1 拉取ki…

Git的远程仓库

Git的远程仓库 添加远程仓库从远程库克隆 添加远程仓库 你在本地创建了一个Git仓库后&#xff0c;又想在GitHub创建一个Git仓库&#xff0c;并且让这两个仓库进行远程同步&#xff0c;这样&#xff0c;GitHub上的仓库既可以作为备份&#xff0c;又可以让其他人通过该仓库来协作…

如何在VScode中让printf输出中文

如何在VScode中让printf输出中文&#xff1f; 1、在“Visual Studio Code”图标上右击&#xff0c;弹出对话框。见下图&#xff1a; 2、点击“以管理员身份运行”&#xff0c;得到下图&#xff1a; 3、点击“UTF-8”按钮&#xff0c;得到下图&#xff1a; 4、点击“通过编码重…

Python---练习:使用for循环嵌套实现打印九九乘法表

思考&#xff1a; 外层循环主要用于控制循环的行数&#xff0c;内层循环用于控制列数。 基本语法&#xff1a; # 外层循环 for i in 序列1:# 内层循环for j in 序列2:循环体 序列1 序列2 &#xff0c;就可以是range(1, 10) -----也就是从1&#xff0c;到9。 参考while循环…

【vector题解】杨辉三角 | 删除有序数组中的重复项 | 只出现一次的数字Ⅱ

杨辉三角 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1…