二次开发Flink-coGroup算子支持迟到数据通过测输出流提取

1.背景

coGroup算子开窗到时间关闭之后,迟到数据无法通过测输出流提取,intervalJoin算子提供了api,因为join算子底层就是coGroup算子,所以Join算子也不行。

flink版本 v1.17.1

2.coGroup算子源码分析

2.1完成的coGroup算子调用流程

    input1.coGroup(input2)
    .where(keySelector1)
    .equalTo(keySelector2)
    .window(windowAssigner)
    .trigger(trigger)
    .evictor(evictor)
    .allowedLateness(allowedLateness)
    .apply(cgroupFunction)

通过上述代码可以看到没有sideOutputLateData的相关方法,用来提取窗口关闭之后的迟到数据

2.2coGroup方法入口

其中创建了一个CoGroupedStreams流对象

    /**
     * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and
     * window can be specified.
     */
    public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
        return new CoGroupedStreams<>(this, otherStream);
    }

2.3 CoGroupedStreams对象分析

他可以理解为构造设计模式的一个Builder类,通过where方法配置第一条流的KeySelector,再返回一个CoGroupedStreams的内部类Where,再通过equalTo方法配置第二条流的KeySelector,再返回EqualTo内部类,window方法配置窗口划分器,返回WithWindow内部类,后续都是窗口的配置 trigger,evictor,allowedLateness配置窗口参数,最后调用apply方法传送用户业务函数

2.4WithWindow内部类分析

WithWindow是最终保存所有配置的内部类包括两条流,窗口配置,key提取器的配置,最终会用户调用apply方法触发CoGroup的业务,在apply方法中通过union联合两条流,然后通过keyby转为KeyedStream,再通过window配置窗口,最终调用窗口函数的apply方法,传入WindowFunction,做CoGroup的业务与用户业务。

具体代码如下已写好备注


    /**
     * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
     * well as a {@link WindowAssigner}.
     *
     * @param <T1> Type of the elements from the first input
     * @param <T2> Type of the elements from the second input
     * @param <KEY> Type of the key. This must be the same for both inputs
     * @param <W> Type of {@link Window} on which the co-group operation works.
     */
    @Public
    public static class WithWindow<T1, T2, KEY, W extends Window> {
        //第一条流
        private final DataStream<T1> input1;
        //第二条流
        private final DataStream<T2> input2;
        //第一个key提取器
        private final KeySelector<T1, KEY> keySelector1;
        //第二个Key提取器
        private final KeySelector<T2, KEY> keySelector2;
        //Key的类型
        private final TypeInformation<KEY> keyType;
        //窗口分配器
        private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;
        //窗口出发计算器
        private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

        private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

        private final Time allowedLateness;

        private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
        //构造函数给上面对象赋值
        protected WithWindow(
                DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness) {
            this.input1 = input1;
            this.input2 = input2;

            this.keySelector1 = keySelector1;
            this.keySelector2 = keySelector2;
            this.keyType = keyType;

            this.windowAssigner = windowAssigner;
            this.trigger = trigger;
            this.evictor = evictor;

            this.allowedLateness = allowedLateness;
        }

        /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p>Note: This method's return type does not support setting an operator-specific
         * parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
         * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
         * parallelism.
         */
        public <T> DataStream<T> apply(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);
			//创建合并两个流的公共TypeInfo,UnionTypeInfo最终会将Input1,Input2的数据通过map算子转换为该类型
            UnionTypeInfo<T1, T2> unionType =
                    new UnionTypeInfo<>(input1.getType(), input2.getType());
			//转换成union的KeySelector
            UnionKeySelector<T1, T2, KEY> unionKeySelector =
                    new UnionKeySelector<>(keySelector1, keySelector2);
			//将taggedInput1的数据类容map成UnionTypeInfo<T1, T2>类型
            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
                    input1.map(new Input1Tagger<T1, T2>());
            taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
            taggedInput1.returns(unionType);
	       //将taggedInput2的数据类容map成UnionTypeInfo<T1, T2>类型
            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
                    input2.map(new Input2Tagger<T1, T2>());
            taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
            taggedInput2.returns(unionType);
			//将两个流进行union
            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
			//keyBy并且开窗
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                                    unionStream, unionKeySelector, keyType)
                            .window(windowAssigner);
			//配置窗口触发器
            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
			//配置移除器
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
			//配置allowedLateness
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }
			//创建CoGroupWindowFunction ,并把用户函数传入进去
            return windowedStream.apply(
                    new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

        /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p><b>Note:</b> This is a temporary workaround while the {@link #apply(CoGroupFunction,
         * TypeInformation)} method has the wrong return type and hence does not allow one to set an
         * operator-specific parallelism
         *
         * @deprecated This method will be removed once the {@link #apply(CoGroupFunction,
         *     TypeInformation)} method is fixed in the next major version of Flink (2.0).
         */
        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            return (SingleOutputStreamOperator<T>) apply(function, resultType);
        }
		
        @VisibleForTesting
        Time getAllowedLateness() {
            return allowedLateness;
        }
		//获取窗口包装流,但是标记为VisibleForTesting,用户无法调用,如果可以调用的话可以通过该方法获取包装流之后通过窗口流获取迟到数据的测输出流
        @VisibleForTesting
        WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
            return windowedStream;
        }
    }

2.5CoGroupWindowFunction函数分析

CoGroupWindowFunction也是CoGroupedStreams内部类,负责做CoGroup的业务,最终将数据封装好转发给用户函数(也就是2.1中apply中的cgroupFunction)

   private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
            extends WrappingFunction<CoGroupFunction<T1, T2, T>>
            implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

        private static final long serialVersionUID = 1L;

        public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
            super(userFunction);
        }

        @Override
        public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
                throws Exception {
			//缓存当前窗口里1号流的数据
            List<T1> oneValues = new ArrayList<>();
			//缓存当前窗口里2号流的数据
            List<T2> twoValues = new ArrayList<>();

            for (TaggedUnion<T1, T2> val : values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
			//传入到用户函数中
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }
    }

3.修改源码支持获取迟到数据测输出流

思路 复制CoGroupedStreams新增一个NewCoGroupedStreams,在WithWindow函数中增加方法sideOutputLateData,让用户传入outputTag,用于提取窗口关闭后的测输出流。

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

新增该方法,传入outputTag,下图WithWindow构造方法是3.3新增的

    @PublicEvolving
        public WithWindow<T1, T2, KEY, W> sideOutputLateData(
                OutputTag<TaggedUnion<T1, T2>> outputTag) {
            return new WithWindow<>(
                    input1,
                    input2,
                    keySelector1,
                    keySelector2,
                    keyType,
                    windowAssigner,
                    trigger,
                    evictor,
                    allowedLateness,
                    outputTag
            );
        }

3.3新增WithWindow构造方法

新增属性laterDataOutputTag,用来保存构造函数中传入的laterOutputTag

   protected WithWindow(
                DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness,
                OutputTag<TaggedUnion<T1, T2>> laterOutputTag
        ) {
            this(
                    input1,
                    input2,
                    keySelector1,
                    keySelector2,
                    keyType,
                    windowAssigner,
                    trigger,
                    evictor,
                    allowedLateness);
            this.lateDataOutputTag = laterOutputTag;

        }

3.4修改apply方法

判断lateDataOutputTag 是否为null,如果不为null则调用windowedStream的sideOutputLateData设置迟到数据tag

 /**
         * Completes the co-group operation with the user function that is executed for windowed
         * groups.
         *
         * <p>Note: This method's return type does not support setting an operator-specific
         * parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
         * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
         * parallelism.
         */
        public <T> DataStream<T> apply(
                CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            // clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType =
                    new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector =
                    new UnionKeySelector<>(keySelector1, keySelector2);

            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
                    input1.map(new Input1Tagger<T1, T2>());
            taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
            taggedInput1.returns(unionType);

            SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
                    input2.map(new Input2Tagger<T1, T2>());
            taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
            taggedInput2.returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

            // we explicitly create the keyed stream to manually pass the key type information in
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                            unionStream, unionKeySelector, keyType)
                            .window(windowAssigner);
            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }
            //判断lateDataOutputTag是否为NULL,如果不为NULL,则调用windowedStream
            //的sideOutputLateData方法,传入lateDataOutputTag让迟到数据输出到测输出流中
            if (lateDataOutputTag != null) {
                windowedStream.sideOutputLateData(lateDataOutputTag);
            }
            return windowedStream.apply(
                    new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

3.5开放UnionTypeInfo类的public权限

该类就是union之后的公共类的类型 oneType代表Input1流的数据类型,TwoType代表Input2流的数据类型

3.6编译Flink源码flink-streaming-java模块

进入到flink-streaming-java所在磁盘目录输入以下命令编译

mvn clean install -DskipTests -Dfast

编译成功

3.7项目中查看maven是否已经刷新为最新代码

编译之后,可以看到导入的maven包已经有了新增的NewCoGroupedStreams类了,注意项目中的maven依赖中的flink版本,要与编译源码的版本一致,否则无法引入到。

4.测试

新建两个流,通过new NewCoGroupedStreams创建对象,在allowedLateness之后通过sideOutputLateData设置outputTag,然后通过with方法触发业务,with底层也是调用了apply,只不过他帮我们把返回的流转为了SingleOutputStreamOperator类型,可以用于提取测输出流。最后通过with.getSideOutput(outputTag)提取测输出流,最后通过map转换为 Tuple2<Integer, WaterSensor> 类型进行打印

    OutputTag<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>> outputTag = new OutputTag<>("later",
                new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));
        
        NewCoGroupedStreams<WaterSensor, WaterSensor> newCgroupStream = new NewCoGroupedStreams<>(ds1, ds2);
        
        SingleOutputStreamOperator<String> with = newCgroupStream.where((x) -> x.getId()).equalTo(x -> x.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(outputTag)
                .with(new RichCoGroupFunction<WaterSensor, WaterSensor, String>() {
                    @Override
                    public void coGroup(Iterable<WaterSensor> first, Iterable<WaterSensor> second, Collector<String> out) throws Exception {
                        out.collect(first.toString() + "======" + second.toString());
                    }
                });
        with.print();
        with.getSideOutput(outputTag).map(new MapFunction<NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor>, Tuple2<Integer, WaterSensor>>() {
            @Override
            public Tuple2<Integer, WaterSensor> map(NewCoGroupedStreams.TaggedUnion<WaterSensor, WaterSensor> value) throws Exception {
                return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());
            }
        }).print();

可以看到下图结果,ts代表时间戳,第一个打印是RichCoGroupFunction打印,代表关闭了1~10s的时间窗,后面我们在输入,WaterSensor{id='a', ts=1, vc=1} 就通过测输出流打印为二元组了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/485028.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

QT(C++)-error LNK2038: 检测到“_ITERATOR_DEBUG_LEVEL”的不匹配项: 值“2”不匹配值“0”

1、项目场景&#xff1a; 在VS中采用QT&#xff08;C&#xff09;调试时&#xff0c;出现error LNK2038: 检测到“_ITERATOR_DEBUG_LEVEL”的不匹配项: 值“2”不匹配值“0”错误 2、解决方案&#xff1a; 在“解决方案资源管理器”中选中出现此类BUG的项目&#xff0c;右键-…

jenkins介绍,帮助你从安装到使用jenkins

Jenkins 概述 官网地址&#xff1a;https://www.jenkins.io/zh/ 什么是 Jenkins Jenkins是一款开源 CI&CD 软件&#xff0c;用于自动化各种任务&#xff0c;包括构建、测试和部署软件。它提供了一个易于使用的图形化界面&#xff0c;可以通过配置简单的任务来实现自动化构…

javaSSM游泳馆日常管理系统IDEA开发mysql数据库web结构计算机java编程maven项目

一、源码特点 IDEA开发SSM游泳馆日常管理系统是一套完善的完整企业内部系统&#xff0c;结合SSM框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用SSM框架&#xff08;MVC模式开发&#xff09;MAVEN方式加载&#xff0c;系统具有完整的源代码和…

Vue 3 里的 onMounted 怎么用?

疑问 最近&#xff0c;一直在学习 Vue 3&#xff0c;此前我不懂前端&#xff0c;也没写过 Vue 2&#xff0c;所以是从 0 开始学习 Vue 3 的。很多对普通人不是疑问的&#xff0c;在我这里也会不太清楚。 我在写项目的时候&#xff0c;常见的一种场景是这样的&#xff1a;页面…

分类预测 | Matlab实现MTF-CNN-Mutilhead-Attention马尔可夫转移场卷积网络多头注意力机制多特征分类预测/故障识别

分类预测 | Matlab实现MTF-CNN-Mutilhead-Attention马尔可夫转移场卷积网络多头注意力机制多特征分类预测/故障识别 目录 分类预测 | Matlab实现MTF-CNN-Mutilhead-Attention马尔可夫转移场卷积网络多头注意力机制多特征分类预测/故障识别分类效果基本介绍模型描述程序设计参考…

基于SSM非遗视域下喀什旅游网站

ssm非遗视域下喀什旅游网站的设计与实现 摘要 我们的生活水平正在不断的提高&#xff0c;然而提高的一个重要的侧面表现就是更加注重我们的娱乐生活。旅行是我们都喜欢的一种娱乐方式&#xff0c;各式各样的旅行经历给我们带来的喜悦也是大不相同的。带来快乐的同时也因为其复…

IntelliJ IDE 插件开发 | (七)PSI 入门及实战(实现 MyBatis 插件的跳转功能)

系列文章 IntelliJ IDE 插件开发 |&#xff08;一&#xff09;快速入门IntelliJ IDE 插件开发 |&#xff08;二&#xff09;UI 界面与数据持久化IntelliJ IDE 插件开发 |&#xff08;三&#xff09;消息通知与事件监听IntelliJ IDE 插件开发 |&#xff08;四&#xff09;来查收…

MongoDB高可用架构涉及常用功能整理

MongoDB高可用架构涉及常用功能整理 1. mongo架构和相关组件1.1. Master-Slave主从模式1.2. Replica Set 副本集模式1.3. Sharding 分片模式 2. Sharding 分片模式2.1. Hashed Sharding方式2.2. Range Sharding方式 3. 事务性4. 疑问和思考4.1. 怎么保证数据的高可靠&#xff1…

常用中间件redis,kafka及其测试方法

常用消息中间件及其测试方法 一、中间件的使用场景引入中间件的目的一般有两个&#xff1a;1、提升性能常用的中间件&#xff1a;1) 高速缓存&#xff1a;redis2) 全文检索&#xff1a;ES3) 存日志&#xff1a;ELK架构4) 流量削峰&#xff1a;kafka 2、提升可用性产品架构中高可…

Web前端—浏览器渲染原理

浏览器渲染原理 浏览器渲染原理渲染时间点渲染流水线1. 解析HTML—Parse HTML2. 样式计算—Recalculate Style3. 布局—Layout4. 分层—Layer5. 绘制—Paint6. 分块—Tiling7. 光栅化—Raster8. 画—Draw完整过程 面试题1. 浏览器是如何渲染页面的&#xff1f;2. 什么是 reflow…

linux apt 速度慢 换源

Ubuntu 20.04.1 LTS已推出,一样的为期5年的服务,感觉不错,安装了一个,但是苦于使用默认源在国内下载太慢,就想着把apt源改为国内源,目前国内比较好的源,有阿里源,清华源,豆瓣源等,下面我以阿里源为例,说下如何修改。 也可以在中科大https://mirrors.ustc.edu.cn/查…

使用amd架构的计算机部署其他架构的虚拟机(如:arm)

1 下载quem模拟器 https://qemu.weilnetz.de/w64/2 QEMU UEFI固件文件下载(引导文件) 推荐使用&#xff1a;https://releases.linaro.org/components/kernel/uefi-linaro/latest/release/qemu64/QEMU_EFI.fd3 QEMU 安装 安装完成之后&#xff0c;需要将安装目录添加到环境变…

福昕阅读器 PDF 文档基本操作

福昕阅读器 PDF 文档基本操作 References 转至 PDF 顶部 快捷键&#xff1a;Home. 转至 PDF 顶部 快捷键&#xff1a;End. 打开超链接 文本选择工具 -> 手形工具 (Hand Tool) -> 点击超链接 福昕阅读器 同时在多个窗口中打开多个文件 文件 -> 偏好设置 -> 文…

数据库导入文件或者运行文件的时候报错误 #1046 - No database selected

如果我们在使用数据库导入文件的时候报错误 #1046 - No database selected该怎么解决 那么小编带我们可以从三个角度去观察 1、这种情况一般是因为你在数据库中没有这个数据库&#xff0c;你新建一个你要导入的数据库名字的数据库&#xff0c;然后选中该数据库&#xff0c;再进…

设计模式-初步认识

目录 &#x1f6fb;1.什么是设计模式 &#x1f69a;2.设计模式的优点 &#x1f68d;3.设计模式6大原则 &#x1f6f4;4.设计模式类型 1.什么是设计模式 设计模式代表了最佳的实践&#xff0c;通常被有经验的面向对象的软件开发人员所采用。设计模式是软件开发人员在软件开…

如何使用PHP和RabbitMQ实现消息队列?

前言 今天我们来做个小试验&#xff0c;用PHP和RabbitMQ实现消息队列功能。 前期准备&#xff0c;需要安装好docker、docker-compose的运行环境。 如何使用docker部署php服务_php如何使用docker发布-CSDN博客 一、安装RabbitMQ 1、创建相关目录&#xff0c;执行如下命令。…

数据分析与挖掘

数据起源&#xff1a; 规模庞大&#xff0c;结构复杂&#xff0c;难以通过现有商业工具和技术在可容忍的时间内获取、管理和处理的数据集。具有5V特性&#xff1a;数量&#xff08;Volume&#xff09;&#xff1a;数据量大、多样性&#xff08;Variety&#xff09;&#xff1a…

neo4j所有关系只显示RELATION,而不显示具体的关系

当看r时&#xff0c;真正的关系在properties中的type里&#xff0c;而type为“RELATION” 造成这个的原因是&#xff1a; 在创建关系时&#xff0c;需要指定关系的类型&#xff0c;这是固定的&#xff0c;不能像属性那样从CSV文件的一个字段动态赋值。标准的Cypher查询语言不支…

Verilog刷题笔记42

题目&#xff1a;Create 16 D flip-flops. It’s sometimes useful to only modify parts of a group of flip-flops. The byte-enable inputs control whether each byte of the 16 registers should be written to on that cycle. byteena[1] controls the upper byte d[15:8…

轻量级 C++ UI 库:快速、可移植、自包含 | 开源日报 No.168

ocornut/imgui Stars: 53.4k License: MIT imgui 是 C 的无臃肿图形用户界面&#xff0c;具有最小的依赖关系。 该项目的主要功能、关键特性、核心优势包括&#xff1a; 为 C 提供了一个轻量级的图形用户界面库输出优化的顶点缓冲区&#xff0c;可在 3D 渲染应用程序中随时呈…