Kafka broker

1. zk中存储的kafka信息

/kafka/brokers/ids存储了在线的broker id。

/kafka/brokers/topics/xxx/partitions/n/state存储了Leader是谁以及isr队列

 /kafka/controller辅助Leader选举,每个broker都有一个controller,谁先在zk中注册上,谁就辅助Leader选举。

2. broker总体工作流程

1)每台broker启动后在zk中注册,即/kafka/borkers/ids

2)每台broker去抢占式注册controller,用于后面Leader选举

3)由注册的controller监听/kafka/borkers/ids节点变化

4)开始Leader选举,选举标准是以isr中存活为前提,以AR中排在前面的优先(AR是所有副本的集合,启动时会有一个固定的AR顺序,比如ar[1, 0, 2])

5)controller将选举出来的信息(Leader和isr信息)传到zk中,即/kafka/brokers/topics/xxx/partitions/n/state

6)其他broker的controller会从zk中同步相关信息

Kafka生产者发送数据到broker,数据在底层以Log方式(逻辑概念)存储,实际上是Segment(物理概念),一般1个Segment是1G,包含.log文件和.index文件,.index文件是索引,用于快速查询数据

7)如果Leader挂了,controller监听到节点变化,选举新的Leader,选举标准依然是以isr中存活为前提,以AR中排在前面的优先,最后更新Leader和isr队列信息

3.  新节点服役

新节点服役后,以前的topic所在的分区不会出现在新节点,即新节点不会分摊旧节点的存储压力。如果需要新节点参与进来,就需要进行一种类似于负载均衡的配置。先创建一个topic-to-move.json配置文件:

{
    "topics": [
        {"topic": "first"}
    ],
    "version": 1
}

生成一个负载均衡的计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

 上面一行是当前的分区分配,下面一行是建议的分区分配计划,创建副本存储计划increase-replication-factor.json,里面内容是上面得分建议计划。最后执行存储计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

 

还可以验证计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

查询这个topic的分区详情

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe

4.  退役旧节点

退役旧节点与服役新节点有一些类似,先创建一个topic-to-move.json配置文件,与服役新节点时一样,然后生成一个计划,只不过--broker-list 改为"0,1,2",接着执行计划,验证计划,都与服役新节点一样。

 最后在退役节点关闭kafka服务

bin/kafka-server-stop.sh

5.  Leader选举验证

创建四个分区四个副本的topic并查看:

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu2 --partitions 4 --replications-factor 4

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe  --topic atguigu2

 

把3号broker停掉,那么isr队列中没有3,并且4号分区的Leader变为2

再把2号干掉

 

再恢复3号,发现Leader未变,仅isr队列信息中新增了3号

 

再恢复2号

再干掉1号

 

这样就验证了第二节讲的选举标准: 以isr中存活为前提,以AR中排在前面的优先

6. Leader和Follower故障处理细节

LEO:Log End Offset,每个副本的最后一个offset+1

HW:high watermark,高水位线,所有副本中最小的LEO,消费者能够看到的最大的offset就是HW - 1

1)如果Follower挂了,该Follower会立即被踢出isr,isr中其他Leader和Follower正常接受/同步数据,待该Follower恢复后,会读取上次的HW,将自己高于HW的数据丢弃,从HW开始与Leader同步,等到该Follower的LEO大于等于该Partition的HW,则重新加入isr队列。

2)如果Leader挂了, Leader会立即被踢出isr,并且会选出一个新的Leader,其余的Follower会将高于HW的数据丢弃,然后与新的Leader进行同步。此时只能保证数据的一致性,不能保证数据不丢失。

7. 手动调整分区副本

如果服务器的存储能力不同,希望将数据更多的存储在空间大的服务器上,那么就不应该按照Kafka分区副本的默认均匀分配,而是需要手动调整。创建4个分区,两个副本,都存在0号和1号broker上面。

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic three --partitions 4 --replications-factor 2

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe  --topic three

 

 创建increase-replication-factor.json:

{
    "partitions": [
        {"topic": "three", "partitions": 0, "replicas": [0, 1]},
        {"topic": "three", "partitions": 1, "replicas": [0, 1]},
        {"topic": "three", "partitions": 2, "replicas": [1, 0]},
        {"topic": "three", "partitions": 3, "replicas": [1, 0]}
    ],
    "version": 1
}

执行存储计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

 最后查看

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe  --topic three

以上是减少副本,增加副本也是类似,先创建一个3个分区,1个副本的topic:

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic four --partitions 3 --replications-factor 1

创建increase-replication-factor.json:

{
    "partitions": [
        {"topic": "four", "partitions": 0, "replicas": [0, 1, 2]},
        {"topic": "four", "partitions": 1, "replicas": [0, 1, 2]},
        {"topic": "four", "partitions": 2, "replicas": [0, 1, 2]}
    ],
    "version": 1
}

执行计划:

bin/kafka-reassign-repartitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

8. Leader Partition自动平衡

在Leader选举验证小节中,如果2号和3号节点都挂了,然后又恢复,则Leader过于集中在0号和1号节点,而Kafka生产者和消费者都是只对Leader操作,所以0号和1号的压力会很大,造成负载不均衡。 未解决该问题,Kafka会自动再平衡,auto.leader.rebalance.enable默认设为true。

什么时机会触发再平衡呢?一个参考指标是broker的不平衡率,leader.imbalance.per.broker.percentage,默认是10%,另一个指标是负载检查的间隔时间,leader.imbalance.check.interval.seconds,默认是300秒。

不平衡率的计算:

实际生产环境中,不一定需要开启再平衡,因为上述例子中其实已经相对平衡了,但是根据规则,需要触发再平衡,因此会需要消耗大量资源。 

9. 文件存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应一个log文件,该log文件存储的就是Kafka生产者的数据。生产的数据不断地追加到log文件中,为防止log文件过大导致检索数据慢,Kafka采取了分片和索引的机制:每个partition分为多个segment,每个segment包括.index文件(偏移量索引文件)、.log文件(日志文件)、.timeindex文件(时间戳索引文件)。这些文件位于一个文件夹中,文件夹命名规则:topic名称+分区号。index和log文件的命名是以当前segment的第一条数据的offset来命名。

log文件和index文件详解:

 

10. 文件清除策略

Kafka数据默认保存7天,7天后数据自动删除或者压缩。可通过如下参数修改保存时间(从上到下优先级依次增高):

log.retention.hours

log.retention.minutes

log.retention.ms

默认检查数据是否超期的间隔时间是5分钟,可通过参数log.retention.check.interval.ms进行修改。

如果是删除数据,log.cleanup.policy=delete,基于时间删除是默认打开的,以segment中最大的时间戳作为该文件的时间戳。而基于空间大小进行删除是默认关闭的(log.retention.bytes=-1),即数据超过阈值,删除最早的数据。

如果是压缩数据,log.cleanup.policy=compact,此时对于相同key的不同value值,只保留最新的。(与之前的snappy压缩概念不同)

注意,压缩后的offset可能不是连续的,比如上图没有 offset 6,如果从offset 6开始消费,则会从7开始消费。

11. 高效读写

1)Kafka本身是分布式集群,采用分区,并行度高

2)读数据采用稀疏索引,可以快读定位数据

3)顺序写磁盘,数据以追加的方式写到log文件,这比随机写的速度要快很多,因为省去了大量的磁头寻址时间

4)采用页缓存和零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和消费者处理。Broker应用层不关心存储的数据,因此就不用走应用层,传输效率高。(传统数据复制方式:从磁盘中读取文件到内核缓冲区,内核读取缓冲区数据复制到用户缓冲区,用户缓冲区的数据复制到socket缓冲区,socket缓冲区数据发送到网卡,再到消费者)

页缓存:Kafka重度依赖Linux提供的页缓存功能。当上层有写操作时,操作系统只是将数据写入页缓存。当读操作发生时,从页缓存中读,如果找不到,再从磁盘中读。页缓存是把尽可能多的空闲内存当做磁盘内存来用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/484845.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PWM实现电机的正反转和调速以及TIM定时器

pwm.c #include "pwm.h"/* PWM --- PA2 --TIM2_CH3 //将电机信号控制一根接GND,一根接在PA2(TIM2_CH3), 输出PWM控制电机快慢 TIM2挂在APB1 定时器频率:84MHZ*/ void Pwm_Init(void) {GPIO_InitTypeDef GPIO_InitStruct;TIM_TimeBaseInitT…

信号处理与分析——matlab记录

一、绘制信号分析频谱 1.代码 % 生成测试信号 Fs 3000; % 采样频率 t 0:1/Fs:1-1/Fs; % 时间向量 x1 1*sin(2*pi*50*t) 1*sin(2*pi*60*t); % 信号1 x2 1*sin(2*pi*150*t)1*sin(2*pi*270*t); % 信号2% 绘制信号图 subplot(2,2,1); plot(t,x1); title(信号x1 1*sin(…

nodejs+vue城市交通管理系统的设计与实现pythonflask-django-php

城市交通管理系统的目的是让使用者可以更方便的将人、设备和场景更立体的连接在一起。能让用户以更科幻的方式使用产品,体验高科技时代带给人们的方便,同时也能让用户体会到与以往常规产品不同的体验风格。 与安卓,iOS相比较起来,…

利用pexpect实现ssh自动登录时命令行无法自动换行问题解决

问题描述 使用python的pexpect模块的pexpect.spawn()进行ssh自动登录时,出现超出一定长度(80个字符)时光标自动切换到本行行首进行覆盖输入的情形 原因 使用spawn时输入窗口大小默认限制为[24,80](可通过spawn类的getwinsize(…

HarmonyOS实战开发-如何使用首选项能力实现一个简单示例。

介绍 本篇Codelab是基于HarmonyOS的首选项能力实现的一个简单示例。实现如下功能: 创建首选项数据文件。将用户输入的水果名称和数量,写入到首选项数据库。读取首选项数据库中的数据。删除首选项数据文件。 最终效果图如下: 相关概念 首选…

人工智能(Educoder)-- 搜索技术 -- 启发式搜索

任务描述 本关任务:八数码问题是在一个33的棋盘上有1−8位数字随机分布,以及一个空格,与空格相连的棋子可以滑动到空格中,问题的解是通过空格滑动,使得棋盘转化为目标状态,如下图所示。 为了简化问题的输…

简单使用Swagger

文章目录 1、介绍2、 使用步骤3、 常用注解 1、介绍 Swagger 是一个规范和完整的框架,用于生成、描述、调用和可视化 RESTful 风格的 Web 服务(https://swagger.io/)。 它的主要作用是: 使得前后端分离开发更加方便,有利于团队协作 接口的文…

数据可视化-ECharts Html项目实战(6)

在之前的文章中,我们学习了如何设置散点图、雷达图。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢数据可视化-ECharts Html项目实战(5&a…

软考96-上午题-【操作系统】-文件目录

一、文件目录 1-1、定义 为了实现“按名存取”,系统必须为每个文件设置用于描述和控制文件的数据结构,它至少要包括:文件名、存放文件的物理地址。 这个数据结构称为:文件控制块(FCB),文件控制块的有序集合称为文件…

flutter3_douyin:基于flutter3+dart3短视频直播实例|Flutter3.x仿抖音

flutter3-dylive 跨平台仿抖音短视频直播app实战项目。 全新原创基于flutter3.19.2dart3.3.0getx等技术开发仿抖音app实战项目。实现了类似抖音整屏丝滑式上下滑动视频、左右滑动切换页面模块,直播间进场/礼物动效,聊天等模块。 运用技术 编辑器&#x…

Web前端Html的表单

表单的关键字: form标签表示一个表单区域 action“后端地址” method“提交数据方式:get/post” input 单行输入框 type“text” 文本 name“定义名称 名字自定义” 向后端提交的键 readonly“readonly” 只读,不可修改,但是可以提交 disab…

Django 三板斧、静态文件、request方法

【一】三板斧 【1】HttpResponse (1)介绍 HttpResponse是Django中的一个类,用于构建HTTP响应对象。它允许创建并返回包含特定内容的HTTP响应。 (2)使用 导入HttpResponse类 from django.http import HttpResponse创…

C++ unordered_set和unordered_map

哈希 1. unordered_set/unordered_map1.1 背景1.2 unordered_set1.2.1 特性1.2.2 常用方法 1.3 unordered_map1.3.1 特性1.3.2 常用方法 2. 哈希2.1概念2.2 哈希冲突2.2.1哈希函数2.2.2 解决哈希冲突2.2.2.1 闭散列2.2.2.2 开散列 1. unordered_set/unordered_map 1.1 背景 之…

Rust并发编程thread多线程和channel消息传递

安全高效的处理并发是 Rust 诞生的目的之一,主要解决的是服务器高负载承受能力。 并发(concurrent)的概念是指程序不同的部分独立执行,这与并行(parallel)的概念容易混淆,并行强调的是"同…

如何理解OSI七层模型?

一、是什么 OSI (Open System Interconnect)模型全称为开放式通信系统互连参考模型,是国际标准化组织 ( ISO ) 提出的一个试图使各种计算机在世界范围内互连为网络的标准框架 OSI将计算机网络体系结构划分为七层,每一层实现各自…

存储随笔原创科普视频首播~

一周之前,存储随笔创建了B站账号。小编利用上个周末休息时间专门研究了B站视频录制的各种方案。发现并没有想象的很容易,先花了很长时间准备了一个PPT,再准备演讲大纲,最终磕磕绊绊完成了首期原创视频录制! 可能不尽如…

PCB布线中晶振电容、电源大小电容、电源电容的设计细节

嵌入式软硬件爱好者 一张手册走天下。嵌入式单片机/Linux/Openwrt/电子电路技术交流分享。//主打一个技术层面的剑走偏锋,直击众人重视和不重视的重点//专注基础,才能走的更远 晶振电容 晶振旁边的电容在电路设计中不是用于滤波的。实际上,…

中国疆域从古至今版图演变,中国历史各个朝代地图大全

一、图片描述 每个朝代都有数十张地图,朝代疆域全图重点区域地图,图片是JPG格式,都是高清地图,行政名称清晰可见,非常适合喜欢历史的朋友。本套历史朝代地图,大小1.32G,18个压缩文件。 二、图…

ShardingSphere水平分表——开发经验(2)

1. 什么场景下分表? 数据量过大或者数据库表对应的磁盘文件过大。 Q:多少数据分表? A:网上有人说1kw,2kw?不准确。 1、一般看字段的数量,有没有包含text类型的字段。我们的主表里面是不允许有t…

C语言数据结构之归并排序

疏雨池塘见 微风襟袖知 目录 归并排序的介绍 基本思想 时间复杂度分析 ⭐归并排序步骤 空间复杂度分析 代码展示 ✨归并排序的非递归 代码展示 总结🔥 归并排序的介绍 归并排序,是创建在归并操作上的一种有效的排序算法。算法是采用分治法&#xff…