flink的窗口

目录

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

2.计数窗口(Count window)

2.按照窗口分配数据的规则分类

窗口API分类

API调用

窗口分配器器:

窗口函数

增量聚合函数:

全窗口函数

flink sql 窗口函数


窗口是flink中重要的概念,为了方便高效的处理无界流,将数据切成有限的数据块进行处理;

窗口 | Apache Flink

窗口分类

1.按照驱动类型分类

1. 时间窗口(Time window)

    时间窗口以时间点定义窗口的开始和结束,因此截取出就是某一段时间的数据。当到达结束时间时窗口不在接受数据,触发计算输出结果,并关闭销毁窗口。

flink有一个专门的类用来表示时间窗口TimeWindow,这个类只有两个私有属性;窗口的方法获取最大时间戳为end-1,因此窗口[start,end)  左开右闭;

@PublicEvolving
public class TimeWindow extends Window {

    private final long start;
    private final long end;

    public TimeWindow(long start, long end) {
        this.start = start;
        this.end = end;
    }
    @Override
    public long maxTimestamp() {
        return end - 1;
    }
2.计数窗口(Count window)

计数窗口是基于元素个数截取,在到达固定个数是就触发计算并关闭窗口。

3.全局窗口(Global Windows)

是计数窗口的底层实现,窗口分配器由GlobalWindows类提供,需要自定义触发器实现窗口的计算;

 stream.keyBy(data -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
//                .max()
                .aggregate(new AvgPv())
                .print();

查看源代码,windou函数后见windowStrream时获取默认的触发器
@PublicEvolving
    public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {

        this.input = input;

        this.builder =
                new WindowOperatorBuilder<>(
                        windowAssigner,
                        windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),  //湖区触发器
                        input.getExecutionConfig(),
                        input.getType(),
                        input.getKeySelector(),
                        input.getKeyType());
    }

// 计数窗口底层采用全局窗口加计数器来实现

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
        return window(GlobalWindows.create())
                .evictor(CountEvictor.of(size))
                .trigger(CountTrigger.of(slide));
    }

2.按照窗口分配数据的规则分类

滚动窗口(Tumbling Window):窗口大小固定,窗口没有重叠;

滑动窗口 (Sliding Window):滑动窗口有重叠,也可以没有重叠,如果窗口size和滑动size相等,等于滚动窗口;

会话窗口 (Session Window):基于会话对窗口进行分组,与其他两个不同的是,会话窗口是借用会话窗口的超时失效机触发窗口计算,当数据到来后会开启一个窗口,如果在超时时间内有数据陆续到来,窗口不会关闭,反之会关闭;极端情况,如果数据总能在窗口超时时间到达前远远不断的到来,该窗口会一直开启不会关闭;

全局窗口 (Global Window):比较通用的窗口,该窗口会把数据分配到一个窗口中,窗口为全局有效,会把相同key的数据分配到同一个窗口中,默认不会触发计算,跟没有窗口一样,需要自定义触发器才能使用;

窗口API分类

窗口大的分类可以分为按键分区和非按键分区两种:按键分需要经过keyby操作,会把数据进行分发,实现负载均分,可以并行处理更大的数据量。而非按键分区窗口,相当于并行度为1,使用上直接调用windowall(),因此一般并不推荐使用;

API调用

窗口操作分为窗口分配器(window Assigners)和窗口函数(window function)两部分,窗口分配器用于构建窗口,确定窗口类型,确定数据划分哪一个窗口,窗口函数制定数据的计算规则;

窗口分配器器:

窗口按照时间可以划分为:滚动、滑动和session,三种类型窗口;

窗口计数划分:滚动和滑动两种类型;

  eventStream.keyBy(data -> data.url)
                        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                        .aggregate();
窗口函数

窗口定义分配器后,知道数据属于哪一个窗口,窗口函数制定数据计算的规则;

增量聚合函数:

数据进入窗口会参与计算,窗口结束前只需要保留一个聚合后的状态值,内存压力小。

1.规约函数(ReduceFunction):数据保存留一个状态,输入类型和输出类型必须一致,来一条数据会处理将数据合并到状态中;

 stream.keyBy(r -> r.f0)
                // 设置滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        // 定义累加规则,窗口闭合时,向下游发送累加结果
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                })
                .print();

sum、max、min等底层都是通过同名AggregateFunction实现(非下面的聚合函数),本质还是实现ReduceFunction结构重写了reduce方法;

2.聚合函数(AggrateFunction):在规约函数基础上进行完善。解决输出和输入类型必须一致的限制问题。实现应用更灵活;

  // 所有数据设置相同的key,发送到同一个分区统计PV和UV,再相除
        stream.keyBy(data -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
                .aggregate(new AvgPv())
                .print();
 public static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {
        @Override
        public Tuple2<HashSet<String>, Long> createAccumulator() {
            // 创建累加器
            return Tuple2.of(new HashSet<String>(), 0L);
        }

        @Override
        public Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {
            // 属于本窗口的数据来一条累加一次,并返回累加器
            accumulator.f0.add(value.user);
            return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);
        }

        @Override
        public Double getResult(Tuple2<HashSet<String>, Long> accumulator) {
            // 窗口闭合时,增量聚合结束,将计算结果发送到下游
            return (double) accumulator.f1 / accumulator.f0.size();
        }

        @Override
        public Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {
            return null;
        }
    }
全窗口函数

全窗口函数会将进入窗口的数据先进行缓存,然后在窗口关闭时一起计算,缓存数据会占用内存资源,如果一个窗口数据量太大时,可能出现内存溢出的问题;

全窗口函数可以划分窗口函数(windowFunction)和处理窗口函数(processWindowFunction)两种;

窗口函数(windowFunction):老版本通用窗口接口,window()后调用apply(),传入实现windowFunction接口; 缺点是不能获取上下文信息,也没有更高级的功能。因为在功能上可以被processWindowFunction全覆盖,因此主键被弃用

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());



@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param window The window that is being evaluated.
     * @param input The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

处理窗口函数(processWindowFunction):是窗口API中最底层通用的窗口函数接口,可以获取到上问对象(context),实现为调用process方法传入自定义继承ProcessWindowFunction类;

input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}

注意:一般增量窗口函数和全量窗口函数可以一起使用,window().aggregate()方法可以传入两个函数,第一个采用增量聚合函数,第二个传入全量函数,这样数据在进入窗口会触发增量计算,窗口不会缓存数据。当窗口关闭触发计算时,结果数据穿度到全量计算,参数Iterable中一般只有一个数据;

aggregate(acct1,acct2)

flink sql 窗口函数

flink sql 窗口也包含常见的滚动窗口、滑动窗口、session窗口,但还有一种累计窗口。

在flink1.13版本后flinksql支持累计窗口CUMULATE,可以实现没5分钟触发一次计算,输出当天的累计数据,使用样例

SELECT 
    cast(PROCTIME() as timestamp_ltz) as window_end_time,
    manufacturer_name,
    event_id,
    case when state is null then -1 else state end ,
    cast(sum(agg)as string ) as agg
FROM TABLE(CUMULATE(
       TABLE dm_cumulate
       , DESCRIPTOR(ts1)
       , INTERVAL '5' MINUTES
       , INTERVAL '1' DAY(9)))
GROUP BY
          window_end
         ,window_start
         ,manufacturer_name
         ,event_id
         ,case when state is null then -1 else state end

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/747936.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

非极大值抑制算法(Non-Maximum Suppression,NMS)

https://tcnull.github.io/nms/ https://blog.csdn.net/weicao1990/article/details/103857298 目标检测中检测出了许多的候选框&#xff0c;候选框之间是有重叠的&#xff0c;NMS作用重叠的候选框只保留一个 算法&#xff1a; 将所有候选框放入到集和B从B中选出分数S最大的b…

探讨数字化背景下VSM(价值流程图)的挑战和机遇

在信息化、数字化飞速发展的今天&#xff0c;各行各业都面临着前所未有的挑战与机遇。作为源自丰田生产模式的VSM&#xff08;价值流程图&#xff09;&#xff0c;这一曾经引领制造业革命的工具&#xff0c;在数字化背景下又将如何乘风破浪&#xff0c;应对新的市场格局和技术变…

西门子智能电气阀门定位器在冶金生产控制的应用

西门子智能电气阀门定位器在冶金生产控制的应用 1 前 言 在自动化程度越来越高的冶金行业中 ,调节阀起着至关重要的作用,一旦其发生故障, 轻则出现生产事故,停机,停炉影响各级生产指标,生产任务,影响装置的安全运行。重则可能出现人身安全事故,将直接影响家庭的幸福和企…

ubuntu 18 虚拟机安装(1)

ubuntu 18 虚拟机安装 ubuntu 18.04.6 Ubuntu 18.04.6 LTS (Bionic Beaver) https://releases.ubuntu.com/bionic/ 参考&#xff1a; 设置固定IP地址 https://blog.csdn.net/wowocpp/article/details/126160428 https://www.jianshu.com/p/1d133c0dec9d ubuntu-18.04.6-l…

网络安全学习(持续更新中~)

网络安全学习 前言 本目录索引持续更新中&#xff0c;记录网络安全学习过程~ 博客最新更新时间&#xff1a;2024.6.25 学习路线 Windows内网服务 Windows内网服务模块包含了Windows服务器的相关知识。 文章链接包含要点Windows账户相关Windows权限相关Windows日志Windows…

关于 AD21导入电子元器件放置“3D体”STEP模型失去3D纹理贴图 的解决方法

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/139969415 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV…

数据交换的桥梁:深入探索JSON序列化和反序列化

目录 JSON序列化 一、查看JSON文件&#xff0c;设置数据模板类 ​编辑 二、Newtonsoft.Json下载 三、代码理解 1.创建BatteryList的实例 2.初始化Batterys属性 3.添加Battery对象到Batterys列表中 4.完整的代码如下 四、运行结果展示 JSON反序列化 序列化是将对象或…

2024年生物技术与食品科学国际会议(ICBFS 2024)

2024 International Conference on Biotechnology and Food Science 2024年生物技术与食品科学国际会议 【会议信息】 会议简称&#xff1a;ICBFS 2024 大会地点&#xff1a;中国厦门 会议邮箱&#xff1a;icbfssub-paper.com 审稿结果&#xff1a;投稿后3日左右 提交检索&…

fiddle查看请求耗时 设置超时背景

windows 下&#xff0c;打开 fiddler 时直接用 快捷键&#xff1a;CTRL R 打开 或 从路径&#xff1a;Rules -> Customize Rules… 打开 // 显示每行请求的服务端耗时时间 public static BindUIColumn("TimeTaken/ms", 120)function TimeTaken(oS: Session):Stri…

安卓直装植物大战僵尸杂交版V2.1版完美运行

安卓直装植物大战僵尸杂交版V2.1版完美运行 链接&#xff1a;https://pan.baidu.com/s/1SPFouV8T-AV2LnUoZfy6lQ?pwd3gl6 提取码&#xff1a;3gl6

裸机与操做系统区别(RTOS)

声明&#xff1a;该系列笔记是参考韦东山老师的视频&#xff0c;链接放在最后&#xff01;&#xff01;&#xff01; rtos&#xff1a;这种系统只实现了内核功能&#xff0c;比较简单&#xff0c;在嵌入式开发中&#xff0c;某些情况下我们只需要多任务&#xff0c;而不需要文件…

49-3 内网渗透 - MSI安 装策略提权

靶场环境搭建: 这里还是用我们之前的windows2012虚拟机进行搭建 1)打开一些设置让靶场存在漏洞 打开组策略编辑器(gpedit.msc) 使用运行命令打开: 按下 Win + R 组合键来打开运行对话框。输入 gpedit.msc,然后按下 Enter 键。使用搜索打开: 点击任务栏上的搜索框(W…

简单的text/html无法解析解决记录

简单的text/html无法解析解决记录 1. bug发现 我们所有的服务都是微服务&#xff0c;服务间调用都是使用feign接口进行调用&#xff0c;正常调用都没有问题&#xff0c;但是某一天发现部分从esb服务调用过来到我们本地的服务&#xff0c;本地服务再使用feign接口调用其他微服…

Java洗鞋小程序预约系统源码

&#x1f4a5;洗鞋神器来袭&#xff01;轻松预约&#xff0c;让你的鞋子焕然一新&#x1f45f; &#x1f389; 告别洗鞋烦恼&#xff0c;洗鞋预约小程序来啦&#xff01; 你是不是常常为洗鞋而烦恼&#xff1f;手洗太累&#xff0c;送去洗衣店又贵又麻烦。现在&#xff0c;好…

js计算某个时间距离当前时间多少天,少于7天红色展示

效果图 后端返回数据格式 info:{vip_validity:"2027-09-07" }<div>到期时间&#xff1a;{{ info.vip_validity }}, 剩余<span :class"countdownDays(info.vip_validity) < 7 ? surplus : ">{{ !!info.vip_validity ? countdownDays(inf…

【ARM-Linux篇】项目:智能家居

一、项目概述 •项目功能 通过语音控制客厅灯、卧室灯、风扇、人脸识别开门等,可以进行火灾险情监测,可以并且实现Sockect发送指令远程控制各类家电等 •项目描述 全志H616通过串口连接各模块硬件,检测语音的识别结果,分析语音识别的结果来对家电设备进行控制。摄像头拍…

Superagent:一个开源的AI助手框架与API

在人工智能日益普及的今天,如何将AI助手无缝集成到应用中成为了开发者们关注的焦点。今天,我们要介绍的Superagent正是一个为这一需求量身打造的开源框架与API。它结合了LLM、检索增强生成(RAG)和生成式AI技术,为开发者们提供了一个强大而灵活的解决方案。 一、Superagen…

PID原理及控制算法详解

文章目录 1. 概念 1.1 PID框图 1.2 具体示例&#xff1a;无人机高度控制 2. PID原理 3. 常用术语 4. 计算过程 4.1 比例控制&#xff08;Proportional&#xff09; 4.2 积分控制&#xff08;Integral&#xff09; 4.3 微分控制&#xff08;Derivative&#xff09; 5.…

6.18-6.26 旧c语言

第一章 概述 32关键字 9种控制语句 优点&#xff1a;能直接访问物理地址&#xff0c;位操作&#xff0c;代码质量高&#xff0c;执行效率高 可移植性好 面向过程&#xff1a;以事件为中心 面向对象&#xff1a;以实物为中心 printf&#xff1a;系统定义的标准函数 #include&l…

[图解]建模相关的基础知识-19

1 00:00:00,640 --> 00:00:04,900 前面讲了关系的这些范式 2 00:00:06,370 --> 00:00:11,570 对于我们建模思路来说&#xff0c;有什么样的作用 3 00:00:12,660 --> 00:00:15,230 我们建模的话&#xff0c;可以有两个思路 4 00:00:16,790 --> 00:00:20,600 一个…