flink-触发器Trigger和移除器Evictor

Trigger 触发器

触发器作用:控制窗口什么时候除法计算。即执行窗口函数;基于WindowStream调用trigger()方法,传入自定义触发器(trigger);

每一个窗口分配器(windowAssigner) 都会对应一个默认的触发器;

 源码样例

  
  @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(
            WindowAssigner<? super T, W> assigner) {
        return new WindowedStream<>(this, assigner);
    }
  

  @PublicEvolving
    public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {

        this.input = input;

        this.builder =
                new WindowOperatorBuilder<>(
                        windowAssigner,
                        windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),
                        input.getExecutionConfig(),
                        input.getType(),
                        input.getKeySelector(),
                        input.getKeyType());
    }

==============默认触发器===
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }

Triger类有4个方法

  1.  onElement:窗口中每来一个元素调用该方法。
    onProcessingTime:当注册的处理时间定时器触发时,将调用这个方法。
     onEventTime:当注时的事件时间定时器触发时,将调用这个方法。
     clear:窗口关闭冰销毁时调用这个方法,一般用来清除自定义状态。
    
    onElement() ,onProcessingTime(),onEventTime()方法的返回类型都是 TriggerResult;TriggerResult中包含四个枚举值:
    CONTINUE:表示对窗口不执行任何操作。
    FIRE:触发计算并输出结果。注意计算完成后,窗口中的数据并不会被清除,将会被保留。
    PURGE:表示将窗口中的数据和窗口清除。
    FIRE_AND_PURGE:表示先将数据进行计算,输出结果,然后将窗口中的数据和窗口进行清除。
    

源码

/** No action is taken on the window. */
CONTINUE(false, false),
/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
FIRE_AND_PURGE(true, true),
/**
 * On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,
 * though, all elements are retained.
 */
FIRE(true, false),
/**
 * All elements in the window are cleared and the window is discarded, without evaluating the
 * window function or emitting any elements.
 */
PURGE(false, true);

flink提供的触发器 

flink提供触发器

ProcessingTimeoutTrigger

  • EventTimeTrigger通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessingTimeoutTrigger:当内置触发器满足设置的超时时间时,触发窗口的计算。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算,全局窗口触发器,

原文链接:https://blog.csdn.net/qq_37555071/article/details/122514061

水印触发一般是窗口关闭时间

flink提供的触发器是与窗口对应,当有水印时,如果水印时间大于等于窗口结束时间会触发计算;window.maxTimestamp()获取的是窗口end-1; EventTimeTrigger 的源码可以很明确可以看到注册时注册了触发时间为window.maxTimestamp(),这也是窗口关闭的触发机制。

如果在窗口关闭前触发计算设置多个触发计算时间,这样实现一些特定的需求。例如,每10s输出一次当天的累计数据;

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
       // 限定触发条件为窗口关闭时间,否则就继续窗口 
       return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }
.....
 

自定义触发器

继承Triger,重写抽象方法,案例

.window(TumblingEventTimeWindows.of(Time.hours(24)))
                .trigger(new MyTrigger())
                .process(new WindowResult())
                .print();

窗口长24小时,每十秒触发一次计算
===================

    public static class MyTrigger extends Trigger<Event, TimeWindow> {
        @Override
        public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
             //定义状态,记录该状态 触发器第一个元素进来时注册全部的触发器
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
                    new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
            );
            //第一次注册,右面全部跳过
            if (isFirstEvent.value() == null) {
                for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 10000L) {
                    //注册触发器  间隔10s
                    triggerContext.registerEventTimeTimer(i);
                }
                isFirstEvent.update(true);
            }
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
           //使用的事件时间,因此触发窗口的计算
            return TriggerResult.FIRE;
        }

        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(
                    new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN)
            );
            isFirstEvent.clear();
        }
    }

 

移除器Evictor

作用:主要用来定义移除某些数据的逻辑。基于windowedStream调用evictor()方法,就可以传入一个自定义得移除器(Evictor)。不同窗口类型都有各自预测实现的移除器。

stream.keyby().window().evictor(new MyEvictor)

evictBefore():定义窗口执行函数之前移除的数据操作,移除后的数据不参与窗口计算;

evictAfter():定义执行窗口函数后移除数据的操作;

默认情况下预实现的移出弃都是在执行窗口函数之前移除数据

注意:如果在evict中使用了iterable.iterator(),后面再次使用时不能直接使用

 .keyBy(r -> r.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)));
        window.evictor(new Evictor<Event, TimeWindow>() {
                    @Override
                    public void evictBefore(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
                        Iterator<TimestampedValue<Event>> iterator = elements.iterator();
                        while (iterator.hasNext()){
                            TimestampedValue<Event> next = iterator.next();
                            if(next.getValue().url.equals("./prod?id=1")){
                                iterator.remove();
                            }
                        }
                    }

                    @Override
                    public void evictAfter(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
                        return;
                    }
                })
                .process(new ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {
                        AtomicInteger i= new AtomicInteger();
                        elements.forEach(v-> i.getAndIncrement());
                        out.collect(
                                new UrlViewCount(
                                        s+"====",
                                        // 获取迭代器中的元素个数  
 不能再使用iterable.spliterator().getExactSizeIfKnown(),否侧获取数据一一直为-1
                                        i.longValue(),
                                        context.window().getStart(),
                                        context.window().getEnd()
                                ));
                    } })
                .print();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/747987.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何以智能方式安装 Python

Python易于使用&#xff0c;对初学者友好&#xff0c;功能强大&#xff0c;几乎可以为任何应用程序创建强大的软件。 但与任何其他软件一样&#xff0c;Python 的设置和管理可能很复杂。 在本文中&#xff0c;我们将介绍如何正确设置 Python。 您将学习如何选择合适的版本、…

JavaScript中常用数据类型做布尔值(Boolean)转换

一、前言 二、示例 1、String转Boolean 2、Number转Boolean 3、NaN、Null、undefined 转Boolean 4、Object转Boolean 5、Array转Boolean 6、Symbol转Boolean 三、总结 四、思考 一、前言 JavaScript中&#xff0c;经常需要对一些值进行boolean判断&#xff0c;根据判…

three.js 第六节 - 纹理以及贴图【.hdr文件(hdr贴图)】- 色彩空间

素材 这是素材 更多素材、案例、项目 好几个G一共&#xff0c;加我q178373168&#xff0c;60大洋拿走 源码 源码 // ts-nocheck // 引入three.js import * as THREE from three // 导入轨道控制器 import { OrbitControls } from three/examples/jsm/controls/OrbitControls…

【考研数学】李林《880》25版主要变化汇总

25版李林880拿到手后对比&#xff0c;发现和24几乎没有太大差别&#xff08;高数前两章20多道题目增删&#xff09;&#xff0c;然后24又和23版本一模一样 所以有24版本or23版本的880都可以&#xff0c;不用一定追求25年的&#xff01; 880算是比较经典的习题了&#xff0c;搭…

UE引擎实现ShadowMap、体积光(C++)

前言 整体上参考了YivanLee大佬的这两篇文&#xff1a; 虚幻4渲染编程&#xff08;灯光篇&#xff09;【第一卷&#xff1a;各种ShadowMap】 虚幻4渲染编程&#xff08;灯光篇&#xff09;【第二卷&#xff1a;体积光】 正文 1、ShadowMap &#xff08;1&#xff09;创建工…

上下文管理器在Python中的妙用

更多Python学习内容&#xff1a;ipengtao.com Python上下文管理器是一个非常强大的工具&#xff0c;它能够帮助开发者在特定代码块前后自动执行特定的操作&#xff0c;常用于资源管理&#xff0c;如文件操作、数据库连接和锁定等。本文将详细介绍Python上下文管理器的概念、使用…

django学习入门系列之第三点《案例 商品推荐部分》

文章目录 划分区域搭建骨架完整代码小结往期回顾 划分区域 搭建骨架 /*商品图片&#xff0c;父级设置*/ .slider .sd-img{display: block;width: 1226px;height: 460px; }<!-- 商品推荐部分 --> <!--搭建出一个骨架--> <div class"slider"><di…

云计算基础技术

云计算基础技术概览 计算类产品主要提供算力&#xff0c;支持业务运行&#xff0c;例如网站、办公软件、数据分析等计算能力&#xff0c;目前典型的产品主要是虚拟化和容器&#xff0c;在公有云上的云主机本质也是虚拟机。网络类产品主要满足资源的网络连通性和隔离&#xff0c…

鸿蒙NEXT开发:工具常用命令—install

安装三方库。 命令格式 ohpm install [options] [[<group>/]<pkg>[<version> | tag:<tag>]] ... ohpm install [options] <folder> ohpm install [options] <har file> alias: i 说明 group&#xff1a;三方库的命名空间&#xff0c;可…

告别数据线!轻松实现iOS和安卓设备间的文件共享

用 AirDroid 的附近传输功能&#xff0c;完全免费&#xff0c;几十个G的文件也可以相互传输。不限制iPhone和iPad数量&#xff0c;多个设备同时登录也不会强迫下线。 当你要在苹果手机和安卓手机之间传输文件&#xff0c;请将AirDroid安装到两台手机上&#xff0c;然后登录同一…

Open3D(C++) 删除点云中重复的点

目录 一、算法原理1、重叠点2、主要函数二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法原理 1、重叠点 原始点云克隆一份   构造重叠区域   合并点云获得重叠点 2、主要…

【Mysql】多表、外键约束

多表 1.1 多表简述 实际开发中&#xff0c;一个项目通常需要很多张表才能完成。 例如一个商城项目的数据库,需要有很多张表&#xff1a;用户表、分类表、商品表、订单表… 1.2 单表的缺点 1.2.1 数据准备 创建一个数据库 db3 CREATE DATABASE db3 CHARACTER SET utf8;数据库…

Kompas AI数据分析与预测功能对比

一、引言 在现代商业环境中&#xff0c;数据分析与预测是企业制定战略决策的关键工具。通过对大量数据的分析&#xff0c;企业能够识别趋势、预测未来变化&#xff0c;并做出更为明智的决策。本文将对比Kompas AI与其他主要AI产品在数据分析与预测方面的能力&#xff0c;展示K…

【Linux】gdb调试器

一、gdb调试器背景 程序的发布方式有两种&#xff0c;debug模式和release模式 Linux gcc/g出来的二进制程序&#xff0c;默认是release模式 要使用gdb调试&#xff0c;必须在源代码生成二进制程序的时候, 加上 -g 选项 二、安装gdb yum install gdb三、使用gdb 在Linux当中g…

btrace使用记录

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、商业变现、人工智能等&#xff0c;希望大家多多支持。 未经允许不得转载 目录 一、导读二、使用三、 推荐阅读 一、导…

【LLM Transparency Tool】用于深入分析和理解大型语言模型(LLM)工作原理的工具

背景 LLM Transparency Tool 是一个用于深入分析和理解大型语言模型&#xff08;LLM&#xff09;工作原理的工具&#xff0c;旨在增加这些复杂系统的透明度。它提供了一个交互式界面&#xff0c;用户可以通过它观察、分析模型对特定输入&#xff08;prompts&#xff09;的反应…

IPD流程开发阶段模板及表单

目录 简介 内容brief&#xff08;部分截图&#xff09; 作者简介 简介 最近一段时间因为公司这边需要规范化管理。 就顺便集中整理了一下各类资料。 部分资料呢&#xff0c;就按照类别逐步分享了出来。 正常来讲&#xff0c;每个公司都应该有一个部门&#xff0c; 来专…

CSS Grid网格布局

一、前言 二、Grid布局 1、基本介绍 2、核心概念 &#xff08;1&#xff09;网格容器 &#xff08;2&#xff09;网格元素 &#xff08;3&#xff09;网格列 &#xff08;4&#xff09;网格行 &#xff08;5&#xff09;网格间距 &#xff08;6&#xff09;网格线 三…

C语言 | Leetcode C++题解之第199题二叉树的右视图

题目&#xff1a; 题解&#xff1a; #define MAX_NODE_NUM 100 int* rightSideView(struct TreeNode* root, int* returnSize){if (root NULL) {*returnSize 0;return NULL;}int *res (int *)malloc(sizeof(int) * MAX_NODE_NUM);int cnt 0;struct TreeNode **record (st…

如何使用ossutil工具迁移本地文件到oss(最快速迁移方法)

1&#xff1a;下载ossutil工具&#xff0c;https://help.aliyun.com/zh/oss/developer-reference/install-ossutil&#xff08;注&#xff1a;根据不同的版本去下载&#xff09; 2&#xff1a;解压ossutil工具&#xff0c;并双击运行ossutil.bat文件。 3&#xff1a;输入配置命…