「Kafka」Broker篇

「Kafka」Broker篇

主要讲解的是在 Kafka 中是怎么存储数据的,以及 Kafka 和 Zookeeper 之间如何进行数据沟通的。

Kafka Broker 总体工作流程

Zookeeper 存储的 Kafka 信息

  • 启动 Zookeeper 客户端:

    [atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
    
  • 通过 ls 命令可以查看 kafka 相关信息:

    [zk: localhost:2181(CONNECTED) 2] ls /kafka
    

    image-20240110143832837

image-20231229163857940

Kafka Broker 总体工作流程

image-20231229163930666

模拟 Kafka 上下线,Zookeeper 中数据变化:

  1. 查看 /kafka/brokers/ids 路径上的节点:

    image-20231229164453868

  2. 查看 /kafka/controller 路径上的数据:

    image-20231229164440799

  3. 查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据:

    image-20231229164521363

  4. 停止 hadoop104 上的 kafka: image-20240110142636596

  5. 再次查看 /kafka/brokers/ids 路径上的节点

    image-20240110142623644

  6. 再次查看 /kafka/controller 路径上的数据

    image-20240110142702080

  7. 再次查看 /kafka/brokers/topics/first/partitions/0/state 路径上的数据

    image-20240110142724094

  8. 启动 hadoop104 上的 kafka

    image-20240110142742279

  9. 再次观察 1、2、3 步骤中的内容。

Broker 重要参数

image-20231229164041782

image-20231229164056634

image-20231229164110883

生产经验—节点服役和退役

服役新节点

新节点准备

  1. 关闭 hadoop104,并右键执行克隆操作

  2. 开启 hadoop105,并修改 IP 地址

    image-20240110150510473

  3. 在 hadoop105 上,修改主机名称为 hadoop105

    image-20240110150536171

  4. 重新启动 hadoop104、hadoop105

  5. 修改 haodoop105 中 kafka 的 broker.id 为 3保证唯一

    [atguigu@hadoop105 config]$ vim server.properties
    

    image-20240110155843127

  6. 删除 hadoop105 中 kafka 下的 datas 和 logs

    [atguigu@hadoop105 kafka]$ rm -rf datas/* logs/*
    
  7. 启动 hadoop102、hadoop103、hadoop104 上的 kafka 集群

    [atguigu@hadoop102 ~]$ zk.sh start
    [atguigu@hadoop102 ~]$ kf.sh start
    
  8. 单独启动 hadoop105 中的 kafka

    [atguigu@hadoop105 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
    

我们先来看一下 first 主题的信息:

image-20240110160935007

目前 first 主题的信息仍然只存在 broker0、1、2上,但 broker3 并没有帮助分担历史数据,所以我们需要负载均衡的操作。

执行负载均衡操作

  1. 创建一个要均衡的主题:

    [atguigu@hadoop102 kafka]$ vim topics-to-move.json
    
    {
    	"topics": [
    		{"topic": "first"}
    	],
    	"version": 1
    }
    
  2. 生成一个负载均衡的计划

    image-20240110160653990

  3. 创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)

    [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
    

    输入以下内容(刚生成的计划):

    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
    
  4. 执行副本存储计划:

    image-20240110161449511

  5. 验证副本存储计划:

    image-20240110161658340

    image-20240110161609204

退役旧节点

执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡

把要退役节点的数据导入到其他节点上。

  1. 创建一个要均衡的主题

    [atguigu@hadoop102 kafka]$ vim topics-to-move.json
    
    {
    	"topics": [
    		{"topic": "first"}
    	],
        "version": 1
    }
    
  2. 创建执行计划

    image-20240110162052104

  3. 创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)

    [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
    
    {"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
    
  4. 执行副本存储计划

    [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
    
  5. 验证副本存储计划

    [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server  hadoop102:9092  --reassignment-json-file increase-replication-factor.json --verify
    
    Status of partition reassignment:
    Reassignment of partition first-0 is complete.
    Reassignment of partition first-1 is complete.
    Reassignment of partition first-2 is complete.
    Clearing broker-level throttles on brokers 0,1,2,3
    Clearing topic-level throttles on topic first
    

    image-20240110162329053

执行停止命令

在 hadoop105 上执行停止命令即可:

[atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh

Kafka 副本

副本基本信息

  • Kafka 副本作用:提高数据可靠性。

  • Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;

    • 太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • Kafka 中副本分为:Leader 和 Follower。

    • Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

A R = I S R + O S R AR = ISR + OSR AR=ISR+OSR

I S R ISR ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

O S R OSR OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。

Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配 Leader 选举等工作。

Controller 的信息同步工作是依赖于 Zookeeper 的。

image-20240110153554112

Leader 选举会按照 AR 的顺序进行选取,就是下图中的 Replicas 顺序:

image-20240110153908376

image-20240110153923789

image-20240110153939706

Leader 和 Follower 故障处理细节

Follower 故障处理细节

消费者可见的数据最大 offset 就是 4, H W − 1 HW - 1 HW1

该 Follower 先被踢出 ISR 队列,然后其余的 Leader、Follower继续接受数据。如果该 Follower 恢复了,会读取本地磁盘上次记录的 HW,并裁剪掉 高于 HW 的数据,从 HW 开始向 Leader 进行同步数据。

image-20240111145337546

待该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上了 Leader,

image-20240111145207846

Leader 故障处理细节

broker0 一开始是 Leader,然后挂掉了,选举 broker1 为新的 Leader,然后其余的 Follower 会把各自 log 文件高于 HW 的部分裁剪掉,然后从新的 Leader 同步数据。

image-20240110154045978

分区副本分配

如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka 底层如何分配存储副本呢?

创建 16 分区,3 个副本

  1. 创建一个新的 topic,名称为 second

    image-20240110154238901

  2. 查看分区和副本情况:

    image-20240110154302249

依次错开,让每一个副本负载均衡,均匀分配,也可以保证数据的可靠性。

image-20240110154314929

生产经验—手动调整分区副本存储

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。

需求:创建一个新的topic,4个分区,两个副本,名称为 three。将该 topic 的所有副本都存储到 broker0 和 broker1 两台服务器上。

image-20240110154440464

手动调整分区副本存储的步骤如下:

image-20240110154459796

image-20240110154534569

image-20240111164010567

生产经验—Leader Partition 负载平衡

image-20240110154628926

image-20240110154640522

真正生产环境建议关闭,或设置 percentage 为 20%、30%,不要频繁的触发自平衡,浪费集群大量性能。

生产经验—增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

  1. 创建 topic

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four
    
  2. 手动增加副本存储

    1. 创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)

      [atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
      

      输入如下内容:

      {"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}
      
    2. 执行副本存储计划

      [atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
      

文件存储

文件存储机制

Topic 数据的存储机制

image-20240116153508193

kafka 中默认数据保存 7 天,通过 .timeindex 文件判断日志保存多久,过期会定时清理对应的数据,详情参考下方的 - 文件清理策略。

思考:Topic 数据到底存储在什么位置?

image-20240111204450098
在这里插入图片描述
image-20240111204513026

index 文件和 log 文件详解

image-20240111204551632

说明:日志存储参数配置

image-20240111204615225

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间:

  • log.retention.hours,最低优先级,小时,默认 7 天。
  • log.retention.minutes,分钟。
  • log.retention.ms,最高优先级,毫秒。
  • log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略有 deletecompact 两种。

1)delete 日志删除:将过期数据删除
  • log.cleanup.policy = delete 所有数据启用删除策略(默认)

    1. 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。

    2. 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

      log.retention.bytes,默认等于 -1,表示无穷大,其实就是关闭掉了。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

image-20240116154803754

以 segment 中所有记录中的最大时间戳作为该文件时间戳,进行删除。

也就是只要这个 segment 中有数据还未过期,就不进行删除操作。

2)compact 日志压缩

image-20240116154918725

高效读写数据

分布式集群

Kafka 本身是分布式集群,可以采用分区技术,并行度高。

稀疏索引

读数据采用稀疏索引,可以快速定位要消费的数据。

顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

image-20240116160711136

页缓存 + 零拷贝技术

image-20240116160741389

image-20240116160751069

笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/336542.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

go语言(一)----声明变量

package mainimport ("fmt""time" )func main() {fmt.Print("hello go!")time.Sleep(1 * time.Second)}运行后,结果如下: 1、golang表达式中,加;和不加;都可以 2、函数的{和函数名一…

范围运算between...and和空判断

目录 between...and 空判断 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 between...and between...and的主要功能是用户进行范围查询,语法如下: select 字段 | 数值 between 最小值 and 最大值; 1.查询工资在 1500 ~ 3000 的所…

【Qt】信号和槽

需要云服务器等云产品来学习Linux的同学可以移步/-->腾讯云<--/-->阿里云<--/-->华为云<--/官网&#xff0c;轻量型云服务器低至112元/年&#xff0c;新用户首次下单享超低折扣。 目录 一、Qt中的信号和槽 1、信号 2、槽 3、Q_OBJECT 二、Qt中的connect函…

【Go面试向】实现map稳定的有序遍历的方式

问题 大家好 我是寸铁&#x1f44a; 总结了一篇实现map稳定的有序遍历的方式探讨的文章✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 你对 map 了解多少&#xff1f;如果要实现第一个稳定的有序遍历有哪些方式&#xff1f; 回答 你对 map 了解多少&#xff1f; 我对map有一定的…

RHCSA上课笔记(前半部分)

第一部分 网络服务 第一章 例行性工作 1.单一执行的例行性工作 单一执行的例行性工作&#xff08;就像某一个时间点 的闹钟&#xff09;&#xff1a;仅处理执行一次 1.1 at命令&#xff1a;定时任务信息 [rhellocalhost ~]$ rpm -qa |grep -w at at-spi2-core-2.40.3-1.el9.x…

Qt文件和目录相关操作

1.相关说明 QCoreApplication类、QFile类、QDir、QTemporaryDir类、QTemporaryFile类、QFileSystemWatcher类的相关函数 2.相关界面 3.相关代码 #include "dialog.h" #include "ui_dialog.h" #include <QFileDialog> #include <QTemporaryDir>…

【JavaEE】网络原理:网络中的一些基本概念

目录 1. 网络通信基础 1.1 IP地址 1.2 端口号 1.3 认识协议 1.4 五元组 1.5 协议分层 什么是协议分层 分层的作用 OSI七层模型 TCP/IP五层&#xff08;或四层&#xff09;模型 网络设备所在分层 网络分层对应 封装和分用 1. 网络通信基础 1.1 IP地址 概念:IP地址…

AIGC语言大模型涌现能力是幻觉吗?

Look&#xff01;&#x1f440;我们的大模型商业化落地产品&#x1f4d6;更多AI资讯请&#x1f449;&#x1f3fe;关注Free三天集训营助教在线为您火热答疑&#x1f469;&#x1f3fc;‍&#x1f3eb; 在自然界中&#xff0c;涌现现象无处不在&#xff0c;从鸟群的和谐飞翔到生…

【C++】unordered_map,unordered_set模拟实现

unordered_map&#xff0c;unordered_set模拟实现 插入普通迭代器const迭代器unordered_map的[ ]接口实现查找修改哈希桶完整代码unordered_map完整代码unordered_set完整代码 喜欢的点赞&#xff0c;收藏&#xff0c;关注一下把&#xff01; 上一篇文章我们把unordered_map和u…

基于JavaWeb+SSM+Vue基于微信小程序的网上商城系统的设计和实现

基于JavaWebSSMVue基于微信小程序的网上商城系统的设计和实现 滑到文末获取源码Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 滑到文末获取源码 Lun文目录 目录 1系统概述 1 1.1 研究背景 1 1.2研究目的 1 1.3系统设计思想…

深度剖析跨境商城源码架构,助你把握行业动向

跨境电商作为当今电商行业的热点&#xff0c;其源码架构备受关注。作为专家&#xff0c;我将深度剖析跨境商城源码架构&#xff0c;帮助你把握行业动向。 跨境商城源码架构的基本组成 跨境商城源码架构一般包括前台系统、后台管理系统、数据管理系统和安全系统四大模块。前台…

LabVIEW振动数据采集与分析系统

在这个项目中&#xff0c; LabVIEW软件配合精确的硬件组件&#xff0c;以实现高效的振动数据采集和复杂信号分析。硬件方面&#xff0c;系统采用了PCB振动加速度传感器的高灵敏度传感器&#xff0c;以及NI9234型号的数据采集卡&#xff0c;确保了数据采集的高精度和可靠性。软件…

实用干货:最全的Loading动画合集网站!复制即用

大家好&#xff0c;我是大澈&#xff01; 本文约1000字&#xff0c;整篇阅读大约需要2分钟。 感谢关注微信公众号&#xff1a;“程序员大澈”&#xff0c;免费领取"面试礼包"一份&#xff0c;然后免费加入问答群&#xff0c;从此让解决问题的你不再孤单&#xff01…

OpenHarmony 应用开发入门 (二、应用程序包结构理解及Ability的跳转,与Android的对比)

在进行应用开发前&#xff0c;对程序的目录及包结构的理解是有必要的。如果之前有过android开发经验的&#xff0c;会发现OpenHarmony的应用开发也很简单&#xff0c;有很多概念是相似的。下面对比android分析总结下鸿蒙的应用程序包结构&#xff0c;以及鸿蒙对比android的诸多…

Dobbo --- HelloWorld项目搭建

Dobbo-HelloWorld 1. demo -- spring方式集成1.1 实现步骤 2. demo -- springboot方式集成2.1 实现provider2.2 实现consumer2.3 项目测试 1. demo – spring方式集成 dubbo官方文档 提供一个可被调用的接口 提供方&#xff1a;实现接口的方法逻辑&#xff0c;启动应用程序&a…

python实操之网络爬虫介绍

一、什么是网络爬虫 网络爬虫&#xff0c;也可以叫做网络数据采集更容易理解。它是指通过编程向网络服务器&#xff08;web&#xff09;请求数据&#xff08;HTML表单&#xff09;&#xff0c;然后解析HTML&#xff0c;提取出自己想要的数据。 它包括了根据url获取HTML数据、解…

介绍几个免费的国内chatgpt网站

概述&#xff1a;水点文章。 第一&#xff1a;chataa网站 chataa (chat778.com) 进去之后注册一下&#xff0c;即可免费使用。 第二&#xff1a;AlchatOS网站 AIchatOS 第三&#xff1a;ChatGPT在线聊天 ChatGPT在线聊天 (zxf7460.cn) 第四&#xff1a;说我真帅&#xff0…

【Python】--- 基础语法(1)

目录 1.变量和表达式2.变量和类型2.1变量是什么2.2变量的语法2.3变量的类型2.3.1整数2.3.2浮点数&#xff08;小数&#xff09;2.3.3字符串2.3.4布尔2.3.5其他 2.4为什么要有这么多类型2.5动态类型特征 3.注释3.1注释的语法3.2注释的规范 结语 1.变量和表达式 对python的学习就…

麒麟V10挂载iso,配置yum源

本文介绍yum 如何挂载本地镜像源 1) 拷贝镜像到本地 2) 执行以下命令&#xff1a; # mount -o loop 镜像路径及镜像名字 /mnt&#xff08;或 media&#xff09; 挂载前 挂载后 3) 进入/etc/yum.repos.d&#xff08;yum.repos.d 是一个目录&#xff0c;该目录是分析 RPM 软件…

kafka(一)——简介

简介 Kafka 是一种分布式、支持分区、多副本的消息中间件&#xff0c;支持发布-订阅模式&#xff0c;多用于实时处理大量数据缓存的场景&#xff0c;类似于一个“缓存池”。 架构 Producer&#xff1a;消息生产者&#xff1b;Consumer&#xff1a;消息消费者&#xff1b;Brok…