【入门Flink】- 08Flink时间语义和窗口概念

Flink-Windows

是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

注意:Flink 中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。【事件驱动,没有数据到达永远都不会创建窗口】

1)窗口分类

(1)按照驱动类型分

(1)时间窗口

时间窗口以时间点来定义窗口的开始(start)和结束(end),截取出的就是某一时间段的数据。

(2)计数窗口

计数窗口基于元素的个数截取数据,到达固定的个数时就触发计算并关闭窗口。

(2)按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

(1)滚动窗口(Tumbling Windows)

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是
“首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口

滚动窗口应用非常广泛,可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

(2)滑动窗口(Sliding Windows)

滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的,而是可以“错开”一定的位置。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率

滚动窗口也可以看作是一种特殊的滑动窗口一一窗口大小等于滑动步长(size=slide)
滑动窗口适合计算结果更新频率非常高的场景。

(3)会话窗口(Session Windows)

会话窗口,是基于“会话”(session)来来对数据进行分组的。会话窗口只能基于时间来定义。
会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到
来的时间间隔(gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,
那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。

会话窗口之间一定是不会重叠的,而且会留有至少为size的间隔(session)

在一些类似保持会话的场景下,可以使用会话窗口来进行数据的处理统计。

(4)全局窗口(Global Windows)

“全局窗口”,这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时侯, 默认是不会做触发计算的,如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

2)窗口 API

(1)按键分区(Keyed)和非按键分区(Non-Keyed)

(1)按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。

stream.keyBy(...)
.window(...)

(2)非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。

stream.windowAll(...)

注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

(2)窗口分配器(Window Assigners)和窗口函数(WindowFunctions)

stream.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(<window function>)

窗口分配器

(1)时间窗口

滚动处理时间窗口

stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .aggregate(...)

.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和offset。第一个参数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。

滑动处理时间窗口

stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全一致。

处理时间会话窗口

stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

还可以调用 withDynamicGap()方法定义 session gap 的动态提取逻辑。

滚动事件时间窗口

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)

滑动事件时间窗口

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

事件时间会话窗口

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)

(2)计数窗口

滚动计数窗口

stream.keyBy(...)
.countWindow(10)

滑动计数窗口

stream.keyBy(...)
.countWindow(10, 3)

全局窗口

stream.keyBy(...)
.window(GlobalWindows.create());

注意:使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

窗口函数

(1)增量聚合函数(ReduceFunction / AggregateFunction)

归约函数(ReduceFunction)

类似Reduce算子,只不过固定时间才会输出

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> stream = env.socketTextStream("124.222.253.33", 7777);

        stream.map(new WaterSensorMapFunction())
                .keyBy(WaterSensor::getId)
                // 设置滚动事件时间窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("调用reduce 方法,之前的结果:" + value1 + ",现在来的数据:" + value2);
                        return new WaterSensor(value1.getId(), System.currentTimeMillis(), value1.getVc() + value2.getVc());
                    }
                })
                .print();
        env.execute();

聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样

image-20231109192227819

有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

输入类型IN 就是输入流中元素的数据类型;累加器类型 ACC 是进行聚合的中间状态类型;而输出类型OUT是最终计算结果的类型。

接口中有四个方法:

  • createAccumulator():创建一个累加器,为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

AggregateFunction 的工作原理:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);

        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> aggregate = sensorWS.aggregate(
                new AggregateFunction<WaterSensor, Integer, String>() {
                    @Override
                    public Integer createAccumulator() {
                        System.out.println("创建累加器");
                        return 0;
                    }

                    @Override
                    public Integer add(WaterSensor value, Integer accumulator) {
                        System.out.println(" 调用add方法,value=" + value);
                        return accumulator + value.getVc();
                    }

                    @Override
                    public String getResult(Integer accumulator) {
                        System.out.println("调用getResult方法");
                        return accumulator.toString();
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        System.out.println("调用merge方法");
                        return null;
                    }
                }
        );
        aggregate.print();
        env.execute();

(2)全窗口函数(full window functions)

基于全部的数据计算

全窗口函数有两种:WindowFunction ProcessWindowFunction

窗口函数(WindowFunction)

基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());

该类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

不过 WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用

处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。

时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());
        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(WaterSensor::getId);
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> process = sensorWS.process(
                new ProcessWindowFunction<WaterSensor,
                        String, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                        long count = elements.spliterator().estimateSize();
                        long windowStartTs = context.window().getStart();
                        long windowEndTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ") 包含 " + count + " 条数据===>" + elements);
                    }
                }
        );
        process.print();
        env.execute();

增量聚合和全窗口函数结合使用

// ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,WindowFunction<TRKW>function)
// ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction,ProcessWindowFunction<TRKW> function)
    
// AggregateFunction 与 WindowFunction 结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,WindowFunction<VRKW> windowFunction)
// AggregateFunction 与 ProcessWindowFunction 结合
public <ACCVR> SingleOutputStreamOperator<R> aggregate(AggregateFunction<TACCV> aggFunction,
ProcessWindowFunction<VRKW>     

结合使用

public class WindowAggregateAndProcessDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("124.222.253.33", 7777)
                .map(new WaterSensorMapFunction());

        KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());
        WindowedStream<WaterSensor, String, TimeWindow> sensorWS = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数:
    /*
      增量聚合 Aggregate + 全窗口 process
      1、增量聚合函数处理数据: 来一条计算一条
      2、窗口触发时, 增量聚合的结果(只有一条)传递给全窗口函数
      3、经过全窗口函数的处理包装后,输出

      结合两者的优点:
      1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
      2、全窗口函数: 可以通过 上下文 实现灵活的功能
    */
        // sensorWS.reduce() //也可以传两个
        SingleOutputStreamOperator<String> result = sensorWS.aggregate(
                new MyAgg(),
                new MyProcess()
        );
        result.print();
        env.execute();
    }

    public static class MyAgg implements AggregateFunction

            <WaterSensor, Integer, String> {
        @Override
        public Integer createAccumulator() {
            System.out.println("创建累加器");
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用 add 方法,value=" + value);
            return accumulator + value.getVc();
        }

        @Override
        public String getResult(Integer accumulator) {
            System.out.println("调用 getResult 方法");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            System.out.println("调用 merge 方法");
            return null;
        }
    }

    // 全窗口函数的输入类型 = 增量聚合函数的输出类型
    public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {
        @Override

        public void process(String s, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            long startTs = context.window().getStart();
            long endTs = context.window().getEnd();
            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(endTs, "yyyyMM-dd HH:mm:ss.SSS");
            long count = elements.spliterator().estimateSize();
            out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements);
        }
    }
}

Flink-Time

  • Event Time:事件时间,一个是数据产生的时间(时间戳Timestamp)
  • Processing time:处理时间,数据真正被处理的时间

image-20231108081425604

事件时间在实际应用中更为广泛,从Flink 1.12版本开始,Flink已经将事件时间作为默认的时间语义

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/129548.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

UE5、CesiumForUnreal实现加载GeoJson绘制单面(Polygon)功能(StaticMesh方式)

文章目录 1.实现目标2.实现过程2.1 实现原理2.1.1 数据读取2.1.2 三角剖分2.1.3 创建StaticMesh2.2 应用测试2.2.1 具体代码2.2.2 蓝图应用测试3.参考资料1.实现目标 通过读取本地GeoJson数据,在UE中以StaticMeshComponent的形式绘制出面数据,支持Editor和Runtime环境,GIF动…

JMeter---JSON提取器

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;加入1000人软件测试技术学习交流群&#x1f4e2;资源分享&#xff1a;进了字节跳动之后&#xff0c;才…

传统企业数字化转型都要面临哪些挑战?_数据治理平台_光点科技

数字化转型已经成为传统企业发展的必经之路&#xff0c;但在这个过程中&#xff0c;企业往往会遭遇多方面的挑战。 1.文化和组织惯性 最大的挑战之一是企业文化和组织惯性的阻力。传统企业往往有着深厚的历史和根深蒂固的工作方式&#xff0c;员工和管理层可能对新的数字化工作…

中国电信终端产业联盟5G Inside行业子联盟正式成立!宏电股份作为副理事单位受邀加入

11月9日&#xff0c;中国电信于广州召开“2023中国电信终端生态合作暨中国电信终端产业联盟&#xff08;以下简称CTTA&#xff09;第十四次会员大会”&#xff0c;联盟成员齐聚现场。作为CTTA大会的一个重要环节&#xff0c;中国电信终端产业联盟5G Inside行业子联盟正式成立&a…

爱剪辑如何将视频旋转90度,详细操作流程

爱剪辑是一款电脑端常用的视频剪辑类软件&#xff0c;基本上囊括了视频剪辑所需的所有功能&#xff0c;此处主要介绍&#xff0c;爱剪辑是如何对视频进行旋转操作的&#xff0c;水平旋转或者垂直旋转爱剪辑都是可以操作的&#xff0c;整体操作的详细过程将在下方为大家讲解。 …

希亦ACE和石头m1这两款内衣洗衣机哪一款更好?高性价比内衣洗衣机测评

内衣洗衣机可以说是近两年很火爆的小家电了&#xff0c;给大家带了一种全新的时尚体验&#xff0c;越来越内衣裤也可以用手洗&#xff01;而且还比手洗得干净&#xff01;不过现在市面上关于内衣洗衣机的品牌越来越多&#xff0c;小伙伴们想要挑选一款性价比高的内衣洗衣机看得…

LabVIEW在OPC中使用基金会现场总线

LabVIEW在OPC中使用基金会现场总线 本文讨论了如何使用开放的OPC&#xff08;用于过程控制的OLE&#xff09;接口访问基金会现场总线网络和设备。 NI-FBUS通信管理器随附了一个OPC数据访问服务器。 &#xff08;NI-FBUS Configurator自动包含NI-FBUS通信管理器。&#xff09…

【面试经典150 | 位运算】二进制求和

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;模拟 其他语言c 写在最后 Tag 【二进制】【位运算】 题目来源 67. 二进制求和 题目解读 以二进制字符串的形式返回两个二进制字符串的和。 解题思路 看到这个题目首先想到的方法可能是先把二进制字符转化成 int 型数…

[LeetCode]-225. 用队列实现栈

目录 225. 用队列实现栈 题目 ​思路 代码 225. 用队列实现栈 225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/implement-stack-using-queues/description/ 题目 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff0…

基于SSM的网络音乐系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

1. 深度学习——激活函数

机器学习面试题汇总与解析——激活函数 本章讲解知识点 什么是激活函数&#xff1f; 为什么要使用激活函数&#xff1f; 详细讲解激活函数 本专栏适合于Python已经入门的学生或人士&#xff0c;有一定的编程基础。本专栏适合于算法工程师、机器学习、图像处理求职的学生或人…

统一消息分发中心设计

背景 我们核心业务中订单完成时&#xff0c;需要完成后续的连带业务&#xff0c;扣件库存库存、增加积分、通知商家等。 如下图的架构&#xff1a; 这样设计出来导致我们的核心业务和其他业务耦合&#xff0c;每次新增连带业务或者去掉连带业务都需要修改核心业务。 一方面&…

javascript用localStorage存储用户搜索词记录,并在搜索框下展显搜索词记录

//首先是storage的一封装 //storage.js文件 function storage(){//设置storage密钥this.ms"mystorage";}//以下为函数的原型方法//获得localStorage值storage.prototype.getLocalfunction(key){//先检查设置的localStorage的密钥var mydatalocalStorage.getItem(thi…

vue项目如何快速上手echarts

1.安装echarts npm i echarts 2.导入echarts 说明&#xff1a;任意组件页面都可以导入echarts。如果在main.js里面导入&#xff0c;那么只需要导入一次就可以应用于任意页面。 import * as echarts from "echarts" 3.创建容器 说明&#xff1a;它具有一个指定的id属…

Selenium+Python自动化测试环境搭建

selenium python 自动化测试 —— 环境搭建 关于 selenium Selenium 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中&#xff0c;就像真正的用户在操作一样。支持的浏览器包括IE(7、8、9)、Mozilla Firefox、Mozilla Suite等。 Selenium 框架底层使用JavaS…

Leetcode—637.二叉树的层平均值【简单】

2023每日刷题&#xff08;二十五&#xff09; Leetcode—637.二叉树的层平均值 BFS实现代码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*/ /*** Note: The returned array mu…

Docker+K8s基础(重要知识点总结)

目录 一、Docker的核心1&#xff0c;Docker引擎2&#xff0c;Docker基础命令3&#xff0c;单个容器运行多个服务进程4&#xff0c;多个容器运行多个服务进程5&#xff0c;备份在容器中运行的数据库6&#xff0c;在宿主机和容器之间共享数据7&#xff0c;在容器之间共享数据8&am…

已解决:TypeError: ‘NoneType‘ object is not callable 问题

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页: &#x1f405;&#x1f43e;猫头虎的博客&#x1f390;《面试题大全专栏》 &#x1f995; 文章图文并茂&#x1f996…

二叉树的中序遍历

一、题目。 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2] 示例 2&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[] 示例 3&#xff1a; 输入&#xff1a;…

第三阶段第二章——Python高阶技巧

时间过得很快&#xff0c;这么快就来到了最后一篇Python基础的学习了。话不多说直接进入这最后的学习环节吧&#xff01;&#xff01;&#xff01; 期待有一天 春风得意马蹄疾&#xff0c;一日看尽长安花 o(*&#xffe3;︶&#xffe3;*)o 1.闭包 什么是闭包&#xff1f; 答…