kafka个人笔记

大部分内容源于https://segmentfault.com/a/1190000038173886, 本人手敲一边加强印象方便复习

消息系统的作用

解耦
冗余
扩展性
灵活性(峰值处理
可恢复
顺序保证
缓冲
异步

  • 解耦:扩展两边处理过程,只需要让他们遵守约束即可
  • 冗余:持久化数据:规避丢失风险。采用 插入-获取-删除范式明确指出消息被处理完毕
  • 扩展性:解耦处理过程,容易扩展处理过程增大消息处理频率
  • 灵活性(峰值处理:访问激增情况不常见,无需投入过多标准资源。使用消息队列顶住访问压力
  • 可恢复:系统失效时仍可保证队列消息在系统恢复后处理
  • 顺序保证:kafka保证partition内消息有序
  • 缓冲:控制和优化 数据经过系统的速度,解决生产、消费速度不一致的问题
  • 异步:允许用户把一个或若干个消息放入队列,且不立即被处理

架构

在这里插入图片描述

  1. producer,消息生产者
  2. broker:kafka集群的服务器
  3. topic:消息的类别
  4. partition:kafka分配单位,一个topic包含一个或多个partition
  5. consumer:消息消费者,终端或服务
  6. comsumer group:
    high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  7. replica:partition副本
  8. leader:特殊的replica,producer和consumer只和leader交互
  9. follower:除了leader的replica都为follwer,复制数据
  10. controller:服务器:用于leader选举和failover
  11. zookepper,存储集群meta信息

发布消息

producer用push发布到broker,消息被append到partition,顺序写磁盘

消息路由

//构造函数
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
     if (topic == null)
          throw new IllegalArgumentException("Topic cannot be null");
     if (timestamp != null && timestamp < 0)
          throw new IllegalArgumentException("Invalid timestamp " + timestamp);
     this.topic = topic;
     this.partition = partition;
     this.key = key;
     this.value = value;
     this.timestamp = timestamp;
}


private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
     Integer partition = record.partition();
     if (partition != null) {//指定了 partition 则直接使用
          List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
          int lastPartition = partitions.size() - 1;
          if (partition < 0 || partition > lastPartition) {
               throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
          }
          return partition;
     }//否则使用 key 计算
     return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
     int numPartitions = partitions.size();
     if (keyBytes == null) {//轮询
          int nextValue = counter.getAndIncrement();
          List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
          if (availablePartitions.size() > 0) {
               int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
               return availablePartitions.get(part).partition();
          } else {
               return DefaultPartitioner.toPositive(nextValue) % numPartitions;
          }
     } else {
          //对 keyBytes 进行 hash 选出一个 patition
          return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
}

  1. 指定partition直接用
  2. 未指定partition但指定了key,对key进行hash得到partition
  3. 都未指定,使用轮询

写入流程

在这里插入图片描述

  1. producer从zk的/brokers/…/stateleader
  2. producer发消息给leader
  3. leader把消息写入log
  4. follower从leader拉取消息写入log后发送ACK给leader
  5. leader收到所有replica的ACK后,增加high watermark(位置信息,即位移(offset))给producer发送ack

投递保证

    ① At most once 消息可能会丢,但绝不会重复传递

    ② At least one  消息绝不会丢,但可能会重复传递

    ③ Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的

默认 at least one

接收消息的行为

  1. comsumer从broker读取消息后,可以选择commit或处理消息
    1. 如果commit
      1. zookeeper存在comsumer在partition下读取消息的offset
      2. comsumer下次读取partition从下一条开始读取
    2. 未commit
      1. 下次读取位置和上次commit后开始位置相同

at most once

读完消息先commit再处理消息。
若commit后未处理消息系统崩坏,下次重新开始工作无法读到已提交但未处理的消息

At least once

读完消息先处理再commit消费状态(保存offset)
若处理消息后未commit系统崩坏,重新工作的时候会处理未commit的消息(处理两次)

Exactly once 两阶段提交

协调offset和实际操作的输出。但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方

  1. consumer拿到数据后可能把数据放到HDFS
  2. 最新的offset和数据一起写到HDFS
  3. 保证offset更新和数据输出同时完成

(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)。

消息保存

topic分为多个partition,每个partition对应一个文件夹

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据

  • 基于时间:log.retention.hours=168
  • 基于大小:log.retention.bytes=1073741824
log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:

清理超过指定时间清理: 
log.retention.hours=16

超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824

请添加图片描述

topic的创造

  1. controller在ZK的/brokers/topics 节点上注册 watcher
    ,topic被创建的时候,controller 会通过 watch 得到该 topic 的 partition/replica 分配
  2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
    1. 分配给partition的所有replica(称为AR)任选一个可用的broker作为leader并将AR设置为ISR
    2. 新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
  3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

请添加图片描述
删除 topic 的序列

  1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher
  2. topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配
  3. 若 delete.topic.enable=false,结束;反之controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest

kafka HA 高可用性

replica

同一个 partition 可能会有多个 replica —— erver.properties 配置中的 default.replication.factor=N

若没有replica,broker死机

  • patition 的数据都不可被消费
  • producer 也不能再将数据存于其上的 patition

引入replica,需要选取leader,leader与producer和consumer交互,其他replica与leader复制数据

分配规则

  1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
  2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
  3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

leader failover

partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader

新的 leader 必须拥有旧 leader commit 过的所有消息

zookeeper 中(/brokers/…/state)动态维护了一个 ISR(in-sync replicas)。只有 ISR 里面的成员才能选为 leader。若有f个replica,partition可以保证f-1个replica失效情况下消息不丢失

failover方案

  • 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
  • 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短
    多用第二种方式

broker failover

在这里插入图片描述

  1. controller在zookeeper的/brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
  2. controller从/brokers/ids 节点读取可用broker
  3. controller决定set_p,集合包含死机broker上所有partition
  4. 对set_p所有partition进行: 1. 读取/brokers/ids 节点读取可用broker的ISR 2. 决定新leader, 新leader ISR controller_epoch和leader_epoch信息写入state结点
  5. 通过RPC给broker发送 leaderAndISRRequest 命令

controller failover

controller 宕机时会触发 controller failover

  1. broker在zookeeper的controller节点注册watcher
  2. controller宕机时,zookeeper临时节点消失
  3. 所有存活broker收到fire通知
  4. 每个broker尝试创建新的controller path,其中一个竞选成功为controller
  5. 当选成功触发KafkaController.onControllerFailover
1. 读取并增加 Controller Epoch。
2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
8. 启动 replicaStateMachine 和 partitionStateMachine。
9. 将 brokerState 状态设置为 RunningAsController。
10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

消费

kafka 提供了两套 consumer API:

The high-level Consumer API
The SimpleConsumer API

consumer API

high-level提供kafka消费数据的抽象

  1. 提供了 consumer group 的语义
  2. 消息只能被group内一个consumer消费
  3. 消费的时候不关注offset
  4. 最后一个offset由zookeeper保存

使用high-level consumer API可以是多线程应用

if(消费线程 > partition){
	部分线程收不到消息
}
if(消费线程 < partition){
	有些线程收到多个partition消息
}

if(一个线程消费多个 patition){
	无法保证收到消息的顺序
}

** SimpleConsumer API**

适用以下情况

  • 多次读取一个消息
  • 只消费一个 patition 中的部分消息
  • 使用事务来保证一个消息仅被消费一次

partition, offset, broker, leader不透明,需要自己管理

  • 追踪offset确定下一条消费的信息
  • 找出每个partition的follower
  • 处理leader变更

流程如下

  1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
  2. 找到partition的follower
  3. 定义好请求,该请求应该能描述应用程序需要哪些数据
  4. fetch数据
  5. 识别leader变化并做出响应

consumer group
kafka分配单位是partition,consumer属于一个group
一个partition被一个group内的一个consumer消费(但是多个group可以同时消费这个partition)

实现离线处理与实时处理

  • spark 实时处理
  • hadoop 离线处理

消费方法

consumer用pull模式从broker读数据

push 模式很难适应消费速率不同的消费者

  • 消息发送速率是由 broker 决定的
  • 尽可能以最快速度传递消息
  • 容易造成 consumer 来不及处理消息(拒绝服务、网络拥塞

pull模式,consumer根据自己的能力消费信息

pull的优点

  • 简化broker设计
  • consumer自主控制消费速率
  • consumer自主控制消费方式 —— 批量/逐条
  • 选择不同提交方式

消费者递送保证

consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit(Exactly once

实际使用过程中,并不是consumer读完消息就结束了,还需要进一步处理。
处理和commit顺序决定了 consumer delivery guarantee

  • 先commit,后处理消息(At most once
    • consumer 在 commit 后还没来得及处理消息就 crash
    • 重新开始工作后就无法读到刚刚已提交而未处理的消息
  • 先处理再commit( At least once
    • 处理完消息之后 commit 之前 consumer crash
    • 恢复工作:处理刚刚未 commit 的消息
  • 两阶段提交
    (offset 和操作输入存在同一个地方,会更简洁和通用)
    (若不支持,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once) —— high-level API里面offset存于zookeeper中,无法存于HDFS,simple可以存于HDFS

consumer rebalance

触发机制

  • consumer加入退出
  • partition改变(broker 加入退出

算法如下

  1. 目标topic的partition排序,存于PT
  2. 选择consumer group下所有consumer排序, 存于CG
  3. N = ⌈ s i z e ( P T ) / s i z e ( C G ) ⌉ N = \lceil size(PT)/size(CG)\rceil N=size(PT)/size(CG)⌉
  4. 对group内原本的分配partition解除关系
  5. 然后每N个partition分配给一个consumer

consumer调整了单个partition后,为了保证一致性,group内其他consumer也应触发balance

导致以下问题

herd effect

  • broker,comsumer增减触发rebalance

split brain

  • 每个consumer单独通过zk判断broker和consumer宕机,不同的consumer同时从zookeeper看到的view可能不一致 —— 导致不正确的rebalance
  • 所有consumer不知道其他consumer的rebalance是否成功,导致kafka工作状态不正确
  • 因此0.9开始使用中心coordinator空值rebalance,计划在consumer客户端分配方案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/153116.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络层协议 ——— IP协议

文章目录 IP协议基本概念IP协议格式分片与组装网段划分特殊的IP地址IP地址的数量限制私网IP地址和公网IP地址路由路由表生成算法 IP协议 IP协议全称为“网际互连协议&#xff08;Internet Protocol&#xff09;”&#xff0c;IP协议是TCP/IP体系中的网络层协议。 基本概念 网…

只使用JS怎么给静态页面网站添加站内全局搜索功能?

&#x1f482; 个人网站:【 海拥】【神级代码资源网站】【办公神器】&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交流的小伙伴&#xff0c;请点击【全栈技术交流群】 背景 静态页面通常由HTML、CSS 和 JavaScript…

荣誉榜再度添彩!热烈祝贺旭帆科技荣获安徽省大数据企业!

2023年11月3日&#xff0c;安徽省数据资源管理局网站发布《关于2023年度安徽省大数据企业名单的公示》&#xff0c;经企业申报、各市初审推荐、专家评审、审查认定等程序&#xff0c;安徽旭帆信息科技有限公司&#xff08;以下简称“旭帆科技”&#xff09;凭借在视频大数据应用…

【Unity地编】地形系统搭建入门详解

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;UI_…

系列十、堆参数调优

一、堆内存调优参数 -Xms堆空间的最小值&#xff0c;默认为物理内存的1/64-Xmx堆空间的最大值&#xff0c;默认为物理内存的1/4-XX:PrintGCDetails输出详细的GC处理日志 二、获取堆内存的默认物理内存 /*** Author : 一叶浮萍归大海* Date: 2023/11/16 14:50* Description: 获…

在Linux上安装Oracle 数据库 11g

好久没碰11g了&#xff0c;今天&#xff08;2023年11月16日&#xff09;因为有个需求又装了一遍。 在OCI上安装了一个Oracle Linux 6实例&#xff1a; $ uname -a Linux instance-20231116-1239-db11g 4.1.12-124.80.1.el6uek.x86_64 #2 SMP Mon Oct 9 02:32:10 PDT 2023 x86…

ATE测试设备功能、原理、特点详解

ATE(Automatic Test Equipment)自动测试设备是用于检测电子产品、电气设备的自动化测试系统&#xff0c;是电测行业首选的一种测试方式&#xff0c;被广泛应用于通信、消费电子、汽车电子、智能家居、半导体、电源模块、医疗电子、航天航空等领域。ATE测试设备在电子设计、研发…

【自动化测试】基于Selenium + Python的web自动化框架!

一、什么是Selenium&#xff1f; Selenium是一个基于浏览器的自动化工具&#xff0c;她提供了一种跨平台、跨浏览器的端到端的web自动化解决方案。Selenium主要包括三部分&#xff1a;Selenium IDE、Selenium WebDriver 和Selenium Grid&#xff1a;  1、Selenium IDE&…

网站使用什么协议比较好

网站协议大多数使用HTTP和HTTPS HTTP协议&#xff0c;超文本传输协议&#xff08;Hypertext Transfer Protocol&#xff0c;HTTP&#xff09;是一个简单的请求-响应协议。 HTTP是应用层协议&#xff0c;同其他应用层协议一样&#xff0c;是为了实现某一类具体应用的协议&…

前台页面从数据库中获取下拉框值

后端&#xff1a;查询所有信息 前台&#xff1a;elementUI <el-select v-model"searchData.stationName" clearable> <el-option :label"item.stationName" :value"item.stationName" v-for"item in stationNameList&quo…

我这些年对于自动化测试的理解

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;欢迎加入我们一起学习&#xff01;&#x1f4e2;资源分享&#xff1a;耗时200小时精选的「软件测试」资…

MySQL/SQLServer判断字符是纯数字或者是其它字符

如下是MySQL表结构设计&#xff08;演示所用&#xff09;&#xff1a; MySQL表关联数据如下所示&#xff1a; 【场景&#xff1a;查询所有数字&#xff0c;包含小数点】&#xff0c;SQL如下所示&#xff1a; SELECT * FROM data WHERE message not REGEXP [^0-9].[^0-9] My…

centos虚拟机使用docker下载镜像太慢的解决办法

虚拟环境&#xff1a; 1、VMware Workstation 16 Pro 2、CentOS 7&#xff08;CentOS Linux release 7.9.2009 (Core)&#xff0c;内核版本3.10.0-1160.el7.x86_64&#xff09; 问题描述&#xff1a; 虚拟机可以与物理主机互相Ping通&#xff0c;也可以Ping通百度&#xff0…

MariaDB安装配置、使用、授权、增删改查以及数据库备份与恢复

目录 1 MariaDB安装 1.1 MariaDB源配置 1.2 清空缓存 1.3 安装MariaDB 2 MariaDB的基本配置 2.1 启动MariaDB 2.2 MariaDB进程查看 2.3 MariaDB数据库初始化 2.3.1 数据库初始化 2.3.2 初始化测试登录 3 MariaDB的使用 3.1 查看数据库 3.2 修改密码 3.3 创建数据库test 3…

《C++避坑神器·十七》找到程序崩溃Bug的一个实用方法:dump调试

在检查程序报错除了断点调试&#xff0c;生成log日志&#xff0c;还有种直接的方法&#xff0c;调试dump文件&#xff0c;该调试方法可以在运行exe程序崩溃时进行调试。文章末尾有下载链接。 头文件 #include "crashdump.h"在mainWindow或主程序最开始处加下面代码…

WebGoat环境搭建

首先安装jdk&#xff0c;此步骤省略…验证 直接打开cmd&#xff0c;输入以下命令&#xff1a; java -version &#xff08;可以查看安装的JDK版本。&#xff09; javac &#xff08;查看java文件编译成的class文件&#xff09; WebGoat下载 WebGoat的下载地址&#xff1a;Relea…

Nerf相关、公式

在3D重建领域&#xff0c;这幅图怎么理解 这张图展示的是“体素剪枝&#xff08;Voxel Pruning&#xff09;”在3D重建中的应用&#xff0c;这是一种利用稀疏性&#xff08;Sparsity&#xff09;来优化3D数据存储和处理的技术。体素剪枝的目的是为了降低存储需求和提高计算效率…

不可思议!中国人民大学与加拿大女王大学金融硕士还能解决金融职场的倦怠期!

职业倦怠期是指在职业生涯中&#xff0c;个体对工作产生的一种疲惫、厌倦和失去兴趣的状态。在这个阶段&#xff0c;人们可能会感到无法集中精力、缺乏动力和创造力&#xff0c;工作效率下降&#xff0c;甚至出现情绪波动和身体健康问题。职业倦怠期是一种常见的心理现象&#…

Python从 0 到 1 系统学习的全面详细内容

这里为大家梳理了一些Python从 0 到 1 系统学习的全面详细内容&#xff0c;想要系统的自学Python&#xff0c;希望我们可以提供一个框架&#xff0c;方便作为参考&#xff0c;学习Python。 1、为什么要学习Python&#xff1f; Python是一种功能强大的编程语言&#xff0c;它具…

【带头学C++】----- 六、结构体 ---- 6.7 共用体以及枚举类型

6.7 共用体以及枚举类型 结构体:结构体用于组合不同类型的数据&#xff0c;每个字段占用独立的内存空间。 共用体:共用体也组合不同类型的数据&#xff0c;但所有字段共享同一块内存。 因此&#xff0c;结构体适合表示具有多个属性的对象&#xff0c;而共用体适合表示可以具…