Flink 1.14.*中flatMap,filter等基本转换函数源码

这里以flatMap,filter为例,介绍Flink如果要实现这些基本转换需要实现哪些接口,Flink运行时调用这些实现类的入口,这些基本转换函数之间的类关系

  • 一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口
    • 1、RichFlatMapFunction
    • 2、RichFilterFunction
  • 二、Flink把实现了flatMap,filter功能的类加入到作业中
  • 三、Flink运行时如何调用flatMap和filter的实现类的
  • 四、类关系图

一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口

1、RichFlatMapFunction

@Public
public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
    private static final long serialVersionUID = 1L;

    public RichFlatMapFunction() {
    }
    //需要实现下面这个方法
    public abstract void flatMap(IN var1, Collector<OUT> var2) throws Exception;
}

只需要实现类继承了RichFlatMapFunction,实现了flatMap方法就可以

2、RichFilterFunction

@Public
public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
    private static final long serialVersionUID = 1L;

    public RichFilterFunction() {
    }
    //需要实现下面这个类
    public abstract boolean filter(T var1) throws Exception;
}

只需要实现类继承了RichFilterFunction,实现了filter方法就可以

二、Flink把实现了flatMap,filter功能的类加入到作业中

一般是通过如下代码

DataStream<Row>  dateStream = 来自source的数据流
dateStream.flatMap(extend RichFlatMapFunction的子类);
dateStream.filter(extend RichFilterFunction的子类);

三、Flink运行时如何调用flatMap和filter的实现类的

那就看一下dateStream.flatMap方法

@Public
public class DataStream<T> {
    protected final Transformation<T> transformation;

    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
        return this.flatMap(flatMapper, outType);
    }
    
    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
        return this.transform("Flat Map", outputType, (OneInputStreamOperator)(new StreamFlatMap((FlatMapFunction)this.clean(flatMapper))));
    }
}

StreamFlatMap构造时会把实现类当成入参构建OneInputStreamOperator

@Internal
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> {
    private static final long serialVersionUID = 1L;
    private transient TimestampedCollector<OUT> collector;

    public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
        super(flatMapper);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
    }
	
    public void processElement(StreamRecord<IN> element) throws Exception {
        this.collector.setTimestamp(element);
        //这里就是调用的父类的userFunction,即构造函数传入的flatMapper
        ((FlatMapFunction)this.userFunction).flatMap(element.getValue(), this.collector);
    }
}

下面会把userFunction赋值给AbstractUdfStreamOperator的字段,这样子类在调用userFunction时就是调用的这个

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1L;
    protected final F userFunction;

    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = (Function)Objects.requireNonNull(userFunction);
        this.checkUdfCheckpointingPreconditions();
    }
}

这样StreamFlatMapuserFunction的操作,就是对实现了RichFlatMapFunction的子类的操作

像filter也类似,如下

@Internal
public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
    private static final long serialVersionUID = 1L;

    public StreamFilter(FilterFunction<IN> filterFunction) {
        super(filterFunction);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        if (((FilterFunction)this.userFunction).filter(element.getValue())) {
            this.output.collect(element);
        }

    }
}

StreamFilterStreamFlatMap都是继承了AbstractUdfStreamOperator 实现了OneInputStreamOperator接口,
你可以理解StreamFilterStreamFlatMap有共同的父类和接口,

四、类关系图

RichFlatMapFunction
在这里插入图片描述
RichFilterFunction
在这里插入图片描述

通过上面两张图就知道RichFlatMapFunctionRichFilterFunction都是相同的父类扩展下来的

StreamFlatMap
在这里插入图片描述
StreamFilter
在这里插入图片描述
通过上面的也清楚,StreamFlatMapStreamFilter都是相同的父类和接口,只是processElement方法的实现不一样

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/872288.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

批量为某跟空间下的pod添加env(例如标签)

1、修改kube-apiserver配置文件&#xff0c;添加PodNodeSelector参数&#xff1a; –enable-admission-pluginsPodNodeSelector # 多个参数&#xff0c;则用逗号隔开systemctl daemon-reload systemctl restart kube-apiserver 2、修改指定命名空间内容&#xff1a; kubec…

SprinBoot+Vue校园活动报名微信小程序的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue3.6 uniapp代码 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平…

C# NX二次开发-获取体全部面

使用 UF_MODL_ask_body_faces 或获取一个体的全部面&#xff1a; 代码&#xff1a; theUf.Modl.AskBodyFaces(body.Tag, out var face_list);face_list.Foreach(x > x.NxListing()); 免责声明&#xff1a; 只用于参考&#xff0c;如果有什么问题不要找我呀。

Web开发的艺术:C#开发中实现代码简洁性与规范性的终极指南

一、变量的要求 变量名 1.简短有意义: 合适: student_count&#xff0c;student_ids&#xff0c;student_enable_list, water_price 不合适: numberOfItemsInTheCart, totalPriceOfTheOrderInTheShoppingCart,temp, data,list 2.变量名之间不要太像: 合适: totalAmount, disc…

NetSuite AI 图生代码

去年的ChatGPT热潮期间&#xff0c;我们写过一篇文章说GTP辅助编程的事。 NetSuite GPT的辅助编程实践_如何打开netsuite: html script notes的视图-CSDN博客文章浏览阅读2.2k次&#xff0c;点赞4次&#xff0c;收藏3次。作为GPT综合症的一种表现&#xff0c;我们今朝来探究下…

Kettle 锁表原因及解决办法【源码级分析】

文章目录 背景源码分析锁表场景1:资源库锁表锁表场景2:写日志锁表在哪里配置的kettle_log_table?官方解释自增 SQL 获取 BatchI 原理解决自增 SQL 获取 BatchID背景 Kettle 7.1.0 经常出现锁表的情况,体现为在数据库里有一条锁表 SQL,然后整个 Kettle 都无法运行。😂�…

【Python】简单的数据类型——int、float、bool、str

目录 1. 整数类型 int 2. 浮点数类型 float 3. 布尔类型 bool 4. 字符串 str 5. 类型转换 5.1 隐式类型转换 5.2 显示类型转换 6. 输出 6.1 print函数 6.2 格式化输出 7. 输入 1. 整数类型 int a 10 print(type(a)) print(type(-2))<class int> <class i…

HBase 部署及shell操作

HBase 数据库 一、HBase 概述1.1 HBase 是什么HBase 的特点 二、HBase 模型及架构2.1 HBase 逻辑模型2.2 HBase 数据模型2.3 HBase 物理模型2.3.1 列簇物理模型2.3.2 Rowkey 字段排序2.3.3 Region 存储到不同节点2.3.4 Region 结构 2.4 HBase 基本架构 三、搭建 HBase 分布式集…

ArcGIS栅格裁剪与合并,制作等高线

1、下载高程数据&#xff08;DEM&#xff09; https://mp.weixin.qq.com/s/ewlUUVV0PwdcspPGtSdCog 项目区域对应的卫片&#xff0c;也可以在谷歌地图里看大致经纬度范围 2、项目区域 确定项目区域&#xff0c;例如某个县区范围 3、栅格裁剪与合并 将DEM多个栅格数据合并&#…

DevOps学习笔记

记录以下DevOps学习笔记&#xff0c;这里是笔记的入口汇总&#xff0c;可以直观的看到所有的笔记&#xff0c;还没有入口的部分&#xff0c;在下正在努力编写中。 gitlab jenkins docker 1.docker安装 2.数据卷、挂载 artifactory 1.artifactory安装 2.artifactory使用 …

hyperf json-rpc

安装 安装docker hyperf 安装 hyperf-rpc-server-v8 &#xff08;服务端&#xff09; docker run --name hyperf-rpc-server-v8 \ -v /www/docker/hyperf-rpc-server:/data/project \ -w /data/project \ -p 9508:9501 -it \ --privileged -u root \ --entrypoint /bin/sh \…

使用 streamlink 把 m3u8 转为 mp4

问题描述&#xff0c; 背景&#xff0c; 来源&#xff1a; 下载 m3u8 ts —> 转为mp4, 按照以往的做法&#xff0c; 就是使用 python requests 一步一步地下载 m3u8, ts&#xff0c; 然后转换。 但是个人写的东西&#xff0c;毕竟问题比较多。 而且&#xff0c; 但是&…

2024国赛数学建模备赛|30种常用的算法模型之最优算法-非线性规划

1.1 非线性规划的实例与定义 如果目标函数或约束条件中包含非线性函数&#xff0c;就称这种规划问题为非线性规划问题。一般说来&#xff0c;解非线性规划要比解线性规划问题困难得多。而且&#xff0c;也不象线性规划有 单纯形法这一通用方法&#xff0c;非线性规划目前还没…

C#上位机使用Microsoft.Office.Interop.Excel和EPPlus库对Excel或WPS表格进行写操作

C#上位机使用Microsoft.Office.Interop.Excel和EPPlus库对Excel或WPS表格进行写操作 一、使用Microsoft.Office.Interop.Excel库 1、通过NuGet包管理器添加引用 按照下图中红框所示进行操作。 需要安装Microsoft.Office.Interop.Excel包 添加Microsoft Office 16.0 Object …

[全网首发]2024国赛数学建模ABCE题完整思路+py(matlab)代码+成品论文参考+持续更新

AB题详细思路(含问题一问题二模型) CE题问题一代码思路已经写好[pythonmatlab两种都会更新 需要完整版的看这里&#xff1a; 点击链接加入群聊【2024数学建模国赛资料汇总】&#xff1a;http://qm.qq.com/cgi-bin/qm/qr?_wv1027&klZncBILk30DuPRI1Bd8X-3Djv7ZVZyAv&…

python开发VTK入门

首先用pip命令安装VTK的python库&#xff1b; 需要一些时间&#xff0c;安装完成如下&#xff1b; 基本示例代码&#xff0c; import vtkcube vtk.vtkCubeSource() cube.SetXLength(100.0) cube.SetYLength(200.0) cube.SetZLength(300.0)mapper vtk.vtkPolyDataMapper() ma…

持续集成与持续部署(CI/CD)的深入探讨

在现代软件开发中&#xff0c;持续集成&#xff08;CI&#xff09;和持续部署&#xff08;CD&#xff09;已成为不可或缺的实践。这些方法旨在加快软件交付的速度&#xff0c;同时提高软件的质量和稳定性。通过CI/CD&#xff0c;开发团队可以频繁地将代码更改集成到主分支&…

瑞芯微RK3566鸿蒙开发板Ubuntu虚拟机环境搭建教程,触觉智能Purple Pi OH主板

本文适用于Ubuntu虚拟机环境搭建教程学习&#xff0c;设备为触觉智能开发的瑞芯微RK3566开发板&#xff0c;型号Purple Pi OH。是华为Laval官方社区主荐的一款鸿蒙开发主板。支持Openharmony、安卓Android、Linux的Debian、Ubuntu系统。 该主板主要针对学生党&#xff0c;极客&…

[001-07-001].Redis7缓存双写一致性之更新策略探讨

1、面试题&#xff1a; 1.只要使用缓存&#xff0c;就可能会涉及到redis缓存与数据库双存储双写&#xff0c;只要是双写&#xff0c;就存在数据一致性问题&#xff0c;那么是如何解决数据一致性问题的2.双写一致性&#xff0c;你先动缓存redis还是数据库MySQL&#xff0c;哪一个…

OpenHarmony轻松玩转GIF数据渲染

OpenAtom OpenHarmony&#xff08;以下简称“OpenHarmony”&#xff09;提供了Image组件支持GIF动图的播放&#xff0c;但是缺乏扩展能力&#xff0c;不支持播放控制等。今天介绍一款三方库——ohos-gif-drawable三方组件&#xff0c;带大家一起玩转GIF的数据渲染&#xff0c;搞…