【Flink】Flink 中的时间和窗口之窗口其他API的使用

1. 窗口的其他API简介

对于一个窗口算子而言,窗口分配器和窗口函数是必不可少的。除此之外,Flink 还提供了其他一些可选的 API,可以更加灵活地控制窗口行为。

1.1 触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的"触发计算"本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。

基于WindowsStream调用.trigger()方法,参数传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)
 .window(...)
 .trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTriggerCountTrigger。所以一般情况下是不需要自定义触发器的。

Trigger是一个抽象类,自定义时必须实现以下四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法。
  • onEventTime():当注册的事件时间定时触发时,将调用这个方法。
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

除了 clear()比较像生命周期方法,其他三个方法其实都是对某种事件的响应。onElement()是对流中数据元素到来的响应;而另两个则是对时间的响应。这些方法参数中都有一个“触发器上下文”(TriggerContext)对象,可以用来注册定时器回调(callback)。对于时间窗口(TimeWindow)而言,就是在窗口的结束时间设定了一个定时器,这样到时间就可以触发窗口的计算输出了。

另外这三个方法的返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。并且TriggerResult的返回结果可以让计算输出结果和关闭窗口分开执行。

示例:
在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 创建自定义数据源的实时流
        DataStream<Event> stringDataStreamSource = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        })
                );
        stringDataStreamSource.keyBy(Event::getUrl)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .trigger(new MyTrigger())
                .process(new WindowResult())
                .print();

        stringDataStreamSource.print("data");
        env.execute();
    }

    public static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {

        @Override
        public void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {
            out.collect(new UrlViewCount(s, elements.spliterator().getExactSizeIfKnown(), context.window().getStart(), context.window().getEnd()));
        }
    }

    private static class UrlViewCount {
        private String s;
        private long size;
        private long start;
        private long end;

        public UrlViewCount(String s, long size, long start, long end) {
            this.s = s;
            this.size = size;
            this.start = start;
            this.end = end;
        }

        @Override
        public String toString() {
            return "UrlViewCount{" +
                    "s='" + s + '\'' +
                    ", size=" + size +
                    ", start='" + start + '\'' +
                    ", end='" + end + '\'' +
                    '}';
        }
    }

    public static class MyTrigger extends Trigger<Event, TimeWindow> {

        @Override
        public TriggerResult onElement(Event element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            ValueState<Boolean> isFirstEvent = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
            if (isFirstEvent.value() == null) {
                for (long i = window.getStart(); i < window.getEnd() ; i = i + 1000L) {
                    ctx.registerEventTimeTimer(i);
                }
                isFirstEvent.update(true);
            }
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
            ValueState<Boolean> isFirstEvent = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
            isFirstEvent.clear();
        }
    }

输出结果:
在这里插入图片描述

1.2 移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。

1.3 允许延迟(Allowed Lateness)

在事件时间语义下,窗口会出现迟到的数据。之所以出现迟到数据是因为在乱序流中,水位线并一定能保证时间戳更早的所有数据不会再出现,当水位线已经到达窗口的结束时间时,窗口触发计算并输出结果,这时一般就要销毁窗口了;如果还有本该属于这个窗口的数据到达,默认情况下会被丢弃。

大多数情况下直接丢弃数据会导致统计结果不准,为了解决迟到数据的问题,Flink提供了一个特殊接口,可以为窗口算子设置一个"允许最大延迟"(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。

基于 WindowedStream 调用.allowedLateness()方法,传入一个Time类型的延迟时间,就可以表示允许这段时间内的延迟数据。

stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .allowedLateness(Time.minutes(1))

比如上面的代码中,我们定义了 1 小时的滚动窗口,并设置了允许 1 分钟的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点的窗口,本来应该是水位线到达 9 点整触发计算并关闭窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果,并不会关窗。后续到达的数据,只要属于 8 点~9 点窗口,依然可以在之前统计的基础上继续叠加,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分,这时就真正清空状态、关闭窗口,之后再来的迟到数据就会被丢弃了。

从这里可以看到,窗口的触发计算(Fire)清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

1.4 将迟到的数据放入侧输出流

对于处理迟到数据,仅仅提供延迟时间还是会出现迟到的数据,所以Flink提供了另外一种方式处理迟到数据。可以将迟到数据放到侧输出流(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。

基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以OutputTag的类型与流中数据类型相同。

DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

这里注意,getSideOutput()SingleOutputStreamOperator的方法,获取到的侧输出流数据
类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/481281.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2核2G服务器阿里云多少钱一年?

阿里云2核2G服务器配置优惠价格61元一年和99元一年&#xff0c;61元是轻量应用服务器2核2G3M带宽、50G高效云盘&#xff1b;99元服务器是ECS云服务器经济型e实例ecs.e-c1m1.large&#xff0c;2核2G、3M固定带宽、40G ESSD entry系统盘&#xff0c;阿里云活动链接 aliyunfuwuqi.…

springboot295基于Mysql的商业辅助决策系统的设计与实现

商业辅助决策系统的设计与实现 摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统收支信息和销售订单信息管…

神奇科技突破:瘫痪男子通过Neuralink脑植入物重新掌控数字世界!

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

电子电器架构 —— 诊断数据DTC起始篇(上)

电子电器架构 —— 诊断数据DTC起始篇(上) 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师(Wechat:gongkenan2013)。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 本就是小人物,输了就是输了,不要在意别人怎么看自己。江湖一碗茶,喝完再…

iStoreOS R4S软路由结合内网穿透实现公网远程本地电脑桌面

文章目录 简介一、配置远程桌面公网地址二、家中使用永久固定地址 访问公司电脑**具体操作方法是&#xff1a;** 简介 软路由是PC的硬件加上路由系统来实现路由器的功能&#xff0c;也可以说是使用软件达成路由功能的路由器。 使用软路由控制局域网内计算机的好处&#xff1a…

1.6 学Python能干什么,Python的应用领域有哪些

Python能干什么&#xff0c;Python的应用领域 Python 作为一种功能强大的编程语言&#xff0c;因其简单易学而受到很多开发者的青睐。那么&#xff0c;Python 的应用领域有哪些呢&#xff1f; Python 有着非广泛的应用&#xff0c;几乎所有大中型互联网公司都在使用 Python&a…

海外问卷调查项目是割韭菜吗?

大家好&#xff0c;我是橙河老师&#xff0c;今天聊一聊国外问卷调查挣钱是真实的吗&#xff1f;海外问卷调查项目是割韭菜吗&#xff1f; 作为问卷行业的老司机&#xff0c;我可以很负责的告诉大家&#xff0c;海外问卷调查这个项目是可以做的&#xff0c;我已经做了4年时间&…

基于SpringBoot+Vue信息化在线教学平台的设计与实现(源码+部署说明+演示视频+源码介绍+lw)

您好&#xff0c;我是码农飞哥&#xff08;wei158556&#xff09;&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。&#x1f4aa;&#x1f3fb; 1. Python基础专栏&#xff0c;基础知识一网打尽&#xff0c;9.9元买不了吃亏&#xff0c;买不了上当。 Python从入门到精通…

全国大学生数学建模大赛备赛——相关系数的求解(皮尔逊(pearson)、斯皮尔曼(spearman)、肯德尔(kendall)相关系数)

相关系数是用来衡量两个变量之间线性相关程度的指标。它的取值范围在-1到1之间&#xff0c;当相关系数为1时表示两个变量完全正相关&#xff08;即一个变大另一个也变大&#xff09;&#xff0c;当相关系数为-1时表示两个变量完全负相关&#xff08;即一个变大另一个变小&#…

linux查看usb是3.0还是2.0

1 作为device cat /sys/devices/platform/10320000.usb30drd/10320000.dwc3/udc/10320000.dwc3/current_speed 如下 high-speed usb2.0 super-speed usb3.0 2 作为host linux下使用以下命令查看 &#xff0c;如果显示 速率为5G, 则为USB 3.0&#xff0c; USB2.0通常显示速率…

STM32CubeIDE 1.15.0 LOAD segment with RWX permissions 警告处理

处理办法&#xff1a; 在"xx_FLASH.ld"文件中&#xff0c;找到并添加上&#xff08;READONLY&#xff09;&#xff0c;即可消除 .ARM.extab (READONLY) :.ARM (READONLY) :.preinit_array (READONLY) :.init_array (READONLY) :.fini_array (READONLY) :

记录C++中,子类同名属性并不能完全覆盖父类属性的问题

问题代码&#xff1a; 首先看一段代码&#xff1a;很简单&#xff0c;就是BBB继承自AAA&#xff0c;然后BBB重写定义了同名属性&#xff0c;然后调用父类AAA的打印函数&#xff1a; #include <iostream> using namespace std;class AAA { public:AAA() {}~AAA() {}void …

QT tableWidget横向纵向设置

横向控件 要设置QTabWidget选项卡的字体方向&#xff0c;可以使用QTabWidget的setTabPosition()方法。通过传递Qt枚举值QTabWidget.east或QTabWidget.west作为参数&#xff0c;可以设置选项卡的字体方向为从左到右或从右到左。 myTabWidget QTabWidget() myTabWidget.setTabP…

判断是否是完全二叉树

题目 题目链接 判断是不是完全二叉树_牛客题霸_牛客网 题目描述 代码实现 #include <queue> class Solution { public:/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可** * param root TreeNode类 * return boo…

虚拟机安装Linux系统,FinalShell远程连接Linux

1.虚拟机安装CentOS系统 2. 查看CentOS系统的ip地址 3. FinalShell远程连接Linux 3.虚拟机快照&#xff08;存档&#xff09; 确保虚拟机关机&#xff0c;找到快照模拟器 恢复快照

【Mysql】面试题汇总

1. 存储引擎 1-1. MySQL 支持哪些存储引擎&#xff1f;默认使用哪个&#xff1f; 答&#xff1a; MySQL 支持的存储引擎包括 InnoDB、MyISAM、Memory 等。 Mysql 5.5 之前默认的是MyISAM&#xff0c;Mysql 5.5 之后默认的是InnoDB。 可以通过 show engines 查看 Mysql 支持…

ES的集群节点发现故障排除指南(2)

本文是ES官方文档关于集群节点发现与互联互通的问题排查指南内容&#xff0c;第二部分。 原文参考及相关内容&#xff1a; 英文原文&#xff08;官网&#xff09; 第一部分-&#xff08;1&#xff09; 已选出主节点但状态不稳定&#xff1f; 当一个节点赢得主节点选举时&…

苹果电脑不能删除移动硬盘文件 苹果电脑移动硬盘只读模式如何更改 移动硬盘文件或目录损坏且无法读取怎么办

当我们将移动硬盘插入苹果电脑后&#xff0c;发现无法对移动硬盘中的文件进行编辑该怎么办&#xff1f;相信有不少网友遇到过这类情况。苹果电脑不能删除移动硬盘文件&#xff0c;或无法拷贝硬盘里的文件。今天我为大家解决苹果电脑移动硬盘只读模式如何更改的问题&#xff0c;…

运维 | 在企业环境中快速安装配置 FreeBSD Unix 服务器操作系统

微信改版了,现在看到我们全凭缘分,为了不错过【全栈工程师修炼指南】重要内容及福利,大家记得按照上方步骤设置「接收文章推送」哦~ 0x01 Unix 服务器系统 FreeBSD Unix FreeBSD 是什么? 描述: FreeBSD 是一种用于为现代服务器、台式机和嵌入式平台供电的操作系统; 三十多…

计算机组成原理 例题集

补码的规格化表示是小数点后一位与符号位不同&#xff1a;数符为0,这个数就是正数,正数补码就是其本身,其最高有效位(阶码使用标准移码的话规格化后尾数最高有效位就是小数点后第一位)必定为1,数符0和最高有效位的1相异.数符为1,这个数就是个负数,求负数的补码有一步叫按位取反…