事件时间+时间窗口,最后一个窗口不执行问题踩坑与源码分析

事件时间+时间窗口,最后一个窗口不执行问题踩坑与源码分析

1. 结论

在使用事件时间和时间窗口的过程中,当最后一个事件的事件时间未达到时间窗口的最大时间,窗口不会触发

举例说明,在按小时的滚动窗口中,假设当前时间是12:05点,按照正常预想13:00时窗口会触发执行,但是在12:00到13:00的时间段内,最后一个事件的时间是12:50,之后再未产生新的事件,那么在13:00的时候,窗口并不会触发执行,只有当后续再产生新的事件,并且事件时间大于13:00时,12-13的窗口才会执行。

在实际开发过程中可能会带来一些问题,当事件不是源源不断的产生时,最后一个窗口不执行,影响结果。

示例代码见末尾。

2. 分析原因

源码基于flink 1.14.4

窗口是否执行是由trigger控制,从trigger中逐步寻找原因,在示例代码中使用了TumblingEventTimeWindows,其默认trigger是EventTimeTrigger(通过方法org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows#getDefaultTrigger可知)。

2.1. 窗口触发

EventTimeTrigger实现可知,只有当窗口最大时间小于等于当前水位(每个事件到达时判断) 或 当eventime timer触发时的触发时间戳等于窗口最大时间时才会触发窗口。因此最终决定窗口是否触发的原因是水位值

flink自身提供的其他和事件时间有关的trigger触发时机类似。

// 返回TriggerResult.FIRE时触发计算
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // 窗口最大时间小于等于水位值时,触发
            return TriggerResult.FIRE;
        } else {
            // 大于水位值时,将窗口最大时间注册timer,注册timer后,当水印时间超过参数指定的时间时,会调用下面的onEvenTime方法
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    // time是timer触发时的事件戳
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }
}    

因此再了解下水位值如何更新。

2.2. 事件时间和水位

DataStrema中默认提供了3种分配水位的方式,重点了解下通过WatermarkStrategy实现指定水位的方法。

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            WatermarkStrategy<T> watermarkStrategy) 
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)
@Deprecated
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner)

WatermarkStrategy的核心是TimestampAssignerWatermarkGenerator(按照字面意思暂且称之为时间戳分配器和水位生成器),水位生成器根据指定的时间戳来生成水位。同时WatermarkStrategy默认提供了两种快速实现WatermarkStrategy的方法forMonotonousTimestampsforBoundedOutOfOrderness,分别表示顺序递增和允许乱序的水位生成策略。

public interface WatermarkStrategy<T>
        extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {

    /** 重点,需要在实现WatermarkStrategy时,实现此方法 */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);

    @Override
    default TimestampAssigner<T> createTimestampAssigner(
            TimestampAssignerSupplier.Context context) {
        // 提供了默认实现,用于类似于kafka等数据源时
        return new RecordTimestampAssigner<>();
    }

    static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
        return (ctx) -> new AscendingTimestampsWatermarks<>();
    }

    static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
    }
}

waterMark generator继承关系
AscendingTimestampsWatermarksBoundedOutOfOrdernessWatermarks的特例(outOfOrdernessMillis=0)。

由于WatermarkGenerator负责水位的具体生成,因此从该接口中寻找真相,接口内容如下,包含两个方法。具体作用见注释

public interface WatermarkGenerator<T> {

    /**
     * 指定每个事件到达后,如何处理
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期调用,调用时处理水位。
     * 调用时间间隔取决于ExecutionConfig#getAutoWatermarkInterval(),默认值为200ms
     */
    void onPeriodicEmit(WatermarkOutput output);
}

public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    private long maxTimestamp;
    private final long outOfOrdernessMillis;

    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
        // 默认最大时间是long最小值+允许的延迟时间+1,在允许延迟时间合理时,默认最大时间可以认为时long的最小值
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

从BoundedOutOfOrdernessWatermark的实现中可知,每个事件到达后,仅获取当前最大的时间戳。而水位是通过周期方法来生成的,默认情况下200ms生成一次水位。水位值=当前事件的最大时间戳-允许延迟的事件值-1

到此真相就找到了,水位是根据当前已到事件的最大时间决定的,当后续无更大的时间的事件到之前,水位将暂停。由于水位值无法增加,当窗口到达自然时间点时,无法触发。


示例代码

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = EnvConf.getEnv();
    env.setParallelism(1);

    DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9999);
    DataStream<SensorRecord> inputStream = source.map(line -> {
        String[] fields = line.split(",");
        return new SensorRecord(fields[0], new Double(fields[1]));
    });

    WatermarkStrategy<SensorRecord> watermarkStrategy = WatermarkStrategy
            // 单调递增的水位生成器
            .<SensorRecord>forMonotonousTimestamps()
            // 时间戳生成器,时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。
            .withTimestampAssigner((event, ts) -> event.getTs());
    SingleOutputStreamOperator<SensorRecord> input = inputStream.assignTimestampsAndWatermarks(watermarkStrategy);

    SingleOutputStreamOperator<Tuple2<String, Integer>> result = input.keyBy(SensorRecord::getId)
            .window(TumblingEventTimeWindows.of(Time.hours(1)))
            .aggregate(new AggregateFunction<SensorRecord, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> createAccumulator() {
                    return null;
                }

                @Override
                public Tuple2<String, Integer> add(SensorRecord value, Tuple2<String, Integer> accumulator) {
                    return null;
                }

                @Override
                public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
                    return null;
                }

                @Override
                public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
                    return null;
                }
            });

    result.print();
    env.execute();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/521929.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开启虚拟机时出现此主机支持 Intel VT-x,但 Intel VT-x 处于禁用状态怎么解决

问题描述 虚拟机安装完成后&#xff0c;点击开启此虚拟机弹出系统提示 原因分析&#xff1a; Intel VT-x 处于禁用状态&#xff0c;需要开启。 解决方案&#xff1a; 以联系小新笔记本电脑为例&#xff0c;进入BIOS界面&#xff0c;将Intel Virtual Technology设置成Enabl…

STL--迭代器的介绍

一.迭代器介绍&#x1f357; 迭代器是 C 标准模板库&#xff08;STL&#xff09;中的一个重要概念。简单来说&#xff0c;迭代器就像是一个指针&#xff0c;用于访问和遍历容器中的元素&#xff08;比如数组、链表、集合等&#xff09;。迭代器提供了一种统一的方法来访问容器…

力扣1448---统计二叉树中好节点的数量(Java、DFS、中等题)

题目描述&#xff1a; 给你一棵根为 root 的二叉树&#xff0c;请你返回二叉树中好节点的数目。 「好节点」X 定义为&#xff1a;从根到该节点 X 所经过的节点中&#xff0c;没有任何节点的值大于 X 的值。 示例 1&#xff1a; 输入&#xff1a;root [3,1,4,3,null,1,5] 输出…

开启新纪元中凡工业装备邀您参观2024第13届生物发酵展

参展企业介绍 中凡工业是一家专注于螺旋板式换热器以及相关非标设备制造的生产厂家。公司产品主要应用于环保废水、污水处理、可再生能源、食品、药化、焦化、农化、精细化工等行业&#xff0c;为这些行业解决生产工艺中的液体加热&#xff0c;液体冷却&#xff0c;气体冷凝&a…

聚观早报 | 沃尔沃发布一季度全球销量;苹果将举办财报电话会议

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 4月07日消息 沃尔沃发布一季度全球销量 苹果将举办新财报电话会议 荣耀Magic6支持5.5G通信 特斯拉将建最大超级充…

Word技巧之【允许修改受保护文档的部分内容】

给Word文档设置“限制编辑”&#xff0c;可以保护文档不能随意编辑更改&#xff0c;但如果文档中有部分内容&#xff0c;是需要供打开文档的人可以修改&#xff0c;要怎么使这部分内容允许修改呢&#xff1f;下面一起来看看&#xff0c;如何设置Word文档中允许部分内容可修改。…

政安晨:【深度学习神经网络基础】(三)—— 激活函数

目录 线性激活函数 阶跃激活函数 S型激活函数 双曲正切激活函数 修正线性单元 Softmax激活函数 偏置扮演什么角色&#xff1f; 政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: 政安晨的机器学习笔记 希望政安晨的博客能够对您有所裨…

Java Spring IoCDI :探索Java Spring中控制反转和依赖注入的威力,增强灵活性和可维护性

&#x1f493; 博客主页&#xff1a;从零开始的-CodeNinja之路 ⏩ 收录文章&#xff1a;Java Spring IoC&DI :探索Java Spring中控制反转和依赖注入的威力,增强灵活性和可维护性 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 前提小知识:高内…

12-2-CSS 字体图标

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;HTML5和CSS3悦读 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 CSS 字体图标1 字体图标的产生2 字体图标的优点3 字体图标的下载4 字体图标的…

LangChain-10(2) 加餐 编写Agent获取本地Docker运行情况 无技术含量只是思路

可以先查看 上一节内容&#xff0c;会对本节有更好的理解。 安装依赖 pip install langchainhub编写代码 核心代码 tool def get_docker_info(docker_name: str) -> str:"""Get information about a docker pod container info."""result…

隐私计算实训营学习八:隐语SCQL的开发实践

文章目录 一、SCQL使用集成最佳实践1.1 SCQL使用流程1.2 SCQL部署1.3 SCQL使用示例 二、SCQL工作原理三、使用SecretNote上手体验SCQL 一、SCQL使用集成最佳实践 1.1 SCQL使用流程 SCQL使用&#xff1a; SCQL 开放 API 供⽤户使⽤/集成。可以使⽤SCDBClient上⼿体验(类似与My…

归一化技术比较研究:Batch Norm, Layer Norm, Group Norm

归一化层是深度神经网络体系结构中的关键&#xff0c;在训练过程中确保各层的输入分布一致&#xff0c;这对于高效和稳定的学习至关重要。归一化技术的选择&#xff08;Batch, Layer, GroupNormalization&#xff09;会显著影响训练动态和最终的模型性能。每种技术的相对优势并…

CSS - 你实现过宽高自适应的正方形吗

难度 难度级别:中高级及以上 提问概率:80% 宽高自适应的需求并不少见,尤其是在当今流行的大屏系统开发中更是随处可见,很显然已经超越了我们日常将div写死100px这样的范畴,那么如何实现一个宽高自适应的正方形呢?这里提出两种实现方案。…

【Linux】进程初步理解

个人主页 &#xff1a; zxctscl 如有转载请先通知 文章目录 1. 冯诺依曼体系结构1.1 认识冯诺依曼体系结构1.2 存储金字塔 2. 操作系统2.1 概念2.2 结构2.3 操作系统的管理 3. 进程3.1 进程描述3.2 Linux下的PCB 4. task_struct本身内部属性4.1 启动4.2 进程的创建方式4.2.1 父…

JAVA:探索Apache POI 处理利器

请关注微信公众号&#xff1a;拾荒的小海螺 1、简述 Apache POI是Apache软件基金会的顶级项目之一&#xff0c;它允许Java开发人员读取和写入Microsoft Office格式的文档&#xff0c;包括Excel、Word和PowerPoint文件。通过POI&#xff0c;开发人员可以创建、修改和读取Excel…

面试(04)————JavaWeb

1、网络通讯部分 1.1、 TCP 与 UDP 区别&#xff1f; 1.2、什么是 HTTP 协议&#xff1f; 1.3、TCP 的三次握手&#xff0c;为什么&#xff1f; 1.4、HTTP 中重定向和请求转发的区别&#xff1f; 1.5、 Get 和 Post 的区别&#xff1f; 2、cookie 和 session 的区别&am…

加入酷开会员 酷开系统带你一起开启看电视的美好时光!

看电视对孩子和大人来说&#xff0c;都是有好处的。英国的《星期日泰晤士报》曾刊登报道&#xff1a;“看电视可以让小孩增长见闻&#xff0c;学习各种良好的社交和学习技巧&#xff0c;从而为他们今后的学习打下良好的基础。”而对于成年人来说&#xff0c;看电视也是一种娱乐…

linux 安装 pptp 协议

注意&#xff1a;目前iOS已不支持该协议 yum -y install ppp wget https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/p/pptpd-1.4.0-2.el7.x86_64.rpm yum -y install pptpd-1.4.0-2.el7.x86_64.rpm vi /etc/pptpd.conf 去除 localip 和 remoteip的注释 …

【.Net】Polly

文章目录 概述服务熔断、服务降级、服务限流、流量削峰、错峰、服务雪崩Polly的基本使用超时策略悲观策略乐观策略 重试策略请求异常响应异常 降级策略熔断策略与策略包裹&#xff08;多种策略组合&#xff09; 参考 概述 Polly是一个被.NET基金会支持认可的框架&#xff0c;同…

SAP-MM 新增公司代码 激活物料分类账

1、OMX1 - 激活物料分类账&#xff08;配置环境&#xff09; 2、CKMSTART - 物料分类账的生产开始&#xff08;生产机运行&#xff09; 不激活创建物料时会报错&#xff1a;估价范围还没有生产式的物料账簿 执行后结果&#xff1a; 以上~~