Flink 学习五 Flink 时间语义

Flink 学习五 Flink 时间语义

1.时间语义

在流式计算中.时间是一个影响计算结果非常重要的因素! (窗口函数,定时器等)

在这里插入图片描述

Flink 可以根据不同的时间概念处理数据。

  • 处理时间: process time System.currentTimeMillis()是指执行相应操作的机器系统时间(也称为纪元时间,例如 Java 的时间)。是现实世界的时间,时间是单调递增的
  • 事件时间: event time 是指根据附加到每一行的时间戳处理流式数据。时间戳可以对事件发生的时间进行编码。是业务数据中的时间,时间有可能停滞,但是不会回溯,时间是不可会退的;

2.时间语义API设置

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  //事件时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // 算子到达时间  其实就是处理时间

上述代码在1.12 版本之前, 已TimeCharacteristic.ProcessingTime 作为默认的时间语义,也可以用上述代码设置时间语义;

1.12 版本之后,flink 以 TimeCharacteristic.EventTime 作为时间语义 ,并且Deprecated上面的代码 在使用需要指定时间语义的API 时,在显示的指定对应的时间语义;

        keyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(2)));
        keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(2)));

        keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(2)));

也可以禁用 event time 语义

    /**
     * Sets the interval of the automatic watermark emission. Watermarks are used throughout the
     * streaming system to keep track of the progress of time. They are used, for example, for time
     * based windowing.
     *
     * <p>Setting an interval of {@code 0} will disable periodic watermark emission.
     *
     * @param interval The interval between watermarks in milliseconds.
     */
    @PublicEvolving
    public ExecutionConfig setAutoWatermarkInterval(long interval) {
        Preconditions.checkArgument(interval >= 0, "Auto watermark interval must not be negative.");
        this.autoWatermarkInterval = interval;
        return this;
    }

3.watermark 简介

上面说到1.12 版本之后,flink 以 TimeCharacteristic.EventTime 作为时间语义,Flink 收到数据中的事件时间有可能不是有序的,这就导致会收到迟到的数据,其事件时间是属于过去的窗口;

为了能够基于事件时间进行计算,Flink引入了Watermark的概念

  • watermark ,本质上也是flink各个算子之间流转的一种标记数据,flink内部自动产生并插入到数据流里面的
  • 消息流信息中就是时间戳

watermark 的产生源头:

  • watermark 一般是 来源于source 算子
  • source 算子计算出来的watermark 广播到下面

4.watermark 状态

4.1.初始watermark

一般是来源于source 算子

在watermark 产生的源头算子中,subTask 程序会用一个定时器,去周期性的检查收到的数据的时间的最大值,如果超过了之前记录的最大值,就把这个最大值更新为watermark,并下游算子广播(通过API设置数据中那个字段作为事件时间)

在这里插入图片描述

4.2.下游/中间算子的watermark

中间算子收到上游算子广播的watermark ,其算子内部也会有一个定时器去定时的检测收到的所有的上游算子的watermark ,并计算其中最小值作为当前算子的watermark,并下游算子广播

:当其中一个所有算子,不在更新watermark 怎么处理? flink 提供一个机制设置watermark的idletime,意思就是如果在idletime时间内没有收到上游算子广播的watermark,则会自动的往前面推进watermark

在这里插入图片描述

5.watermark生成策略

5.1 生成watermark时机

Flink 1.12 版本之后,watermark 的生产策略是固定频率周期性的产生

  • AssignerWithPeriodicWatermarks 周期性生成
  • AssignerWithPunctuatedWatermarks 指定标志生成,比如数据中的某个属性

5.1 生成watermark的数值

新版API watermark生成策略

  • 紧跟最大事件时间(watermark=周期内最大时间) :WatermarkStrategy.forMonotonousTimestamps();
  • 允许乱序(watermark=周期内最大时间-容错时间) :WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5));
  • 自定义生成策略:WatermarkStrategy.forGenerator()
  • 不生产watermark 禁用了时间推进:WatermarkStrategy.noWatermarks()

6.watermark 示例

简单的测试并发度为1下的算子watermark更新情况


public class _01_Watermark {

    public static void main(String[] args) throws Exception {

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forMonotonousTimestamps( )
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        String s = element.split(",")[0];
                        long time = 0;
                        try {
                            time = simpleDateFormat.parse(s).getTime(); //解析字符串值转long 作为时间戳
                        } catch (ParseException e) {
                        }
                        return time;
                    }
                });
        SingleOutputStreamOperator<String> streamOperator = dataStreamSource.assignTimestampsAndWatermarks(watermarkStrategy);
        SingleOutputStreamOperator<String> processed = streamOperator.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
                long currentWatermark = ctx.timerService().currentWatermark();
                long processingTime = ctx.timerService().currentProcessingTime();
                System.out.println("currentWatermark:" + simpleDateFormat.format(new Date(currentWatermark)));
                System.out.println("currentWatermark:" + currentWatermark); //打印watermark
                System.out.println("processingTime:" + simpleDateFormat.format(new Date(processingTime)));
                out.collect(value);
            }
        });
        processed.print();
        env.execute ();

    }
}

7.浅析watermark 源码

还是以上面示例为例讲解

7.1 准备

主要是 WatermarkStrategy.forMonotonousTimestamps( )和 WatermarkStrategy.forBoundedOutOfOrderness

两者对应的类是都是一样的 均为BoundedOutOfOrdernessWatermarks,只是 WatermarkStrategy.forMonotonousTimestamps( )对应BoundedOutOfOrdernessWatermarks中的参数 outOfOrdernessMillis = 0,后续主要是看BoundedOutOfOrdernessWatermarks类即可

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    //最大时间戳 
    private long maxTimestamp;

    //可以延迟的毫秒值 
    private final long outOfOrdernessMillis;

    /**设置初始水位
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    //触发水位的更新
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    //周期的触发,
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/30590.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker安全

一、Docker 容器与虚拟机的区别 1、隔离与共享 • 虚拟机通过添加 Hypervisor 层&#xff0c;虚拟出网卡、内存、CPU 等虚拟硬件&#xff0c;再在其上建立虚拟机&#xff0c;每个虚拟机都有自己的系统内核 • Docker容器则是通过隔离的方式&#xff0c;将文件系统、进程、设…

Redis入门 - Lua脚本

原文首更地址&#xff0c;阅读效果更佳&#xff01; Redis入门 - Lua脚本 | CoderMast编程桅杆https://www.codermast.com/database/redis/redis-scription.html Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL。 …

Spark01-Spark快速上手、运行模式、运行框架、核心概念

1 概述 Spark和Hadoop Hadoop HDFS(GFS:TheGoogleFileSystem)MapReduce总结&#xff1a;性能横向扩展变得容易&#xff0c;横向拓展:增加更多的计算节点来扩展系统的处理能力Hbase&#xff1a;分布式数据库 Spark Spark CoreSpark SQLSQL 方言&#xff08;HQL)Spark Streamin…

光伏电池局部遮阴下三种不同的工况对比MATLAB仿真模型

光伏电池局部遮阴下三种不同的工况对比MATLAB仿真模型及程序资源-CSDN文库https://download.csdn.net/download/weixin_56691527/87910311 模型简介&#xff1a; 建议使用MATLAB21b及以上版本打开&#xff01; 光伏阵列表面被局部遮挡时会产生热斑效应。为了防止太阳电池因热…

【Matter】Matter学习笔记1

文章目录 前言Matter协议架构1.Matter Over IPV62.Matter协议架构3.Matter标准协议架构 Matter网络拓扑结构Mesh组网1.单一网络拓扑2.星形网络拓扑 设备数据模型&#xff08;Date Model&#xff09;1.设备和端点&#xff08;Node、Endpoint&#xff09;2.节点角色&#xff08;N…

ADAudit Plus:保护企业内部IT安全的强大解决方案

随着企业数字化的推进&#xff0c;IT系统和数据安全变得比以往任何时候都更加重要。为了保护企业的机密信息和敏感数据&#xff0c;企业需要一种可靠的IT安全解决方案。在众多选项中&#xff0c;ADAudit Plus是一款备受赞誉的软件&#xff0c;为企业内部的IT安全提供了强大的支…

工业机器人运动学与Matlab正逆解算法学习笔记(用心总结一文全会)(二)

文章目录 机器人逆运动学※ 代数解、几何解&#xff0c;解析解&#xff08;封闭解&#xff09;、数值解的含义与联系○ 代数解求 θ 1 \theta_1 θ1​、 θ 2 \theta_2 θ2​、 θ 3 \theta_3 θ3​※参考资料 求解 θ 1 \theta_1 θ1​ 求解 θ 3 \theta_3 θ3​ 求解 θ 2 \t…

Unity核心7——2D动画

一、序列帧动画 &#xff08;一&#xff09;什么是序列帧动画 ​ 我们最常见的序列帧动画就是我们看的日本动画片&#xff0c;以固定时间间隔按序列切换图片&#xff0c;就是序列帧动画的本质 ​ 当固定时间间隔足够短时&#xff0c;我们肉眼就会认为图片是连续动态的&#…

【Java基础学习打卡11】Path环境变量的配置

目录 前言一、为什么配置环境变量二、如何配置环境变量三、JDK11的环境变量配置总结 前言 本文我们要知道为什么配置环境变量&#xff0c;自己思考不配置环境变量可以吗&#xff1f;JDK 11 如何配置环境变量。 一、为什么配置环境变量 原因很简单&#xff0c;就是方便命令的查…

1.5 掌握Scala内建控制结构(一)

一、条件表达式 &#xff08;一&#xff09;语法格式 if (条件) 值1 else 值2 &#xff08;二&#xff09;执行情况 条件为真&#xff0c;结果是值1&#xff1b;条件为假&#xff0c;结果是值2。如果if和else的返回结果同为某种类型&#xff0c;那么条件表达式结果也是那种…

【STM32MP135 - ST官方源码移植】第二章:TF-A源码移植教程

STM32MP135 TF-A源码移植教程 一、创建build.sh编译脚本&#xff08;1&#xff09;解压tf-a的源码压缩包&#xff08;2&#xff09;打补丁&#xff0c;获取stm32mp135的源码&#xff08;3&#xff09;设计编译脚本build.sh1、进入tf-a源码&#xff1a;2、创建build.sh脚本文件3…

HTTP协议,带你了解HTTP协议

目录 1、HTTP 协议介绍 2、HTTP 协议的工作过程 HTTP 协议的工作过程可以分为以下几个步骤&#xff1a; 3、Fiddler 抓包工具介绍 3.1 抓包工具的使用 3.2 抓包结果 3.3 抓包工具原理 4、HTTP 协议格式总览 5、HTTP 请求&#xff08;Request&#xff09; 5.1 认识 URL…

C#中List<T>的排序相关的使用方法总结

C#中List<>的排序相关的使用方法 list的排序一般使用Sort和LINQ的Orderby方法&#xff0c;本文主要介绍其如何使用。 &#x1f32e;1.Sort和实现Comparable接口 此方式需要类去实现IComparable接口 public class OrderTest {[Test]public void OraderTest(){List<E…

MySQL优化--undo log和redo log的区别

首先我们需要知道两个概念 缓冲池&#xff08;buffer pool&#xff09;:主内存中的一个区域&#xff0c;里面可以缓存磁盘上经常操作的真实数据&#xff0c;在执行增删改查操作时&#xff0c;先操作缓冲池中的数据&#xff08;若缓冲池没有数据&#xff0c;则从磁盘加载并缓存…

(写自己语言的练手级应用)JSON(JavaScript Object Notation) 产生式(BNF)

写自己的开发语言时&#xff0c;很多人都会拿JSON当第一个练习对象 开源net json FJSON 解析工具https://dbrwe.blog.csdn.net/article/details/107611540?spm1001.2014.3001.5502 <json> :: <object> | <array> <object> :: "{" [ <me…

操作系统 复习-计算题

一. 计算题&#xff08;共5题&#xff0c;100分&#xff09; 1.(计算题) 假设有4个进程需要在单CPU上运行&#xff0c;它们的执行时间如下表所示&#xff1a; 进程ID执行时间P18P25P32P44 现在我们需要按照抢占式优先级调度算法来安排这些进程的执行顺序。其中&#xff0c;进…

python代码加密方案

为何要对代码加密&#xff1f; python的解释特性是将py编译为独有的二进制编码pyc 文件&#xff0c;然后对pyc中的指令进行解释执行&#xff0c;但是pyc的反编译却非常简单&#xff0c;可直接反编译为源码&#xff0c;当需要将产品发布到外部环境的时候&#xff0c;源码的保护尤…

基于机器学习的内容推荐算法及其心理学、社会学影响闲谈

基于机器学习的内容推荐算法目前在各类内容类APP中使用的非常普遍。在购物、时尚、新闻咨询、学习等领域&#xff0c;根据用户的喜好&#xff0c;进行较为精准的用户画像与内容推荐。此类算法不但可以较为准确的分析用户的特征&#xff0c;如年龄、性别等&#xff0c;还能通过长…

特征点Features2D类介绍

文章目录 Features2D类介绍1. cv::AgastFeatureDetector2. cv::AKAZE3. cv::BRISK4. cv::FastFeatureDetector5. cv::GFTTDetector6. cv::KAZE7. cv::MSER8. cv::SimpleBlobDetector9. cv::StarDetector10. cv::SIFT11. cv::SURF12. cv::FastFeatureDetector13. cv::AgastFeatu…

最喜爱的编程语言——Python

一、编程语言发展 编程语言&#xff08;programming language&#xff09;可以简单的理解为一种计算机和人都能识别的语言。一种能够让程序员准确地定义计算机所需数据的计算机语言&#xff0c;并精确地定义在不同情况下所应当采取的行动。 编程语言处在不断的发展和变化中&…