【FLink】水位线(Watermark)

目录

1、关于时间语义

1.1事件时间

1.2处理时间​编辑

2、什么是水位线

2.1 顺序流和乱序流

2.2乱序数据的处理

2.3 水位线的特性

3 、水位线的生成

3.1 生成水位线的总体原则

3.2 水位线生成策略

3.3 Flink内置水位线

3.3.1 有序流中内置水位线设置

3.4.2 断点式水位线生成器(Punctuated Generator)

3.4.3 在数据源中发送水位线

4、水位线的传递

5、迟到数据的处理


1、关于时间语义

1.1事件时间

        一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

1.2处理时间

2、什么是水位线

在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。说白了就是事件时间戳。

2.1 顺序流和乱序流

有序流就是指数据按照生成的先后顺序,每条数据产生一个有先后顺序的水位线

这是一种理想的状态(数据量较小),而在实际中,我们产生的数据量往往非常庞大,而数据之间的时间间隔非常之小,所以为了提高效率,一般会每隔一段时间生成一个水位线

在实际生产中,由于多服务之间网络传输等的因素,往往我们的数据流,并不是我们所想的顺序结果,而是数据先后错乱,这就是乱序流

2.2乱序数据的处理

由于数据是乱序的,我们无法正确处理“迟到”的数据,为了让窗口能够正确的收集到迟到的数据,我们也可以让窗口等上一段时间,比如2秒。也就是说,我们可以在数据的时间戳基础上加上一些延迟来尽量保证不丢数据。

2.3 水位线的特性

3

3 、水位线的生成

3.1 生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。

所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

3.2 水位线生成策略

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。

DataStream<Event> stream = env.addSource(new ClickSource());

DataStream<Event> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(<watermark strategy>);

WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>,
            WatermarkGeneratorSupplier<T>{

    // 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    // 主要负责按照既定的方式,基于时间戳生成水位线
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

3.3 Flink内置水位线

3.3.1 有序流中内置水位线设置

对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

public class WatermarkMonoDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:升序的watermark,没有等待时间
                .<WaterSensor>forMonotonousTimestamps()
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        // 返回的时间戳,要 毫秒
                        System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                        return element.getTs() * 1000L;
                    }
                });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}

3.3.2 乱序流中内置水位线设置

调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。

这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值

public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());


        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 1.1 指定watermark生成:乱序的,等待3s
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // 1.2 指定 时间戳分配器,从数据中提取
                .withTimestampAssigner(
                        (element, recordTimestamp) -> {
                            // 返回的时间戳,要 毫秒
                            System.out.println("数据=" + element + ",recordTs=" + recordTimestamp);
                            return element.getTs() * 1000L;
                        });

        // TODO 2. 指定 watermark策略
        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();

        env.execute();
    }
}

3.4 自定义水位线生成器

3.4.1 周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。

import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
                .addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                .print();

        env.execute();
    }

    public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {

        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {

            return new SerializableTimestampAssigner<Event>() {

                @Override
                public long extractTimestamp(Event element,long recordTimestamp) {
                    return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
                }
            };
        }

        @Override
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomBoundedOutOfOrdernessGenerator();
        }
    }

    public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {

        private Long delayTime = 5000L; // 延迟时间
        private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳

        @Override
        public void onEvent(Event event,long eventTimestamp,WatermarkOutput output) {
            // 每来一条数据就调用一次
            maxTs = Math.max(event.timestamp,maxTs); // 更新最大时间戳
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 发射水位线,默认200ms调用一次
            output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }
}

如果想修改默认周期时间,可以通过下面方法修改。

//修改默认周期为400ms
env.getConfig().setAutoWatermarkInterval(400L);

3.4.2 断点式水位线生成器(Punctuated Generator

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。

3.4.3 在数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)

4、水位线的传递

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。

水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟

也就是说:水位线的传递是以最小事件时间为准则。

5、迟到数据的处理

5.1 推迟水印推进

在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

5.2 设置窗口延迟关闭

当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

5.3 使用侧流接收迟到的数据

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)

完整示例:

public class WatermarkLateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000L);

        SingleOutputStreamOperator<WaterSensor> sensorDSwithWatermark = sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);


        OutputTag<WaterSensor> lateTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));

        SingleOutputStreamOperator<String> process = sensorDSwithWatermark.keyBy(sensor -> sensor.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(2)) // 推迟2s关窗
                .sideOutputLateData(lateTag) // 关窗后的迟到数据,放入侧输出流
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                );


        process.print();
        // 从主流获取侧输出流,打印
        process.getSideOutput(lateTag).printToErr("关窗后的迟到数据");

        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/173686.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CANopen权威指南【CAN总线协议】

1这个总线定义是老外发明的。 想要使用&#xff0c;就必须按照协议去配置数据帧。 CIA301和cia402协议&#xff0c;实际就是寄存器地址上某一段的定义。 下载地址&#xff1a; CAN in Automation (CiA): Technical documents 注册下载也是非常快的。【没什么难度】 就是资…

数据结构之二叉树

前言&#xff1a;我们前面已经学习了数据结构的栈和队列&#xff0c;今天我们就来学习一下数据结构中的二叉树&#xff0c;那么作为二叉树我们就得先了解树的一些概念&#xff0c;还有二叉树一些特点。 树的概念&#xff1a; 树是一种非线性的数据结构&#xff0c;它是由n&…

Linux网络驱动

Linux 网络设备驱动结构 网络协议接口层向网络层协议提供统一的数据包收发接口&#xff0c;这一层使得上层协议独立于具体的设备网络设备接口层向协议接口层提供统一的用于描述具体网络设备属性和操作的结构体net_device&#xff0c;该结构体是设备驱动功能层中各函数的容器设备…

【Operating Systems:Three Easy Pieces 操作系统导论 】第28章 插叙:线程 API

【Operating Systems:Three Easy Pieces 操作系统导论 】 第28章 插叙&#xff1a;线程 API pthread 库介绍 线程创建 #include <pthread.h> // 头文件 int pthread_create(pthread_t * thread,const pthread_attr_t * attr,void * (*start_routine)(void*),void *…

wsl安装ubuntu的问题点、处理及连接

WSL安装Ubuntu的参考链接 (41条消息) wsl报错&#xff1a;WslRegisterDistribution failed with error: 0x800701bc_yzpyzp的博客-CSDN博客_0x800701bc wsl (41条消息) 使用Ubuntu安装软件出现Unable to locate package错误解决办法_大灰狼学编程的博客-CSDN博客 手把手教你…

linux:查看文件前100行和后100行

查看文件中的前100行 head -n 100 文件名查看文件中的后100行 tail -n 100 文件名

【Java 进阶篇】JavaScript JSON 语法入门:轻松理解数据的序列化和反序列化

嗨&#xff0c;亲爱的小白们&#xff01;欢迎来到这篇关于 JavaScript 中 JSON&#xff08;JavaScript Object Notation&#xff09;语法的入门指南。JSON 是一种轻量级的数据交换格式&#xff0c;广泛应用于前端开发中。通过这篇博客&#xff0c;我将带你深入了解 JSON 的语法…

【深度学习实验】注意力机制(四):点积注意力与缩放点积注意力之比较

文章目录 一、实验介绍二、实验环境1. 配置虚拟环境2. 库版本介绍 三、实验内容0. 理论介绍a. 认知神经学中的注意力b. 注意力机制 1. 注意力权重矩阵可视化&#xff08;矩阵热图&#xff09;2. 掩码Softmax 操作3. 打分函数——加性注意力模型3. 打分函数——点积注意力与缩放…

【GUI】-- 12 贪吃蛇小游戏之让小蛇动起来

GUI编程 04 贪吃蛇小游戏 4.3 第三步&#xff1a;让小蛇动起来(键盘控制) 首先&#xff0c;在构造器中要获取焦点事件、键盘监听事件并加入定时器(定时器定义需要实现ActionListener接口并重写actionPerformed方法)&#xff1a; //构造器public GamePanel() {init();this.s…

郎酒“掉队”,经销商们能等来春天吗?

文 | 螳螂观察&#xff08;TanglangFin&#xff09; 作者 | 渡过 有“六朵金花”之称的川酒品牌中&#xff0c;五粮液、泸州老窖、舍得、水井坊都已成功上市&#xff0c;只剩下郎酒和剑南春未上市。 与IPO的“掉队”相对应的&#xff0c;是郎酒在冲刺高端、内部管理、渠道管…

【LeetCode刷题-链表】--23.合并K个升序链表

23.合并K个升序链表 方法&#xff1a;顺序合并 在前面已经知道合并两个升序链表的前提下&#xff0c;用一个变量ans来维护以及合并的链表&#xff0c;第i次循环把第i个链表和ans合并&#xff0c;答案保存到ans中 /*** Definition for singly-linked list.* public class List…

SQL基础理论篇(八):视图

文章目录 简介创建视图修改视图删除视图总结参考文献 简介 视图&#xff0c;即VIEW&#xff0c;是SQL中的一个重要概念&#xff0c;它其实是一种虚拟表(非实体数据表&#xff0c;本身不存储数据)。 视图类似于编程中的函数&#xff0c;也可以理解成是一个访问数据的接口。 从…

VPX 插座(VITA46)介绍及应用 (简单介绍)

1. VPX 插座的介绍 VPX是VITA(VME International Trade Association, VME国际贸易协会)组织于2007年在其VME总线基础上提出的新一代高速串行总线标准。VPX总线的基本规范、机械结构和总线信号等具体内容均在ANSI/VITA46系列技术规范中定义。VPX就是基于高速串行总线的新一代总线…

基于Surfer与Voxler数据处理及可视化实践技术应用

Surfer和Voxler分别是美国Golden Software 公司开发的用于二维和三维数据可视化软件&#xff0c;具有强大的数据处理和插值功能&#xff0c;软件主要应用于气象、环境和地质&#xff08;以及生物、医学等&#xff09;等领域。其中Surfer主要用于绘制二维等值线图、三维表面图以…

仪表盘:pyecharts绘制

一、仪表盘 在数据分析中&#xff0c;仪表盘图&#xff08;dashboard&#xff09;的作用是以一种简洁、图表化的方式呈现数据的关键指标和核心信息&#xff0c;以帮助用户快速理解数据的情况&#xff0c;并从中提取关键见解。 仪表盘图通常由多个图表、指标和指示器组成&…

Java —— String类

目录 1. String类的重要性 2. 常用方法 2.1 字符串构造 2.2 String对象的比较 2.3 字符串查找 2.4 转化 1. 数值和字符串转化 2. 大小写转换 3. 字符串转数组 4. 格式化 2.5 字符串替换 2.6 字符串拆分 2.7 字符串截取 2.8 其他操作方法 2.9 字符串常量池 2.9.1 创建对象的思考…

基于安卓android微信小程序的刷题系统

项目介绍 面试刷题系统的开发过程中&#xff0c;采用B / S架构&#xff0c;主要使用jsp技术进行开发&#xff0c;中间件服务器是Tomcat服务器&#xff0c;使用Mysql数据库和Eclipse开发环境。该面试刷题系统包括会员、答题录入员和管理员。其主要功能包括管理员&#xff1a;个…

芯片IO口不加电阻会怎样?

芯片IO口不加电阻会怎样&#xff1f; 可能会导致以下几个后果&#xff1a; 1.高电流问题&#xff0c;IO口没有电阻限流&#xff0c;当与外部设备直接连接时&#xff0c;就可能会导致过大的电流流过IO口&#xff0c;这就可能损坏IO口&#xff0c;引起短路或烧坏其它电路组件。像…

Spring Boot要如何学习?【云驻共创】

Spring Boot 是由 Pivotal 团队提供的全新框架&#xff0c;其设计目的是用来简化新 Spring 应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置&#xff0c;从而使开发人员不再需要定义样板化的配置。我这里会分享一些学习Spring Boot的方法和干货&#xff0c;包括…

【C++】泛型编程 ⑨ ( 类模板的运算符重载 - 函数声明 和 函数实现 写在同一个类中 | 类模板 的 外部友元函数问题 )

文章目录 一、类模板 - 函数声明与函数实现分离1、函数声明与函数实现分离2、代码示例 - 函数声明与函数实现分离3、函数声明与函数实现分离 友元函数引入 二、普通类的运算符重载 - 函数声明 和 函数实现 写在同一个类中三、类模板的运算符重载 - 函数声明 和 函数实现 写在同…