Flink Watermark和时间语义

Flink 中的时间语义

[点击并拖拽以移动] ​

时间语义: EventTime:事件创建时间;Ingestion Time:数据进入Flink的时间;Processing Time:执行操作算子的本地系统时间,与机器无关。不同的时间语义有不同的应用场合,我们往往更关系事件时间Event Time。数据生成的时候就会自动注入时间戳,Event Time可以从日志数据的时间戳timestamp)中提取。

设置 Event Time

我们可以直接在代码中,对执行环境调用setStreamTimeCharacteristic方法,设置流的时间特性。具体的时间,还需要从数据中提取时间戳timestamp

val env = StreamExecutionEnvironment.getExecutionEnvironment
//从调用时刻开始给 env 创建的每一个 stream 追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

乱序数据的影响

[点击并拖拽以移动] ​

FlinkEvent Time模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子。由于网络、分布式等原因,会导致乱序数据的产生。如上图所示,理想情况与实际情况会存在差异,乱序数据会让窗口计算不准确。解决方案是让窗口等几分钟。

水位线 Watermark

怎么避免乱序数据带来计算不正确?
遇到一个时间戳到达了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口。Watermark是一种衡量Event Time进展的机制,可以设置延迟触发。Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经达到了,因此,window的执行也是由Watermark触发的。Watermark用来让程序自己延迟和结果正确性。

Watermark 的特点: Watermark是一条特殊的数据记录,必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退。Watermark与数据的时间戳有关。
[点击并拖拽以移动] ​

watermark 的传递、引入和设定

watermark的传递: 一个Task输入可以并行多个,如下有4个并行度,输出也可能存在多个并行,如下有3个。每个任务Task内部都有一个事件时钟,且每个分区也维护了对应的WM,如下的Partition WM。当事件流流进Partition时会判断新事件流的WM是否大于当前的Partition WM,当大于时就更新Partition的时间戳WM为新流入的WM(取最大值),如下1->2象限Partition WM的变化。同时,如下Task也维护了一个全局的WM表示事件时钟,该值取分区中最小的WM作为输出的时间戳,如下第二象限的输出选择最小的WM=3向下传递。当第二个(横线)分区Partition WM流进来WM=7的事件流时,就会出现第三象限的情景,但是最小的WM还是=3,因此不更新Task全局的WM。当第三个分区Partition WM流进来WM=6的事件流时,就会出现第四象限的情景,此时分区Partition WM的最小值=4,因此Task全局WM=4
[点击并拖拽以移动] ​

watermark的引入: Event Time的使用一定要指定数据源中的时间戳。对于排好序的数据,只需要指定时间戳就够了,不需要延迟触发。

import org.apache.flink.streaming.api.windowing.time.Time
//同时分配时间戳和水位线
dataStream.assignTimestampsAndWatermarks(
//无序数据       Time.milliseconds(1000)=延迟时间
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
  //提取事件戳 = timestamp * 1000是因为出入的毫秒
  override def extractTimestamp(t: SensorReading): Long = {
    t.timestamp * 1000
  }
})

【1】对于排好序的数据,不需要延迟触发,可以只指定事件戳就行了

dataStream.assignTimestampsAndWatermarks(_.timestamp * 1000)

【2】Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳和生成 watermarkMyAssigner可以有两种类型,都继承自TimestampAssigner

dataStream.assignTimestampsAndWatermarks(new MyAssigner())

TimestampAssigner:定义了抽取时间戳,以及生成watermark的方法,有两种类型:
【1】AssignerWithPeriodicWatermarks 系统会周期性的将Watermark插入到流中。默认周期是200毫秒(如果是processingTimeWatermark = 0 ),可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。升序和前面乱序的处理BoundedOutOfOrderness,都是基于周期性watermark的。举例:如下产生watermark的逻辑:每隔5秒,Flink调用AssignerWithPeriodicWatermarksgetCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的water会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于之前水位的时间戳,则不会产生新的watermark

//方案一:
//EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//每隔 5秒产生一个 watermark
env.getConfig.setAutoWatermarkInterval(5000);//方案二
//自定义一个周期性的时间戳
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading]{

  val bound: Long = 60 * 1000 //延时为 1 分钟
  var maxTs: Long = Long.MinValue //观察到的最大时间戳

  //生成水位线
  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
  }

  //抽取时间戳的方法
  override def extractTimestamp(t: SensorReading, l: Long): Long = {
    maxTs = maxTs.max(t.timestamp)
    t.timestamp
  }
}

【2】AssignerWithPunctuatedWatermarks 没有时间周期规律,可打断的生成watermark

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading]{
  val bound: Long = 60 * 1000

  //获取水位线,根据数据触发
  override def checkAndGetNextWatermark(t: SensorReading, l: Long): Watermark = {
    if(t.id == "sensor_1"){
      new Watermark(l - bound)
    }else{
      null
    }
  }

  //抽取时间戳的方法
  override def extractTimestamp(t: SensorReading, l: Long): Long = {
    t.timestamp
  }
}

watermark 的设定:
【1】在Flink中,watermark由应用程序开发人员生成,这通常需要对相应的领域有一定的了解。
【2】如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果。
【3】而如果watermark到达得太早,则可能收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/290774.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

光耀未来 第一届能源电子产业创新大赛太阳能光伏赛道决赛在宜宾举行

1月3日,第一届能源电子产业创新大赛太阳能光伏赛道决赛在宜宾盛大举行,本次比赛吸引了全国范围内的光伏行业顶尖人才和创新团队参与。 为深入贯彻《关于推动能源电子产业发展的指导意见》,推动我国能源电子产业升级,工业和信息化部…

CMake入门教程【核心篇】添加依赖(add_dependencies)

😈「CSDN主页」:传送门 😈「Bilibil首页」:传送门 😈「本文的内容」:CMake入门教程 😈「动动你的小手」:点赞👍收藏⭐️评论📝 文章目录 1. 基本用法2. 添加目…

Docker网络模型

Docker容器是虚拟化技术,其使用虚拟化网络技术实现Docker容器主机中运行的容器实例之间的网络互连功能,本文主要描述Docker的网络模型。 Docker架构 如上所示,Docker容器化的架构分为三个部分,其中Docker Host是虚拟化主机系统&a…

JAVA学习专栏

JAVA专栏 Java核心技术 Java核心技术 Java练手算法 Java练手算法 Java数据结构和算法 Java数据结构和算法 Java设计模式 Java设计模式 Java并发编程 Java并发编程 MySQL数据库 MySQL数据库 Java项目管理Maven Java项目管理Maven 项目管理工具gradle 项目管理工具gradle…

leetcode2397. 被列覆盖的最多行数

目录 题目 思路 解题方法 题目 https://leetcode.cn/problems/maximum-rows-covered-by-columns/description/ 给你一个下标从 0 开始、大小为 m x n 的二进制矩阵 matrix ;另给你一个整数 numSelect,表示你必须从 matrix 中选择的 不同 列的数量。 …

助力工业生产“智造”,基于YOLOv8全系列模型【n/s/m/l/x】开发构建纺织生产场景下布匹瑕疵检测识别系统

纯粹的工业制造没有办法有长久的发展过程,转制造为全流程全场景的生产智造才是未来最具竞争力的生产场景,在前面的开发实践中我们已经涉足工业生产场景下进行了很多实地的项目开发,如:PCB电路板缺陷检测、焊接缺陷检测、螺母螺钉缺…

深入浅出Python日志打印

0.引言 在编程过程中,日志记录是一项非常重要的任务,无论是用于调试代码、记录系统运行状态,还是跟踪可能出现的问题,日志都能发挥重要作用。然而,许多开发者习惯使用简单的print语句来记录信息,这种方法虽…

SVN服务端的下载、安装

地址 : Apache Subversion Binary Packages 下载 点击 VisualSVN 安装 都是点击 next 点击next ,即可安装成功

让设备更聪明 |启英泰伦离线自然说,开启智能语音交互新体验!

语音交互按部署方式可以分为两种:离线语音交互和在线语音交互。 在线语音交互是将数据储存在云端,其具备足够大的存储空间和算力,可以实现海量的语音数据处理。 离线语音交互是以语音芯片为载体,语音数据的采集、计算、决策均在…

二叉树链式结构的实现(二叉树的遍历以及各种常用功能函数的实现)

之前也是把堆部分的知识点梳理完毕(即二叉树链式顺序的实现):堆的应用:堆排序和TOP-K问题 那么讲完了二叉树链式结构的实现。今天就进入二叉树链式结构的实现: 文章目录 1.准备工作2.二叉树的遍历2.1前序遍历2.2中序遍…

用通俗易懂的方式讲解:基于扩散模型(Diffusion),文生图 AnyText 的效果太棒了

近年来,随着AIGC的爆火,图片生成技术得到飞速发展,当前AI生成的图片已达到真假难辨的高保真度。不过,当合成图片中出现文字内容时,仍能够使AI露出马脚,因为当前主流方法尚无法在图片中生成准确可读的字符。…

YOLOv8融合改进 更换检测头为Detect_DyHead同时添加C2f-EMSC和C2f-EMSCP模块

一、Detect_DyHead检测头和C2f-EMSC,C2f-EMSCP模块 详细介绍和代码在往期的博客里: Detect_DyHead: (YOLOv8改进检测头Detect为Detect_Dyhead-CSDN博客) C2f-EMSC和C2f-EMSCP: (YOLOv8改进…

Unity中Shader雾效在场景中的调节技巧

文章目录 前言一、修改棋盘格Shader的Cull可以在属性面板控制1、在属性面板定义CullMode2、在SubShader中,使用CullMode3、这样就可以在不同剔除情况下使用棋盘格场景了 二、调节天际线颜色和雾融为一体1、在摄像机设置不渲染天空盒,渲染单一颜色2、采样…

我开发了一个聚合网盘资源搜索引擎-支持阿里云盘与夸克网盘资源

还在为找不到电子书资源而发愁?还在愁没有高清影视剧观看? 来试试我开发的云盘资源搜索引擎吧! 公众号回复关键词: 搜索 ! 就可以获取到网站网址。 这里还有资源分享微信群,不定期分享资源。 关于界面 怎么使用这个引擎&#x…

实验笔记之——下载数据到服务器

开发过程中经常需要把数据传到服务器上,太麻烦了,为此本博文记录采用百度云来传输数据 百度云 使用bypy包。 安装:pip install bypy 配置bypy连接百度网盘: 终端输入bypy info将命令行提示的链接复制到浏览器,并复制…

Github 2024-01-04 开源项目日报 Top10

根据Github Trendings的统计,今日(2024-01-04统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目3C项目2TypeScript项目2Java项目2Jupyter Notebook项目1Go项目1 系统设计指南 创建周期&#xff…

Flutter用GridView实现网格功能(1、item设置一个外边框,2、item背景点击变色,松开恢复原色)

GridView接收如下可选参数属性: scrollDirection:滚动方法padding:内边距resolve:组件反向排序crossAxisSpacing:水平子Widget之间间距mainAxisSpacing:垂直子Widget之间间距crossAxisCount:一…

C#/.NET/.NET Core优秀项目和框架2023年12月简报

前言 公众号每月定期推广和分享的C#/.NET/.NET Core优秀项目和框架(公众号每周至少推荐两个优秀的项目和框架当然节假日除外),公众号推文有项目和框架的介绍、功能特点以及部分功能截图等(打不开或者打开GitHub很慢的同学可以优先…

Wireshark本地回环网络抓包

背景 因为发往本机的数据包是通过回环地址的,即:数据包不会通过真实的网络接口发送,因此我们需要通过设置路由规则来让本来发到虚拟网络接口的数据包发送到真实网络接口即可。 场景描述:在网络程序开发的过程中,有时…

SQL中 Group by Grouping Sets 分组的用法

文章目录 1. 用法2. 语法3. 实际应用3.1 求总和与小计3.2 按多个维度分组3.3 标记小计和总计 1. 用法 将Grouping Sets 运算符添加到Group by 子句中,使用Grouping Set 可以在一个查询中指定数据的多个分组,其结果与针对指定的组执行union all 运算等效…