flink学习之水位线

什么是水位线

在事件时间语义下,我们不依赖系统时间,而是基于数据自带的时间戳去定义了一个时钟,
用来表示当前时间的进展。于是每个并行子任务都会有一个自己的逻辑时钟,它的前进是靠数
据的时间戳来驱动的。
我们可以把时钟也以数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟
的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标
记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以
更新自己的时钟了。在 Flink 中,数据流中用来做时间标记的记号就叫做水位线。
水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,
主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个
数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

水位线的分类

有序的水位线

在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;而在实际应用中,
如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间
戳、插入水位线就做了大量的无用功。所以为了提高效率,一般会每隔一段时间生成一个水位
线,这个水位线的时间戳,就是当前最新数据的时间戳,所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
在这里插入图片描述

无序的水位线

在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改
变,这就是所谓的“乱序数据”。
在这里插入图片描述
对于连续数据流,我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就
不再生成新的水位线,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
在这里插入图片描述

如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需
要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新
的水位线,。所以我们可以试着多等几秒,也就是把时钟调得更慢一些。最终的目的,就是要让窗口能够把所有迟到数据都收进来,得到正确的计算结果。对应到水位线上,其实就是要保证,当前时间已经进展到了这个时间戳,在这之后不可能再有迟到数据来了(延迟设的足够长)。
在这里插入图片描述

如何生成水位线

1.水位线的生成时机

水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,可以考虑在生成水位线之前使用。

2.水位线生成策略

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方 法:assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指
示事件时间。

val stream: DataStream[ClickEvent] = env.addSource(new ClickSource())  
val withTimestampsAndWatermarks: DataStream[ClickEvent] = stream.assignTimestampsAndWatermarks(watermarkStrategy)

assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是
所谓的“水位线生成策略”。WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner
和一个“水位线生成器”WatermarkGenerator。

trait WatermarkStrategy[T] extends TimestampAssignerSupplier[T] with WatermarkGeneratorSupplier[T] {  
  def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[T]  
  def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[T]  
}

TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给
元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在
WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,
以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间
为处理时间,可以调用环境配置的 setAutoWatermarkInterval()方法来设置,默认为
200ms。

env.getConfig.setAutoWatermarkInterval(60 * 1000L)
3. flink内置水位线生成器
  1. 有序流
val stream: DataStream[Event] = env.addSource(new ClickSource())  
val withTimestampsAndWatermarks: DataStream[Event] = stream.assignTimestampsAndWatermarks(  
  WatermarkStrategy  
    .forMonotonousTimestamps[Event]()  
    .withTimestampAssigner { (event, timestamp) => event.timestamp }  
)
  1. 无序流
import java.time.Duration  
import org.apache.flink.streaming.api.scala._  
import org.apache.flink.streaming.api.windowing.time.Time  
import org.apache.flink.util.Collector  
  
object OutOfOrdernessTest {  
  def main(args: Array[String]): Unit = {  
    val env = StreamExecutionEnvironment.getExecutionEnvironment  
    val clickSource = new ClickSource()  
    val stream = env.addSource(clickSource)  
  
    // 插入水位线的逻辑  
    val watermarkedStream = stream  
      .assignTimestampsAndWatermarks(  
        WatermarkStrategy  
          .forBoundedOutOfOrderness(Time.seconds(5))  
          .withTimestampAssigner(new SerializableTimestampAssigner[Event] {  
            override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp  
          })  
      )  
  
    watermarkedStream.print()  
    env.execute("OutOfOrdernessTest")  
  }  
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/335827.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux下MySQL用户管理、权限、密码

一、原理 MySQL的用户管理实质上是对用户表的管理,系统中的数据库mysql存在一张用户表(user),所有的用户都在该表内,对用户的管里也就是对该表进行增删查改的操作。 show databases; 如图中的mysql数据库,…

Leetcode 用队列实现栈

题目: 请你仅使用两个队列实现一个后入先出(LIFO)的栈,并支持普通栈的全部四种操作(push、top、pop 和 empty)。 实现 MyStack 类: void push(int x) 将元素 x 压入栈顶。 int pop() 移除并…

2024/1/20 并查集

目录 并查集关键代码 亲戚 村村通 团伙(新知识) 并查集关键代码 返回祖宗节点路径压缩: int find(int x) {if(f[x]!x) f[x]find(f[x]);return f[x]; } 合并: void make(int x,int y) {int f1find(f[x]);int f2find(f[y]);…

69.使用Go标准库compress/gzip压缩数据存入Redis避免BigKey

文章目录 一:简介二:Go标准库compress/gzip包介绍ConstantsVariablestype Headertype Reader 三:代码实践1、压缩与解压工具包2、单元测试3、为何压缩后还要用base64编码 代码地址: https://gitee.com/lymgoforIT/golang-trick/t…

图像分割实战-系列教程15:deeplabV3+ VOC分割实战3-------网络结构1

🍁🍁🍁图像分割实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Pycharm中进行 本篇文章配套的代码资源已经上传 deeplab系列算法概述 deeplabV3 VOC分割实战1 deeplabV3 VOC分割实战2 deeplabV3 VOC分割实战3 dee…

C#中chart控件

C#中chart控件 图表的5大集合 例子 第一步:创建工程 放入chart控件 series集合 选择图标类型 选择绘制曲线的宽度和颜色。 显示数据标签 Title集合 添加标题 调整标题字体:大小和颜色 CharsArea集合 对坐标轴进行说明 设置间隔 设置刻度…

使用Ultimate-SD-Upscale进行图片高清放大

之前我们介绍过StableSR进行图片高清放大,如果调的参数过大,就会出现内存不足的情况,今天我们介绍另外一个进行图片高清放大的神器Ultimate-SD-Upscale,他可以使用较小的内存对图像进行高清放大。下面我们来看看如何使用进行操作。…

Spark读取kafka(流式和批数据)

spark读取kafka(批数据处理) # 按照偏移量读取kafka数据 from pyspark.sql import SparkSessionss SparkSession.builder.getOrCreate()# spark读取kafka options {# 写kafka配置信息# 指定kafka的连接的broker服务节点信息kafka.bootstrap.servers: n…

无法访问云服务器上部署的Docker容器

说明:记录一次无法访问云服务器上部署的Docker容器的问题。 问题描述 某次,我在云服务器上,使用Docker运行了一个Nginx容器,用公网IP怎么也访问不到。这种情况博主也算有经验,可以从以下几个方面去排查: …

舵机使用总结

文章目录 1 舵机简介2 注意事项3 编写驱动程序3.1 使用STM32作为控制器3.1.1 计算高电平对应程序中的取值范围3.1.2 编写控制程序 1 舵机简介 舵机使用PWM控制,周期为20ms,通过改变高电平占空比来驱动,高电平通常为1~2ms( 或 0.5 …

RabbitMQ系列之入门级

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《RabbitMQ系列之入门级》。🎯&#x…

YOLOv8全网首发:新一代高效可形变卷积DCNv4如何做二次创新?高效结合SPPF

💡💡💡本文独家改进:DCNv4更快收敛、更高速度、更高性能,与YOLOv8 SPPF高效结合 收录 YOLOv8原创自研 https://blog.csdn.net/m0_63774211/category_12511737.html?spm=1001.2014.3001.5482 💡💡💡全网独家首发创新(原创),适合paper !!! 💡💡💡…

AOI与AVI:在视觉检测中的不同点和相似点

AOI(关注区域)和AVI(视觉感兴趣区域)是视觉检测中常用的两个概念,主要用于识别和分析图像或视频中的特定区域。虽然这两个概念都涉及到注视行为和注意力分配,但它们在定义和实际应用等方面有一些差异。 AOI…

x86-x64汇编语言、反汇编知识和IDA

x86-x64汇编语言 基础知识 x86寄存器: 通用寄存器:EAX, EBX, ECX, EDX, ESI, EDI 栈顶指针寄存器:ESP 栈底指针寄存器:EBP 指令计数器:EIP 段寄存器:CS, DS, ES, FS, GS, SS x86-64寄存器:&a…

2.【C语言】(函数指针||sizeof||笔试题)

0x01.函数指针 void test(const char* str) {printf("%s\n", str); }int main() {void (*pf)(const char*) test;//pf是函数指针变量void (*pfarr[10])(const char*);//pfarr是存放函数指针的数组void (*(*p)[10])(const char*) &pfarr;//p是指向函数指针数组…

Leetcoder Day10|栈与队列part02(栈的应用)

语言:Java/C 目录 20. 有效的括号 1047. 删除字符串中的所有相邻重复项 150. 逆波兰表达式求值 今日总结 20. 有效的括号 给定一个只包括 (,),{,},[,] 的字符串,判断字符串是否有效。 有效字…

WEB接口测试之Jmeter接口测试自动化 (三)(数据驱动测试)

接口测试与数据驱动 1简介 数据驱动测试,即是分离测试逻辑与测试数据,通过如excel表格的形式来保存测试数据,用测试脚本读取并执行测试的过程。 2 数据驱动与jmeter接口测试 我们已经简单介绍了接口测试参数录入及测试执行的过程&#xff0…

Unity3D Pico VR 手势识别物体交互 适配 MRTK3

当前Pico已经支持手势识别了,但是提供的PICO Unity Integration SDK 中是没有手势和物体交互的功能,Unity XR Interaction Toolkit提供的手势识别物体交互对 Quest适配的挺好的,Pico 当前只能用指尖点触还不能对物体进行抓握以及手势控制射线…

数据结构一:算法效率分析(时间复杂度和空间复杂度)-重点

在学习具体的数据结构和算法之前,每一位初学者都要掌握一个技能,即善于运用时间复杂度和空间复杂度来衡量一个算法的运行效率。所谓算法,即解决问题的方法。同一个问题,使用不同的算法,虽然得到的结果相同,…

开发实践8_project

要求: 使用Restful对Chaos模型作基本操作。 结果: post 3 组数据后,get 查询如下: put修改后get: delete pk3之后get: 代码: python manage.py startapp pro8_app 注册 总路由 // path(pr…