Flink作业执行之 2.算子 StreamOperator

Flink作业执行之 2.算子 StreamOperator

前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。

UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。

// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)

这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。

问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?

1. Flink算子

在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。

一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator

因此Function作为算子的一部分参与后续的数据加工。

算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:

  • 生命周期、状态和容错管理,主要是AbstractStreamOperator抽象类及其子类实现,以及未来的AbstractStreamOperatorV2抽象类。
  • 数据处理,主要是OneInputStreamOperatorTwoInputStreamOperatorMultipleInputStreamOperator接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。

StreamOperator完整的顶层抽象如下。

在这里插入图片描述

  • AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
    包含UDF的算子需继承其AbstractUdfStreamOperator子类
    对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
    将来将会使用AbstractStreamOperatorV2替换该基类
  • OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
  • TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
  • AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
    当前仅仅用于和MultipleInputStreamOperator一起配合使用。

OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。

MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下

// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {
    // 处理数据
    void processElement(StreamRecord<IN> element) throws Exception;
}

// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {

    // 处理双流输入中第一个流上的元素
    void processElement1(StreamRecord<IN1> element) throws Exception;

    // 处理双流输入中第二个流上的元素
    void processElement2(StreamRecord<IN2> element) throws Exception;
}

// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {
    List<Input> getInputs();
}

在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    // 封装Function
    protected final F userFunction;
    // 通过Function实现进行算子的实例化
    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = requireNonNull(userFunction);
        checkUdfCheckpointingPreconditions();
    }

    // 算子生命周期的相关方法,实际上调用Function的方法
    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(userFunction, new Configuration());
    }

    @Override
    public void finish() throws Exception {
        super.finish();
        if (userFunction instanceof SinkFunction) {
            ((SinkFunction<?>) userFunction).finish();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        FunctionUtils.closeFunction(userFunction);
    }
}

常用的实现类基本继承自AbstractUdfStreamOperator抽象类。

单流输入,如map、fliter、source、sink等实现类

在这里插入图片描述
sink算子有两个实现类,分别是SinkOperatorStreamSink<IN>。二者的关系为SinkOperatorStreamSink<RowData>的特例。

双流输入,如concat、intervalJoin等实现类

在这里插入图片描述
本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例,该方法如下

public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {
    if (operator == null) {
        return null;
    } else if (operator instanceof StreamSource
            && ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {
        // 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类
        return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);
    } else if (operator instanceof StreamSink
            && ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {
        // 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类
        return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);
    } else if (operator instanceof AbstractUdfStreamOperator) {
        return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);
    } else {
        return new SimpleOperatorFactory<>(operator);
    }
}

得到SimpleOperatorFactory实例后,在实际执行时,通过其createStreamOperator方法得到StreamOperator实例。

1.1. 算子生成示例

上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation的过程

// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))

// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(
        MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    // 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        OneInputStreamOperator<T, R> operator) {
    
    // 获取StreamMap对应的StreamOperatorFactory工厂类
    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {

    OneInputTransformation<T, R> resultTransform =
            new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    // 将StreamOperatorFactory工厂实例,传入到Transformation中
                    operatorFactory,
                    outTypeInfo,
                    environment.getParallelism());

    @SuppressWarnings({"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream =
            new SingleOutputStreamOperator(environment, resultTransform);

    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)的逻辑。

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
    // 以下3个属性从父类继承
    // 函数实例
    protected final F userFunction;
    // 结果输出
    protected transient Output<StreamRecord<OUT>> output;
    // 默认算子链生成策略
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        // 实例化StreamMap时,指定ALWAYS的算子链生成策略
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。

2. 算子链

Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。

算子的合并策略在ChainingStrateg枚举类中定义,详情如下

/**
 * StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 
 */
public enum ChainingStrategy {
    // 尽可能的将和上游算子链接到一起,大多数算子的默认值
    ALWAYS,
    // 当前算子不会上下游算子链接到一起
    NEVER,
    // 不会上游算子连接到一起,但是可以和下游算子链接到一起
    HEAD,
    // 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。
    HEAD_WITH_SOURCES;
    public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/707998.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式高性能计算 (HPC)的工作负载管理平台和作业调度程序—— IBM Spectrum® LSF® Suites

IBM Spectrum LSF Suites 是面向分布式高性能计算 (HPC) 的工作负载管理平台和作业调度程序。基于 Terraform 的自动化现已可用&#xff0c;该功能可在 IBM Cloud 上为基于 IBM Spectrum LSF 的集群供应和配置资源。 借助我们针对任务关键型 HPC 环境的集成解决方案&#xff0…

uni-app利用renderjs实现安卓App上jssip+freeswitch+webrtc音视频通话功能

效果图 前置知识 利用renderjs在app端加载for web库 JsSIPFreeSwitchVue实现WebRtc音视频通话 原始模块 <template><viewclass"test-sip":userExtension"userExtension":change:userExtension"JsSIP.handleUserExtenSionChange":tar…

1+x(Java)中级题库易混淆理论题(五)

Java 语言具有许多优点和特点&#xff0c;多线性反映了 Java 程序并行机制的特点 字符流与字节流的区别在于每次读写的字节数不同 如果需要从文件中读取数据&#xff0c;则可以在程序中创建FileInputStream的对象 void 的含义是方法没有返回值 设 x1&#xff0c;y2&#xf…

SpringBoot集成slf4j日志配置

目录 前言 1、slf4j概述 2、pom.xml的日志依赖 3、application.yml的日志配置 4、logback.xml配置文件定义 5、logback.xml配置文件解析 5.1 定义日志的存储路径 5.2 定义日志的输出格式 5.3 定义控制台输出 5.4 定义日志相关参数 5.5 定义日志的输出级别 6、测试日…

AI大模型时代:一线大厂为何竞相高薪招揽AI产品经理?

前言 在当今日新月异的科技浪潮中&#xff0c;人工智能&#xff08;AI&#xff09;技术已经渗透至各行各业&#xff0c;成为推动社会进步的重要力量。在这样的背景下&#xff0c;AI产品经理这一新兴职位逐渐崭露头角&#xff0c;成为各大企业竞相争夺的稀缺人才。那么&#xf…

​带三维重建和还原的PACS源码 医院PACS系统源码 PACS系统源码医院PACS系统源码 C/S架构 带三维重建和还原​

带三维重建和还原的PACS源码 医院PACS系统源码 PACS系统源码医院PACS系统源码 C/S架构 带三维重建和还原 ​ 主要的任务就是把日常产生的各种医学影像&#xff08;包括核磁&#xff0c;CT&#xff0c;超声&#xff0c;各种X光机&#xff0c;各种红外仪、显微仪等设备产生的图…

cleanmymacX和腾讯柠檬到底哪个好用 2024最新使用测评

CleanMyMac X和腾讯柠檬都是Mac系统清理软件&#xff0c;各有其特点和优势&#xff0c;选择哪个更好用取决于用户的具体需求和使用习惯。 经常有新关注的粉丝问&#xff0c;同样做为垃圾清理软件&#xff0c;付费CleanMyMac和免费的柠檬清理哪个更好用&#xff1f;其实&#xf…

【AI绘画】Stable Diffusion 3开源

Open Release of Stable Diffusion 3 Medium 主要内容 Stable Diffusion 3是Stability AI目前为止最先进的文本转图像开放源代码算法。 这款模型的小巧设计使其完美适合用于消费级PC和笔记本电脑&#xff0c;以及企业级图形处理单元上运行。它已经满足了标准化的文字转图像模…

HTML静态网页成品作业(HTML+CSS)—— 家乡山西介绍网页(3个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有6个页面。 二、作品演示 三、代…

使用Python保护或加密Excel文件的7种方法

目录 安装Python Excel库 Python 使用文档打开密码保护 Excel 文件 Python 使用文档修改密码保护 Excel 文件 Python 将 Excel 文件标记为最终版本 Python 保护 Excel 工作表 Python 在保护 Excel 工作表的同时允许编辑某些单元格 Python 锁定 Excel 工作表中的特定单元…

移植fatfs制作内存文件系统

本文目录 1、引言2、环境准备2.1 下载源码2.2 创建一个工程 3、移植3.1 修改配置3.2 修改diskio.c3.3 编写RAM驱动3.4 编写验证代码 4、验证 文章对应视频教程&#xff1a; 暂无&#xff0c;可以关注我的B站账号等待更新。 点击图片或链接访问我的B站主页~~~ 1、引言 在嵌入式…

GaN VCSEL:工艺革新引领精准波长控制新纪元

日本工程师们凭借精湛的技艺&#xff0c;开创了一种革命性的生产工艺&#xff0c;让VCSEL的制造达到了前所未有的高效与精准。这一成果由名城大学与国家先进工业科学技术研究所的精英们联手铸就&#xff0c;将氮化镓基VCSELs的商业化进程推向了新的高峰。它们将有望成为自适应前…

【Effective Web】常见的css居中方式

CSS居中方式 水平居中 text-align:center 适用范围&#xff1a;容器中都是行内元素 缺点&#xff1a;容器内所有元素都会居中&#xff0c;如果是文本描述需要左对齐&#xff0c;需要增加text-align:left覆盖 margin: 0 auto 适用范围&#xff1a;容器宽度固定。子元素宽度…

Linux-黑马程序员

目录 一、前言二、初识Linux1、操作系统&#xff08;1&#xff09;硬件和软件&#xff08;2&#xff09;操作系统 2、Linux3、虚拟机4、FinalShell5、WSL6、虚拟机快照 三、Linux基础命令1、Linux的目录结构2、Linux命令入门&#xff08;1&#xff09;Linux命令基础格式&#x…

优雅谈大模型11:Mistral

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调或者LLM背后的基础模型新阅读。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;则提…

tcp协议机制的总结(可靠性,提高性能),基于tcp的应用层协议,用udp如何实现可靠传输

目录 总结 引入 可靠性 ​编辑 分析 三次握手 提高性能 其他 常见的基于tcp应用层协议 用udp实现可靠传输 总结 引入 为什么tcp要比udp复杂的多? 因为它既要保证可靠性,又要兼顾性能 可靠性 分析 其中,序列号不止用来排序,还可以用在重传时去重 确认应答是机制中的…

618有什么值得推荐?2024数码产品推荐,轻松拿捏选购!

随着618购物节即将来临&#xff0c;你是否已被琳琅满目的商品所吸引&#xff0c;难以抉择&#xff1f;团团特意为你筛选出一系列经过亲身试验的优质好物&#xff0c;旨在帮助你在这场购物盛宴中迅速锁定心仪之选。这些推荐不仅走在时尚的前沿&#xff0c;更能满足你日常生活的各…

AUTOSAR学习

文章目录 前言1. 什么是autosar&#xff1f;1.1 AP&#xff08;自适应平台autosar&#xff09;1.2 CP&#xff08;经典平台autosar)1.3 我的疑问 2. 为什么会有autosar3.autosar的架构3.1 CP的架构3.1.1 应用软件层3.1.2 运行时环境3.1.3 基础软件层 3.2 AP的架构 4. 参考资料 …

软件测试分类介绍

大家好&#xff0c;软件测试是确保软件质量的关键环节之一&#xff0c;通过对软件系统的各个方面进行测试&#xff0c;可以发现和解决潜在的问题&#xff0c;提高软件的稳定性、可靠性和用户满意度。在软件测试领域&#xff0c;根据测试的目的、方法和对象的不同&#xff0c;可…

LLM大模型的挑战与未来,挑战大但是机遇更大!

大模型必然是未来很长一段时间我们工作生活的一部分&#xff0c;而对于这样一个与我们生活高度同频互动的“大家伙”&#xff0c;除了性能、效率、成本等问题外&#xff0c;大规模语言模型的安全问题几乎是大模型所面对的所有挑战之中的重中之重&#xff0c;机器幻觉是大模型目…