Flink DataStream API 介绍

一、介绍

官网

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。

二、基础算子

1、Map

Map算子:输入一个元素同时输出一个元素,这里的写法和Java类似,可以使用糖化语法或者实现Function接口

package com.xx.common.study.api.base;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author xiaxing
 * @describe Map算子,输入一个元素同时输出一个元素,这里的写法和Java类似,可以使用糖化语法或者实现Function接口
 * @since 2024/5/17 14:27
 */
public class DataStreamMapApiDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3);

        // 糖化语法
        SingleOutputStreamOperator<Integer> multiStream = sourceStream.map(e -> e * 2);
        multiStream.print("数据乘2");

        // 实现Function接口
        SingleOutputStreamOperator<Integer> addStream = sourceStream.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                return value + 2;
            }
        });

        addStream.print("数据加2");
        env.execute();
    }
}

2、FlatMap

FlatMap算子:输入一个元素同时产生零个、一个或多个元素

package com.xx.common.study.api.base;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.List;

/**
 * @author xiaxing
 * @describe FlatMap算子,输入一个元素同时产生零个、一个或多个元素
 * @since 2024/5/17 14:27
 */
public class DataStreamFlatMapApiDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> sourceStream = env.fromElements("1,2,3");

        // 对source进行加工处理
        sourceStream.flatMap((FlatMapFunction<String, List<String>>) (value, out) -> {
            String[] split = value.split(",");
            out.collect(Arrays.asList(split));
        }).print();

        // 错误写法,和Java写法不用,无法使用这种糖化语法
//        sourceStream.flatMap((k, v) -> {
//            String[] split = k.split(",");
//            v.collect(split);
//        }).print();

        env.execute();
    }
}

3、Filter

Filter算子:为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素

package com.xx.common.study.api.base;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author xiaxing
 * @describe Filter算子,为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素
 * @since 2024/5/17 14:27
 */
public class DataStreamFilterApiDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3);
        // 保留整数
        sourceStream.filter(e -> (e % 2) == 0).print("糖化语法保留整数");

        sourceStream.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
                return value % 2 == 0;
            }
        }).print("实现Function保留整数");

        env.execute();
    }
}

4、KeyBy

KeyBy算子:在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的

package com.xx.common.study.api.base;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author xiaxing
 * @describe KeyBy算子,在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的
 * @since 2024/5/17 14:27
 */
public class DataStreamKeyByApiDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class keyByDemo {
        private Integer id;
        private Integer count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<keyByDemo> sourceStream = env.fromElements(
                new keyByDemo(1, 1),
                new keyByDemo(2, 2),
                new keyByDemo(3, 3),
                new keyByDemo(1, 4)
        );
        KeyedStream<keyByDemo, Integer> keyByStream = sourceStream.keyBy(keyByDemo::getId);
        keyByStream.print("按照key分组");

        // 使用key分组之后可以使用一些常用的聚合算子
        // positionToSum:可以用于Tuple类型数据传递索引位置,field:传递字段名称
        keyByStream.sum("count").print();
        env.execute();
    }
}

5、Reduce

Reduce算子:在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值

package com.xx.common.study.api.base;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author xiaxing
 * @describe Reduce算子,在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值
 * @since 2024/5/17 14:27
 */
public class DataStreamReduceApiDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class reduceByDemo {
        private String id;
        private Integer count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<reduceByDemo> sourceStream = env.fromElements(
                new reduceByDemo("1", 1),
                new reduceByDemo("2", 2),
                new reduceByDemo("3", 3),
                new reduceByDemo("1", 4)
        );

        sourceStream.keyBy(reduceByDemo::getId).reduce(new ReduceFunction<reduceByDemo>() {
            @Override
            public reduceByDemo reduce(reduceByDemo value1, reduceByDemo value2) throws Exception {
                value1.setCount(value1.getCount() + value2.getCount());
                return value1;
            }
        }).print();
        env.execute();
    }
}

三、窗口

官网地址

3.1、概念

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。

Flink 中的时间有三种类型:

  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。

  • Ingestion Time:是数据进入 Flink 的时间。

  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是 Processing Time。

3.2、语法

Keyed Windows

stream
       .keyBy(...)               <-  仅 keyed 窗口需要
       .window(...)              <-  必填项:"assigner"
      [.trigger(...)]            <-  可选项:"trigger" (省略则使用默认 trigger)
      [.evictor(...)]            <-  可选项:"evictor" (省略则不使用 evictor)
      [.allowedLateness(...)]    <-  可选项:"lateness" (省略则为 0)
      [.sideOutputLateData(...)] <-  可选项:"output tag" (省略则不对迟到数据使用 side output)
       .reduce/aggregate/apply()      <-  必填项:"function"
      [.getSideOutput(...)]      <-  可选项:"output tag"

Non-Keyed Windows

stream
       .windowAll(...)           <-  必填项:"assigner"
      [.trigger(...)]            <-  可选项:"trigger" (else default trigger)
      [.evictor(...)]            <-  可选项:"evictor" (else no evictor)
      [.allowedLateness(...)]    <-  可选项:"lateness" (else zero)
      [.sideOutputLateData(...)] <-  可选项:"output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  必填项:"function"
      [.getSideOutput(...)]      <-  可选项:"output tag"

3.3、Window Assigners

Window Assigners为抽象类,Flink默认已经实现了4种窗口

3.3.1、滚动窗口(Tumbling Windows)

滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。 比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。

在这里插入图片描述

DataStream<T> input = ...;

// 滚动 event-time 窗口
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滚动 processing-time 窗口
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

3.3.2、滑动窗口(Sliding Windows)

与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。 因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

在这里插入图片描述

DataStream<T> input = ...;

// 滑动 event-time 窗口
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滑动 processing-time 窗口
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滑动 processing-time 窗口,偏移量为 -8 小时
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

3.3.3、会话窗口(Session Windows)

会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

在这里插入图片描述

DataStream<T> input = ...;

// 设置了固定间隔的 event-time 会话窗口
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// 设置了动态间隔的 event-time 会话窗口
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // 决定并返回会话间隔
    }))
    .<windowed transformation>(<window function>);

// 设置了固定间隔的 processing-time session 窗口
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// 设置了动态间隔的 processing-time 会话窗口
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // 决定并返回会话间隔
    }))
    .<windowed transformation>(<window function>);

3.3.4、全局窗口(Global Windows)

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。 这样的窗口模式仅在你指定了自定义的 trigger 时有用。 否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

在这里插入图片描述

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

3.4、窗口函数

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据

窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。
前两者执行起来更高效,因为 Flink 可以在每条数据到达窗口后 进行增量聚合(incrementally aggregate)。 而 ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information

3.4.1、ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同

package com.xx.common.study.api.windows;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author xiaxing
 * @describe 窗口函数-reduce
 * @since 2024/5/17 14:27
 */
public class DataStreamWindowsReduceApiDemo {

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class TumblingWindows {
        private Integer id;
        private Integer count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 7777);
        socketStream
                .map(new MapFunction<String, TumblingWindows>() {
                    @Override
                    public TumblingWindows map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new TumblingWindows(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
                    }
                })
                .keyBy(new KeySelector<TumblingWindows, Integer>() {
                    @Override
                    public Integer getKey(TumblingWindows value) throws Exception {
                        return value.getId();
                    }
                })
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5L)))
                .reduce(new ReduceFunction<TumblingWindows>() {
                    @Override
                    public TumblingWindows reduce(TumblingWindows value1, TumblingWindows value2) throws Exception {
                        return new TumblingWindows(value1.getId(), value1.getCount() + value2.getCount());
                    }
                })
                .print();

        env.execute();
    }
}


3.4.2、ReduceFunction糖化语法

使用Lambda糖化语法对代码进行了简化

package com.xx.common.study.api.windows;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author xiaxing
 * @describe 窗口函数-reduce-糖化语法
 * @since 2024/5/17 14:27
 */
public class DataStreamWindowsReduceLambdaApiDemo {

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class TumblingWindows {
        private Integer id;
        private Integer count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 7777);

        socketStream
                .map(value -> {
                    String[] split = value.split(",");
                    return new TumblingWindows(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
                })
                .keyBy(TumblingWindows::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5L)))
                .reduce((value1, value2) -> new TumblingWindows(value1.getId(), value1.getCount() + value2.getCount()))
                .print();

        env.execute();
    }
}


3.4.3、AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情况。 AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。 输入数据的类型是输入流的元素类型,AggregateFunction 接口有如下几个方法: 把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT 类型)

package com.xx.common.study.api.windows;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Optional;

/**
 * @author xiaxing
 * @describe 窗口函数-Aggregate
 * @since 2024/5/17 14:27
 */
public class DataStreamWindowsAggregateApiDemo {

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class TumblingWindows {
        private Integer id;
        private Integer count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 7777);

        // 求和
        AggregateFunction<TumblingWindows, TumblingWindows, TumblingWindows> aggregateFunction = new AggregateFunction<TumblingWindows, TumblingWindows, TumblingWindows>() {
            @Override
            public TumblingWindows createAccumulator() {
                // 创建累加器,并将其初始化为默认值
                return new TumblingWindows();
            }

            @Override
            public TumblingWindows add(TumblingWindows value, TumblingWindows accumulator) {
                // 将输入的元素添加到累加器,返回更新后的累加器
                Integer count1 = Optional.of(value.getCount()).orElse(0);
                Integer count2 = Optional.ofNullable(accumulator.getCount()).orElse(0);
                return new TumblingWindows(value.getId(), count1 + count2);
            }

            @Override
            public TumblingWindows getResult(TumblingWindows accumulator) {
                // 从累加器中提取操作的结果
                return accumulator;
            }

            @Override
            public TumblingWindows merge(TumblingWindows a, TumblingWindows b) {
                // 将两个累加器合并为一个新的累加器
                return new TumblingWindows(a.getId(), a.getCount() + b.getCount());
            }
        };

        socketStream
                .map(value -> {
                    String[] split = value.split(",");
                    return new TumblingWindows(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
                })
                .keyBy(TumblingWindows::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5L)))
                .aggregate(aggregateFunction)
                .print();

        env.execute();
    }
}

3.4.4、ProcessWindowFunction

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。 这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction` 中获得窗口的元数据。

package com.xx.common.study.api.windows;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author xiaxing
 * @describe 窗口函数-reduce-process
 * @since 2024/5/17 14:27
 */
public class DataStreamWindowsReduceProcessApiDemo {

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class TumblingWindows {
        private Integer id;
        private Integer count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 7777);

        socketStream
                .map(value -> {
                    String[] split = value.split(",");
                    return new TumblingWindows(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
                })
                .keyBy(TumblingWindows::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5L)))
                .reduce(new MyReduceFunction(), new MyProcessWindowsFunction())
                .print();

        env.execute();
    }

    private static class MyReduceFunction implements ReduceFunction<TumblingWindows> {
        @Override
        public TumblingWindows reduce(TumblingWindows value1, TumblingWindows value2) throws Exception {
            return new TumblingWindows(value1.getId(), value1.getCount() + value2.getCount());
        }
    }

    private static class MyProcessWindowsFunction extends ProcessWindowFunction<TumblingWindows, TumblingWindows, Integer, TimeWindow> {
        @Override
        public void process(Integer integer, ProcessWindowFunction<TumblingWindows, TumblingWindows, Integer, TimeWindow>.Context context, Iterable<TumblingWindows> elements, Collector<TumblingWindows> out) throws Exception {
            elements.forEach(e -> {
                Integer count = e.getCount();
                // 当count > 10时才数据元素
                if (count > 10) {
                    out.collect(e);
                }
            });
        }
    }
}

3.5、窗口算子

除了窗口函数外,Flink还提供了一些窗口算子,其用法类似于基础算子,这部分内容和上面的窗口存在部分重复,上面主要想说明Flink中涉及哪些类型的窗口,这块演示一下一些窗口算子的基本用法,在后面的Process Function会更加深入的使用窗口函数

3.5.1、Window

可以在已经分区的 KeyedStreams 上定义 Window。Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组

package com.xx.common.study.api.windows.base;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author xiaxing
 * @describe Window算子, 以在已经分区的 KeyedStreams 上定义 Window。Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组
 * @since 2024/5/17 14:27
 */
public class DataStreamWindowsApiDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WindowByDemo {
        private Integer id;
        private Integer count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> sourceStream = env.socketTextStream("localhost", 7777);

        // 将5秒内的数据按照key分组加起来
        sourceStream
                .map((value) -> {
                    String[] split = value.split(",");
                    return new WindowByDemo(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
                })
                .keyBy(WindowByDemo::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5L)))
                .sum("count")
                .print();
        env.execute();
    }
}


3.5.1、WindowAll

WindowAll算子,不需要调用keyBy算子

package com.xx.common.study.api.windows.base;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author xiaxing
 * @describe WindowAll算子,不需要调用keyBy算子
 * @since 2024/5/17 14:27
 */
public class DataStreamWindowsAllApiDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WindowAllDemo {
        private Integer id;
        private Integer count;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> sourceStream = env.socketTextStream("localhost", 7777);

        // 将5秒内的数据加起来
        sourceStream
                .map((value) -> {
                    String[] split = value.split(",");
                    return new WindowAllDemo(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
                })
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5L))).sum("count")
                .print();
        env.execute();
    }
}

3.5.1、Window Apply

3.5.1、WindowReduce

四、Join

五、Process Function

六、旁路输出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/649513.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

视觉语言模型详解【VLM】

视觉语言模型&#xff08;Visual Language Models&#xff09;是可以同时从图像和文本中学习以处理许多任务的模型&#xff0c;从视觉问答到图像字幕。在这篇文章中&#xff0c;我们将介绍视觉语言模型的主要组成部分&#xff1a;概述&#xff0c;了解它们的工作原理&#xff0…

[自动驾驶技术]-8 Tesla自动驾驶方案之硬件(AI Day 2022)

特斯拉在AI Day 2022先介绍了AI编译器&#xff0c;后面又介绍了Dojo的硬件软件&#xff0c;软件部分和AI编译器有部分重叠&#xff0c;本文介绍还是延用AI Day的思路&#xff0c;分为三部分&#xff1a;AI编译和推理&#xff0c;Dojo硬件&#xff0c;Dojo软件。 特斯拉车道检测…

MATLAB导入导出Excel的方法|读与写Excel的命令|附例程的github下载链接

前言 前段时间遇到一个需求&#xff1a;导出变量到Excel里面&#xff0c;这里给出一些命令&#xff0c;同时给一个示例供大家参考。 MATLAB读/写Excel的命令 在MATLAB中&#xff0c;可以使用以下命令来读写Excel文件&#xff1a; 读取Excel文件&#xff1a; xlsread(filen…

Jlink卡死 JFlash keil 盗版JLINK

现象&#xff1a;用Keil打开Jlink配置页&#xff0c;会卡死。 解决方法&#xff1a;用旧版本的Jlink软件&#xff0c;因为淘宝买的很多JLINK下载器是盗版的&#xff0c;不支持新版本的JLINK软件。到https://www.segger.com/downloads/jlink下载旧版本的软件。 如果必须要用新版…

Pytorch深度学习实践笔记8(b站刘二大人)

&#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;pytorch深度学习 &#x1f380;CSDN主页 发狂的小花 &#x1f304;人生秘诀&#xff1a;学习的本质就是极致重复! 《PyTorch深度学习实践》完结合集_哔哩哔哩_bilibi…

上海市港股通交易佣金手续费最低是多少?万0.8?恒生港股通ETF今起发行!港股通的价值如何?

港股通交易佣金概述 港股通的交易佣金可能会因证券公司和投资者的不同而有所差异。 上海市港股通交易佣金最低可以万分之零点八&#xff08;0.008%&#xff09;&#xff0c;但这需要投资者与证券公司客户经理了解&#xff0c;进行沟通和申请。 一般来说&#xff0c;证券公司…

[CISCN2024]-PWN:orange_cat_diary(glibc2.23.,仅可修改最新堆块,house of orange)

查看保护 查看ida 这里我们仅可以修改最新申请出来的堆块&#xff0c;但是有uaf漏洞。 完整exp&#xff1a; from pwn import* #context(log_leveldebug) pprocess(./orange) free_got0x201F78def alloc(size,content):p.sendlineafter(bPlease input your choice:,b1)p.send…

【Spring Cloud】服务熔断

目录 服务雪崩效应服务雪崩效应形成的原因及应对策略小结 Hystrix介绍Hystrix可以做什么1.资源隔离2.请求熔断3.服务降级 小结 Hystrix实现服务降级方式一&#xff1a;HystrixCommand注解方式1.服务提供者1.1业务接口和业务实现中添加方法hystrixTimeout1.2控制器中处理/provid…

信息安全基础(补充)

&#xff09;的内容主要有数据备份、数据修复、系统恢复等。响应&#xff08;Respons&#xff09;的内容主要有应急策略、应急机制、应急手段、入侵过程分析及安全状态评估等。 面向数据挖掘的隐私保护技术主要解决高层应用中的隐私保护问题&#xff0c;致力于研究如何根据不同…

html5各行各业官网模板源码下载(2)

文章目录 1.来源2.源码模板2.1 HTML5好看的旅行网站模板源码2.2 HTML5自适应医院叫号大屏模板源码2.3 HTML5好看的高科技登录页面模板源码2.4 HTML5宠物美容服务公司网站模板源码2.5 HTML5创意品牌广告设计公司网站模板源码2.6 HTML5实现室内设计模板源码2.7 HTML5黄金首饰网站…

PL5358A 单芯锂离子/聚合物电池保护IC芯片

一般说明 PL5358A系列产品是锂离子/聚合物电池保护的高集成解决方案。PL5358A包含先进 的功率MOSFET&#xff0c;高精度电压检测电路和延迟电路。5358A被放入一个超小的SOT23-5封装&#xff0c;只有一个外部元件&#xff0c;使其成为理想的解决方案&#xff0c;在有限的…

美业SaaS收银系统源码-已过期卡项需要延期怎么操作?美业系统实操

美业SaaS系统 连锁多门店美业收银系统源码 多门店管理 / 会员管理 / 预约管理 / 排班管理 / 商品管理 / 促销活动 PC管理后台、手机APP、iPad APP、微信小程序 1.询问会员手机号和需要延期的卡项 2.PC运营后端-数据导入-修改已售卡项&#xff0c;搜索手机号 3.把需要卡项选…

D - Permutation Subsequence(AtCoder Beginner Contest 352)

题目链接: D - Permutation Subsequence (atcoder.jp) 题目大意&#xff1a; 分析&#xff1a; 相对于是记录一下每个数的位置 然后再长度为k的区间进行移动 然后看最大的pos和最小的pos的最小值是多少 有点类似于滑动窗口 用到了java里面的 TreeSet和Map TreeSet存的是数…

Vue3中如何自定义消息总线

前言 在 Vue 开发中&#xff0c;组件之间的通信是一个常见的需求&#xff0c;无论是父组件向子组件传递数据&#xff0c;还是子组件向父组件传递数据&#xff0c;甚至是兄弟组件之间的数据交换。这些通信需求在构建复杂的 Vue 应用时尤为关键。 Vue 提供了多种组件通信的方式…

Linux系统安装AMH服务器管理面板并实现远程访问管理维护

目录 前言 1. Linux 安装AMH 面板 2. 本地访问AMH 面板 3. Linux安装Cpolar 4. 配置AMH面板公网地址 5. 远程访问AMH面板 6. 固定AMH面板公网地址 1. 部署Docker Registry 2. 本地测试推送镜像 3. Linux 安装cpolar 4. 配置Docker Registry公网访问地址 5. 公网远程…

如何被谷歌收录?

最简单的方法就是提交网站给谷歌&#xff0c;但这种方法可操作空间不大&#xff0c;一天一般也就只有十条左右的链接可以提交&#xff0c;对于一些大网站来说&#xff0c;这种方法显然不适用&#xff0c;这时候GPC爬虫池的好处就体现了&#xff0c;GPC爬虫池对希望提升Google搜…

re:记录下正则的使用方法

1、match pattern r(\d{4})[-\/](\d{1,2})[-\/](\d{1,2}) match re.search(pattern, text) if match:year, month, day match.groups()

SolidWorks教育版 学生使用的优势

在工程技术领域的学习中&#xff0c;计算机辅助设计软件&#xff08;CAD&#xff09;如SolidWorks已经成为学生掌握专业知识和技能的必要工具。SolidWorks教育版作为专为教育机构和学生设计的版本&#xff0c;不仅提供了与商业版相同的强大功能&#xff0c;还为学生带来了诸多独…

TypeScript 语言在不改变算法复杂度前提下,细节上性能优化,运行时性能提升效果明显吗?

有经验的专家写的代码&#xff0c;和无经验的新手写的代码&#xff0c;在运行时性能上大概会有多少差异&#xff1f; 个人感觉&#xff0c;常规业务逻辑代码通常可以差 1 倍&#xff1b;如果算上框架的影响&#xff0c;可以差 2~4 倍。 仅考虑业务代码的话&#xff0c;新手容易…

科普:水冷负载的工作原理

水冷负载是一种利用水作为冷却介质&#xff0c;将电子设备产生的热量传递到外部环境的散热方式。它广泛应用于各种电子设备&#xff0c;如服务器、数据中心、电力设备等&#xff0c;以提高设备的运行效率和稳定性。本文将对水冷负载的工作原理进行简要科普。 水冷负载的工作原理…