Kafka学习笔记

目录

    • 常见术语
    • 如何解决数据重复和数据乱序
      • 幂等性
      • 幂等性的缺陷
      • 事务
      • 事务原子性
      • 事务原理流程图
    • 如何解决数据积压
      • 解决方法
    • Kafka的高水位(HW)和Leader Epoch
      • 副本同步机制解析
      • Leader Epoch是如何解决消息丢失和消息不一致的问题的

常见术语

Kafka的三层消息架构:
第一层是主题层,每个主题可以配置M个分区,而每个分区又可以配置N个副本。
第二层是分区层,每个分区的N个副本中只能有一个充当领导者角色,对外提供服务;其他N-1个副本是追随者副本,只是提供数据冗余之用。(客户端程序只与分区的领导者副本进行交互
第三层是消息层,分区中包含若干条消息,每条消息的位移从0开始,依次递增。

在Kafka中,发布订阅的对象是主题(Topic),每个业务、每类数据都能创建对应的主题。

向主题发布消息的客户端应用程序称为生产者(Producer)

订阅这些主题消息的客户端应用程序就被称为消费者(Consumer)

生产者和消费者统称为客户端(Clients)。

Kafka的服务器端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成,Broker负责接收和处理客户端发送过来的请求,以及对消息进行持久化。

为了保证高可用,可以采取多副本进行数据同步。Kafka定义了两类副本:领导者副本(Leader Replica)和追随者副本(Follower Replica)。

生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。

为了保证系统的伸缩性,Kafka中的分区机制指的是将每个主题划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区0中,要么在分区1中。

副本是在分区这个层级定义的。生产者向分区写入消息,每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。

每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)。

如何解决数据重复和数据乱序

Kafka默认提供的交付可靠性保障是“至少一次”,即消息不会丢失,但有可能被重复发送。

只有Broker成功“提交”消息且Producer接到Broker的应答才会认为该消息成功发送。不过倘若消息成功“提交”,但Broker的应答没有成功发送回Producer端(比如网络出现瞬时抖动),那么Producer就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是Kafka默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。

对于涉及金钱交易的系统,我们更希望交付方式是“精确一次”,即消息既不会丢失,也不会被重复处理。Kafka通过两种机制:幂等性(Idempotence)和事务(Transaction)保证了“精确一次”的交付。

幂等性

Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重,Kafka为了实现幂等性,在0.11.0.0之后加入的该新功能,它在底层设计架构中引入了ProducerID和SequenceNumber。

  • ProducerID:每个生产者启动时,Kafka 都会给它分配一个 ID,ProducerID 是生产者的唯一标识,需要注意的是,Kafka 重启也会重新分配 PID
  • SequenceNumber :对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

如果Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:
在这里插入图片描述
每个新的Producer在初始化的时候会被分配一个唯一的PID(凡是开启幂等性都是需要生成PID,只不过未开启事务的PID可以在任意broker生成,而开启事务只能在TransactionCoordinator节点生成),该PID对用户完全透明而不会暴露给用户。Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其消息序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:

  • 如果消息序号比Broker维护的序号大于1以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
  • 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber

上述设计解决了数据重复和数据乱序的问题。

幂等性的缺陷

只能保证 Producer 在单个会话内不丟不重 ,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);另外幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性 ,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

如果需要跨会话、跨多个 topic-partition 的情况,需要使用 Kafka 的事务性来实现。

事务

为了实现有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。应用程序必须提供一个稳定的(重启后不变)唯一的 ID,也即Transaction ID。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。

另外,为了保证新的 Producer 启动后,旧的具有相同Transaction ID的 Producer 即失效,每次 Producer 通过Transaction ID拿到 PID 的同时,还会获取一个单调递增的 epoch。由于旧的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 可以很容易识别出该 Producer 是老的 Producer 并拒绝其请求。

有了Transaction ID和epoch后,Kafka 可保证:

  • 跨 Session 的数据幂等发送。当具有相同Transaction ID的新的 Producer 实例被创建且工作时,旧的且拥有相同Transaction ID的 Producer 将不再工作。
  • 跨 Session 的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么 Commit 要么 Abort,使得新实例从一个正常状态开始工作。

事务原子性

事务原子性是指 Producer 将多条消息作为一个事务批量发送,要么全部成功要么全部失败。 引入了一个服务器端的模块,名为Transaction Coordinator,用于管理 Producer 发送的消息的事务性。

该Transaction Coordinator维护Transaction Log,该 log 存于一个内部的 Topic 内。由于 Topic 数据具有持久性,因此事务的状态也具有持久性。

Producer 并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。

Transaction Log的设计与Offset Log用于保存 Consumer 的 Offset 类似。

在Kafka Stream 应用中同时包含 Consumer 和 Producer(即Consumer-Transform-Producer),前者负责从 Kafka 中获取消息,后者负责将处理完的数据写回 Kafka 的其它 Topic 中。

为了实现该场景下的事务的原子性,Kafka 需要保证对 Consumer Offset 的 Commit 与 Producer 对发送消息的 Commit 包含在同一个事务中。否则,如果在二者 Commit 中间发生异常,根据二者 Commit 的顺序可能会造成数据丢失和数据重复:

  • 如果先 Commit Producer 发送数据的事务再 Commit Consumer 的 Offset,即At Least Once语义,可能造成数据重复。
  • 如果先 Commit Consumer 的 Offset,再 Commit Producer 数据发送事务,即At Most Once语义,可能造成数据丢失。

事务原理流程图

在这里插入图片描述

如何解决数据积压

导致突然积压的原因肯定是多种多样的,不同的系统、不同的情况有不同的原因,不能一概而论。但能导致积压突然增加,最粗粒度的原因,只有两种:1、要么是发送变快了,2、要么是消费变慢了。

解决方法

1、生产者producer:

  • batch.memory修改缓冲区大小

    设置发送消息的缓冲区,默认值是33554432,就是32MB

    如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住。

  • compression.type压缩格式

    默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。

  • batch.size批次大小

    设置merge batch合并批次消息的大小

    如果 batch 批次太小,会导致频繁网络请求,吞吐量下降;

    如果batch批次太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里。

    默认值是:16384,就是16kb,也就是一个batch批次满了16kb就发送出去,一般在实际生产环境,这个batch批次的值可以增大一些来提升吞吐量,可以自己压测一下。

  • linger.ms等待时长

    这个值默认是0,意思就是消息必须立即被发送,但是这是不对的。

    一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch批次,如果100毫秒内,这个batch批次满了16kb,自然就会发送出去。

    但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。

2、消费者consum :

  • 提升消费者组中的消费者数以及Topic中的分区数,让二者相等,假设设置为3个分区 = 3CPU。

  • 提高消费者拉取数据的能力,比如Flume每次拉取的数据可以由1000条改为3000条、Spark中将限流的参数增大、Flink中保证数据的处理效率等。

Kafka的高水位(HW)和Leader Epoch

Kafka在保证数据的可靠性上使用的是‘数据冗余’的方式,即将一个分区下的数据保存到多个副本中,起到备份数据的作用。这样如果leader挂了,会重新选举follower作为leader继续工作,那么Kafka中leader副本是如何将数据同步到follower副本中的呢。

在这里插入图片描述

  • 什么是高水位?
    在Kafka中,高水位是一个位置信息标记,它是用消息位移来表征的,比如某个副本中HW=4,就是这个副本的高水位在offset=4那个位置上。

HW的作用
在Kafka中,高水位的作用主要有2个

1.定义消息的可见性,即用来标识分区下的哪些消息是可以被消费的,比如某个分区的HW(leader的HW)是8,那么这个分区只有 < 8 这些位置上消息可以被消费。即高水位之前的消息才被认为是已提交的消息,才可以被消费。

2.帮助Kafka完成副本同步。

需要注意的是,位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的

还有一个日志末端位移的概念,Log End Offset,缩写是LEO。它表示副本写入下一条消息的位移值。上图中LEO是15,即下一条新消息的位移是15,8-14这些位置上的消息就是未提交消息。同一个副本对象,其高水位值不会大于LEO值。

分区的高水位就是其leader副本的高水位。

副本同步机制解析

follower向leader副本发送fetch同步数据请求,此时他们的HW和LEO都是0。
在这里插入图片描述
当生产者向leader发送一条消息,且提交成功后,leader的LEO更新为1,这个时候follower发现有消息可以拉取了,于是follower的LEO也更新为1,但是此时leader和follower的HW都为0,它们需要在下一轮的拉取中被更新。
在这里插入图片描述
在新一轮的拉取请求中,因为之前位移值是0的消息已经拉取成功了,所以follower这次请求拉取的是位移值=1的消息。leader接收到这个请求后,将远程副本LEO更新为1,然后更新leader的HW=1,最后将HW=1发送给follower副本,follower收到后将自己的高水位值更新成1。至此,一次完整的消息同步周期就结束了。Kafka就是利用这样的机制,实现了leader和follower之前的同步。
在这里插入图片描述
依托于高水位,Kafka既界定了消息的对外可见性,又实现了异步的副本同步机制。但是这里面还存在一些问题。刚才我们了解到的Kafka副本同步的过程中,follower副本的高水位更新需要一轮额外的拉取请求才能实现。如果有多个follower副本,情况可能更糟,就需要多轮拉取请求。也就是说,leader副本的高水位更新和follower副本高水位更新在时间上是存在一定延迟的,这样会导致数据丢失或者数据不一致。数据丢失主要是由于副本重启后,LEO和HW不一致导致日志截断造成的。

另外上图中副本A和B,其中副本A中有两条消息,LEO是2、HW是2,B的LEO和HW都是1,假设在同一时刻,A和B都宕机了,然后B先醒过来,那么B成了新的leader,然后他收到生产者发来的m3消息,然后B的LEO和HW都更新成了2。
当A醒过来后,会先根据HW判断是否需要进行日记截断,这里HW和LEO相等,发现不需要进行日志截断,然后跟B进行同步,这个时候A和B的LEO都是2,这样A中的消息是<m1,m2>,B中的消息是<m1,m3>,出现了消息不一致的情况。

Kafka中采用了Leader Epoch来解决这个问题。

Leader Epoch是如何解决消息丢失和消息不一致的问题的

Leader Epoch 可以认为是leader的版本,它由两部分数据组成。

  • Epoch,一个单调增加的版本号。每当副本领导权发生变更后,都会增加该版本号。小版本号的leader被认定是过期leader。
  • 起始位移,leader副本在该Epoch值上写入的首条消息的位移。

在这里插入图片描述
还是刚才那个场景,现在有了Leader Epoch机制的介入,当副本B重启回来后,会向A发送一个请求去获取leader中的LEO,发现A的LEO=2,不比它自己的LEO值小,而且缓存中没有保存任何起始位移值 > 2的Epoch条目,这样B就不需要执行日志截断操作了。
然后副本A宕机了,B成为了leader,B的Leader Epoch由原来的<0, 0> 更新成了 <1, 2>,意思说是B成为了新的leader,版本号+1,这个leader的起始消息位移值为2。
A重启回来后会向B发送请求获取B的LEO,发现等于2,和自己相同,并且缓存中的Leader Epoch的起始位移值是2,也不需要进行日志截断。这样就不会出现消息丢失的问题了。

有了Leader Epoch机制的加入,当B变为leader后,producer发送m3消息到B中,数据保存到磁盘上,Leader Epoch会更新为<1,1>,然后A醒过来后,会先发送请求知道B的LEO值为2和自己一样,然后通过缓存的Leader Epoch值,得知下一条要写入的消息是1的位置,然后就会进行日志截断,将原先的m2删除,再将m3写入。由此解决了消息不一致的问题。

有了Leader Epoch机制的加入,当B变为leader后,producer发送m3消息到B中,数据保存到磁盘上,Leader Epoch会更新为<1,1>,然后A醒过来后,会先发送请求知道B的LEO值为2和自己一样,然后通过缓存的Leader Epoch值,得知下一条要写入的消息是1的位置,然后就会进行日志截断,将原先的m2删除,再将m3写入。由此解决了消息不一致的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/93222.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Web安全测试(三):SQL注入漏洞

一、前言 结合内部资料&#xff0c;与安全渗透部门同事合力整理的安全测试相关资料教程&#xff0c;全方位涵盖电商、支付、金融、网络、数据库等领域的安全测试&#xff0c;覆盖Web、APP、中间件、内外网、Linux、Windows多个平台。学完后一定能成为安全大佬&#xff01; 全部…

简述docker映射(Mapping)和挂载(Mounting)

映射的概念&#xff1a; 将容器内的端口映射到主机的端口上&#xff0c;这样就可以通过主机的网络接口与容器内部进行通信。主机上对应端口的请求会被转发到容器内部&#xff0c;从而实现对容器内部程序的通信访问&#xff08;注意&#xff01;这里提到的容器内部的端口并不一定…

arcgis的MapServer服务查询出来的结果geometry坐标点带*的问题

不知道小伙伴使用arcgis server服务做查询的时候&#xff0c;有没有遇到下面的问题 原因是查询结果中出现*字符 这个问题一直困扰了我很久&#xff1a;因为从数据库查询的坐标点是没有问题的。 一开始有同事遇到过&#xff0c;说重新插入下就好了&#xff0c;有时候确实能解决…

Qt 获取文件图标、类型 QFileIconProvider

Qt中获取系统图标、类型是通过QFileIconProvider来实现的&#xff0c;具体如下&#xff1a; 一、Qt获取系统文件图标1、获取文件夹图标QFileIconProvider icon_provider;QIcon icon icon_provider.icon(QFileIconProvider::Folder);2、获取指定文件图标QFileInfo file_info(n…

最详细jdk安装以及配置环境(保姆级教程)

一.进入oracle官网&#xff0c;下载jdk oracle官网&#xff1a;Oracle | Cloud Applications and Cloud Platform ps:不同的浏览器&#xff0c;可能进入oracle官网&#xff0c;会只显示部分内容&#xff0c;所以建议使用google Chrome浏览器 在下载之前&#xff0c;首先需要去…

UML四大关系

文章目录 引言UML的定义和作用UML四大关系的重要性和应用场景关联关系继承关系聚合关系组合关系 UML四大关系的进一步讨论UML四大关系的实际应用软件开发中的应用其他领域的应用 总结 引言 在软件开发中&#xff0c;统一建模语言&#xff08;Unified Modeling Language&#x…

7 集群基本测试

1. 上传小文件到集群 在hadoop路径下执行命令创建一个文件夹用于存放即将上传的文件&#xff1a; [atguiguhadoop102 ~]$ hadoop fs -mkdir /input上传&#xff1a; [atguiguhadoop102 hadoop-3.1.3]$ hadoop fs -put wcinput/work.txt /input2.上传大文件 [atguiguhadoop1…

Mybatis-Plus快速入门

目录 一、基础工程 1、创建一个数据库&#xff1a;mp 2、添加数据 3、创建初始工程 4、添加依赖 二、Mybatis Mybatis-Plus 1、创建子工程&#xff1a;mybatis-plus-simple 2、在子工程下添加配置 2.1Mybatis实现查询User 2.1.1、编写User实体对象 2.1.2、编写UserMa…

pdf.js构建时,报Cannot read property ‘createChildCompiler‘ of undefined #177的解决方法

在本地和CI工具进行构建时&#xff0c;报如下错误。 Cannot read property createChildCompiler of undefined #177解决方法&#xff1a; 找到vue.config.js&#xff0c;在 module.exports {parallel: false, //新增的一行chainWebpack(config) {....config.module.rule(&…

微信小程序 基于Android的美容理发师预约管理系统

&#xff0c;本系统主要根据管理员、用户及理发师的实际需要&#xff0c;方便用户利用互联网实现对商品信息进行立即订购&#xff0c;同时让管理者可以通过这个系统对用户实际需求以及各信息进行管理。设计该系统主要目的是为了方便用户、理发师可以有一个非常好的平台体验&…

Qt/C++编写视频监控系统80-远程回放视频流

一、前言 远程回放NVR或者服务器上的视频文件&#xff0c;一般有三种方式&#xff0c;第一种是调用厂家的SDK&#xff0c;这个功能最全&#xff0c;但是缺点明显就是每个厂家的设备都有自己的SDK&#xff0c;只兼容自家的设备&#xff0c;如果你的软件需要接入多个厂家的&…

利用LLM模型微调的短课程;钉钉宣布开放智能化底座能力

&#x1f989; AI新闻 &#x1f680; 钉钉宣布开放智能化底座能力AI PaaS&#xff0c;推动企业数智化转型发展 摘要&#xff1a;钉钉在生态大会上宣布开放智能化底座能力AI PaaS&#xff0c;与生态伙伴探寻企业服务的新发展道路。AI PaaS结合5G、云计算和人工智能技术的普及和…

前端如何走通后端接口

0 写在前面 现在基本都是前后端分离的项目了&#xff0c;那么前端小伙伴如何获取后端小伙伴接口呢&#xff1f; 1 条件 同一WiFi下&#xff0c;让后端小伙伴分享出自己的ip地址&#xff1a; 步骤1:winr调出运行界面 步骤2&#xff1a;cmd调出命令行窗口 步骤3&#xff1a;…

接口多态 面试题及习题

基础题目 第一题&#xff1a;概念辨析 什么是接口&#xff0c;如何定义接口&#xff1f; 接口&#xff0c;是Java语言中一种引用类型&#xff0c;是方法的集合。使用interface关键定义接口&#xff0c;其中可以定义抽象方法&#xff0c;默认方法&#xff0c;私有方法&#xf…

ESP32应用教程(0)— PMW3901MB光流传感器

文章目录 前言 1 传感器介绍 1.1 关键特征 1.2 关键参数 2 硬件概述 2.1 信号引脚 2.2 参考电路图 3 寄存器 3.1 寄存器列表 3.2 性能优化寄存器 4 代码说明 4.1 结构体说明 4.2 编译说明 5 波形分析 前言 本文介绍了在 ESP32 DEVKIT V1 开发板上开发 PMW3901MB…

VR防地质灾害安全教育:增强自然灾害知识,提高自我保护意识

VR防地质灾害安全教育系统是一种虚拟仿真技术&#xff0c;可以通过虚拟现实技术模拟地震、泥石流、滑坡等地质灾害的发生和应对过程&#xff0c;帮助人们提高应对突发自然灾害的能力。这种系统的优势在于可以增强自然灾害知识&#xff0c;提高自我保护意识&#xff0c;锻炼人们…

4. 池化层相关概念

4.1 池化层原理 ① 最大池化层有时也被称为下采样。 ② dilation为空洞卷积&#xff0c;如下图所示。 ③ Ceil_model为当超出区域时&#xff0c;只取最左上角的值。 ④ 池化使得数据由5 * 5 变为3 * 3,甚至1 * 1的&#xff0c;这样导致计算的参数会大大减小。例如1080P的电…

R语言实现网状Meta分析(1)

#R语言实现网状Meta library(gemtc) help(package"gemtc") data<-gemtc::smoking #注意按照实例格式编写数据 net<-mtc.network(data$data.ab) #网状图 plot(net,mode"circle",displaylabelsT,boxed.labelF) summary(net) #网状model model<-mtc…

wazuh环境配置和漏洞复现

1.wazuh配置 虚拟机 &#xff08;OVA&#xff09; - 替代安装 (wazuh.com)在官方网页安装ova文件 打开VMware选择打开虚拟机&#xff0c;把下载好的ova文件放入在设置网络改为NAT模式 账号:wazuh-user 密码:wazuh ip a 查看ip 启动小皮 远程连接 账号admin …

温故知新之:代理模式,静态代理和动态代理(JDK动态代理)

0、前言 代理模式可以在不修改被代理对象的基础上&#xff0c;通过扩展代理类&#xff0c;进行一些功能的附加与增强。 1、静态代理 静态代理是一种代理模式的实现方式&#xff0c;它在编译期间就已经确定了代理对象&#xff0c;需要为每一个被代理对象创建一个代理类。静态代…