Flink任务如何跑起来之 2.算子 StreamOperator

Flink任务如何跑起来之 2.算子 StreamOperator

前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。

UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。

// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)

这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。

问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?

1. Flink算子

在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。

一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator

因此Function作为算子的一部分参与后续的数据加工。

算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:

  • 生命周期、状态和容错管理,主要是AbstractStreamOperator抽象类及其子类实现,以及未来的AbstractStreamOperatorV2抽象类。
  • 数据处理,主要是OneInputStreamOperatorTwoInputStreamOperatorMultipleInputStreamOperator接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。

StreamOperator完整的顶层抽象如下。

在这里插入图片描述

  • AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
    包含UDF的算子需继承其AbstractUdfStreamOperator子类
    对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
    将来将会使用AbstractStreamOperatorV2替换该基类
  • OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
  • TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
  • AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
    当前仅仅用于和MultipleInputStreamOperator一起配合使用。

OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。

MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下

// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {
    // 处理数据
    void processElement(StreamRecord<IN> element) throws Exception;
}

// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {

    // 处理双流输入中第一个流上的元素
    void processElement1(StreamRecord<IN1> element) throws Exception;

    // 处理双流输入中第二个流上的元素
    void processElement2(StreamRecord<IN2> element) throws Exception;
}

// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {
    List<Input> getInputs();
}

在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    // 封装Function
    protected final F userFunction;
    // 通过Function实现进行算子的实例化
    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = requireNonNull(userFunction);
        checkUdfCheckpointingPreconditions();
    }

    // 算子生命周期的相关方法,实际上调用Function的方法
    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(userFunction, new Configuration());
    }

    @Override
    public void finish() throws Exception {
        super.finish();
        if (userFunction instanceof SinkFunction) {
            ((SinkFunction<?>) userFunction).finish();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        FunctionUtils.closeFunction(userFunction);
    }
}

常用的实现类基本继承自AbstractUdfStreamOperator抽象类。

单流输入,如map、fliter、source、sink等实现类

在这里插入图片描述
sink算子有两个实现类,分别是SinkOperatorStreamSink<IN>。二者的关系为SinkOperatorStreamSink<RowData>的特例。

双流输入,如concat、intervalJoin等实现类

在这里插入图片描述
本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例,该方法如下

public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {
    if (operator == null) {
        return null;
    } else if (operator instanceof StreamSource
            && ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {
        // 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类
        return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);
    } else if (operator instanceof StreamSink
            && ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {
        // 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类
        return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);
    } else if (operator instanceof AbstractUdfStreamOperator) {
        return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);
    } else {
        return new SimpleOperatorFactory<>(operator);
    }
}

得到SimpleOperatorFactory实例后,在实际执行时,通过其createStreamOperator方法得到StreamOperator实例。

1.1. 算子生成示例

上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation的过程

// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))

// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(
        MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    // 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        OneInputStreamOperator<T, R> operator) {
    
    // 获取StreamMap对应的StreamOperatorFactory工厂类
    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {

    OneInputTransformation<T, R> resultTransform =
            new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    // 将StreamOperatorFactory工厂实例,传入到Transformation中
                    operatorFactory,
                    outTypeInfo,
                    environment.getParallelism());

    @SuppressWarnings({"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream =
            new SingleOutputStreamOperator(environment, resultTransform);

    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)的逻辑。

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
    // 以下3个属性从父类继承
    // 函数实例
    protected final F userFunction;
    // 结果输出
    protected transient Output<StreamRecord<OUT>> output;
    // 默认算子链生成策略
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        // 实例化StreamMap时,指定ALWAYS的算子链生成策略
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。

2. 算子链

Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。

算子的合并策略在ChainingStrateg枚举类中定义,详情如下

/**
 * StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 
 */
public enum ChainingStrategy {
    // 尽可能的将和上游算子链接到一起,大多数算子的默认值
    ALWAYS,
    // 当前算子不会上下游算子链接到一起
    NEVER,
    // 不会上游算子连接到一起,但是可以和下游算子链接到一起
    HEAD,
    // 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。
    HEAD_WITH_SOURCES;
    public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/707120.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

机器学习python实践——关于ward聚类分层算法的一些个人心得

最近在利用python跟着参考书进行机器学习相关实践&#xff0c;相关案例用到了ward算法&#xff0c;但是我理论部分用的是周志华老师的《西瓜书》&#xff0c;书上没有写关于ward的相关介绍&#xff0c;所以自己网上查了一堆资料&#xff0c;都很难说清楚ward算法&#xff0c;幸…

数据分析常用6种分析思路(下)

作为一名数据分析师&#xff0c;你又没有发现&#xff0c;自己经常碰到一些棘手的问题就没有思路&#xff0c;甚至怀疑自己究竟有没有好好学过分析&#xff1f; 在上篇文章里&#xff0c;我们讲到了数据分析中的流程、分类、对比三大块&#xff0c;今天&#xff0c;我们继续讲…

为Nanopi m1交叉编译opencv

为Nanopi m1交叉编译opencv 一、下载交叉编译器 根据之前的博客进行 二、下载opencv和必要库 sudo apt-get install cmake git libgtk2.0-dev pkg-config libavcodec-dev libavformat-dev libswscale-devgit clone https://github.com/opencv/opencv.git cd opencv三、进行编…

计算机网络实验(15):基于Socket的网络编程(附JAVA源码.txt)

一、实验名称 UDP客户服务器即时通信程序 二、实验目的&#xff1a; 掌握基于SOCKET的网络编程方法。 基于JAVA语言&#xff0c;编写一个SOCKET的即时通信小程序 三、实验内容和要求 实验内容&#xff1a; 基于JAVA语言&#xff0c;编写一个SOCKET的即时通信小程序 实…

docker一些常用命令以及镜像构建完后部署到K8s上

docker一些常用命令以及镜像构建完后部署到K8s上 1.创建文件夹2.删除文件3.复制现有文件内容到新建文件4.打开某个文件5.查看文件列表6.解压文件&#xff08;tar格式&#xff09;7.解压镜像8.查看镜像9.删除镜像10.查看容器11.删除容器12.停止运行容器13.构建镜像14.启动容器15…

Mongodb在UPDATE操作中使用$push向数组中插入数据

学习mongodb&#xff0c;体会mongodb的每一个使用细节&#xff0c;欢迎阅读威赞的文章。这是威赞发布的第69篇mongodb技术文章&#xff0c;欢迎浏览本专栏威赞发布的其他文章。如果您认为我的文章对您有帮助或者解决您的问题&#xff0c;欢迎在文章下面点个赞&#xff0c;或者关…

无需破解,基于AI翻译的Poedit翻译小助手PoeditHelper

背景&#xff1a; 应用在做国际化的时候是一件比较让人头大的事情&#xff0c;需要进行多国语言互译&#xff0c;做国际化的方式有很多&#xff0c;现阶段比较常用的方式是gettext的形式&#xff0c;并输出一个.po文件来做国际化&#xff0c;与之配套的有一款半开源软件叫Poedi…

【PB案例学习笔记】-21小大写金额转换

写在前面 这是PB案例学习笔记系列文章的第21篇&#xff0c;该系列文章适合具有一定PB基础的读者。 通过一个个由浅入深的编程实战案例学习&#xff0c;提高编程技巧&#xff0c;以保证小伙伴们能应付公司的各种开发需求。 文章中设计到的源码&#xff0c;小凡都上传到了gite…

晶振的匹配电容的计算

晶振 等效电路 C0是晶振的静态电容 L1是晶振的等效电感 C1是晶振的等效电容 R1是晶振的等效串联电阻 芯片内部已有反相器和负载电阻 计算公式 参考1 参考2

Blender骨骼创建

骨骼系统 建立 使用Shift A添加骨骼或在添加|骨架中添加一段骨骼 骨骼的三种模式 -物体模式&#xff1a;做动画&#xff0c;摆人物pose时在该模式 -编辑模式&#xff1a;进行骨骼搭建&#xff08;选择一段骨骼&#xff0c;然后按E挤出一段骨骼并进行调整&#xff09; -姿…

matlab 任意二维图像转点云

目录 一、概述二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫。 一、概述 给定任意一张图片,通过代码操作将图片转成点云。图像中包含大量可用信息,其中必不可少的信息为像素坐标和像素值,将像…

【乐吾乐2D可视化组态编辑器】导出HTML,下载离线部署包

乐吾乐2D可视化组态编辑器地址&#xff1a;https://2d.le5le.com/ 使用步骤 1. 从“文件”菜单导出HTML 导出为 HTML 需要一定的开发能力&#xff0c;后续不再维护&#xff0c;即将下线&#xff0c;推荐使用 下载离线部署包&#xff08;html&#xff09; 2. 解压 3. 下载后端…

Intellij IDEA开发Android项目打包生成APK

在 IntelliJ IDEA 左上方中选择 “Build” -> “Generate Signed Bundle / APK…”选择“APK”——“Next”——“Create New…”&#xff08;Password随便填123456即可&#xff09; “Next”——选择release&#xff08;APK生成后默认存放在本项目的release文件夹里&#x…

Leetcode 力扣119. 杨辉三角 II (抖音号:708231408)

给定一个非负索引 rowIndex&#xff0c;返回「杨辉三角」的第 rowIndex 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: rowIndex 3 输出: [1,3,3,1]示例 2: 输入: rowIndex 0 输出: [1]示例 3: 输入: rowIndex 1 输出: [1,1]提示…

Cisco Packet Tracer实验(二)

二、用交换机构建 LAN 构建物件如下&#xff1a; 四个PC 两个交换机 一个Multi Switch多功能拓展控制器 连线必须是这个直线&#xff01;&#xff01;&#xff01;不是虚线 最后实现效果如下&#xff1a; 全部的线是绿的&#xff0c;就表示是通的。 尝试一下&#xff0c;看PC…

SpringBoot系列——使用Spring Cache和Redis实现查询数据缓存

文章目录 1. 前言2. 缓存2.1 什么是缓存2.2 使用缓存的好处2.3 缓存的成本2.4 使用Spring Cache和Redis的优点 3. Spring Cache基础知识3.1 Spring Cache的核心概念3.2 Spring Cache的注解3.2.1 SpEL表达式3.2.2 Cacheable3.2.3 CachePut3.2.4 CacheEvict 4. 实现查询数据缓存4…

量化交易入门——盘口

今天接着上一期讲解开盘定势的种类&#xff0c;在讲之前&#xff0c;科普一下“盘口五档”的成交知识。 每个炒股软件上&#xff0c;都会有某只个股的成交信息&#xff0c;在其中会出现一个五档的行情列表&#xff0c;里面列出了买家和卖家各五个价格及其对应的数量。这五档价…

深入浅出 Go 语言的 GPM 模型(Go1.21)

引言 在现代软件开发中&#xff0c;有效地利用并发是提高应用性能和响应速度的关键。随着多核处理器的普及&#xff0c;编程语言和框架如何高效、简便地支持并发编程&#xff0c;成为了软件工程师们评估和选择工具时的一个重要考量。在这方面&#xff0c;Go 语言凭借其创新的并…

基于51单片机的教室智能照明控制系统

一.硬件方案 本系统以51单片机作为控制模块的核心部件&#xff0c;采用热释红外人体传感器检测人体的存在&#xff0c;采用光敏三极管构成的电路检测环境光的强度&#xff1b;根据教室合理开灯的条件&#xff0c;通过对人体存在信号和环境光信号的识别与判断&#xff0c;完成对…

MySQL的增删查改(CRUD)

目录 一.CRUD 1.什么是CRUD 2.CRUD的特点 二.新增&#xff08;Create&#xff09; 单列插入全行数据 表的复制 额外小知识 三.阅读(Read) 1.全表查询指定列查询 2.查询字段为表达式 3.别名 ​编辑 4.去重 5.排序 1.根据列名进行排序 2.使用表达式及别名进行排序…