谈谈Flink消费kafka的偏移量

offset配置:

flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…):从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromGroupOffsets():默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。

消费者offset提交配置:

配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。5000单位毫秒,代表每5秒进行依次checkpoint

关闭checkpoint:如何禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。
开启checkpoint:如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。
总结:Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前,因为0.8之前offset是维护在zookeeper中的)的配置 ;关闭checkpoint的话,flink消费kafka数据 offset取决于kafka客户端的配置;开启checkpoint的话,flink消费kafka offset由jobmanager中的checkpoint维护,并同步到kafka中保持一置,注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况。

使用checkpoint + 两阶段提交来保证仅消费一次kafka中的数据:
Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。

当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时,也没有重复的数据或者未被处理的数据出现,实现仅一次处理的语义。

checkpoint中包含:

当前应用的状态;
当前消费流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;

使用两阶段提交协议保证flink连接外部系统数据仅一次处理

Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。

当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时,也没有重复的数据或者未被处理的数据出现,实现仅一次处理的语义。

checkpoint中包含:

当前应用的状态;
当前消费流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;

使用两阶段提交协议保证flink连接外部系统数据仅一次处理;
当Flink处理完的数据需要写入外部系统时,不保证仅一次处理数据。为了提供端到端的仅一次处理数据,在将数据写入外部系统时也要保证仅一次处理数据,这些外部系统必须提供一种手段来允许程序提交或者回滚写入操作,同时还要保证与Flink的checkpoint机制协调使用,在分布式系统中协调提交和回滚的常见方法就是两阶段提交协议。下面给出一个实例了解Flink如何使用两阶段提交协议来实现数据仅一次处理语义。

该实例是从kafka中读取数据,经过处理数据之后将结果再写回kafka。kafka0.11版本之后支持事务,这也是Flink与kafka交互时仅一次处理的必要条件。【注意:当Flink处理完的数据写入kafka时,即当sink为kafka时,自动封装了两阶段提交协议】。Flink支持仅一次处理数据不仅仅限于和Kafka的结合,只要sink提供了必要的两阶段协调实现,可以对任何sink都能实现仅一次处理数据语义。

其原理如下:

上图Flink程序包含以下组件:

一个从kafka中读取数据的source
一个窗口聚合操作
一个将结果写往kafka的sink。
要使sink支持仅一次处理数据语义,必须以事务的方式将数据写往kafka,将两次checkpoint之间的操作当做一个事务提交,确保出现故障时操作能够被回滚。假设出现故障,在分布式多并发执行sink的应用程序中,仅仅执行单次提交或回滚事务是不够的,因为分布式中的各个sink程序都必须对这些提交或者回滚达成共识,这样才能保证两次checkpoint之间的数据得到一个一致性的结果。Flink使用两阶段提交协议(pre-commit+commit)来实现这个问题。

Filnk checkpointing开始时就进入到pre-commit阶段,具体来说,一旦checkpoint开始,Flink的JobManager向输入流中写入一个checkpoint barrier将流中所有消息分隔成属于本次checkpoint的消息以及属于下次checkpoint的消息,barrier也会在操作算子间流转,对于每个operator来说,该barrier会触发operator的State Backend来为当前的operator来打快照。如下图示:

Flink DataSource中存储着Kafka消费的offset,当完成快照保存后,将chechkpoint barrier传递给下一个operator。这种方式只有在Flink内部状态的场景是可行的,内部状态指的是由Flink的State Backend管理状态,例如上面的window的状态就是内部状态管理。只有当内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些定义好的状态变量即可,checkpoint成功时Flink负责提交这些状态写入,否则就不写入当前状态。

但是,一旦operator操作包含外部状态,事情就不一样了。我们不能像处理内部状态一样处理外部状态,因为外部状态涉及到与外部系统的交互。这种情况下,外部系统必须要支持可以与两阶段提交协议绑定的事务才能保证仅一次处理数据。

本例中的data sink是将数据写往kafka,因为写往kafka是有外部状态的,这种情况下,pre-commit阶段下data sink 在保存状态到State Backend的同时,还必须pre-commit外部的事务。如下图:

当checkpoint barrier在所有的operator都传递一遍切对应的快照都成功完成之后,pre-commit阶段才算完成。这个过程中所有创建的快照都被视为checkpoint的一部分,checkpoint中保存着整个应用的全局状态,当然也包含pre-commit阶段提交的外部状态。当程序出现崩溃时,我们可以回滚状态到最新已经完成快照的时间点。

下一步就是通知所有的operator,告诉它们checkpoint已经完成,这便是两阶段提交的第二个阶段:commit阶段。这个阶段中JobManager会为应用中的每个operator发起checkpoint已经完成的回调逻辑。本例中,DataSource和Winow操作都没有外部状态,因此在该阶段,这两个operator无需执行任何逻辑,但是Data Sink是有外部状态的,因此此时我们需要提交外部事务。如下图示:

汇总以上信息,总结得出:

一旦所有的operator完成各自的pre-commit,他们会发起一个commit操作。
如果一个operator的pre-commit失败,所有其他的operator 的pre-commit必须被终止,并且Flink会回滚到最近成功完成的checkpoint位置。
一旦pre-commit完成,必须要确保commit也要成功,内部的operator和外部的系统都要对此进行保证。假设commit失败【网络故障原因】,Flink程序就会崩溃,然后根据用户重启策略执行重启逻辑,重启之后会再次commit。
因此,所有的operator必须对checkpoint最终结果达成共识,即所有的operator都必须认定数据提交要么成功执行,要么被终止然后回滚。

参考:https://blog.51cto.com/u_16213620/7923389

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/750948.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

全自动封箱机技术革新:效率优化新篇章

在日新月异的物流行业中,全自动封箱机以其高效、精准的特性,成为了不可或缺的关键设备。然而,随着市场竞争的加剧和客户需求的不断升级,如何进一步优化全自动封箱机的效率,成为了行业内外关注的焦点。 一、全自动封箱机…

俯视LLM的灵魂:一文搞懂稀疏自动编码器

实时了解业内动态,论文是最好的桥梁,专栏精选论文重点解读热点论文,围绕着行业实践和工程量产。若在某个环节出现卡点,可以回到大模型必备腔调或者LLM背后的基础模型重新阅读。而最新科技(Mamba,xLSTM,KAN)…

鸿蒙开发HarmonyOS NEXT (一) 入门

最近总听见大家讨论鸿蒙,前端转型的好方向?先入门学习下 目前官方版本和文档持续更新中 一、开发环境 提示:要占用的空间比较多,建议安装在剩余空间多的盘 1、下载:官网最新工具 - 下载中心 - 华为开发者联盟 (huaw…

滑动窗口1

1. 长度最小的子数组(209) 题目描述: 算法原理: 这题使用滑动窗口的方法来进行解决,而滑动窗口实际上就是通过定义同向双指针的方式来实现的,两个指针就是窗口的左右边界,然后结合这样的思想来给出这题的…

华为---配置基本的访问控制列表(ACL)

11、访问控制列表(ACL) 11.1 配置基本的访问控制列表 11.1.1 原理概述 访问控制列表ACL(Access Control List)是由permit或deny语句组成的一系列有顺序的规则集合,这些规则根据数据包的源地址、目的地址、源端口、目的端口等信息来描述。A…

MySQL数据库简介和安装

文章目录 一、数据库原理目前情况数据库的发展史RDBMS关系型数据库关系型数据库理论 二、MySQL历史发展历程关系型数据库和非关系型数据库 三、安装mysql及优化yum安装编译安装mysql二进制安装优化操作 四、 安装mycli插件客户端工具 一、数据库原理 目前情况 我们正处于一个…

Servlet中请求转发【 Forward】与重定向【Redirection】的区别

在Servlet中,请求转发(Request Forwarding)和重定向(Redirection)是用于控制请求流程的两种不同机制。它们的主要区别如下: 一、请求转发 服务器内部操作:转发是在服务器内部进行的操作&#xf…

Datax学习

DataX学习 DataX3.0概览 DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。学习可见官网资料(https://github.com/alibaba/DataX)。 设计理念&#…

MySQL之可扩展性(六)

可扩展性 向外扩展 12.重新均衡分片数据 如有必要,可以通过在分片间移动数据来达到负载均衡。举个例子,许多读者可能听一些大型图片分享网站或流行社区网站的开发者提到过用于分片间移动用户数据的工具。在分片间移动数据的好处很明显。例如&#xff…

【JavaScript】流程控制和函数

目录 一、分支语句 1、if语句: 2、switch语句: 二、循环语句 1、while循环语句 2、for循环语句 三、函数声明 1、function 函数名(形参列表){ 函数体 } 2、var 函数名function(形参列表){函数体} 一、分支语句 1、if语句: if(表达式){ }else …

高考志愿不知道怎么填?教你1招,用这款AI工具,立省4位数

高中的岁月,就像一本厚厚的书,我们一页页翻过,现在,终于翻到了最后一页。但这不是结束,这是新的开始,是人生的新篇章。 高考落幕,学子们在短暂的放松后,又迎来了紧张的志愿填报。 “…

达梦数据库的系统视图v$locked_object

达梦数据库的系统视图v$locked_object 在达梦数据库(Dameng Database)中,V$LOCKED_OBJECT 视图提供了与数据库中被锁定对象相关的信息。这通常用于监控和诊断数据库中的锁定问题,帮助管理员了解哪些对象被锁定了,以及…

如何在Windows 11上设置默认麦克风和相机?这里有详细步骤

如果你的Windows 11计算机上连接了多个麦克风或网络摄像头,并且希望自动使用特定设备,而不必每次都在设置中乱动,则必须将首选设备设置为默认设备。我们将向你展示如何做到这一点。 如何在Windows 11上更改默认麦克风 有两种方法可以将麦克…

工商银行:低息差下的挣扎

时隔四年,市值再度超越贵州茅台成为A股“股王”。 今天要说的就是“宇宙行”——中国工商银行 虽然茅台的信仰开始崩塌,但各大银行股巨头们今年也不好过。2024年一季度六大行业绩集体受挫,息差普遍收窄超过20个基点。其中,包括工…

【Web3】Web3.js 启动!并解决Web3 is not a constructor报错

苏泽 大家好 这里是苏泽 一个钟爱区块链技术的后端开发者 本篇专栏 ←持续记录本人自学智能合约学习笔记和经验总结 如果喜欢拜托三连支持~ 本节教大家如何启动Web3.js 目录 Web3 启动! 于是很愉快的报错 创建实例! 出来了 Web3:模块…

代码随想录——跳跃游戏Ⅱ(Leetcode 45)

题目链接 贪心 class Solution {public int jump(int[] nums) {if(nums.length 1){return 0;}int count 0;// 当前覆盖最远距离下标int curDistance 0;// 下一步覆盖距离最远下标int nextDistance 0;for(int i 0; i < nums.length; i){nextDistance Math.max(nums[…

指针并不是用来存储数据的,而是用来存储数据在内存中地址(内存操作/函数指针/指针函数)

推荐&#xff1a;1、4、5号书籍 1. 基本概念 首先&#xff0c;让小明了解指针的基本概念&#xff1a; 指针的定义&#xff1a;指针是一个变量&#xff0c;它存储的是另一个变量的地址。指针的声明&#xff1a;例如&#xff0c;int *p表示一个指向整数的指针变量p。 2. 形象…

Mac 微信能上网但浏览器打不开网页

文章目录 推荐 DNSMac 设置 DNS 推荐 DNS 名称首选 DNS备用 DNSGoogle8.8.8.88.8.4.4114 DNS114.114.114.114114.114.115.115阿里223.5.5.5百度180.76.76.76腾讯119.29.29.29电信101.226.4.6联通123.125.81.6移动101.226.4.6铁通101.226.4.68福建电信218.85.152.99218.85.157.…

基于elastic stack的docker-compose部署的ELK与LDAP集成

说明&#xff1a; ldap信息配置到es配置文件上&#xff0c;然后kibana读取es的配置信息 用户与角色的关系通过role_mapping.yml文件配置获取 角色与权限的关系通过elastic stack提供的DevTools或API进行维护 一、前置条件&#xff1a; 1.1 es已开启xpack&#xff08;已开启…

QQ等级评估源码+软件

今天&#xff0c;我将和大家探讨一个与直播、撸礼物相关的主题&#xff0c;它涉及到的是一种特殊的软件及其源码——QQ等级评估工具。在我们的生活中&#xff0c;直播已经成为了一种越来越流行的娱乐方式。不论是音乐会、电子竞技&#xff0c;还是日常生活分享&#xff0c;你都…