flink重温笔记(十三): flink 高级特性和新特性(2)——ProcessFunction API 和 双流 join

Flink学习笔记

前言:今天是学习 flink 的第 13 天啦!学习了 flink 高级特性和新特性之ProcessFunction API 和 双流 join,主要是解决大数据领域数据从数据增量聚合的问题,以及快速变化中的流数据拉宽问题,即变化中多个数据源合并在一起的问题,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Flink学习笔记
    • 四、Flink 高级特性和新特性
      • 2. Process Function API
        • 2.1 Process Function 分类
        • 2.2 KeyedProcessFunction [重点]
        • 2.3 具有增量聚合的 ProcessWindowFunction
          • 2.3.1 用法概述
          • 2.3.2 使用 ReduceFunction 进行增量窗口聚合
          • 2.3.3 使用 AggerateFunction 进行增量窗口聚合
          • 2.3.4 Using per-window state in ProcessWindowFunction
      • 3. 双流 Join
        • 3.1 面试介绍
        • 3.2 Window Join
          • 3.2.1 Tumbling Window Join
          • 3.2.2 Sliding Window Join
          • 3.2.3 Session Window Join
          • 3.2.3 案例演示
        • 3.3 Interval Join
          • 3.3.1 Interval Join 介绍
          • 3.3.2 案例演示

四、Flink 高级特性和新特性

2. Process Function API

之前的转换算子是无法访问时间戳信息和水位线信息的,但 Process Function 可以访问时间戳,水位线,以及注册定时时间等,Flink SQL 就是使用 Process Function 实现的

2.1 Process Function 分类
  • 1- ProcessFunction 用于 dataStream
  • 2- KeyedProcessFunction 用于 Keyed dataStream
  • 3- CoProcessFunction 用于 connect 连接的流
  • 4- ProcessJoinFunction 用于 join 流操作
  • 5- BroadcastProcessFunction 用于广播
  • 6- KeyedBroadcastProcessFunction 用于 keyed 后的广播
  • 7- ProcessWindowFunction 窗口增量聚合
  • 8- ProcessAllWindowFunction 全窗口聚合

2.2 KeyedProcessFunction [重点]

KeyedProcessFunction 作为 ProcessFunction 的扩展,在其 onTimer(…) 方法中提供对定时器对应key的访问。

所有的 Process Function 都继承自 RichFunction 接口,所以都有:

  • open()
  • close()
  • getRuntimeContext()

KeyedProcessFunction 额外提供了两个方法:

  • processElement,每个元素调用一次
  • onTimer,回调函数,用于定时器

案例:在服务器运维中,需要实时监控服务器机架的温度,如果一定时间内温度超过了一定阈值(100度),且后一次上报的温度超过了前一次上报的温度,需要触发告警(温度持续升高中)

package cn.itcast.day12.process;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import org.apache.commons.collections.IteratorUtils;
import java.text.SimpleDateFormat;

/**
 * @author lql
 * @time 2024-03-08 13:01:05
 * @description TODO:数据结构:(id,温度)
 */
public class SystemMonitorDemo {
    public static void main(String[] args) throws Exception {
        // todo 1) 初始化 flink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // todo 2) 指定并行度为 1
        env.setParallelism(1);

        // todo 3) 接入数据源
        DataStreamSource<String> socketTextStream = env.socketTextStream("node1", 9999);

        // todo 4) 将获取的数据转化为 tuple
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> tupleDataStream = socketTextStream.map(new MapFunction<String, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> map(String line) throws Exception {
                String[] arrayData = line.split(",");
                return Tuple2.of(Integer.parseInt(arrayData[0]), Integer.parseInt(arrayData[1]));
            }
        });

        // todo 5) 分组操作
        KeyedStream<Tuple2<Integer, Integer>, Integer> tuple2TupleKeyedStream = tupleDataStream.keyBy(t -> t.f0);

        // todo 6) 自定义ProcessFunction对象,继承 KeyedProcessFunction 抽象类
        SingleOutputStreamOperator<String> result = tuple2TupleKeyedStream.process(new MyKeyedProcessFunction());

        // todo 7) 打印输出
        result.printToErr();

        // todo 8) 执行程序
        env.execute();
    }

    private static class MyKeyedProcessFunction extends KeyedProcessFunction<Integer,Tuple2<Integer,Integer>,String> {

        // 定义数据存储对象
        private ListState<Tuple2<Integer,Integer>> listState = null;
        // 定义时间对象
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 定义时间
        private Long timeTS = 0L;

        /**
         * 初始化资源
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 实例化 state 对象
            this.listState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<Integer, Integer>>(
                    "listState",
                    TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})
            ));
            System.out.println("初始化state对象...");
        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        /**
         * 定时器触发方法
         * @param timestamp
         * @param ctx
         * @param out
         * @throws Exception
         */
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            System.out.println("触发了定时服务...");
            // 迭代转化状态到列表中,然后计算个数
            int stateSize = IteratorUtils.toList(this.listState.get().iterator()).size();
            if(stateSize >= 2){
                //返回数据,触发告警
                out.collect("触发了告警");
            }
            //清空历史的状态数据
            this.listState.clear();
        }

        /**
         * 对数据集中的每条数据进行处理
         * @param integerIntegerTuple2
         * @param context
         * @param collector
         * @throws Exception
         */
        @Override
        public void processElement(Tuple2<Integer, Integer> integerIntegerTuple2, Context context, Collector<String> collector) throws Exception {
            //获取状态中存储的历史数据
            Tuple2<Integer, Integer> lastData = null;
            for (Tuple2<Integer, Integer> tuple : listState.get()){
                lastData =tuple;
            }
            // 判断状态中的数据是否为空
            if (lastData==null){
                lastData = Tuple2.of(0,0);
            }
            System.out.println("状态中获取到的数据是:"+lastData);

            if (integerIntegerTuple2.f1 > 100 & integerIntegerTuple2.f1 > lastData.f1){
                System.out.println("温度上升中...注册定时器!");
                //满足了温度大于100,且后一次的温度大于前一次的温度
                //将当前的温度存储起来
                listState.add(Tuple2.of(integerIntegerTuple2.f0,integerIntegerTuple2.f1));

                //注册一个定时器(当前处理的时间+窗口长度=触发计算的时间)
                timeTS = context.timerService().currentProcessingTime() + 10000L;
                context.timerService().registerProcessingTimeTimer(timeTS);
            }else{
                if (integerIntegerTuple2.f1 < lastData.f1){
                    System.out.println("温度下降了...取消定时器!");
                    //取消定时器
                    context.timerService().deleteProcessingTimeTimer(timeTS);
                }
                if (integerIntegerTuple2.f1 < 100){
                    //清除状态存储的数据
                    listState.clear();
                }
            }
        }
    }
}

结果:

输入:
1,100
1,101

输出:
温度上升中...注册定时器!
触发了告警

2.3 具有增量聚合的 ProcessWindowFunction
image-20240308165337455

在 reduce 和 aggregate 中均有可以和 processWindowFunction 结合实现增量聚合的方法(红角星标记)。

原理:对于一个窗口来说,先增量计算,关闭窗口前,增量计算结果发给 ProcessWindowFunction 作为输入再全量处理。

特点:既可以增量聚合,又可以访问窗口的元数据信息(比如开始时间、状态等)。


2.3.1 用法概述
input
        .keyBy(...)
        .timeWindow(...)
        .reduce(
            incrAggregator: ReduceFunction[IN],
            function: ProcessWindowFunction[IN, OUT, K, W])

input
        .keyBy(...)
        .timeWindow(...)
        .aggregate(
            incrAggregator: AggregateFunction[IN, ACC, V],
            windowFunction: ProcessWindowFunction[V, OUT, K, W])

2.3.2 使用 ReduceFunction 进行增量窗口聚合

数据:

{"userID": "user_1", "eventTime": "2020-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:33", "eventType": "browse", "productID": "product_1", "productPrice": 30}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:34", "eventType": "browse", "productID": "product_1", "productPrice": 20}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:36", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:38", "eventType": "browse", "productID": "product_1", "productPrice": 70}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:40", "eventType": "browse", "productID": "product_1", "productPrice": 20}

例子:获取一段时间内(Window Size)每个用户(KeyBy)浏览的商品的最大价值的那条记录(ReduceFunction),并获得Key和Window信息。

package cn.itcast.day12.process;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import java.text.SimpleDateFormat;
import java.time.Duration;
/**
 * @author lql
 * @time 2024-03-08 17:06:59
 * @description TODO
 */
public class ReduceAndProcessFunction {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        // todo 3) 将获取的 json 数据解析成 java bean
        lines.process(new SocketProcessFunction())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserActionLog>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<UserActionLog>() {
                            @Override
                            public long extractTimestamp(UserActionLog userActionLog, long l) {
                                try {
                                    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                    return format.parse(userActionLog.getEventTime()).getTime();
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    return 0L;
                                } }}))
                // 按照用户分组
                .keyBy( (KeySelector<UserActionLog,String>) UserActionLog::getUserID )
                // 构造窗口函数 TimeWindow:滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 窗口函数: 获取这段窗口时间内每个用户浏览的商品的最大价值对应的那条记录
                .reduce(
                        //增量聚合操作
                        new ReduceFunction<UserActionLog>() {
                            @Override
                            public UserActionLog reduce(UserActionLog value1, UserActionLog value2) throws Exception {
                                return value1.getProductPrice() > value2.getProductPrice() ? value1 : value2;
                            }
                        },
                        //窗口函数操作,其中迭代器中的数据只有一条,已经进行了增量聚合
                        new ProcessWindowFunction<UserActionLog, String, String, TimeWindow>() {
                            @Override
                            public void process(String key, Context context, Iterable<UserActionLog> elements, Collector<String> out) throws Exception {
                                UserActionLog max = elements.iterator().next();
                                System.out.println("集合中的数据:"+ IteratorUtils.toList(elements.iterator()).size());

                                String windowStart = new DateTime(context.window().getStart()).toString("yyyy-MM-dd HH:mm:ss");
                                String windowEnd = new DateTime(context.window().getEnd()).toString("yyyy-MM-dd HH:mm:ss");
                                String record = "key:"+key+"\n"+"窗口开始时间:"+windowStart+"\n窗口结束时间:"+windowEnd+"\n浏览的商品最大价值对应的记录:"+max;
                                out.collect(record);
                            }
                        }
                ).print();

        // todo 4) 启动程序
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserActionLog{
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Long productPrice;
    }

    /**
     * 将获取的JSON数据解析成Java Bean
     */
    private static class SocketProcessFunction extends ProcessFunction<String,UserActionLog>{
        /**
         * 每条数据都需要执行的方法
         * @param s
         * @param context
         * @param collector
         * @throws Exception
         */
        @Override
        public void processElement(String s, Context context, Collector<UserActionLog> collector) throws Exception {
            collector.collect(JSON.parseObject( s, UserActionLog.class ));
        }
    }
}

结果:

集合中的数据:1
key:user_1
窗口开始时间:2020-11-09 10:41:30
窗口结束时间:2020-11-09 10:41:35
浏览的商品最大价值对应的记录:ReduceAndProcessFunction.UserActionLog(userID=user_1, eventTime=2020-11-09 10:41:33, eventType=browse, productID=product_1, productPrice=30)
集合中的数据:1
key:user_1
窗口开始时间:2020-11-09 10:41:35
窗口结束时间:2020-11-09 10:41:40
浏览的商品最大价值对应的记录:ReduceAndProcessFunction.UserActionLog(userID=user_1, eventTime=2020-11-09 10:41:38, eventType=browse, productID=product_1, productPrice=70)

总结:

  • 1- 需要先设置并行度为1,便于少量数据观察到结果
  • 2- reduce/aggregate 暂时不需要 RichreduceFunction,报错:ReduceFunction of apply can not be a RichFunction.

2.3.3 使用 AggerateFunction 进行增量窗口聚合

数据:

{"userID": "user_1", "eventTime": "2020-11-09 10:41:32", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:33", "eventType": "browse", "productID": "product_1", "productPrice": 30}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:34", "eventType": "browse", "productID": "product_1", "productPrice": 20}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:36", "eventType": "browse", "productID": "product_1", "productPrice": 10}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:38", "eventType": "browse", "productID": "product_1", "productPrice": 70}
{"userID": "user_1", "eventTime": "2020-11-09 10:41:40", "eventType": "browse", "productID": "product_1", "productPrice": 20}

例子:获取一段时间内(Window Size)每个用户(KeyBy)浏览的平均价值(AggregateFunction),并获得Key和Window信息。

package cn.itcast.day12.process;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.text.SimpleDateFormat;
import java.time.Duration;

/**
 * @author lql
 * @time 2024-03-08 17:59:42
 * @description TODO
 */
public class AggregateAndProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        // 将从Kafka获取的JSON数据解析成Java Bean
        lines.process(new KafkaProcessFunction())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserActionLog>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<UserActionLog>() {
                                    @Override
                                    public long extractTimestamp(UserActionLog element, long recordTimestamp) {
                                        try {
                                            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                            return format.parse(element.getEventTime()).getTime();
                                        } catch (Exception e) {
                                            e.printStackTrace();
                                            return 0L;
                                        }
                                    }
                                }))
                // 按用户分组
                .keyBy((KeySelector<UserActionLog, String>) UserActionLog::getUserID)
                // 构造TimeWindow
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        // 窗口函数: 获取这段窗口时间内,每个用户浏览的商品的平均价值,并发出Key和Window信息
                .aggregate(
                        new AggregateFunction<UserActionLog, Tuple2<Long, Long>, Double>() {

                            // 1、初始值
                            // 定义累加器初始值
                            @Override
                            public Tuple2<Long, Long> createAccumulator() {
                                return new Tuple2<>(0L, 0L);
                            }

                            // 2、累加
                            // 定义累加器如何基于输入数据进行累加
                            @Override
                            public Tuple2<Long, Long> add(UserActionLog value, Tuple2<Long, Long> accumulator) {
                                accumulator.f0 += 1;
                                accumulator.f1 += value.getProductPrice();
                                return accumulator;
                            }

                            // 3、合并
                            // 定义累加器如何和State中的累加器进行合并
                            @Override
                            public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
                                acc1.f0 += acc2.f0;
                                acc1.f1 += acc2.f1;
                                return acc1;
                            }

                            @Override
                            public Double getResult(Tuple2<Long, Long> longLongTuple2) {
                                return longLongTuple2.f1 / (longLongTuple2.f0 * 1.0);
                            }
                        },
                        new ProcessWindowFunction<Double, String, String, TimeWindow>() {
                            @Override
                            public void process(String key, Context context, Iterable<Double> elements, Collector<String> out) throws Exception {
                                Double avg = elements.iterator().next();
                                String windowStart = new DateTime(context.window().getStart(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");
                                String windowEnd=new DateTime(context.window().getEnd(), DateTimeZone.forID("+08:00")).toString("yyyy-MM-dd HH:mm:ss");

                                String record="Key: "+key+" 窗口开始时间: "+windowStart+" 窗口结束时间: "+windowEnd+" 浏览的商品的平均价值: "+String.format("%.2f",avg);
                                out.collect(record);

                            }
                        }
                ).print();

        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserActionLog{
        private String userID;
        private String eventTime;
        private String eventType;
        private String productID;
        private Long productPrice;
    }

    /**
     * 将从Kafka获取的JSON数据解析成Java Bean
     */
    private static class KafkaProcessFunction extends ProcessFunction<String, UserActionLog> {
        @Override
        public void processElement(String value, Context ctx, Collector<UserActionLog> out) throws Exception {
            out.collect(JSON.parseObject(value, UserActionLog.class));
        }
    }
}

结果:

Key: user_1 窗口开始时间: 2020-11-09 10:41:30 窗口结束时间: 2020-11-09 10:41:35 浏览的商品的平均价值: 20.00
Key: user_1 窗口开始时间: 2020-11-09 10:41:35 窗口结束时间: 2020-11-09 10:41:40 浏览的商品的平均价值: 40.00

总结:

  • 这种方法主要以 aggregate 的累加器思路为重点,processWindowFunction 的方法主要是为了更能输出状态数据等信息。

2.3.4 Using per-window state in ProcessWindowFunction

与 windowFunction 不同,使用 ProcessWindowFunction 不仅仅可以拿到窗口内数据信息,还可以获取两个状态:

  • WindowState:表示窗口的状态,该状态值和窗口绑定的,一旦窗口消亡状态消失。
  • GlobalState:表示窗口的状态,该状态和Key绑定的,可以累计多个窗口的值。

数据:

1000,spark,2
5000,spark,2
6000,spark,3
10000,spark,5

例子:

package cn.itcast.day12.process;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Iterator;

/**
 * @author lql
 * @time 2024-03-08 20:36:12
 * @description TODO
 */

public class WindowStateAndGlobalStateFunctionDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(10000);
        //1000,spark,3
        //1200,spark,5
        //2000,hadoop,2
        //socketTextStream返回的DataStream并行度为1
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(WatermarkStrategy
                .<String>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((line, timestamp) -> Long.parseLong(line.split(",")[0])));


        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] fields = value.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });

        //调用keyBy
        KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);

        //NonKeyd Window: 不调用KeyBy,然后调用windowAll方法,传入windowAssinger
        // Keyd Window: 先调用KeyBy,然后调用window方法,传入windowAssinger
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed = keyed
                .window(TumblingEventTimeWindows.of(Time.seconds(5)));

        //如果直接调用sum或reduce,只会聚合窗口内的数据,不去跟历史数据进行累加
        //需求:可以在窗口内进行增量聚合,并且还可以与历史数据进行聚合
        SingleOutputStreamOperator<String> result = windowed.aggregate(new MyAggFunc(), new MyWindowFunc());
        result.print();
        env.execute();
    }


    private static class MyAggFunc implements AggregateFunction<Tuple2<String, Integer>, Integer, Integer> {

        //创建一个初始值
        @Override
        public Integer createAccumulator() {
            return 0;
        }

        //数据一条数据,与初始值或中间累加的结果进行聚合
        @Override
        public Integer add(Tuple2<String, Integer> value, Integer accumulator) {
            return value.f1 + accumulator;
        }

        //返回的结果
        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        //如果使用的是非SessionWindow,可以不实现
        @Override
        public Integer merge(Integer a, Integer b) {
            return null;
        }
    }


    private static class MyWindowFunc extends ProcessWindowFunction<Integer, String, String, TimeWindow> {
        // 一个是窗口描述器,一个是全局描述器
        private transient ReducingStateDescriptor<Integer> windowStateDescriptor;
        private transient ReducingStateDescriptor<Integer> globalStateDescriptor;

        @Override
        public void open(Configuration parameters) throws Exception {
            windowStateDescriptor = new ReducingStateDescriptor<Integer>(
                    "window",
                    new ReduceFunction<Integer>() {
                @Override
                public Integer reduce(Integer value1, Integer value2) throws Exception {
                    return value1 + value2;
                }
            }, TypeInformation.of(new TypeHint<Integer>() { }));

            globalStateDescriptor = new ReducingStateDescriptor<Integer>(
                    "global",
                    new ReduceFunction<Integer>() {
                @Override
                public Integer reduce(Integer value1, Integer value2) throws Exception {
                    return value1 + value2;
                }
            }, TypeInformation.of(new TypeHint<Integer>() { }));
        }

        @Override
        public void process(String key, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {

            Integer sum = 0;
            Iterator<Integer> iterator = elements.iterator();
            while (iterator.hasNext()){
                sum += iterator.next();
            }
            ReducingState<Integer> windowState = context.windowState().getReducingState(windowStateDescriptor);
            ReducingState<Integer> globalState = context.globalState().getReducingState(globalStateDescriptor);

            // lambda 表达式的遍历,每个元素 t
            elements.forEach(t -> {
                try {
                    windowState.add(t);
                    globalState.add(t);
                } catch (Exception exception) {
                    exception.printStackTrace();
                }
            });
            out.collect(key+",window:"+windowState.get()+",global:"+globalState.get());
        }
    }
}

结果:

1> spark,window:2,global:2
1> spark,window:5,global:7

总结:

  • 1- 注册两个状态描述器之后,需要重写 open 方法;
  • 2- 在 open 方法中,都需要 new 一个 ReducingStateDescriptor,然后重写 reduce 方法进行累加操作;
  • 3- 在 process 方法中,进行元素的迭代求和;
  • 4- 极为主要的是,运用 context.windowState() / globalState(),这个是主要区别!

3. 双流 Join

3.1 面试介绍

Join大体分类只有两种:Window Join 和 Interval Join

  • Window Join 将数据缓存在 Window State 中,窗口触发计算时执行join操作

    • Tumbling Window Join
    • Sliding Window Join
    • Session Widnow Join。
  • interval join

    • 也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理

3.2 Window Join
3.2.1 Tumbling Window Join

执行翻滚窗口联接时,具有公共键和公共翻滚窗口的所有元素将作为成对组合联接,并传递给 JoinFunction 或 FlatJoinFunction。

注意,在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。

使用模板:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 ...
    DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
            orangeStream.join(greenStream)
            .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
            .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

3.2.2 Sliding Window Join

在执行滑动窗口联接时,具有公共键和公共滑动窗口的所有元素将作为成对组合联接,并传递给 JoinFunction 或 FlatJoinFunction。

注意,在窗口[2,3]中,橙色②与绿色③连接,但在窗口[1,2]中没有与任何对象连接。

使用模板:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
        orangeStream.join(greenStream)
        .where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
        .apply (new JoinFunction<Integer, Integer, String> (){
    @Override
    public String join(Integer first, Integer second) {
        return first + "," + second;
    }
});

3.2.3 Session Window Join

在执行会话窗口联接时,具有相同键(当“组合”时满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。

注意,在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

使用模板:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
        orangeStream.join(greenStream)
        .where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
        .apply (new JoinFunction<Integer, Integer, String> (){
    @Override
    public String join(Integer first, Integer second) {
        return first + "," + second;
    }
});

3.2.3 案例演示

例子:使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。

package cn.itcast.day13.join;

/**
 * @author lql
 * @time 2024-03-09 21:03:00
 * @description TODO:
思路
Window Join首先需要使用where和equalTo指定使用哪个key来进行关联,此处我们通过应用方法,基于GoodsId来关联两个流中的元素。
设置5秒的滚动窗口,流的元素关联都会在这个5秒的窗口中进行关联。
apply方法中实现将两个不同类型的元素关联并生成一个新类型的元素。
 */

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * 来做个案例:
 * 使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。
 */
public class JoinDemo01 {
    public static void main(String[] args) throws Exception {
        //todo 1)环境初始化
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2)设置并行度
        env.setParallelism(1);

        //todo 3)构建数据源
        //构建商品数据流
        // 因为继承的 Richsource 没有指出返回类型,所以这里需要指出了!!!
        SingleOutputStreamOperator<Goods> goodsDataStream = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class))
                .assignTimestampsAndWatermarks(new GoodsWatermark());

        //构建订单明细数据流
        SingleOutputStreamOperator<OrderItem> orderItemDataStream = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderDetailWatermark());

        DataStream<FactOrderItem> result = goodsDataStream.join(orderItemDataStream)
                //第一个流的where
                .where(Goods::getGoodsId)
                //第二个流的where
                .equalTo(OrderItem::getGoodsId)
                //添加窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Goods, OrderItem, FactOrderItem>() {
                    @Override
                    public FactOrderItem join(Goods goods, OrderItem orderItem) throws Exception {
                        FactOrderItem factOrderItem = new FactOrderItem();
                        factOrderItem.setGoodsId(goods.getGoodsId());
                        factOrderItem.setGoodsName(goods.getGoodsName());
                        factOrderItem.setCount(new BigDecimal(orderItem.getCount()));
                        factOrderItem.setTotalMoney(goods.getGoodsPrice().multiply(new BigDecimal(orderItem.getCount())));
                        return factOrderItem;
                    }
                });

        result.printToErr();
        env.execute();
    }

    //商品类
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Goods {
        private String goodsId;
        private String goodsName;
        private BigDecimal goodsPrice;

        public static List<Goods> GOODS_LIST;
        public static Random r;

        static  {
            r = new Random();
            GOODS_LIST = new ArrayList<>();
            GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
        }

        public static Goods randomGoods() {
            int rIndex = r.nextInt(GOODS_LIST.size());
            return GOODS_LIST.get(rIndex);
        }

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //订单明细类
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class OrderItem {
        private String itemId;
        private String goodsId;
        private Integer count;

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //关联结果
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class FactOrderItem {
        private String goodsId;
        private String goodsName;
        private BigDecimal count;
        private BigDecimal totalMoney;
        private String itemId;

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //构建一个商品Stream源(这个好比就是维表)
    public static class GoodsSource11 extends RichSourceFunction {
        private Boolean isCancel;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                // steam 可以将列表转化为流
                // lambda 表达式将返回对象逐个进行 collect
                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isCancel = true;
        }
    }

    //构建订单明细Stream源
    public static class OrderItemSource extends RichSourceFunction {
        private Boolean isCancel;
        private Random r;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
            r = new Random();
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods goods = Goods.randomGoods();
                OrderItem orderItem = new OrderItem();
                orderItem.setGoodsId(goods.getGoodsId());
                orderItem.setCount(r.nextInt(10) + 1);
                orderItem.setItemId(UUID.randomUUID().toString());
                sourceContext.collect(orderItem);
                orderItem.setGoodsId("111");
                sourceContext.collect(orderItem);
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isCancel = true;
        }
    }

    // 因为这里没有指定是哪一种水印,重写两个方法!

    /**
     * 定义商品水印信息
     */
    private static class GoodsWatermark implements WatermarkStrategy<Goods> {

        // 因为这里看见水印生成器,所以一定要想到有继承方法,参考自定义水印章节
        @Override
        public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Goods>(){ // 继承两个方法

                @Override
                public void onEvent(Goods goods, long eventTimestamp, WatermarkOutput watermarkOutput) {
                    System.out.println("商品数据时间:"+System.currentTimeMillis());
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }

        @Override
        public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            // 在流处理过程中,每个 Goods 元素都将被分配一个当前的时间戳
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
    }

    /**
     * 定义订单明细数据流的水印
     */
    public static class OrderDetailWatermark implements WatermarkStrategy<OrderItem>{
        @Override
        public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<OrderItem>() {
                @Override
                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                    System.out.println("订单明细数据时间:"+System.currentTimeMillis());
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }

        @Override
        public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
    }
}

结果:

订单明细数据时间:1709991872660
商品数据时间:1709991872660
订单明细数据时间:1709991872661
商品数据时间:1709991872661
商品数据时间:1709991872661
商品数据时间:1709991872661
商品数据时间:1709991872661
商品数据时间:1709991872661
订单明细数据时间:1709991873665
商品数据时间:1709991873665
订单明细数据时间:1709991873665
商品数据时间:1709991873665
商品数据时间:1709991873665
商品数据时间:1709991873665
商品数据时间:1709991873665
商品数据时间:1709991873665
订单明细数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
商品数据时间:1709991874665
订单明细数据时间:1709991874665

{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
{"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}
{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}
{"count":4,"goodsId":"4","goodsName":"Thinkpad X1","totalMoney":39200}
{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}
{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}
{"count":1,"goodsId":"2","goodsName":"iphone12","totalMoney":12000}

总结:

  • 1- 注意定义 java bean 类处理流信息的时候
  • 2- 窗口流注意水印操作的生成器方式,发生水印的时间
  • 3- joinFunction 需要重写 join 方法

3.3 Interval Join
3.3.1 Interval Join 介绍
  • interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界(负号),且,流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界(正号)。

这些边界是包含的,但是可以应用 .lowerBoundExclusive().upperBoundExclusive 来更改行为!

使用模板:

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
        orangeStream
                .keyBy(<KeySelector>)
        .intervalJoin(greenStream.keyBy(<KeySelector>))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process (new ProcessJoinFunction<Integer, Integer, String(){

    @Override
    public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
        out.collect(first + "," + second);
    }
});

3.3.2 案例演示

例子:

package cn.itcast.day13.join;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * @author lql
 * @time 2024-03-09 22:27:18
 * @description TODO
 */
public class JoinDemo02 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建商品数据流
        DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());
        // 构建订单明细数据流
        DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());

        // 进行关联查询
        SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
                .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
                .between(Time.seconds(-1), Time.seconds(0))
                .upperBoundExclusive()
                .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {
                    @Override
                    public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {
                        FactOrderItem factOrderItem = new FactOrderItem();
                        factOrderItem.setGoodsId(right.getGoodsId());
                        factOrderItem.setGoodsName(right.getGoodsName());
                        factOrderItem.setCount(new BigDecimal(left.getCount()));
                        factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));

                        out.collect(factOrderItem);
                    }
                });

        factOrderItemDS.print();

        env.execute("Interval JOIN");
    }

    //商品类
    @Data
    public static class Goods {
        private String goodsId;
        private String goodsName;
        private BigDecimal goodsPrice;

        public static List<Goods> GOODS_LIST;
        public static Random r;

        static {
            r = new Random();
            GOODS_LIST = new ArrayList<>();
            GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
        }

        public static Goods randomGoods() {
            int rIndex = r.nextInt(GOODS_LIST.size());
            return GOODS_LIST.get(rIndex);
        }

        public Goods() {
        }

        public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
            this.goodsId = goodsId;
            this.goodsName = goodsName;
            this.goodsPrice = goodsPrice;
        }

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //订单明细类
    @Data
    public static class OrderItem {
        private String itemId;
        private String goodsId;
        private Integer count;

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //关联结果
    @Data
    public static class FactOrderItem {
        private String goodsId;
        private String goodsName;
        private BigDecimal count;
        private BigDecimal totalMoney;

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    //构建一个商品Stream源(这个好比就是维表)
    public static class GoodsSource11 extends RichSourceFunction {
        private Boolean isCancel;

        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while (!isCancel) {
                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isCancel = true;
        }
    }

    //构建订单明细Stream源
    public static class OrderItemSource extends RichSourceFunction {
        private Boolean isCancel;
        private Random r;

        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
            r = new Random();
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while (!isCancel) {
                Goods goods = Goods.randomGoods();
                OrderItem orderItem = new OrderItem();
                orderItem.setGoodsId(goods.getGoodsId());
                orderItem.setCount(r.nextInt(10) + 1);
                orderItem.setItemId(UUID.randomUUID().toString());
                sourceContext.collect(orderItem);
                orderItem.setGoodsId("111");
                sourceContext.collect(orderItem);
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isCancel = true;
        }
    }

    //构建水印分配器(此处为了简单),直接使用系统时间了
    public static class GoodsWatermark implements WatermarkStrategy<Goods> {

        @Override
        public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }

        @Override
        public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Goods>() {
                @Override
                public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }

    public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
        @Override
        public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }

        @Override
        public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<OrderItem>() {
                @Override
                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }

                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
}

结果:

5> {"count":4,"goodsId":"5","goodsName":"MeiZu One","totalMoney":12800}
3> {"count":9,"goodsId":"3","goodsName":"MacBookPro","totalMoney":135000}

总结:

  • 1- connect + broadcast 连接适用于数据几乎不变的情况下
  • 2- BroadcastState 连接适用于数据变化不那么快的情况下
  • 3- 双流 Join 连接适用于流式数据变化很快的情况下(类似于股价)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/443967.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【简写Mybatis】03-Mapper xml的注册和使用

前言 在学习MyBatis源码文章中&#xff0c;斗胆想将其讲明白&#xff1b;故有此文章&#xff0c;如有问题&#xff0c;不吝指教&#xff01; 注意&#xff1a; 学习源码一定一定不要太关注代码的编写&#xff0c;而是注意代码实现思想&#xff1b; 通过设问方式来体现代码中的…

vSphere 8考试认证题库 2024最新(VCP 8.0版本)

VMware VCP-DCV&#xff08;2V0-21.23&#xff09;认证考试题库&#xff0c;已全部更新&#xff0c;答案已经完成校对&#xff0c;完整题库请扫描上方二维码访问。正常考可以考到450分以上&#xff08;满分500分&#xff0c;300分通过&#xff09; An administrator is tasked …

HPE ProLiant MicroServer Gen8驱动程序下载(windows)

记录下&#xff0c;以方便需要重装系统时将驱动更新到最后版本。 共有下面设备有适用的驱动可用&#xff1a; 1、系统管理&#xff1a; iLO 4 Channel Interface Driver for Windows Server 2016 下面这个驱动&#xff0c;安装后不知道有什么用 iLO 3/4 Management Control…

基于springboot+vue实现物资仓储物流管理系统项目【项目源码+论文说明】计算机毕业设计

基于springbootvue实现物资仓储物流管理系统演示 摘要 随着我国经济及产业化结构的持续升级&#xff0c;越来越多的企业借助信息化及互联网平台实现了技术的创新以及竞争力的提升&#xff0c;在电子经济的影响下仓储物流业务也获得了更多的关注度&#xff0c;利用系统平台实现…

【软考】设计模式之享元模式

目录 1. 说明2. 应用场景3. 结构图4. 构成5. 适用性6. java示例 1. 说明 1.享元设计模式&#xff08;Flyweight Design Pattern&#xff09;是一种常见的软件设计模式2.属于结构型设计模式&#xff0c;对象结构型模式3.目的&#xff1a;运用共享技术有效地支持大量细粒度的对象…

【Wireshark傻瓜式安装,Wireshark使用过滤条件】

Wireshark傻瓜式安装&#xff0c;Wireshark使用过滤条件 安装使用wireshark过滤器表达式的规则1.抓包过滤器语法和实例&#xff08;1&#xff09;协议过滤&#xff08;2&#xff09;IP过滤&#xff08;3&#xff09;端口过滤&#xff08;4&#xff09;逻辑运算符&&与、…

后端传给前端的时间字段前端显示不正确

具体问题是什么呢&#xff0c;就比如我后段有一个字段是TimeStamp类型&#xff0c;从数据库中查出数据是下面的样式&#xff1a; 但是前端显示的是下面的格式&#xff1a; 这个的解决方法还是挺多的&#xff0c;那接下来具体来看看吧~ 第一种&#xff1a; 在application.prop…

【数据结构】数组、双链表代码实现

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

迁移篇 | MatrixOne与MySQL全面对比

Part 1 迁移背景 Skyable 自研了物联网私有云平台用于 IoT 设备的数据上报和协议解析&#xff0c;由于管理设备数量的增加导致设备上报的数据量越来越大&#xff0c;架构中原使用的 MySQL 数据库&#xff08;分库分表&#xff09;的部分业务在对设备上报信息进行相关的查询时&…

ChatGPT 结合实际地图实现问答式地图检索功能基于Function calling

ChatGPT 结合实际地图实现问答式地图检索功能基于Function calling ChatGPT结合实际业务&#xff0c;主要是研发多函数调用&#xff08;Function Calling&#xff09;功能模块&#xff0c;将自定义函数通过ChatGPT 问答结果&#xff0c;实现对应函数执行&#xff0c;再次将结果…

鸿蒙Harmony应用开发—ArkTS声明式开发(通用属性:文本通用)

文本通用属性目前只针对包含文本元素的组件&#xff0c;设置文本样式。 说明&#xff1a; 从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 属性 名称参数类型描述fontColorResourceColor设置字体颜色。 从API version 9开…

VBA更新xlOLELinks链接的值

xlOLELinks是在Excel文档中插入对象的链接&#xff0c;该链接能够显示被插入文档的数据&#xff0c;通常情况下链接的数值会自动更新&#xff0c;但有时更新也会不及时或失效&#xff0c;这时就需要手动更新&#xff0c;如下图&#xff1a; 以插入Word文档为例&#xff0c;使用…

【漏洞复现】Laykefu客服系统任意文件上传

漏洞描述 Laykefu客服系统/admin/users/upavatar.html接口处存在文件上传漏洞,而且当请求中Cookie中的”user_name“不为空时即可绕过登录系统后台,未经身份验证的攻击者可利用此问题,上传后门文件,获取服务器权限。 免责声明 技术文章仅供参考,任何个人和组织使用网络…

js【深度解析】代码的执行顺序

代码的分类 我们将每一句要执行的 js 代码当做一个任务&#xff0c;则 js 代码可以按照其执行方式的不同&#xff0c;按下图分类 同步任务&#xff1a;立即执行的代码异步任务&#xff1a;延迟执行的代码 微任务&#xff1a;被放入微任务队列&#xff08;micro task queue&…

【记录37】VueBaiduMap 踩坑一

截图 错误 Error in callback for watcher “position.lng”: “TypeError: Cannot read properties of undefined (reading ‘setPosition’)” 解释 回调观察程序“content”时出错&#xff1a;“TypeError:无法读取未定义的属性&#xff08;读取’setContent’&#xff09;”…

设计模式-行为型模式-模版方法模式

模板方法模式&#xff0c;定义一个操作中的算法的骨架&#xff0c;而将一些步骤延迟到子类中。模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。[DP] 模板方法模式是通过把不变行为搬移到超类&#xff0c;去除子类中的重复代码来体现它的优势。 //首…

L-2:插松枝(Python)

作者 陈越 单位 浙江大学 人造松枝加工场的工人需要将各种尺寸的塑料松针插到松枝干上&#xff0c;做成大大小小的松枝。他们的工作流程&#xff08;并不&#xff09;是这样的&#xff1a; 每人手边有一只小盒子&#xff0c;初始状态为空。每人面前有用不完的松枝干和一个推送…

《汇编语言》第3版(王爽)实验9

第9章 实验9 编程&#xff1a;在屏幕中间分别显示绿色、绿底红色、白底蓝色的字符串 ‘welcome to masm!’ assume cs:code,ds:datadata segmentdb welcome to masm!,0 data endscode segmentstart:mov ax,data mov ds,ax ;ds指向data段mov ax,0B800H ;显存空间从B800H…

LeetCode_24_中等_两两交换链表中的节点

文章目录 1. 题目2. 思路及代码实现&#xff08;Python&#xff09;2.1 递归2.2 迭代 1. 题目 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节点交换…

windows11编译FFmpeg源码完整步骤

1.安装MSYS2 下载并安装MSYS2 安装GCC GCC安装成功 克隆FFmpeg源码 打开MSYS2终端并进入ffmpeg文件夹,然后输入./configure回车开始生成makefile