FlinkAPI开发之水位线(Watermark)

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

Flink中的时间语义

在这里插入图片描述

哪种时间语义更重要

从《星球大战》说起

在这里插入图片描述

数据处理系统中的时间语义

在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。

水位线(Watermark)

事件时间和窗口

在这里插入图片描述

什么是水位线

在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

水位线和窗口的工作原理

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。

生成水位线

生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

水位线生成策略

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下:

DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks = 
stream.assignTimestampsAndWatermarks(<watermark strategy>);

说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”

WatermarkGenerator。
public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>,
            WatermarkGeneratorSupplier<T>{

    // 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    // 主要负责按照既定的方式,基于时间戳生成水位线
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

Flink内置水位线

有序流中内置水位线设置
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WatermarkStrategyDemo {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        // TODO: 2024/1/7 定义时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/11 定义Watermark策略
        WatermarkStrategy<Orders> ordersWatermarkStrategy = WatermarkStrategy.<Orders>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Orders>() {
                    @Override
                    public long extractTimestamp(Orders orders, long l) {
                        System.out.println("返回时间戳"+orders.getOrder_date()+"毫秒13位的");
                        return orders.getOrder_date();
                    }
                });
        // TODO: 2024/1/11  指定 watermark策略
        SingleOutputStreamOperator<Orders> watermarks = ordersDataStreamSource.assignTimestampsAndWatermarks(ordersWatermarkStrategy);
        // TODO: 2024/1/11 进行聚合运算
        SingleOutputStreamOperator<Orders> reduce = watermarks.keyBy(orders -> orders.getProduct_id())
                // TODO: 2024/1/11 定义时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Orders>() {
                    @Override
                    public Orders reduce(Orders orders, Orders t1) throws Exception {
                        Orders orders1 = new Orders(t1.getOrder_id(), t1.getUser_id(), t1.getOrder_date(), t1.getOrder_amount() + orders.getOrder_amount(), t1.getProduct_id(), t1.getOrder_num());
                        return orders1;
                    }
                });
        ordersDataStreamSource.print("聚合前数据");
        reduce.print("聚合数据");
        environment.execute();
    }
}

在这里插入图片描述

乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import javafx.scene.input.DataFormat;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;

public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        // TODO: 2024/1/7 定义时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/11 定义Watermark策略
        WatermarkStrategy<Orders> ordersWatermarkStrategy = WatermarkStrategy.<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // TODO: 2024/1/11 指定水位线时间戳也可以用 Lambda 表达式
                .withTimestampAssigner((orders, l) -> orders.getOrder_date());
        // TODO: 2024/1/11 指定水位线
        SingleOutputStreamOperator<Orders> operator = ordersDataStreamSource.assignTimestampsAndWatermarks(ordersWatermarkStrategy);
        // TODO: 2024/1/11 分组聚合
        SingleOutputStreamOperator<Object> reduce = operator.keyBy(orders -> orders.getProduct_id())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<Orders, Object, Integer, TimeWindow>() {
                    // TODO: 2024/1/9 参数说明:分组key值,窗口计时器,按照key分组后的数据,收集器
                    @Override
                    public void process(Integer integer, ProcessWindowFunction<Orders, Object, Integer, TimeWindow>.Context context, Iterable<Orders> elements, Collector<Object> collector) throws Exception {

                        // TODO: 2024/1/9 窗口内同一个key包含的数据条数
                        long count = elements.spliterator().estimateSize();
                        // TODO: 2024/1/9 窗口的开始时间
                        long windowStartTs = context.window().getStart();
                        // TODO: 2024/1/9 窗口的结束时间
                        long windowEndTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        // TODO: 2024/1/9 输出收集器
                        collector.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                    }
                });
        ordersDataStreamSource.map(new MapFunction<Orders, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(Orders orders) throws Exception {
                //时间格式,HH是24小时制,hh是AM PM12小时制
                SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                //比如timestamp=1449210225945;
                String date_string = sdf.format(new Date(orders.getOrder_date()));
                return new Tuple2<>(date_string,orders.getOrder_amount());
            }
        }).print();
        reduce.print("聚合数据");
        environment.execute();
    }
}

在这里插入图片描述

自定义水位线生成器

周期性水位线生成器(Periodic Generator)

周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;

import java.text.SimpleDateFormat;
import java.util.Date;

public class CustomPeriodicWatermarkExample {
    public static void main(String[] args) throws Exception {
        //创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        environment.setParallelism(1);
        // TODO: 2024/1/7 定义时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //调用Flink自定义Source
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/12 设置水位线 
        SingleOutputStreamOperator<Orders> operator = ordersDataStreamSource.assignTimestampsAndWatermarks(new CustomWatermarkStrategy());
        // TODO: 2024/1/12 打印数据
        ordersDataStreamSource.map(new MapFunction<Orders, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(Orders orders) throws Exception {
                //时间格式,HH是24小时制,hh是AM PM12小时制
                SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                //比如timestamp=1449210225945;
                String date_string = sdf.format(new Date(orders.getOrder_date()));
                return new Tuple2<>(date_string,orders.getOrder_amount());
            }
        }).print();
        // TODO: 2024/1/11 分组聚合
        SingleOutputStreamOperator<Object> reduce = operator.keyBy(orders -> orders.getProduct_id())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<Orders, Object, Integer, TimeWindow>() {
                    // TODO: 2024/1/9 参数说明:分组key值,窗口计时器,按照key分组后的数据,收集器
                    @Override
                    public void process(Integer integer, ProcessWindowFunction<Orders, Object, Integer, TimeWindow>.Context context, Iterable<Orders> elements, Collector<Object> collector) throws Exception {

                        // TODO: 2024/1/9 窗口内同一个key包含的数据条数
                        long count = elements.spliterator().estimateSize();
                        // TODO: 2024/1/9 窗口的开始时间
                        long windowStartTs = context.window().getStart();
                        // TODO: 2024/1/9 窗口的结束时间
                        long windowEndTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        // TODO: 2024/1/9 输出收集器
                        collector.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                    }
                });
        reduce.print("聚合数据");
        environment.execute();
    }

    // TODO: 2024/1/12 自定义水位线生成器
    public static class CustomWatermarkStrategy implements WatermarkStrategy<Orders>{

        @Override
        public WatermarkGenerator<Orders> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Orders>() {
                // TODO: 2024/1/12  延迟时间
                private Long delayTime = 2000L;
                // TODO: 2024/1/12  观察到的最大时间戳
                private Long maxTs = -Long.MAX_VALUE + delayTime + 1L;

                @Override
                public void onEvent(Orders orders, long l, WatermarkOutput watermarkOutput) {
                    // TODO: 2024/1/12   每来一条数据就调用一次 , 更新最大时间戳
                    maxTs = Math.max(orders.getOrder_date(),maxTs);
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // TODO: 2024/1/12   发射水位线,默认200ms调用一次
                    output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
                }
            };
        }

        @Override
        public TimestampAssigner<Orders> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            // TODO: 2024/1/12  告诉程序数据源里的时间戳是哪一个字段 
            return new SerializableTimestampAssigner<Orders>(){

                @Override
                public long extractTimestamp(Orders orders, long l) {
                    return orders.getOrder_date();
                }
            };
        }


    }
}

在这里插入图片描述

断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。

在数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)

水位线的传递

在这里插入图片描述

在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。

案例:乱序流的watermark,将并行度设为2,观察现象。
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。

import com.zxl.bean.Orders;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;


public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(2);
        DataStream<String> socketTextStream = environment.socketTextStream("175.24.186.230", 9999);
        DataStream<Integer> streamOperator = socketTextStream.map(Integer::parseInt);
        //自定义分区
        DataStream<Integer> dataStream = streamOperator.partitionCustom(new Partitioner<Integer>() {
            @Override
            public int partition(Integer integer, int i) {
                if (integer % 2 == 0) {
                    return 0;
                } else {
                    return 1;
                }
            }
        }, new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer integer) throws Exception {
                return integer;
            }
        });
        WatermarkStrategy<Orders> ordersWatermarkStrategy = WatermarkStrategy.<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((orders, l) -> orders.getOrder_date());

        SingleOutputStreamOperator<Integer> watermarks = dataStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Integer>forMonotonousTimestamps()
                        .withTimestampAssigner((r, ts) -> r * 1000L)
                        // TODO: 2024/1/12 空闲等待5s
                        .withIdleness(Duration.ofSeconds(5)));
        // 分成两组: 奇数一组,偶数一组 , 开10s的事件时间滚动窗口
        watermarks.keyBy(r -> r % 2)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
                        long startTs = context.window().getStart();
                        long endTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                        long count = elements.spliterator().estimateSize();

                        out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());

                    }
                })
                .print();
        environment.execute();
    }
}

在这里插入图片描述

迟到数据的处理

推迟水印推进

在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

设置窗口延迟关闭

Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。
以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))

注意:允许迟到只能运用在event time上

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

public class WatermarkLateDemo {
    public static void main(String[] args) throws Exception {
        // TODO: 2024/1/15  创建Flink流处理执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO: 2024/1/15  设置并行度为1
        environment.setParallelism(1);
        // TODO: 2024/1/7 定义时间语义
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // TODO: 2024/1/6 订单数据
        DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());
        // TODO: 2024/1/15 设置水位线
        WatermarkStrategy<Orders> watermarkStrategy = WatermarkStrategy
                // TODO: 2024/1/15 设置最大乱序程度,数据流中乱序数据时间戳的最大差值
                .<Orders>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                // TODO: 2024/1/15 指定水位线中对应的时间字段 
                .withTimestampAssigner((orders, l) -> orders.getOrder_date())
                // TODO: 2024/1/15 设置空闲等待时间,如果某个分区中有长时间无数据产生将放弃此分区的水位线选举权力 
                .withIdleness(Duration.ofSeconds(3));
        // TODO: 2024/1/15 添加水位线
        SingleOutputStreamOperator<Orders> operator = ordersDataStreamSource.assignTimestampsAndWatermarks(watermarkStrategy);
        // TODO: 2024/1/15  定义侧输出流的标签
        OutputTag<Orders> lateData = new OutputTag<>("late_data", Types.POJO(Orders.class));
        // TODO: 2024/1/15 设置分区key
        SingleOutputStreamOperator<Object> process = operator.keyBy(orders -> orders.getProduct_id())
                // TODO: 2024/1/15 定义滚动窗口时间大小
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // TODO: 2024/1/15 窗口延迟关闭时间
                .allowedLateness(Time.seconds(3))
                // TODO: 2024/1/15 迟到数据的测输出流
                .sideOutputLateData(lateData)
                // TODO: 2024/1/15 进行数据聚合
                .process(new ProcessWindowFunction<Orders, Object, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, ProcessWindowFunction<Orders, Object, Integer, TimeWindow>.Context context, Iterable<Orders> elements, Collector<Object> collector) throws Exception {
                        long startTs = context.window().getStart();
                        long endTs = context.window().getEnd();
                        String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");
                        long count = elements.spliterator().estimateSize();
                        collector.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                    }
                });
        process.print();
        process.getSideOutput(lateData).print("侧输出流");
        environment.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/326075.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CC工具箱使用指南:【获取所有字段信息】

一、简介 这个工具的目的简单易懂&#xff0c;就是获取选定要素图层的所有字段信息。 本身不对要素图层作任何处理&#xff0c;只是一个查看属性的工具。 问我要用在什么地方&#xff0c;我也不知道-_- 二、工具参数介绍 点击【信息获取】组里的【获取所有字段信息】工具&a…

解决flask中jinja2插值变量变成字符串的办法

今天在通过使用{{ variable_name }}这种方式插入html内容时&#xff0c;发现变量内容到了页面中全部变成了字符串&#xff0c; python代码&#xff1a; return render_template(FilePath.file_path_to_page,md_contenthtml_content # 返回html内容 )html代码中插入&#xff1…

【运维杂谈】为什么docker镜像推送至harbor上就变小了?

为什么docker镜像推送至harbor上就变小了&#xff1f;我们以一个游戏镜像为例&#xff0c;在Linux显示295MB。 [rootWorker232 ~]# docker images | grep v0.6 harbor.koten.com/koten-games/games v0.6 30ec3e6e4747 25 hours ago 295MB […

C# Cad2016二次开发HelloWorld(一)

1 新建类库 二 引用 acdbmgd.dll、acmgd.dll、accoremgd.dll 三 HelloWorld代码 public class Class1{/// <summary>/// 程序入口标识/// </summary>[CommandMethod("HelloWorld")]public void HelloWorld(){Document adoc Autodesk.AutoCAD.Applicatio…

presto 支持regexp_count

一、背景 1、查询regexp_count 函数提示未注册 用户想正则查询特定字符出现次数 function regexp_count not registered 二、调研 1、官网地址&#xff1a; Presto Documentation — Presto 0.284 Documentation 2、regexp_extract_all Regular Expression Functions —…

如何为 SEO 进行关键字研究

什么是关键词研究&#xff1f; 关键字研究是查找和分析理想网站访问者输入搜索引擎的关键字的过程。这使您能够在内容策略中定位最有效的关键字。 关键字是人们用来在搜索引擎中查找信息或产品的单词或短语。例如&#xff0c;如果您想为您的小狗购买食物&#xff0c;您可以在…

最简单爱心的解析

首先你需要了解爱心代码在直角坐标系的方程 数学知识&#xff1a;x 属于 -1.5 ~ 1.5 y 属于 -1 ~ 1.5 和 高中所学的线性规划 请看代码 #include <math.h> #include <stdlib.h> #include <Windows.h> #include <stdio.h> int main() { …

迅软科技丨IT企业如何应对数据泄密危机?

随着信息技术的快速发展&#xff0c;软件IT行业面临着前所未有的数据安全挑战。黑客攻击、病毒传播、内部泄密等安全威胁层出不穷&#xff0c;给企业的核心资产和运营带来严重威胁。同时&#xff0c;国家对于数据安全的法律法规也日益严格&#xff0c;要求企业必须采取更加有效…

【转载】MyBatisCodeHelperPro最新版使用教程

在开发中编写生成bean&#xff0c;mapper&#xff0c;mapper.xml费时也费力&#xff0c;可以通过MyBatisCodeHelper-Pro自动生成bean&#xff0c;dao&#xff0c;mapper.xml等文件。 MyBatisCodeHelper-Pro是IDEA下的一个插件&#xff0c;类似于mybatis plugin&#xff0c;但可…

iPhone是国内最畅销的智能手机

据调研机构BCI发布最新数据显示&#xff0c;去年中国一共卖出2.7亿部智能手机&#xff0c;其中&#xff0c;苹果的iPhone系列是国内最畅销的机型。 其中&#xff0c;苹果以17.1%的市场份额占据了第一&#xff0c;而vivo手机和OPPO则以16.7%和16%紧随其后&#xff0c;接着是荣耀…

【C#】面向对象的三大特性,还记得吗,简单代码举例回顾

欢迎来到《小5讲堂》 大家好&#xff0c;我是全栈小5。 这是《C#》序列文章&#xff0c;每篇文章将以博主理解的角度展开讲解&#xff0c; 特别是针对知识点的概念进行叙说&#xff0c;大部分文章将会对这些概念进行实际例子验证&#xff0c;以此达到加深对知识点的理解和掌握。…

论文阅读 Vision Transformer - VIT

文章目录 1 摘要1.1 核心 2 模型架构2.1 概览2.2 对应CV的特定修改和相关理解 3 代码4 总结 1 摘要 1.1 核心 通过将图像切成patch线形层编码成token特征编码的方法&#xff0c;用transformer的encoder来做图像分类 2 模型架构 2.1 概览 2.2 对应CV的特定修改和相关理解 解…

程序员应该学习的 10 件事

程序员应该学习的 10 件事&#xff08;省流版&#xff09; 翻译&#xff1a;10 Things Software Developers Should Learn about Learning 原文&#xff1a;https://cacm.acm.org/magazines/2024/1/278891-10-things-software-developers-should-learn-about-learning/fulltex…

【教学类-43-21】完结篇 16宫格(4*4可算全部数字)

作品展示&#xff1a; 16宫格里面的4*4小格子可以凑满1-16&#xff0c;旁边的7宫格格2份 背景需求&#xff1a; 做完了1-20宫格的A4模板&#xff0c;最后做一个16宫格小格子&#xff08;附加7宫格2套&#xff09;的样式&#xff0c;只有4宫格&#xff08;2*2&#xff09;、9宫…

vue3二次封装element-ui中的table组件

为什么要做这件事 借助封装table组件的过程来巩固一下vue3相关知识点。 组件有哪些配置项 options:表格的配置项data: 表格数据源elementLoadingText&#xff1a;加载文案elementLoadingSpinner&#xff1a;加载图标elementLoadingBackground&#xff1a;背景遮罩的颜色elem…

【RabbitMQ】RabbitMQ高级:死信队列和延迟队列

目录 设置TTL&#xff08;过期时间&#xff09;概述RabbitMQ使用TTL原生API案例springboot案例 死信队列概述原生API案例springboot案例 延迟队列概述插件实现延迟队列安装插件代码 TTL实现延迟队列实现延迟队列优化 设置TTL&#xff08;过期时间&#xff09; 概述 在电商平台…

Netty开篇——NIO章下(五)

SelectionKey 表示 Selector 和网络通道的注册关系&#xff0c;共四种(全是常量): Int OP_ACCEPT:有新的网络连接可以接受&#xff0c;值为 16 &#xff08;1 << 4&#xff09;Int OP_CONNECT: 代表连接已经建立&#xff0c;值为 8 &#xff08;1 << 3&#xff09;…

#AIGC#text2video文生视频,开源DragNUWA:通过集成文本、图像和轨迹对视频生成进行细粒度控制

DragNUWA&#xff1a;通过集成文本、图像和轨迹对视频生成进行细粒度控制 论文地址&#xff1a;https://arxiv.org/abs/2308.08089 DragNUWA 使用户能够直接操纵图像中的背景或对象&#xff0c;模型将这些动作无缝地转换为相机运动或对象运动&#xff0c;生成相应的视频。 Drag…

linux基础学习(3):挂载

挂载可以理解为给磁盘空间一个可访问的入口&#xff0c;那个入口称为挂载点&#xff0c;相当于windows中的盘符。 1.挂载命令mount 1.1直接输入mount 查看系统已挂载的设备 1.2挂载与卸载命令 mount -t 文件系统名 设备文件名 挂载点 | umount 挂载点 或 umount 设…

机器学习——支持向量机SVM

1 摘要&#xff1a; 支持向量机&#xff08;SVM&#xff09;是一种二类分类模型&#xff0c;其基本模型是在特征空间上找到最佳的分离超平面使得训练集上正负样本间隔最大&#xff0c;间隔最大使它有别于感知机&#xff0c;支持向量机也可通过核技巧使它成为非线性分类器。支持…