说说FLINK细粒度滑动窗口如何处理

分析&回答

Flink的窗口机制是其底层核心之一,也是高效流处理的关键。Flink窗口分配的基类是WindowAssigner抽象类,下面的类图示出了Flink能够提供的所有窗口类型。

Flink窗口分为滚动(tumbling)、滑动(sliding)和会话(session)窗口三大类,本文要说的是滑动窗口。

下图示出一个典型的统计用户访问的滑动窗口,来自官方文档。

假设每两条虚线之间代表1分钟时间差,那么窗口大小(size)就是2分钟,滑动步长(slide)是1分钟。若时间特征为事件时间,代码如下。

dataStream .keyBy("userId") .window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1))); 由图可知,当前滑动窗口与上一个滑动窗口会有重叠。在窗口大小size是步长slide的2倍的情况下,(几乎)每个DataStream元素都会处于2个窗口内。

我们简单参考一下相关的Flink源码,以加深理解。以下是窗口算子WindowOperator的processElement()方法的部分源码。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
        boolean isSkippedElement = true;
        final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
        if (windowAssigner instanceof MergingWindowAssigner) {
            // 会话窗口的处理逻辑,略去
        } else {
            for (W window : elementWindows) {
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;
                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());
 
                triggerContext.key = key;
                triggerContext.window = window;
                TriggerResult triggerResult = triggerContext.onElement(element);
 
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }
        // 最后是侧输出迟到数据的逻辑,略去
    }
复制代码

该方法先调用WindowAssigner.assignWindows()方法,根据输入元素的时间戳判断它应该属于哪些窗口。接着遍历所有窗口,将该元素加入对应的窗口状态(即缓存)中,并根据触发器返回的TriggerResult决定是输出(fire)还是清除(purge)窗口的内容,emitWindowContents()方法会调用用户函数。最后,还要调用registerCleanupTimer()方法注册计时器用来在窗口彻底过期时清除窗口状态。

以下是SlidingEventTimeWindows.assignWindows()方法的源码。

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                 start > timestamp - size;
                 start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
 
    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
复制代码

这段代码就不难理解了,先调用getWindowStartWithOffset()方法根据元素的时间戳计算出其窗口的起点时间戳,再逐次循环向后滑动,产生size / slide个窗口。我们可以将size / slide叫做“粒度”,亦即上述代码中返回的Collection集合的大小。粒度越大(“细”),滑动窗口之间的重合也越大。

代码读完了,有一个貌似稀松平常的需求:

以3分钟的频率实时计算App内各个子模块近24小时的PV和UV。

直觉上我们需要用粒度为1440 / 3 = 480的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:

状态 由代码可知,WindowOperator内维护了窗口本身的内部状态windowState(类型为InternalAppendingState)。对于一个元素,会将其写入对应的(key, window)二元组所圈定的状态中。可见,如果粒度为480,那么每个元素到来,更新windowState时都要遍历480个窗口并写入,开销是非常大的。在采用HDFS/RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。

定时器 在Flink中,定时器的实际实现是TimerHeapInternalTimer类,并且是用Flink自己实现的优先队列维护在堆内存中的。而在WindowOperator中,每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。细粒度滑动窗口会造成维护的定时器增多,内存负担加重。

在官方文档Windows最后一节的最后,也有如下的提醒:

Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly one window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the Window Assigners section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.

可能有看官会问:预聚合不能解决细粒度窗口的问题吗?答案是不能。预聚合只是让AggregateFunction/ReduceFunction之后的数据量降低,但是进入WindowOperator的窗口状态的数据还是没变的。换句话说,就算触发器实现为FIRE_AND_PURGE,遍历大量窗口并写入状态的开销也是无法消除的。

扯了这么多,有解决方案吗?

当然是有的,办法总比困难多。我们一般使用 滚动窗口+在线存储+读时聚合 的思路作为workaround。简单来讲就是:

弃用滑动窗口,用长度等于原滑动窗口步长的滚动窗口代替; 每个滚动窗口将其周期内的数据做聚合,打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase); 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。 针对上面的PV/UV问题,如果采用Redis作为在线存储,我们可以将时间戳放在key内,并设定24小时过期时间。用数字字符串存储3分钟周期内的PV量,用HyperLogLog存储3分钟周期内的UV量。近24小时的PV和UV就分别可以通过简单加减和HyperLogLog的pfmerge/pfcount命令得出了。当然,实际操作起来还是要根据需求和服务器资源而定。

喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/98419.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

unity界面上Global 与Local xyz- right up forward

gloabal 如果要沿这个方向移动就比较困难 local下就不一样了

对于需要进行商品价格监控的企业来说,使用淘宝API是一种有效的途径

淘宝是一个广泛使用的在线购物平台&#xff0c;其API可以用于获取各种商品的价格数据。对于需要进行商品价格监控的企业来说&#xff0c;使用淘宝API是一种有效的途径。 以下是使用淘宝API进行商品价格监控的几个步骤&#xff1a; 获取API密钥 在使用淘宝API之前&#xff0c…

Springboot 接口方式硬通知实现 动态刷新配置值,@ConfigurationProperties 、@Value 都可以

前言 看到这个文章标题&#xff0c;也许有的看官就觉得很多余&#xff0c; 因为Nacos 可以设置 NacosValue(value "${XXX}",autoRefreshed true) 实现动态刷新&#xff1b; 又因为cloud config的RefreshScope 实现动态刷新&#xff1b; 还有阿波罗...等 这…

【C++】VS配置OpenCV/Libtorch环境

前言 本文是视频https://www.bilibili.com/video/BV1dp4y177L4的笔记。 OpenCV和Libtorch安装包&#xff1a;https://pan.baidu.com/s/1i3DqTcHFSC1rRDsIgYGCsQ?pwd8888 VS版本&#xff1a;2019 Opencv版本&#xff1a;3.4.1 Libtorch版本&#xff1a;2.0.1cu117 配置Open…

TCP之三次握手四次挥手

在前面的文章中我们了解到http是基于TCP/IP协议的&#xff0c;这篇文章我们来了解一下TCP/IP。 一、TCP与UDP 1、UDP 基于非连接。类似于写信&#xff0c;不能保证对方能不能接收到&#xff0c;接收到的内容是否完整&#xff0c;顺序是否正确。 优缺点&#xff1a;性能损耗小…

el-select实现懒加载

先看一个线上的演示示例&#xff1a;https://code.juejin.cn/pen/7273352811440504889 背景 我们在实际开发中经常遇到这样的需求&#xff1a; el-select实现懒加载&#xff0c;用通俗的话说&#xff0c;为了增加响应速度&#xff0c;就是初始下拉只展示50条数据&#xff0c…

excel 分组排序

excel中会遇到对不同分组数据进行排序&#xff0c;比如对于不同班级里的学生按照分数高低进行升序排序&#xff0c;可以采用如下公式 SUMPRODUCT((A$2:A$12A2)*(C$2:C$12>C2))1 如果需要 进行降序排序&#xff0c;将公式中的大于号替换为小于号即可

Unity UI与粒子 层级问题Camera depth Sorting Layer Order in Layer RenderQueue

Unity游戏开发中&#xff0c;模型、界面、特效等&#xff0c;需要规划好layer的概念&#xff0c;涉及到摄像机&#xff08;Camera&#xff09;、画布&#xff08;Canvas&#xff09;、Shader等相关内容。 在 Unity 中&#xff0c;渲染顺序是由多个因素共同决定的&#xff0c;大…

【暴力DP】2021 icpc上海 I

Problem - I - Codeforces 题意&#xff1a; 思路&#xff1a; 考虑暴力DP即可 设 dp[i][j][k]表示 前 i 个物品&#xff0c;已经翻倍了 j 次&#xff0c;A点数 - B点数为 k 的最大价值和 然后分为这6种决策分类讨论就好了 注意数组里不能有负数&#xff0c;要加个偏移量 P…

百度文心一率先言向全社会开放 应用商店搜“文心一言”可直接下载

8月31日&#xff0c;文心一言率先向全社会全面开放。广大用户可以在应用商店下载“文心一言APP”或登陆“文心一言官网”&#xff08;https://yiyan.baidu.com&#xff09; 体验。同时&#xff0c;企业用户可以直接登录百度智能云千帆大模型平台官网&#xff0c;调用文心一言能…

Kubernetes技术--k8s核心技术 configMap

1.概述 configMap最主要的作用是存储一些不加密的数据到/etcd,让pod以变量或者数据卷(volume)挂载到容器。 应用场景:配置文件、存储信息等 2.使用 -1.创建配置文件。 这里我们需要先编写一个配置文件。使用redis,如下所示:

k8s使用ECK(2.4)形式部署elasticsearch+kibana-http协议

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、准备elasticsearch-cluster.yaml二、部署并测试总结 前言 之前写了eck2.4部署eskibana&#xff0c;默认的话是https协议的&#xff0c;这里写一个使用http…

SpringCloud(十)——ElasticSearch简单了解(三)数据聚合和自动补全

文章目录 1. 数据聚合1.1 聚合介绍1.2 Bucket 聚合1.3 Metrics 聚合1.4 使用 RestClient 进行聚合 2. 自动补全2.1 安装补全包2.2 自定义分词器2.3 自动补全查询2.4 拼音自动补全查询2.5 RestClient 实现自动补全2.5.1 建立索引2.5.2 修改数据定义2.5.3 补全查询2.5.4 解析结果…

面试官如何考察与CAP相关的理论?

在互联网技术面试中&#xff0c;考察分布式技术已经是面试的标配了。很多招聘信息中&#xff0c;你能发现&#xff0c;一线互联网公司在对候选人的要求中都有“分布式系统设计”这一关键词。无论你是程序员&#xff0c;还是架构师&#xff0c;都要掌握分布式系统设计。 案例背…

vue使用打印组件print-js

项目场景&#xff1a; 由于甲方要求&#xff0c;项目需要打印二维码标签&#xff0c;故开发此功能 开发流程 安装包&#xff1a;npm install print-js --saveprint-js的使用 <template><div id"print" ref"print" ><p>打印内容<p&…

WebSocket 协议及其使用案例

文章目录 前言一、初识 WebSocket 协议1.1 什么是 WebSocket 协议1.2 WebSocket 与 HTTP 的关系1.3 WebSocket 握手的过程1.4 WebSocket 解决了什么问题 二、WebSocket 数据帧格式2.1 WebSocket 数据帧格式图示2.2 各字段的详细说明 三、SpringBoot 项目中引入 WebSocket3.1 创…

python中的文件操作

我们平常对文件的基本操作&#xff0c;大概可以分为三个步骤&#xff08;简称文件操作三步走&#xff09;&#xff1a; ① 打开文件 ② 读写文件 ③ 关闭文件 【注意事项】 注意&#xff1a;可以只打开和关闭文件&#xff0c;不进行任何读写 文件打开 open函数&#xff…

Hibernate(Spring Data)抓取策略

文章目录 示例代码放到最后&#xff0c;使用的是Springboot 项目1. 简介2. Hibernate抓取策略分类2.1 即时加载&#xff08;Eager Loading&#xff09;2.2 延迟加载&#xff08;Lazy Loading&#xff09;2.3 子查询加载&#xff08;Subselect Loading&#xff09;2.4 基于批处理…

排序之选择排序

文章目录 前言一、直接选择排序1、直接选择排序基本思想2、直接选择排序代码实现3、直接选择排序的效率 二、堆排序1、堆排序2、堆排序的效率 前言 选择排序的基本思想就是每一次从待排序的数据元素中选出最小(或最大)的一个元素&#xff0c;存放在序列的起始位置&#xff0c;…

pdfh5在线预览pdf文件

前言 pc浏览器和ios的浏览器都可以直接在线显示pdf文件&#xff0c;但是android浏览器不能在线预览pdf文件&#xff0c;如何预览pdf文件&#xff1f; Github: https://github.com/gjTool/pdfh5 Gitee: https://gitee.com/gjTool/pdfh5 使用pdfh5预览pdf 编写预览页面 <…