Flink 的时间属性及原理解析

FlinkAPI大体上可以划分为三个层次:处于最底层的ProcessFunction中间一层的DataStream API最上层的SQL/Table API,这三层中的每一层都非常依赖于时间属性。时间在Flink中的地位如下图所示:
[点击并拖拽以移动] ​

时间属性是流处理中最重要的一个方面,是流处理系统的基石之一,贯穿这三层API。在DataStream API这一层中因为封装方面的原因,我们能够接触到时间的地方不是很多,所以我们将重点放在底层的ProcessFunction和最上层的SQL/Table API

Flink 时间语义

不同的应用场景拥有不同的时间语义,Flink作为一个先进的分布式流处理引擎,它本身支持不同的时间语义。其核心是Processing Time(窗口时间即处理时间) 和Event TimeRow Time,事件发生时间),这两类时间主要的不同点如下表所示:

Processing TimeEvent Time
真实世界的时间数据世界的时间
处理数据节点的本地时间记录携带的 Timestamp
处理简单处理复杂
结果不确定(无法重现)结果确定(可重现)

Processing Time是来模拟我们真实世界的时间 ,其实就算是处理数据的节点本地时间也不一定是完完全全的真实世界的时间,所以说它是用来模拟真实世界的时间。而 Event Time是数据世界的时间,即我们要处理的数据流世界里的时间。关于他们的获取方式,Process Time是通过直接去调用本地机器的时间,而Event Time则是根据每一条处理记录所携带的时间戳来判定。
这两种时间在Flink内部的处理以及用户的实际使用方面,难易程度都是不同的。相对而言的Processing Time处理起来更加的简单,而 Event Time要更麻烦一些。而在使用Processing Time的时候,我们得到的处理结果(或者说流处理应用的内部状态)是不确定的。而因为在Flink 内部对Event Time做了各种保障,使用Event Time的情况下,无论重放数据多少次,都能得到一个相对确定可重现的结果。

因此在判断应该使用Processing Time还是Event Time的时候,可以遵循一个原则:当你的应用遇到某些问题要从上一个checkpoint或者 savepoint进行重放,是不是希望结果完全相同。如果希望结果完全相同,就只能用Event Time;如果接受结果不同,则可以用Processing TimeProcessing Time的一个常见的用途是,根据现实时间来统计整个系统的吞吐,比如要计算现实时间一个小时处理了多少条数据,这种情况只能使用Processing Time

时间的特性

Processing Time:递增;
Event Time:一定程度的乱序;

时间的一个重要特性是:时间只能递增,不会来回穿越。 在使用时间的时候我们要充分利用这个特性。假设我们有这么一些记录,然后我们来分别看一下Processing Time还有Event Time对于时间的处理。
【1】对于Processing Time,因为我们是使用的是本地节点的时间(假设这个节点的时钟同步没有问题),我们每一次取到的Processing Time肯定都是递增的,递增就代表着有序,所以说我们相当于拿到的是一个有序的数据流。
【2】而在用Event Time的时候因为时间是绑定在每一条的记录上的,由于网络延迟、程序内部逻辑、或者其他一些分布式系统的原因,数据的时间可能会存在一定程度的乱序,比如下图的例子。在Event Time场景下,我们把每一个记录所包含的时间称作Record Timestamp。如果Record Timestamp所得到的时间序列存在乱序,我们就需要去处理这种情况。
[点击并拖拽以移动] ​

如果单条数据之间是乱序,我们就考虑对于整个序列进行更大程度的离散化。简单地讲,就是把数据按照一定的条数组成一些小批次,但这里的小批次并不是攒够多少条就要去处理,而是为了对他们进行时间上的划分。经过这种更高层次的离散化之后,我们会发现最右边方框里的时间就是一定会小于中间方框里的时间,中间框里的时间也一定会小于最左边方框里的时间。
[点击并拖拽以移动] ​

这个时候我们在整个时间序列里插入一些类似于标志位的特殊的处理数据,这些特殊的处理数据叫做watermark。一个watermark本质上就是一个timestamp数值,表示后到来的数据再也没有小于或等于这个时间watermark的了

Timestamp 和 Watermark 行为概览

[点击并拖拽以移动] ​

接下来我们重点看一下Event Time里的Record Timestamp(简写成timestamp)和watermark的一些基本信息。绝大多数的分布式流计算引擎对于数据都是进行了 DAG 图的抽象,它有自己的数据源,有处理算子,还有一些数据汇。数据在不同的逻辑算子之间进行流动。watermarktimestamp有自己的生命周期,主要分为watermarktimestamp的产生、他们在不同的节点之间的传播、以及在每一个节点上的处理

Timestamp分配和Watermark生成: Flink支持两种watermark生成方式。第一种是在SourceFunction中产生,相当于把整个的timestamp分配和watermark生成的逻辑放在流处理应用的源头。我们可以在SourceFunction里面通过这两个方法产生watermark
【1】通过collectWithTimestamp方法发送一条数据,其中第一个参数就是我们要发送的数据,第二个参数就是这个数据所对应的时间戳;也可以调用emitWatermark去产生一条watermark,表示接下来不会再有时间戳小于等于这个数值记录。
【2】另外,有时候我们不想在SourceFunction里生成timestamp或者watermark,或者说使用的SourceFunction本身不支持,我们还可以在使用DataStreamAPI的时候指定,调用的DataStream.assignTimestampsAndWatermarks这个方法,能够接收不同的timestampwatermark的生成器。

总体上而言生成器可以分为两类: 第一类是定期生成器;第二类是根据一些在流处理数据流中遇到的一些特殊记录生成的。

定期生成根据特殊记录生成
现实时间驱动数据驱动
没个一段时间调用生成方法分一次分配Timestamp都会调用生成的方法
实现AssignerWithPeriodicWatermarks实现 AssignerWithPunctuatedWatermarks

两者的区别主要有三个方面,首先定期生成是现实时间驱动的,这里的定期生成主要是指watermark(因为timestamp是每一条数据都需要有的),即定期会调用生成逻辑去产生一个watermark。而根据特殊记录生成是数据驱动的,即是否生成watermark不是由现实时间来决定,而是当看到一些特殊的记录就表示接下来可能不会有符合条件的数据再发过来了,这个时候相当于每一次分配Timestamp之后都会调用用户实现的watermark生成方法,用户需要在生成方法中去实现watermark的生成逻辑。

在分配timestamp和生成watermark的过程中,虽然在SourceFunctionDataStream中都可以指定,但是还是建议生成的工作越靠近 DataSource越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据是否乱序。Flink 内部提供了很好的机制去保证这些timestampwatermark被正确地传递到下游的节点。

Watermark 传播

具体的传播策略基本上遵循这三点:
【1】watermark会以广播的形式在算子之间进行传播。比如说上游的算子连接了三个下游的任务,它会把自己当前的收到的watermark以广播的形式传到下游。

广播特点: 主机之间“一对所有”的通讯模式,网络对其中每一台主机发出的信号都进行无条件复制并转发,所有主机都可以接收到所有信息(不管你是否需要)

【2】如果在程序里面收到了一个Long.MAX_VALUE这个数值的watermark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就是一个终止的标志。
【3】对于单流而言,这个策略比较好理解,而对于有多个输入的算子,watermark的计算就有讲究了,一个原则是:单输入取其大,多输入取小

[点击并拖拽以移动] ​

举个例子,上图蓝色代表一个算子的一个任务,然后它有三个输入,分别是W1W2W3,这三个输入可以理解成任何输入,这三个输入可能是属于同一个流,也可能是属于不同的流。然后在计算watermark的时候,对于单个输入而言是取他们的最大值,因为我们都知道 watermark应该遵循一个单调递增的一个原则。对于多输入,它要统计整个算子任务的watermark时,就会取这三个计算出来的watermark的最小值。即一个多个输入的任务,它的watermark受制于最慢的那条输入流。这一点类似于木桶效应,整个木桶中装的水会受制于最矮的那块板。

watermark在传播的时候有一个特点是,它的传播是幂等的。多次收到相同的watermark,甚至收到之前的watermark都不会对最后的数值产生影响,因为对于单个输入永远是取最大的,而对于整个任务永远是取一个最小的。同时我们可以注意到这种设计其实有一个局限,具体体现在它没有区分你这个输入是一条流多个partition还是来自于不同的逻辑上的流的JOIN。对于同一个流的不同partition,我们对他做这种强制的时钟同步是没有问题的,因为一开始就把一条流拆散成不同的部分,但每一个部分之间共享相同的时钟。但是如果算子的任务是在做类似于JOIN操作,那么要求两个输入的时钟强制同步其实没有什么道理的,因为完全有可能是把一条离现在时间很近的数据流和一个离当前时间很远的数据流进行JOIN,这个时候对于快的那条流,因为它要等慢的那条流,所以说它可能就要在状态中去缓存非常多的数据,这对于整个集群来说是一个很大的性能开销。

ProcessFunction

在了解watermark的处理之前,先简单了解ProcessFunction,因为watermark在任务里的处理逻辑分为内部逻辑外部逻辑。外部逻辑其实就是通过ProcessFunction来体现的,需要使用 Flink提供的时间相关的API的话就只能写在ProcessFunction里。

ProcessFunction和时间相关的功能主要有三点:
【1】根据你当前系统使用的时间语义不同,你可以去获取当前你正在处理这条记录的Record Timestamp,或者当前的Processing Time
【2】它可以获取当前算子的时间,可以把它理解成当前的watermark
【3】为了在 ProcessFunction 中去实现一些相对复杂的功能,允许注册一些timer(定时器)。比如说在watermark达到某一个时间点的时候就触发定时器,所有的这些回调逻辑也都是由用户来提供,涉及到如下三个方法,registerEventTimeTimerregisterProcessingTimeTimeronTimer。在onTimer方法中就需要去实现自己的回调逻辑,当条件满足时回调逻辑就会被触发。

一个简单的应用是,我们在做一些时间相关的处理的时候,可能需要缓存一部分数据,但这些数据不能一直去缓存下去,所以需要有一些过期的机制,我们可以通过timer去设定这么一个时间,指定某一些数据可能在将来的某一个时间点过期,从而把它从状态里删除掉。所有的这些和时间相关的逻辑在Flink内部都是由自己的Time Service(时间服务)完成的。

Watermark 处理

一个算子的实例在收到watermark的时候,首先要更新当前的算子时间,这样的话在ProcessFunction里方法查询这个算子时间的时候,就能获取到最新的时间。第二步它会遍历计时器队列,这个计时器队列就是我们刚刚说到的timer,你可以同时注册很多timerFlink会把这些Timer按照触发时间放到一个优先队列中。第三步Flink得到一个时间之后就会遍历计时器的队列,然后逐一触发用户的回调逻辑。通过这种方式,Flink的某一个任务就会将当前的watermark发送到下游的其他任务实例上,从而完成整个watermark的传播,从而形成一个闭环。
[点击并拖拽以移动] ​

Table API 中的时间

下面看一看Table/SQL API中的时间。为了让时间参与到Table/SQL这一层的运算中,我们需提前把时间属性放到表的schema中,这样的话我们才能够在SQL语句或者Table的逻辑表达式里面使用时间去完成需求。

Table中指定时间列: 其实之前社区就怎么在Table/SQL中去使用时间这个问题做过一定的讨论,是把获取当前Processing Time的方法是作为一个特殊的UDF,还是把这一个列物化到整个的schema里面,最终采用了后者。我们这里就分开来讲一讲Processing TimeEvent Time在使用的时候怎么在Table中指定。

从DataStream转化通过TableSource 转化
Processing TimetEnv.fromDataStream(stream,“f1,f2,f3.proctime”)TableSource实现DefinedProctimeAttributes接口
Event Time原始 DataStream 必须有 timestamp 及 watermark数据中存在类型为 long或 timestamp的时间字段
Event TimetEnv.fromDataStream(stream,“f1,f2,f3.rowtime”) tEnv.fromDataStream(stream,“f1.rowtime,f2,f3”)TableSource实现DefinedProctimeAttributes接口

对于Processing Time,我们知道要得到一个Table对象(或者注册一个Table)有两种手段:
● 可以从一个DataStream转化成一个Table
● 直接通过TableSource去生成这么一个Table
对于第一种方法而言,我们只需要在你已有的这些列中(例子中f1f2就是两个已有的列),在最后用“列名.proctime”这种写法就可以把最后的这一列注册为一个Processing Time,以后在写查询的时候就可以去直接使用这一列。如果Table是通过TableSource生成的,就可以通过实现这一个DefinedRowtimeAttributes接口,然后就会自动根据你提供的逻辑去生成对应的Processing Time
相对而言,在使用EventTime时则有一个限制,因为EventTime不像Processing Time那样是随拿随用。如果要从DataStream去转化得到一个Table,必须要提前保证原始的DataStream里面已经存在了RecordTimestampwatermark。如果想通过TableSource生成的,也一定要保证要接入的数据里面存在一个类型为long或者timestamp的时间字段。具体来说,如果要从DataStream去注册一个表,和proctime类似,只需要加上“列名.rowtime”就可以。需要注意,如果要用Processing Time,必须保证要新加的字段是整个schema中的最后一个字段,而Event Time的时候其实可以去替换某一个已有的列,然后Flink会自动的把这一列转化成需要的rowtime这个类型。

如果是通过TableSource生成的,只需要实现DefinedRowtimeAttributes接口就可以了。需要说明的一点是,在DataStream API这一侧其实不支持同时存在多个Event Time(rowtime),但是在Table这一层理论上可以同时存在多个rowtime。因为DefinedRowtimeAttributes接口的返回值是一个对于rowtime描述的List,即其实可以同时存在多个rowtime列,在将来可能会进行一些其他的改进,或者基于去做一些相应的优化。

时间列和 Table 操作

指定完了时间列之后,当我们要真正去查询时就会涉及到一些具体的操作。这里我列举的这些操作都是和时间列紧密相关,或者说必须在这个时间列上才能进行的。比如说 Over窗口聚合Group by窗口聚合 这两种窗口聚合,在写SQL提供参数的时候只能允许你在这个时间列上进行这种聚合。第三个就是时间窗口聚合,你在写条件的时候只支持对应的时间列。最后就是排序,我们知道在一个无尽的数据流上对数据做排序几乎是不可能的事情,但因为这个数据本身到来的顺序已经是按照时间属性来进行排序,所以说如果要对一个 DataStream转化成Table进行排序的话,只能是按照时间列进行排序,当然同时也可以指定一些其他的列,但是时间列这个是必须的,并且必须放在第一位。
[点击并拖拽以移动] ​

为什么说这些操作只能在时间列上进行?
因为我们有的时候可以把到来的数据流就看成是一张按照时间排列好的一张表,而我们任何对于表的操作,其实都是必须在对它进行一次顺序扫描的前提下完成的。大家都知道数据流的特性之一就是一过性,某一条数据处理过去之后,将来其实不太好去访问它。当然因为 Flink中内部提供了一些状态机制,我们可以在一定程度上去弱化这个特性,但是最终还是不能超越的,限制状态不能太大。所有这些操作为什么只能在时间列上进行,因为这个时间列能够保证我们内部产生的状态不会无限的增长下去,这是一个最终的前提。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/455175.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

cms垃圾回收

cms垃圾回收 CMS概述CMS收集器整体流程初始标记并发标记重新标记并发清除 CMS卡表什么是卡表(card table)什么是mod-union table CMS概述 CMS(Concurrent Mark Sweep)收集器是Java虚拟机中的一种老年代(old Generation)垃圾收集器,他主要目标是减少垃圾收集时的应用…

数据库基础知识超详细解析~‍(进阶/复习版)

文章目录 前言一、数据库的操作1.登入数据库2.创建数据库3.显示当前数据库4.使用数据库5.删除数据库 二、常用数据类型三、数据库的约束1约束类型2NULL约束3UNIQUE:唯一约束4DEFAULT:默认值约束5 PRIMARY KEY:主键约束6 FOREIGN KEY:外键约束…

STM32第十课:串口发送

一、usart串口 1.1 USART串口协议 串口通讯(Serial Communication) 是一种设备间非常常用的串行通讯方式,因为它简单便捷,因此大部分电子设备都支持该通讯方式,电子工程师在调试设备时也经常使用该通讯方式输出调试信息。在计算机科学里&…

2024年云仓酒庄:店中店增项新模式,开启葡萄酒文化新篇章

2024云仓酒庄:店中店增项新模式,开启葡萄酒文化新篇章 在葡萄酒行业蓬勃发展的今天,云仓酒庄以其独特的经营模式和创新思维,在市场中脱颖而出。2024年,云仓酒庄继续深化其战略布局,不仅在多地开设酒庄实体…

【论文笔记合集】Transformers in Time Series A Survey综述总结

本文作者: slience_me 文章目录 Transformers in Time Series A Survey综述总结1 Introduction2 Transformer的组成Preliminaries of the Transformer2.1 Vanilla Transformer2.2 输入编码和位置编码 Input Encoding and Positional Encoding绝对位置编码 Absolute …

Redis到底是单线程还是多线程!,【工作感悟】

无论你是做 Python,PHP,JAVA,Go 还是 C#,Ruby 开发的,都离不开使用 Redis。 大部分程序员同学工作中都有用到 Redis,但是只限于会简单的使用,对Redis缺乏整体的认知。 无论是在大厂还是在中小…

pkav之当php懈垢windows通用上传缺陷

环境&#xff1a; Windowsnginxphp 一、php源码 <?php //U-Mail demo ... if(isset($_POST[submit])){$filename $_POST[filename];$filename preg_replace("/[^\w]/i", "", $filename);$upfile $_FILES[file][name];$upfile str_replace(;,&qu…

01-java入门了解--cmd命令、jdk、java的认识

cmd常用命令 java入门需要安装的环境 jdk。&#xff08;下载好jdk&#xff0c;并配置好环境&#xff09;idea。&#xff08;或者其他的编程工具&#xff09; jdk安装目录介绍 第一步&#xff1a;编写程序&#xff08;程序员写.java后缀的文件&#xff09; 第二步&#xff1a;…

【MMDetection3D实战(1)】:环境安装

1.介绍 MMDetection3D首次发布于2018年10月&#xff0c;是面向3D 场景中检测和分割的工具包&#xff0c;可以基于MMDetection3D实现基于点云、图像和多模态数据的3D检测与分割。github仓库地址:https://github.com/open-mmlab/mmdetection3d 目前MMDetection3D支持20多种不同的…

FastWiki v0.1.0发布!新增超多功能

FastWiki 发布 v0.1.0 https://github.com/239573049/fast-wiki/releases/tag/v0.1.0 更新日志 兼容OpenAI接口格式删除Blazor版本UI删除useEffect,解决可能存在问题的bug修复对话可以看到所有对话Merge branch ‘master’ of https://gitee.com/hejiale010426/fast-wiki更新…

【测试】1. 概念 + 基础篇

概念篇 测试相较于开发岗位而言&#xff0c;如果同学们的编程能力稍微弱一些&#xff0c;可以尝试测试方向&#xff08;更简单&#xff09; 1. 什么是软件测试 最常见的理解是&#xff1a;软件测试就是找BUG&#xff0c;发现缺陷。 早期&#xff0c;人们更多的将测试看成是对…

STM32第七节:GPIO输入——按键检测(包含带参宏)

目录 前言 STM32第七节&#xff1a;GPIO输入——按键检测&#xff08;包含带参宏&#xff09; 带参宏 代码替换展示 定义带参宏 GPIO输入——按键检测 硬件部分 端口输入数据寄存器&#xff08;GPIOx_IDR&#xff09; 编写程序 配置以及编写bsp_key文件 main函数编程…

用虚拟机安装win10超详细教程。

前言&#xff1a;安装中有任何疑问&#xff0c;可以在评论区提问&#xff0c;博主身经百战会快速解答小伙伴们的疑问 BT、迅雷下载win10镜像&#xff08;首先要下载win10的镜像&#xff09;&#xff1a;ed2k://|file|cn_windows_10_business_editions_version_1903_updated_sep…

项目中日志采集实践:技术、工具与最佳实践

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 引言 一. 选择合适的日志框架 二. 配置日志框架 三. 使用…

web 课程

文章目录 格式图片超链接书签链接表格例子横跨束跨 格式 <br /> <br/> #换行图片 <img> 标签是用于在网页中嵌入图像的 HTML 标签&#xff0c;它有一些属性可以用来控制图像的加载、显示和交互。以下是对 <img> 标签常用属性的详细介绍&#xff1a;…

SpringCloud Gateway 新一代网关

一、前言 接下来是开展一系列的 SpringCloud 的学习之旅&#xff0c;从传统的模块之间调用&#xff0c;一步步的升级为 SpringCloud 模块之间的调用&#xff0c;此篇文章为第六篇&#xff0c;即介绍 Gateway 新一代网关。 二、概述 2.1 Gateway 是什么 Gateway 是在 Spring 生…

在域控的Users目录下批量创建用户组,名称来自Excel

对于CSV文件&#xff0c;PowerShell可以直接读取并处理&#xff0c;无需额外安装模块。假设你的CSV文件中&#xff0c;用户组名称在第一列&#xff0c;文件名为"groups.csv"&#xff0c;可以使用以下PowerShell脚本来批量创建&#xff1a; # 读取CSV文件中的数据 $g…

学生时期学习资源同步-1 第一学期结业考试题2

原创作者&#xff1a;田超凡&#xff08;程序员田宝宝&#xff09; 版权所有&#xff0c;引用请注明原作者&#xff0c;严禁复制转载

【Python】科研代码学习:十 evaluate (metrics,Evaluator)

【Python】科研代码学习&#xff1a;十 evaluate Evaluate评估类型简单使用教程如何寻找想要的 metric使用 Evaluator与 transformers.trainer 配合使用疑问与下节预告 Evaluate 【HF官网-Doc-Evaluate&#xff1a;API】 看名字就可以知道&#xff0c;Evaluate 是 HF 提供的便…

中国湿地沼泽分类分布数据集

数据下载链接&#xff1a;百度云下载链接 引言 随着经济社会的快速发展和城市化进程的加速推进&#xff0c;农业发生功能性转变&#xff0c;从单一生产功能向生产、生活、生态多功能服务首都经济社会发展转变。湿地与农田、草地、森林三大生态系统整合形成完整的现代农业生态服…