Flink 窗口触发器(Trigger)(一)

Flink 窗口触发器(Trigger)(一)
Flink 窗口触发器(Trigger)(二)

Flink的窗口触发器(Trigger)是流处理中一个非常关键的概念,它定义了窗口何时被触发并决定触发后的行为(如进行窗口数据的计算或清理)。

一、基本概念

  • 定义:触发器决定了窗口何时被触发以及触发后的行为。在Flink中,窗口的触发是通过设置定时器来实现的。
  • 作用:控制窗口数据的聚合时机,确保数据在适当的时间点被处理和输出。
    在这里插入图片描述

二、类型

Flink提供了多种内置的触发器,以下几种为常用类型:

  1. EventTimeTrigger
  • 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立即触发窗口计算。
  • 适用场景:适用于需要基于事件时间进行处理的场景,如金融交易、日志分析等。
  1. ProcessingTimeTrigger
  • 工作原理:基于处理时间(即机器的系统时间)来触发窗口计算。当处理时间达到窗口的结束时间时,触发窗口计算。
  • 适用场景:适用于对时间精度要求不高的场景,或者当事件时间无法准确获取时。
  1. CountTrigger
  • 工作原理:根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。
  • 适用场景:适用于需要基于数据量进行处理的场景,如批量数据处理、流量分析等。
  1. ContinuousEventTimeTriggerContinuousProcessingTimeTrigger
  • 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的结束时间小于当前的时间(事件时间或处理时间)时触发计算。
  • 适用场景:适用于需要周期性处理数据的场景,如实时监控、周期性报表等。
  1. DeltaTrigger
  • 工作原理:根据接入数据计算出的Delta指标是否超过指定的阈值来触发窗口计算。
  • 适用场景:适用于需要基于数据变化量进行处理的场景,如异常检测、趋势分析等。
  1. PurgingTrigger
  • 工作原理:将其他触发器作为参数转换为Purge类型的触发器,在触发计算后清除窗口内的数据。
  • 适用场景:适用于需要在计算完成后立即清除窗口数据的场景,以节省存储空间。

三、关键方法

触发器通常包含以下几个关键方法:

  1. onElement(T element, long timestamp, W window, TriggerContext ctx)
    当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。
  2. onEventTime(long time, W window, TriggerContext ctx)
    当事件时间计时器触发时调用,用于处理事件时间相关的触发逻辑。
  3. onProcessingTime(long time, W window, TriggerContext ctx)
    当处理时间计时器触发时调用,用于处理处理时间相关的触发逻辑。
  4. onMerge(W window, OnMergeContext ctx)(可选)
    当两个窗口合并时调用,用于合并窗口的状态和定时器。
  5. clear(W window, TriggerContext ctx)
    当窗口被删除时调用,用于清理窗口的状态和定时器。

四、行为

触发器在触发时会返回一个TriggerResult枚举值,以决定窗口的后续行为。常见的TriggerResult值包括:

  • CONTINUE:表示不进行任何操作,等待下一个触发条件。
  • FIRE:表示触发窗口计算并输出结果,但窗口状态保持不变。
  • PURGE:表示不触发窗口计算,只清除窗口内的数据和状态。
  • FIRE_AND_PURGE:表示触发窗口计算并输出结果,然后清除窗口内的数据和状态。

Flink的窗口触发器是流处理中非常灵活且强大的工具,它允许开发者根据实际需求定义窗口的触发条件和触发后的行为。通过选择合适的触发器和配置相应的参数,可以实现高效、准确的流数据处理。

五、Trigger

EventTimeTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

ProcessingTimeTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

ProcessingTimeoutTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import java.time.Duration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;
    private final Trigger<T, W> nestedTrigger;
    private final long interval;
    private final boolean resetTimerOnNewRecord;
    private final boolean shouldClearOnTimeout;
    private final ValueStateDescriptor<Long> timeoutStateDesc;

    private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long interval, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
        this.nestedTrigger = nestedTrigger;
        this.interval = interval;
        this.resetTimerOnNewRecord = resetTimerOnNewRecord;
        this.shouldClearOnTimeout = shouldClearOnTimeout;
        this.timeoutStateDesc = new ValueStateDescriptor("timeout", LongSerializer.INSTANCE);
    }

    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
        if (triggerResult.isFire()) {
            this.clear(window, ctx);
            return triggerResult;
        } else {
            ValueState<Long> timeoutState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
            long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
            Long timeoutTimestamp = (Long)timeoutState.value();
            if (timeoutTimestamp != null && this.resetTimerOnNewRecord) {
                ctx.deleteProcessingTimeTimer(timeoutTimestamp);
                timeoutState.clear();
                timeoutTimestamp = null;
            }

            if (timeoutTimestamp == null) {
                timeoutState.update(nextFireTimestamp);
                ctx.registerProcessingTimeTimer(nextFireTimestamp);
            }

            return triggerResult;
        }
    }

    public TriggerResult onProcessingTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
        if (this.shouldClearOnTimeout) {
            this.clear(window, ctx);
        }

        return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
    }

    public TriggerResult onEventTime(long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onEventTime(timestamp, window, ctx);
        if (this.shouldClearOnTimeout) {
            this.clear(window, ctx);
        }

        return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ValueState<Long> timeoutTimestampState = (ValueState)ctx.getPartitionedState(this.timeoutStateDesc);
        Long timeoutTimestamp = (Long)timeoutTimestampState.value();
        if (timeoutTimestamp != null) {
            ctx.deleteProcessingTimeTimer(timeoutTimestamp);
            timeoutTimestampState.clear();
        }

        this.nestedTrigger.clear(window, ctx);
    }

    public String toString() {
        return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout) {
        return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), false, true);
    }

    public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(Trigger<T, W> nestedTrigger, Duration timeout, boolean resetTimerOnNewRecord, boolean shouldClearOnTimeout) {
        return new ProcessingTimeoutTrigger(nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
    }
}

CountTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final ReducingStateDescriptor<Long> stateDesc;

    private CountTrigger(long maxCount) {
        this.stateDesc = new ReducingStateDescriptor("count", new Sum(), LongSerializer.INSTANCE);
        this.maxCount = maxCount;
    }

    public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> count = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        count.add(1L);
        if ((Long)count.get() >= this.maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ((ReducingState)ctx.getPartitionedState(this.stateDesc)).clear();
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(this.stateDesc);
    }

    public String toString() {
        return "CountTrigger(" + this.maxCount + ")";
    }

    public static <W extends Window> CountTrigger<W> of(long maxCount) {
        return new CountTrigger(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Sum() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}

ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long interval;
    private final ReducingStateDescriptor<Long> stateDesc;

    private ContinuousEventTimeTrigger(long interval) {
        this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE);
        this.interval = interval;
    }

    public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
            if (fireTimestampState.get() == null) {
                this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState);
            }

            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        if (time == window.maxTimestamp()) {
            return TriggerResult.FIRE;
        } else {
            ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
            Long fireTimestamp = (Long)fireTimestampState.get();
            if (fireTimestamp != null && fireTimestamp == time) {
                fireTimestampState.clear();
                this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);
                return TriggerResult.FIRE;
            } else {
                return TriggerResult.CONTINUE;
            }
        }
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        Long timestamp = (Long)fireTimestamp.get();
        if (timestamp != null) {
            ctx.deleteEventTimeTimer(timestamp);
            fireTimestamp.clear();
        }

    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(this.stateDesc);
        Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();
        if (nextFireTimestamp != null) {
            ctx.registerEventTimeTimer(nextFireTimestamp);
        }

    }

    public String toString() {
        return "ContinuousEventTimeTrigger(" + this.interval + ")";
    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {
        return new ContinuousEventTimeTrigger(interval.toMilliseconds());
    }

    private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
        long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp());
        fireTimestampState.add(nextFireTimestamp);
        ctx.registerEventTimeTimer(nextFireTimestamp);
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Min() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long interval;
    private final ReducingStateDescriptor<Long> stateDesc;

    private ContinuousProcessingTimeTrigger(long interval) {
        this.stateDesc = new ReducingStateDescriptor("fire-time", new Min(), LongSerializer.INSTANCE);
        this.interval = interval;
    }

    public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        timestamp = ctx.getCurrentProcessingTime();
        if (fireTimestampState.get() == null) {
            this.registerNextFireTimestamp(timestamp - timestamp % this.interval, window, ctx, fireTimestampState);
        }

        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestampState = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        if (((Long)fireTimestampState.get()).equals(time)) {
            fireTimestampState.clear();
            this.registerNextFireTimestamp(time, window, ctx, fireTimestampState);
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = (ReducingState)ctx.getPartitionedState(this.stateDesc);
        Long timestamp = (Long)fireTimestamp.get();
        if (timestamp != null) {
            ctx.deleteProcessingTimeTimer(timestamp);
            fireTimestamp.clear();
        }

    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(this.stateDesc);
        Long nextFireTimestamp = (Long)((ReducingState)ctx.getPartitionedState(this.stateDesc)).get();
        if (nextFireTimestamp != null) {
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
        }

    }

    @VisibleForTesting
    public long getInterval() {
        return this.interval;
    }

    public String toString() {
        return "ContinuousProcessingTimeTrigger(" + this.interval + ")";
    }

    public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {
        return new ContinuousProcessingTimeTrigger(interval.toMilliseconds());
    }

    private void registerNextFireTimestamp(long time, W window, Trigger.TriggerContext ctx, ReducingState<Long> fireTimestampState) throws Exception {
        long nextFireTimestamp = Math.min(time + this.interval, window.maxTimestamp());
        fireTimestampState.add(nextFireTimestamp);
        ctx.registerProcessingTimeTimer(nextFireTimestamp);
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Min() {
        }

        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }
}

DeltaTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;
    private final DeltaFunction<T> deltaFunction;
    private final double threshold;
    private final ValueStateDescriptor<T> stateDesc;

    private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
        this.deltaFunction = deltaFunction;
        this.threshold = threshold;
        this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);
    }

    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);
        if (lastElementState.value() == null) {
            lastElementState.update(element);
            return TriggerResult.CONTINUE;
        } else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
            lastElementState.update(element);
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        ((ValueState)ctx.getPartitionedState(this.stateDesc)).clear();
    }

    public String toString() {
        return "DeltaTrigger(" + this.deltaFunction + ", " + this.threshold + ")";
    }

    public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
        return new DeltaTrigger(threshold, deltaFunction, stateSerializer);
    }
}

PurgingTrigger

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1L;
    private Trigger<T, W> nestedTrigger;

    private PurgingTrigger(Trigger<T, W> nestedTrigger) {
        this.nestedTrigger = nestedTrigger;
    }

    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onEventTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(time, window, ctx);
        return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
    }

    public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
        this.nestedTrigger.clear(window, ctx);
    }

    public boolean canMerge() {
        return this.nestedTrigger.canMerge();
    }

    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        this.nestedTrigger.onMerge(window, ctx);
    }

    public String toString() {
        return "PurgingTrigger(" + this.nestedTrigger.toString() + ")";
    }

    public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
        return new PurgingTrigger(nestedTrigger);
    }

    @VisibleForTesting
    public Trigger<T, W> getNestedTrigger() {
        return this.nestedTrigger;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/767998.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AC7801时钟配置流程

一 默认配置 在启动文件中&#xff0c;已经对时钟进行了初始化&#xff0c;默认按外部8M晶振&#xff0c;配置系统时钟为48MHZ&#xff0c;APB为系统时钟的2分频&#xff0c;为24MHZ。在system_ac780x.c文件中&#xff0c;可以找到下面这个系统初始化函数&#xff0c;里面有Se…

力扣hot100-普通数组2

文章目录 题目&#xff1a;轮转数组方法1-使用额外的数组方法2-三次反转数组 除自身以外数组的乘积方法1-用到了除法方法2-前后缀乘积法 题目&#xff1a;轮转数组 原题链接&#xff1a;轮转数组 方法1-使用额外的数组 方法1是自己写出来的。方法2参考的别人的&#xff0c;…

守护创新之魂:源代码防泄漏的终极策略

在信息化快速发展的今天&#xff0c;企业的核心机密数据&#xff0c;尤其是源代码&#xff0c;成为了企业竞争力的关键所在。然而&#xff0c;源代码的泄露风险也随之增加&#xff0c;给企业的安全和发展带来了巨大威胁。在这样的背景下&#xff0c;SDC沙盒作为一种创新的源代码…

C++——stack和queue类用法指南

一、stack的介绍和使用 1.1 stack的介绍 1、stack是一种容器适配器&#xff0c;专门用在具有后进先出操作的上下文环境中&#xff0c;其删除只能从容器的一端进行插入与提取操作 2、stack是作为容器适配器被实现的&#xff0c;容器适配器即是对特定类封装作为其底层的容器&am…

imx6ull/linux应用编程学习(8)PWM应用编程(基于正点)

1.应用层如何操控PWM&#xff1a; 与 LED 设备一样&#xff0c; PWM 同样也是通过 sysfs 方式进行操控&#xff0c;进入到/sys/class/pwm 目录下 这里列举出了 8 个以 pwmchipX&#xff08;X 表示数字 0~7&#xff09;命名的文件夹&#xff0c;这八个文件夹其实就对应了…

同样的APP为何在Android 8以后网络感觉变卡?

前言 在无线网络技术不断发展的今天&#xff0c;Wi-Fi已经成为了我们日常生活中不可或缺的一部分。无论是家庭娱乐、办公还是在线游戏&#xff0c;Wi-Fi都在提供着便捷的互联网接入服务。然而&#xff0c;在安卓8.1后&#xff0c;为了进一步延长安卓设备的待机时间。原生安卓(A…

Ubuntu18.04新安装--无网络连接、重启黑屏解决教程

一、安装Ubuntu Ubuntu安装需要U盘作为启动盘&#xff0c;在目前教新的电脑中选中GPT作为分区&#xff0c;制作启动盘&#xff0c;其中在安装双系统Ubuntu时&#xff0c;以自定义格式作为存储空间。详细安装过程以以及如何分区请参考下列链接&#xff1a;内含详细安装过程&…

不是大厂云用不起,而是五洛云更有性价比

明月代维的一个客户的大厂云境外云服务器再有几天就到期了&#xff0c;续费提醒那是提前一周准时到来&#xff0c;但是看到客户发来的续费价格截图&#xff0c;我是真的没忍住。这不就是在杀熟吗&#xff1f;就这配置续费竟然如此昂贵&#xff1f;说实话这个客户的服务器代维是…

哈哈看到这条消息感觉就像是打开了窗户

在这个信息爆炸的时代&#xff0c;每一条动态可能成为我们情绪的小小触发器。今天&#xff0c;当我无意间滑过那条由杜海涛亲自发布的“自曝式”消息时&#xff0c;不禁心头一颤——如果这是我的另一半&#xff0c;哎呀&#xff0c;那画面&#xff0c;简直比烧烤摊还要“热辣”…

Qt安装配置教程

目录 一、下载Qt二、进行安装1、点击安装包&#xff08;QT6.7版本演示&#xff09;2、注册Qt账号3、选择安装的位置4、选择对应的组件 三、新建项目1、打开Qt Creator2、创建项目3、编辑名称和地址4、选择默认的CMake或切换成qmake构建5、选择自己的编译器&#xff0c;在此选择…

ui.perfetto.dev sql 查询某个事件范围内,某个事件的耗时并降序排列

ui.perfetto.dev sql 查询某个事件范围内,某个事件的耗时并降序排列 1.打开https://ui.perfetto.dev 导入Chrome Trace Json文件2.ParallelMLP.forward下的RowParallelLinear.forward3.点击Query(SQL),在输入框中输入以下内容,按CtrlEnter,显示查询结果4.点击Show timeline,点击…

告别PS修图,设计师都在用的AI抠图工具

引言 大家好&#xff01;如果你是美工或设计师&#xff0c;肯定深知Photoshop修图的繁琐和耗时。现在有一款超方便的工具&#xff0c;让你摆脱这些问题——千鹿设计助手。它不仅是个抠图工具&#xff0c;还能通过先进的AI技术&#xff0c;让抠图变得简单快速&#xff0c;让你专…

jdk动态代理-基于反射的动态代理

JDK动态代理的示例图&#xff0c;下图的绿色箭头表示实现的关系&#xff0c;白色虚线表示依赖关系&#xff0c;target表示被ProxyFactory的target成员表示代理类对象&#xff0c;由ProxyFactory传入的Object参数初始化&#xff0c;接着调用getProxyInstance函数利用反射来返回代…

npm 淘宝镜像证书过期,错误信息 Could not retrieve https://npm.taobao.org/mirrors/node/latest

更换 npm 证书 问题描述报错原因更换步骤1 找到 nvm 安装目录2 发现证书过期3 更换新地址4 保存后&#xff0c;重新安装成功 问题描述 在使用 nvm 安装新版本时&#xff0c;未成功&#xff0c;出现报错&#xff1a; Could not retrieve https://npm.taobao.org/mirrors/node/l…

02:项目1 (按键点灯)

按键点灯 1、嘉立创EDA的简要介绍2、硬件的选型与原理图设计3、PCB设计4、下单啦 1、嘉立创EDA的简要介绍 1、EDA的基础设置 2、新建工程 2、硬件的选型与原理图设计 ①在EDA软件按下 shiftf 打开嘉立创商城&#xff0c;然后在里面选型 ②打开数据手册&#xff0c;能看到基本…

【FDTD 仿真案例解析】

平面结热电子光探测器是一种基于热电子效应的光探测器&#xff0c;其工作原理是利用光子的能量激发金属表面的热电子&#xff0c;从而产生电流。 激发光学Tamm态是指在金属-介质结界面上的电磁场分布出现共振现象&#xff0c;形成一种特殊的表面态。这种表面态具有高度局域化的…

JS(JavaScript)数据校验 表单校验-案例

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

Spark join数据倾斜调优

Spark中常见的两种数据倾斜现象如下 stage部分task执行特别慢 一般情况下是某个task处理的数据量远大于其他task处理的数据量&#xff0c;当然也不排除是程序代码没有冗余&#xff0c;异常数据导致程序运行异常。 作业重试多次某几个task总会失败 常见的退出码143、53、137…

【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【18】认证服务02—微博社交登录

持续学习&持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【18】认证服务02—微博社交登录 微博社交登录图示原理前置准备实现流程完整代码 参考 微博社交登录 OAuth&#xff1a; OAuth&#xff08;开放授权&#xff09;是一个开放标准&#xff0…

qt6 通过http查询天气的实现

步骤如下&#xff1a; cmakelist 当中&#xff0c;增加如下配置 引入包 访问远端api 解析返回的数据 cmakelist 当中&#xff0c;增加如下配置&#xff0c;作用是引入Network库。 引入包 3、访问远端api void Form1::on_pushButton_clicked() {//根据URL(http://t.weather.…