Kafka 位移

Consumer位移管理机制

Consumer的位移数据作为一条条普通的Kafka消息,提交到__consumer_offsets中。可以这么说,__consumer_offsets的主要作用是保存Kafka消费者的位移信息。使用Kafka主题来保存位移。

消息格式

位移主题就是普通的Kafka主题。也是一个内部主题,但它的消息格式却是Kafka自己定义的KV对(Key和Value分别表示消息的键值和消息体),用户不能修改,Kafka Consumer有API去提交位移,也就是向位移主题写消息。不要自己写个Producer随意向该主题发送消息。

主题消息的Key中应该保存标识Consumer的字段,也就是Consumer GroupGroup ID,标识唯一的Consumer Group,因为Consumer提交位移是在分区层面上进行的,即它提交的是某个或某些分区的位移,那么很显然,Key中还应该保存 Consumer要提交位移的分区

总结:位移主题的Key中应该保存3部分内容:<Group ID,主题名,分区号>

还有2种格式:

        1. 用于保存Consumer Group信息的消息,用来注册Consumer Group

        2. tombstone消息,即墓碑消息,也称delete mark:用于删除Group过期位移甚至是删除Group的消息。

位移主题的创建

当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。

分区数是怎么设置的呢?这就要看Broker端参数offsets.topic.num.partitions的取值了。它的默认值是50,因此Kafka会自动创建一个50分区的位移主题。Broker端另一个参数offsets.topic.replication.factor 控制副本数,默认为3。所以:如果位移主题是Kafka自动创建的,那么该主题的分区数是50,副本数是3。

提交位移(Committing Offsets)

Consumer需要向Kafka汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。当Consumer发生故障重启之后,就能够从Kafka中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍

从用户的角度来说,位移提交分为自动提交手动提交从Consumer端的角度来说,位移提交分为同步提交异步提交

Kafka Consumer提交位移的方式有两种:自动提交位移手动提交位移

手动提交位移

enable.auto.commit 如果值是false,则为手动提交,它能够把控位移提交的时机和频率可以使用Kafka Consumer API的consumer.commitSync等方法,当调用这些方法时,Kafka会向位移主题写入相应的消息。

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}

调用consumer.commitSync()方法的时机,是在处理完了poll()方法返回的所有消息之后。如果过早提交了位移,就可能会出现消费数据丢失的情况。它还也有一个缺陷,就是在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束,影响整个应用程序的TPS。

Kafka社区为手动提交位移提供了另一个API方法:KafkaConsumer#commitAsync() ,这是一个异步操作。调用commitAsync()之后,它会立即返回,不会阻塞,因此不会影响Consumer应用的TPS。由于它是异步的,Kafka提供了回调函数(callback),在实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用commitAsync()的方法:

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}

commitAsync是否能够替代commitSync呢?

        答案是不能。commitAsync的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过 期”或不是最新值了。因此,异步提交的重试其实没有意义,所以commitAsync是不会重试的。 

将commitSync和commitAsync组合使用

try {
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
        process(records); // 处理消息
        commitAysnc(); // 使用异步提交规避阻塞
    }
} catch(Exception e) {
    handle(e); // 处理异常
} finally {
    try {
        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    } finally {
        consumer.close();
    }
}

对于常规性、阶段性的手动提交,我们调用commitAsync()避免程序阻塞,而在Consumer要关闭前,我们调用commitSync()方法执行同步阻塞式的位移提交,以确保Consumer关闭前能够保存正确的位移数据。将两者结合后,既实现了异步无阻塞式的位移管理,也确保了Consumer位移的正确性。

分批处理(细粒度的位移提交)

        commitSync(Map<TopicPartition, OffsetAndMetadata>)

         commitAsync(Map<TopicPartition, OffsetAndMetadata>)

它们的参数是一个Map对象,键就 是TopicPartition,即消费的分区,而值是一个OffsetAndMetadata对象,保存的主要是位移数据。

例如:如何每处理100条消息就提交一次位移呢?以commitAsync为例,展示一段代码,实际上,commitSync的调用方法和它是一模一样的。

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
// 其他操作
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, String> record: records) {
        process(record); // 处理消息
        offsets.put(new TopicPartition(record.topic(), record.partition()) , new OffsetAndMetadata(record.offset() + 1);
        if(count % 100 == 0)
            consumer.commitAsync(offsets, null); // 回调处理逻辑是null
            count++;
        }
    }
}

程序先是创建了一个Map对象,用于保存Consumer消费处理过程中要提交的分区位移,之后开始逐条处理消息,并构造要提交的位移值。要提交下一条消息的位移,这里构造OffsetAndMetadata对象时,使用当前消息位移加1的原因。代码的最后部分是做位移的提交。这里设置了一个计数器,每累计100条消息就统一提交一次位移。与调用无参的 commitAsync不同,这里调用了带Map对象参数的commitAsync进行细粒度的位移提交。这样,这段代码就能够实现每处理100条消息就提交一次位移,不用再受poll方法返回的消息总数的限制了。 

自动提交位移

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Consumer端有个参数叫enable.auto.commit,如果值是true,则Consumer 定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。但是没法把控Consumer端的位移管理。

 一旦设置了enable.auto.commit为true,Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息。从顺序上来说,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer每5秒自动提交一次位移。现在,我们假设提交位移之后的3秒发生了Rebalance操作。在Rebalance之后,所有Consumer从上一次提交的位移处继续消费但该位移已经是3秒前的位移数据了,故在Rebalance发生前3秒消费的所有数据都要重新再消费一次。虽然能够通过减少auto.commit.interval.ms的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。 

自动提交位移问题:

自动提交位移,那么就可能存在一个问题:只要Consumer一直启动着,它就会无限期地向位移主题写入消息。

假设Consumer当前消费到了某个主题的最新一条消息,位移是100,之后该主题没有任何新消息产生,故Consumer无消息可消费了,所以位移永远保持在100。由于是自动提交位移位移主题中会不停地写入位移=100的消息。显然Kafka只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。这就要求Kafka必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。

Kafka使用Compact策略来删除位移主题中的过期消息,避免该主题无限期膨胀。那么应该如何定义Compact策略中的过期呢?对于同一个Key的两条消息M1M2,如果M1的发送时间早于 M2,那么M1就是过期消息。Compact的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起

图中位移为0、2和3的消息的Key都是K1。Compact之后,分区只需要保存位移为3的消息,因为它是最新发送的。 

Kafka提供了专门的后台线程定期地巡检待Compact的主题,看看是否存在满足条件的可删除数据。这个后台线程叫LogCleaner

参考:Kafka 核心技术与实战 (geekbang.org)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/755483.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机组成原理:海明校验

在上图中&#xff0c;对绿色的7比特数据进行海明校验&#xff0c;需要添加紫色的4比特校验位&#xff0c;总共是蓝色的11比特。紫色的校验位pi分布于蓝色的hi的1, 2, 4, 8, 16, 32, 64位&#xff0c;是2i-1位。绿色的数据位bi分布于剩下的位。 在下图中&#xff0c;b1位于h3&a…

高频面试题基本总结回顾2(含笔试高频算法整理)

干货分享&#xff0c;感谢您的阅读&#xff01; &#xff08;暂存篇---后续会删除&#xff0c;完整版和持续更新见高频面试题基本总结回顾&#xff08;含笔试高频算法整理&#xff09;&#xff09; 备注&#xff1a;引用请标注出处&#xff0c;同时存在的问题请在相关博客留言…

kubernetes给指定用户分配调用k8s的api权限

文章目录 概要利用RBAC添加角色权限使用shell命令创建角色权限使用配置文件创建角色权限 调用k8s的api获取k8s账户的token 小结 概要 使用kubernetes部署项目时&#xff0c;有些特殊场景&#xff0c;我们需要在自己创建的pod里面调用k8s的api来管理k8s&#xff0c;但是需要使用…

论文笔记:Spatial-Temporal Interval Aware Sequential POI Recommendation

ICDE 2022 1 intro 1.1 背景 空间&#xff08;Spatial&#xff09;和时间&#xff08;Temporal&#xff09;信息是序列 POI 推荐中两个重要且相辅相成的因素。 空间因素&#xff08;如地理距离间隔&#xff09;可以在用户的历史轨迹呈现空间分簇现象时&#xff0c;细粒度刻画…

Elasticsearch 聚合查询简介

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;&#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;&#x1f4a5;所属专栏&#xff1a;C语言 &#x1f680;本系列文章为个人学习…

Pytest集成Allure生成测试报告

# 运行并输出报告在Report文件夹下 查看生成的allure报告 1. 生成allure报告&#xff1a;pycharm terminal中输入命令&#xff1a;产生报告文件夹 pytest -s --alluredir../report 2. pycharm terminal中输入命令&#xff1a;查看生成的allure报告 allure serve ../report …

[数据集][目标检测]婴儿状态睡觉哭泣检测数据集VOC+YOLO格式7109张3类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;7109 标注数量(xml文件个数)&#xff1a;7109 标注数量(txt文件个数)&#xff1a;7109 标注…

abap 类封装Excel转换到内表

文章目录 1.封装思路2.参数2.1.参数解析3.代码4.调用案例5.该类中的其他方法截图1.封装思路 直接复制粘贴激活直接用 首先,需要你在SE11中创建一个和你Excel中的字段相同的结构,然后把这个结构名字以字符串的形式传给方法.几乎可以实现任意扁平结构的Excel转到内表. 2.参数 2…

港科夜闻 | 香港科大学者戴希教授荣获陈嘉庚科学奖及国家自然科学奖一等奖,李卫平教授荣获国家自然科学奖二等奖...

关注并星标 每周阅读港科夜闻 建立新视野 开启新思维 1、香港科大学者戴希教授荣获「陈嘉庚科学奖」及「国家自然科学奖」一等奖&#xff0c;李卫平教授荣获「国家自然科学奖」二等奖。香港科大蒙民伟博士纳米科学教授兼物理系讲座教授戴希及其团队&#xff0c;具有开创性的研究…

Stylized Modular Character (Female)(程式化的模块化角色(女性)“运动型”)

一套程式化的角色模块化部件。 在这样的插槽中定制&#xff1a; 头 躯干 手 裤子 靴子 头发 每个插槽都有 2 到 5 个在 URP 中工作的 PBR 材料的选项。 该项目基于官方 Unity Standard Assets 包中的 Ethan 默认角色。 不包含动画。 皮肤网格的 SSS 是由自发光贴图伪造的。 如果…

i-Health

技术栈&#xff1a;HTMLCSSJavascriptPHP

LabVIEW代码性能优化

优化LabVIEW代码以提高软件性能是确保系统高效运行的关键。通过分析代码结构、数据管理、并行处理、内存使用和硬件资源的有效利用&#xff0c;我们可以从多个角度提升LabVIEW程序的执行速度和稳定性。 代码结构优化 模块化编程 将复杂的程序分解成多个子VI&#xff0c;每个子V…

SpringBoot:使用Spring Batch实现批处理任务

引言 在企业级应用中&#xff0c;批处理任务是不可或缺的一部分。它们通常用于处理大量数据&#xff0c;如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分&#xff0c;专为批处理任务设计&#xff0c;提供了简化的配置和强大的功能。本文将介绍如何使用Spr…

mysql wrnning Difficult to find free blocks in the buffer pool解决方法

mysql [InnoDB] Difficult to find free blocks in the buffer pool (140397 search iterations)! 我使用的是mysql8,。 原因&#xff1a;这种情况&#xff0c;多半出现在别人在非常大的写入&#xff0c;或者百万级的查询中。 解决方式&#xff0c;centos7在线安装的mysql&am…

【Linux进阶】windows和linux文件互传的两种方式

前言 我们在windows电脑上使用ssh工具&#xff08;比如Xshell&#xff09;来远程登录并使用linux云服务器的时候&#xff0c;难免要将我们的文件传输到linux服务器上&#xff0c;或者将linux服务器的文件传输到我们的windows电脑里&#xff0c;那么&#xff0c;我们要怎么来实…

FFmpeg教程-三-播放pcm文件-1

目录 一&#xff0c;下载SDL 二&#xff0c;在Qt中测试 1&#xff0c;在pro文件中加入路径 2&#xff0c;在.cpp文件中加入头文件 3&#xff0c;进行测试 4&#xff0c;显示结果 一&#xff0c;下载SDL 通过编程的方式播放音视频&#xff0c;也是需要用到这2个库: FFmpeg…

电脑数据恢复篇:如何恢复误删除的文件

在清理电脑或优化存储设备时无意中删除重要文件是人类常见的错误。不可否认的是&#xff0c;在批量删除文件时&#xff0c;您经常会同时删​​除垃圾文件和重要文件。后来您发现一堆重要的文档或文件不见了。在这种情况下&#xff0c;您唯一的选择就是寻找恢复已删除文件的方法…

【机器学习300问】135、决策树算法ID3的局限性在哪儿?C4.5算法做出了怎样的改进?

ID3算法是一种用于创建决策树的机器学习算法&#xff0c;该算法基于信息论中的信息增益概念来选择最优属性进行划分。信息增益是原始数据集熵与划分后数据集熵的差值&#xff0c;熵越小表示数据集的纯度越高。有关ID3算法的详细步骤和算法公式在我之前的文章中谈到&#xff0c;…

单调队列优化DP——AcWing 135. 最大子序和

单调队列优化DP 定义 单调队列优化DP是一种在动态规划&#xff08;Dynamic Programming, DP&#xff09;中应用的数据结构优化方法。它利用单调队列&#xff08;Monotonic Queue&#xff09;这一数据结构来高效维护一个区间内的最值&#xff08;通常是最大值或最小值&#xf…

自定义一个背景图片的高度,随着容器高度的变化而变化,小于图片的高度时裁剪,大于时拉伸100%展示

1、通过js创建<image?>标签来获取背景图片的宽高比&#xff1b; 2、当元素的高度大于原有比例计算出来的高度时&#xff0c;背景图片的高度拉伸自适应100%&#xff0c;否则高度为auto&#xff0c;会自动被裁减 3、背景图片容器高度变化时&#xff0c;自动计算背景图片的…