详解 Flink 的时间语义和 watermark

一、Flink 时间语义类型

在这里插入图片描述

  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳
  • Ingestion Time :是数据进入 Flink 的时间
  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time

二、EventTime 引入

Flink 默认是按照 ProcessingTime 来处理数据的

/**
	在 Flink 的流式处理中,绝大部分情况推荐使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 Ing estionTime 。使用 EventTime ,需要先引入 EventTime 的时间属性
*/
public class EventTimeTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //引入 EvenetTime
        //TimeCharacteristic 是一个枚举类,有 ProcessingTime、IngestionTime 和 EventTime 三个属性
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
    }
}

三、Watermark

1. 数据乱序情况

在这里插入图片描述

  • 正常情况下,Flink 接收到的事件应该要是按照事件的产生时间 (EventTime) 的先后顺序排列的
  • 实际情况下,事件从产生到进入 source 再到触发 operator,其中间是有一个过程和时间的,而且由于网络、分布式等原因会造成 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的,即所谓的乱序数据
  • 乱序数据的问题会造成窗口触发关闭的时间混乱,计算不准确
  • Flink 处理乱序数据的机制:Watermark + allowedLateness + sideOutputLateData

2. Watermark 介绍

  • Watermark 是一种使用延迟触发 window 执行来处理乱序数据的机制
  • 原理:当设置 Watermark = t 时 (即延迟时长为 t),则 Flink 每一次都会获取已经到达的数据中的最大的 EventTime,然后判断 maxEventTime - t 是否等于某一个窗口的触发时间,如果相等则认为属于这个窗口的所有数据都已经到达,这个窗口被触发执行关闭,也可能存在数据丢失
  • 在数据有序的流中,相当于 Watermark = 0,即已经到达的数据中的最大的 EventTime 等于某一个窗口的触发时间,则这个窗口被触发执行关闭
  • 一般将 Watermark 设置为乱序数据流中最大的迟到时间差

3. Watermark 特点和行为

  • 水位线 (Watermark) 是作为一个特殊的数据插入到数据流中的一个标记
  • 水位线 (Watermark) 在 Flink 程序中是一个常量类,有一个时间戳属性,用来表示当前事件时间的进展
  • 水位线 (Watermark) 是基于数据的 EventTime 时间戳生成的
  • 水位线 (Watermark) 的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

4. Watermark 在任务间的传递

任务并行度不为 1;Watermark 设置的位置越靠近 Source 端越好

在这里插入图片描述

  • 一个任务会接收上游多个并行任务的数据,也会向下游多个并行任务发送数据
  • 从上游多个并行任务接收 Watermark:使用 Partition WM 分别存储接收到的不同分区任务的 Watermark,并以其中最小的 Watermark 作为自己当前的事件时间
  • 向下游多个并行任务发送 Watermark:采取广播的分区策略,向下游的每一个任务都发送一份 Watermark,如果后续 Watermark 没有变更则不会重复发送

5. Watermark 引入

5.1 核心代码
/**
	方法签名:
		DataStream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T>)
		DataStream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T>)
	
	参数:
		1.AssignerWithPeriodicWatermarks:继承 TimestampAssigner 接口,周期性的生成 watermark,常用实现类为:BoundedOutOfOrdernessTimestampExtractor 和 AscendingTimestampExtractor
		2.AssignerWithPunctuatedWatermarks:继承 TimestampAssigner 接口,间断式地生成 watermark
*/
public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //引入 EvenetTime       
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        DataStream<String> dataStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> inputStream = dataStream.map(new MapFunction<SensorReading>() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        });
        
        //有序数据设置事件时间戳(毫秒数)和watermark
        //不需要传递watermark延迟时间,默认是当前事件时间戳 - 1ms 作为watermark
        inputStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
            @Override
            public long extractAscendingTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        
        //乱序数据设置事件时间戳(毫秒数)和watermark
        //BoundedOutOfOrdernessTimestampExtractor 构造方法必须传入watermark延迟时间
        //生成的watermark时间戳 = 当前所有事件的最大时间戳 - 延迟时间
        inputStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        
        env.execute();
        
    }
}
5.2 AssignerWithPeriodicWatermarks

系统会周期性地生成 watermark 并插入到数据流中,默认周期是 200 毫秒

/**
	设置watermark生成周期:env.getConfig.setAutoWatermarkInterval(milliseconds);
	产生watermark的逻辑:每隔 0.2 秒钟,Flink 会调用 AssignerWithPeriodicWatermarks 的 getCurrentWatermark() 方法获取一个时间戳,如果大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的 watermark
	自定义watermark周期生成器:实现 AssignerWithPeriodicWatermarks 接口,并重写 getCurrentWatermark 和 extractTimestamp 方法
*/
public class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading> {
    private Long bound = 60 * 1000L;  // watermark延迟时间
    private Long maxTs = Long.MIN_VALUE;  // 当前最大时间戳
    
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
    	return new Watermark(maxTs - bound);
    }
    
    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
        maxTs = Math.max(maxTs, element.getTimestamp()); //获取当前最大的事件时间戳
        return element.getTimestamp();
    }
}
5.3 AssignerWithPunctuatedWatermarks

间断式地生成 watermark,可以根据需要对每条数据进行条件判断筛选来确定是否生成 watermark

public class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading> {
    private Long bound = 60 * 1000L;  // 延迟时间
    
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {
        if(lastElement.getId().equals("sensor_1")) {
        	return new Watermark(extractedTimestamp - bound);
        } else {
        	return null;
        }
    }
    
    @Override
    public long extractTimestamp(SensorReading element, long previousElementTimestamp) {
    	return element.getTimestamp();
    }
}

四、EventTime 的 window 操作

1. 滚动时间窗口操作

/**
	需求:统计 15 秒内的最小温度值,设置 2 秒的延迟
*/
public class TumblingEventTimeWindowTest {
 	public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        /*
          sensor_1,1547718199,35.8
          sensor_6,1547718201,15.4
          sensor_7,1547718202,6.7
          sensor_10,1547718205,38.1
          sensor_1,1547718207,36.3
          sensor_1,1547718209,32.8
          sensor_1,1547718212,37.1
          ...
        */
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<SensorReading>() {
            @Override
            public SensorReading map(String value) {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        
        //开窗聚合
       SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id").timeWindow(Time.seconds(15)).minBy("temperature");
        
        minTempStream.print("minTemp");
        
        /**
        	输出的结果分析:
        		1.在接收到 sensor_1,1547718212,37.1 时,触发了一个窗口关闭,此时数据的 EventTime 为 1547718212,由于 watermark 延迟时间设置为 2,所以该窗口触发关闭的时间戳为 1547718212 - 2 = 1547718210,该窗口的范围为 [1547718195,1547718210)
        		2.当前第一个窗口是 [1547718195,1547718210),其起始点的确定规则为:
        			2.1 滚动时间窗口使用的窗口分配器为 TumblingEventTimeWindows 类
        			2.2 TumblingEventTimeWindows 的 assignWindows 方法中调用 getWindowStartWithOffset 方法获取起始点
        			2.3 getWindowStartWithOffset(timestamp, offset, windowSize):方法逻辑为 timestamp - (timestamp - offset + windowSize) % windowSize,默认 offset 为 0,所以最终得到的起始点应该是 windowSize 的整数倍,在本例中的起始点为 1547718199 - (1547718199-0+15)%15 = 1547718195
        		3.偏移量 offset:一般是用来处理不同时区的数据
        */
        
        env.execute();
        
    }   
}

2. 迟到数据处理

/**
	需求:统计 15 秒内的最小温度值,设置 2 秒的延迟,并允许 1 分钟的迟到数据,1 分钟后的数据写入侧输出流
*/
public class TumblingEventTimeWindowDelayTest {
 	public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<SensorReading>() {
            @Override
            public SensorReading map(String value) {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });
        
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late"){};
        
        //开窗聚合
       SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
           .timeWindow(Time.seconds(15))
           .allowedLateness(Time.minutes(1));
           .sideOutputLateData(outputTag)
           .minBy("temperature");
        
        minTempStream.print("minTemp");
        minTempStream.getSideOutput(outputTag).print("late");
        
        /**
        	依次输入数据:
              sensor_1,1547718199,35.8
              sensor_1,1547718206,36.3
              sensor_1,1547718210,34.7
              sensor_1,1547718211,31
              sensor_1,1547718209,34.9
              sensor_1,1547718212,37.1
              sensor_1,1547718213,33
              sensor_1,1547718206,34.2
              sensor_1,1547718202,36
              ...
              sensor_1,1547718272,34
              sensor_1,1547718203,30.6
        
        	输出的结果分析:
        		1.在接收到 sensor_1,1547718212,37.1 时,触发 [1547718195,1547718210) 窗口执行,此时输出数据 sensor_1,1547718209,34.9,此时 2 秒内的延迟数据能被处理  
        		2.在接收到 sensor_1,1547718206,34.2 时,由于设置了允许 1 分钟迟到,所以 [1547718195,1547718210) 窗口仍然没有关闭,此时会更新数据为 sensor_1,1547718206,34.2,此时的系统时间戳为 1547718213 - 2 = 1547718211 - 1547718210 < 60
        		3.在接收到 sensor_1,1547718202,36 时,[1547718195,1547718210) 窗口仍然会更新输出一次数据 sensor_1,1547718206,34.2
        		4.在接收到 sensor_1,1547718272,34 时,属于 [1547718210,1547718225) 窗口的数据会输出 sensor_1,1547718211,31,此时的系统时间戳为 1547718272 - 2 = 1547718270,由于 1547718270 - 1547718210 >= 60,所以 [1547718195,1547718210) 窗口会真正的关闭
        		5.在之后接收到 sensor_1,1547718203,30.6 时,会把数据输出到侧输出流中
        */
        
        env.execute();
        
    }   
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/690864.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

K8s速览

k8s的核心能力 ● 服务发现与负载均衡 ● 服务恢复 ● 服务伸缩 ● 自动发布与回滚 ● 批量执行 架构 server-client两层架构&#xff0c;Master作为中央管控节点&#xff0c;会和每一个Node进行一个连接&#xff1b; 所有UI层&#xff0c;client的操作&#xff0c;只会和Mat…

spark复习

第一章 1.​大数据特点:4V 2.​大数据计算模式 3.​hadoop生态系统 4.​spark提供了内存计算和基于DAG的任务调度机制&#xff0c;遵循一个软件栈满足不同应用场景的理念。 5.​hadoop中MapReduce计算框架的缺点&#xff0c;对应的spark的优点 第二章 1.​spark生态系统 …

shell编程(二)——字符串与数组

本文为shell 编程的第二篇&#xff0c;介绍shell中的字符串和数组相关内容。 一、字符串 shell 字符串可以用单引号 ‘’&#xff0c;也可以用双引号 “”&#xff0c;也可以不用引号。 单引号的特点 单引号里不识别变量单引号里不能出现单独的单引号&#xff08;使用转义符…

[经验] 涠洲岛在广西吗 #职场发展#知识分享#媒体

涠洲岛在广西吗 广西涠洲岛&#xff0c;是中国南海上的一颗闪亮明珠&#xff0c;位于广西北部湾沿海&#xff0c;东经108.71度&#xff0c;北纬21.54度&#xff0c;距离北海市区30公里&#xff0c;是中国最大的海岛之一&#xff0c;风景秀丽&#xff0c;气候温和。岛上山青水秀…

PowerDesigner导入Excel模板生成数据表

PowerDesigner导入Excel模板生成数据表 1.准备好需要导入的Excel表结构数据,模板内容如下图所示 2.打开PowerDesigner,新建一个physical data model文件,填入文件名称,选择数据库类型 3.点击Tools|Execute Commands|Edit/Run Script菜单或按下快捷键Ctrl Shift X打开脚本窗口…

Ansys的电磁场分析和系统电路仿真软件Electronics 2024 R1版本在Windows系统的下载与安装配置

目录 前言一、许可管理工具安装二、许可配置三、EM安装四、MCAD和帮助文件安装&#xff08;可选择&#xff0c;非必要&#xff09;总结 前言 “ ANSYS Electromagnetics Suite或ANSYS Electronics Suite是几个功能强大的程序的集合&#xff0c;用于仿真系统的电磁。ANSYS电磁套…

vscode中jupyter notebook执行bash命令,乱码解决方法

问题描述 使用vscode中使用jupyter notebook执行bash命令时,不管是中文还是英文,输出均是乱码 但是使用vscode的terminal执行同样的命令又没有问题,系统自带的cmd也没有问题。 最终解决后的效果如下: ## 问题分析 默认vscode会选择使用cmd执行shell, 但是通过vscode的设…

docker部署使用本地文件的fastapi项目

项目背景&#xff1a;项目使用python开发&#xff0c;需要使用ubutun系统部署后端api接口&#xff0c;对外使用8901端口。 1:项目结构&#xff1a; 2&#xff1a;项目需要使用的pyhton版本为3.9&#xff0c;dockerfile内容如下&#xff1a; # FROM python:3.9# WORKDIR /co…

SpringBoot: 可执行jar的特殊逻辑

这一篇我们来看看Java代码怎么操作zip文件(jar文件)&#xff0c;然后SpringBoot的特殊处理&#xff0c;文章分为2部分 Zip API解释&#xff0c;看看我们工具箱里有哪些工具能用SpringBoot的特殊处理&#xff0c;看看SpringBoot Jar和普通Jar的不同 1. Zip API解释 1. ZipFil…

【小白专用】C# Task 类异步操作-浅谈

注解 Task类表示不返回值并且通常以异步方式执行的单个操作。 Task 对象是在 .NET Framework 4 中首次引入的 基于任务的异步模式 的中心组件之一。 由于对象执行的工作 Task 通常在线程池线程上异步执行&#xff0c;而不是在主应用程序线程上同步执行&#xff0c;因此可以使用…

Adobe Illustrator 矢量图设计软件下载安装,Illustrator 轻松创建各种矢量图形

Adobe Illustrator&#xff0c;它不仅仅是一个简单的图形编辑工具&#xff0c;更是一个拥有丰富功能和强大性能的设计利器。 在这款软件中&#xff0c;用户可以通过各种精心设计的工具&#xff0c;轻松创建和编辑基于矢量路径的图形文件。这些矢量图形不仅具有高度的可编辑性&a…

检测五个数是否一样的算法

目录 算法算法的输出与打印效果输出输入1输入2 打印打印1打印2 算法的流程图总结 算法 int main() {int arr[5] { 0 };int i 0;int ia 0;for (i 0; i < 5; i) { scanf("%d", &arr[i]); }for (i 1; i < 5; i) {if (arr[0] ! arr[i]) {ia 1;break;} }…

SpringBoot高手之路-springboot原理篇

配置文件优先级 SpringBoot原理篇-多环境配置

Elasticsearch 认证模拟题 - 13

一、题目 集群中有索引 task3&#xff0c;用 oa、OA、Oa、oA 查询结构是 4 条&#xff0c;使用 dingding 的查询结果是 1 条。通过 reindex 索引 task3 为 task3_new&#xff0c;能够使 task3_new 满足以下查询条件。 使用 oa、OA、Oa、oA、0A、dingding 查询都能够返回 6 条…

R语言 | 使用最简单方法添加显著性ggpubr包

本期教程原文&#xff1a;使用最简单方法添加显著性ggsignif包 本期教程 获得本期教程代码和数据&#xff0c;在后台回复关键词&#xff1a;20240605 小杜的生信笔记&#xff0c;自2021年11月开始做的知识分享&#xff0c;主要内容是R语言绘图教程、转录组上游分析、转录组下游…

立创·天空星开发板-GD32F407VE-GPIO

本文以 立创天空星开发板-GD32F407VET6-青春版 作为学习的板子&#xff0c;记录学习笔记。 立创天空星开发板-GD32F407VE-GPIO 基础概念三极管MOS管 GPIO输出模式输出线与GPIO输入模式GPIO点灯 基础概念 GPIO&#xff0c;全称为“通用输入/输出”&#xff08;General Purpose …

Wireshark自定义Lua插件

背景&#xff1a; 常见的抓包工具有tcpdump和wireshark&#xff0c;二者可基于网卡进行抓包&#xff1a;tcpdump用于Linux环境抓包&#xff0c;而wireshark用于windows环境。抓包后需借助包分析工具对数据进行解析&#xff0c;将不可读的二进制数转换为可读的数据结构。 wires…

SpringBoot+Vue实现前后端分离基本的环境搭建

目录 一、Vue项目的搭建 &#xff08;1&#xff09;基于vite创建vue项目 &#xff08;2&#xff09;引入elementplus &#xff08;3&#xff09;启动后端服务&#xff0c;并测试 二、SpringBoot项目的搭建 &#xff08;1&#xff09;通过idea创建SpringBoot项目 &#x…

ipables防火墙

一、Linux防火墙基础 Linux 的防火墙体系主要工作在网络层&#xff0c;针对 TCP/IP 数据包实施过滤和限制&#xff0c;属于典 型的包过滤防火墙&#xff08;或称为网络层防火墙&#xff09;。Linux 系统的防火墙体系基于内核编码实现&#xff0c; 具有非常稳定的性能和高效率&…

AI高考大战,揭秘五大热门模型谁能问鼎数学之巅?

在高考前&#xff0c;我就有想法了&#xff0c;这一次让AI来做做高考题。就用国内的大模型&#xff0c;看哪家的大模型解题最厉害。 第一天考完&#xff0c;就拿到了2024高考数学2卷的电子版&#xff0c;这也是重庆市采用的高考试卷 这次选了5个AI工具&#xff0c;分别是天工&a…