Flink任务如何跑起来之 1.DataStream和Transformation

Flink任务如何跑起来之 1.DataStream和Transformation

1. 滥觞

在使用Flink完成业务功能之余,有必要了解下我们的任务是如何跑起来的。知其然,知其所以然。

既然重点是学习应用程序如何跑起来,那么应用程序的内容不重要,越简单越好。
WordCount示例作为学习数据引擎时hello word程序,再合适不过。接下来便以任务执行顺序为线索开启对源码逐步学习。

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 初始化执行环境
        Configuration configuration = new Configuration();
        configuration.setString("rest.port", "9091");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        env.setParallelism(1);

        // 业务逻辑转换
        DataStream<String> text = env.fromCollection(Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan")).name("zl-source");
        DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .name("counter");
        counts.print().name("print-sink");

        // 执行应用程序
        env.execute("WordCount");
    }
}

为了使示例代码足够纯粹(直接复制粘贴后即可跑起来的那种),因此在示例中直接使用List数据作为Source。

最后,计划将自己学习的过程以系列文档的形式作为记录。同时作为自己学习过程的记录,可能存在错误或片面理解,欢迎一起讨论。

2. 头疼的“角色”

在学习源码或查阅资料的同时,以下单词(但不限于)一定会频繁出现,它们或者直接对应flink源码中的接口、类名,或者是一些概念名称。初次看到难免让人抓狂。现在先对这些单词混个脸熟。

Client
JobManager/JobMaster
TaskManager/TaskExecutor
Transformation
StreamOperator
StreamGraph
JobGraph
ExecutionGraph
Task
StreamTask
……

3. 宏观视角

当任务开始执行后,便可以在WebUI上查看其对应的物理执行拓扑,即Task DAG。从我们编写的应用程序代码到Task DAG势必经历了复杂的解析转换操作,这个过程大体如下所示。

在这里插入图片描述

我们编写的应用程序代码首先会转化为Transformation,该实例将作为Flink世界中的起点,开启了之后一系列“旅程”。

4. env.execute()方法做了什么?

在使用DataStream API编写应用程序时,无论业务逻辑如何如何的复杂,但整体结构大致由三部分构成,即

// 1.初始化执行环境
StreamExecutionEnvironment env = ;
// 2.业务逻辑转换,即一系列的DataStream转化
DataStream source = ;
// 3.env.execute()
env.execute();

既然最后必须执行 env.execute()方法,那么首先了解下execute都执行了那些操作。

基于1.16版本的源码,并只保留了源码中的关键逻辑。

// 方法1
public JobExecutionResult execute(String jobName) throws Exception {
    final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
    // 生成StreamGraph,最终调用方法4,通过StreamGraphGenerator生成StreamGraph
    StreamGraph streamGraph = getStreamGraph();
    // ...
    try {
        // 调用方法2
        return execute(streamGraph);
    } catch (Throwable t) {
        // ...
    }
}
// 方法2
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    // 调用方法3,通过StreamGraph最终得到JobClient
    final JobClient jobClient = executeAsync(streamGraph);
    try {
        final JobExecutionResult jobExecutionResult;
        // ...
        jobListeners.forEach(
                jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
        return jobExecutionResult;
    } catch (Throwable t) {
        // ...
    }
}
// 方法3
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    // 根据启动环境,得到对应环境的Executor实现
    // 如miniCluster环境则对应LocalExecutor
    final PipelineExecutor executor = getPipelineExecutor();
    // 在具体的executor.execute方法中,将StreamGraph先转化成JobGraph,在将JobGraph提交到JobManager中
    CompletableFuture<JobClient> jobClientFuture =
            executor.execute(streamGraph, configuration, userClassloader);
    try {
        JobClient jobClient = jobClientFuture.get();
        jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
        collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
        collectIterators.clear();
        return jobClient;
    } catch (ExecutionException executionException) {
        // ...
    }
}
// 方法4
private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {
    synchronizeClusterDatasetStatus();
    // 根据Transformation生成StreamGraph
    return getStreamGraphGenerator(transformations).generate();
}

通过上述源码调用链可以,完成从DataStream API->Transformation->StreamGraph->JobGraph的转化。最后将JobGraph提交到了JobManager中并,执行后续操作。

从上述方法4getStreamGraph(List<Transformation<?>> transformations)可知,StreamGraph由Transformation演变而来,此处不禁会产生一个新的疑问,Transformation又从何而来?

WordCount示例代码中并没有与Transformation直接相关的代码。通过查看getSreamGraph方法的完成调用链可知其入参直接来自是StreamExecutionEnvironment类中的transformations成员属性值。在应用程序第一步便生成了StreamExecutionEnvironment的实例,接下来通过env得到DataStream并进行了一系列的转化操作,而在最后的execute方法中便已直接使用transformations属性值了,那么该属性中一定是前面2个过程中实际赋值的。

protected final List<Transformation<?>> transformations = new ArrayList<>();

5. Transformation何时生成?

从StreamExecutionEnvironment的源码中可知,transformations属性只有addOperator方法会执行集合的add操作,其余地方均为集合的get操作。
然而addOperator方法有诸多调用方,且均为其他类中的调用,继续往上查看调用方有些困难,因此这里暂时记下addOperator方法唯一对transformations集合中执行add操作的结论。

// 该方法不适合用户使用。创建operator的api方法必须调用此方法
@Internal
public void addOperator(Transformation<?> transformation) {
    Preconditions.checkNotNull(transformation, "transformation must not be null.");
    this.transformations.add(transformation);
}

通过查看StreamExecutionEnvironment实例的创建过程,可以发现在创建过程中并无transformations的add操作,因此是在DataStream转换操作中对transformations执行了add操作。

5.1. DataStream

在Flink中使用DataStream表示数据流。其仅用于表达业务转化逻辑,实际上并没有真正的存储数据。

DataSteam是顶层封装类,其子类如下

在这里插入图片描述
DataStream类中只有两个成员属性,分别是StreamExecutionEnvironment和Transformation,并在构造方法中对其进行初始化。因此实例化DataStream的同时除执行环境外,还必须传入Transformation的实例。

public class DataStream<T> {
    protected final StreamExecutionEnvironment environment;
    protected final Transformation<T> transformation;

    public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {
        this.environment =
                Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
        this.transformation =
                Preconditions.checkNotNull(
                        transformation, "Stream Transformation must not be null.");
    }
    // ...
}

回到WordCount示例代码中,从集合到DataStream的过程,封装示意如下。

在这里插入图片描述

注意,Transformation中并不是直接持有了AbstractUdfStreamOperator的引用,而是对应的工厂。

源码中关键步骤如下

// 步骤1,从List到Function
public <OUT> DataStreamSource<OUT> fromCollection(
        Collection<OUT> data, TypeInformation<OUT> typeInfo) {
    // ...
    // 创建SourceFunction实例,SourceFunction是Function的实现
    SourceFunction<OUT> function = new FromElementsFunction<>(data);
    return addSource(function, "Collection Source", typeInfo, Boundedness.BOUNDED)
            .setParallelism(1);
}

// 步骤2,从Function到StreamOperator
private <OUT> DataStreamSource<OUT> addSource(
        final SourceFunction<OUT> function,
        final String sourceName,
        @Nullable final TypeInformation<OUT> typeInfo,
        final Boundedness boundedness) {
    // ...
    // 创建StreamSource实例,StreamSource是AbstractUdfStreamOperator的子类,Flink中算子的表示
    final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
    return new DataStreamSource<>(
            this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}

// 步骤3,从StreamOperator到Transformation,再到DataStream
public DataStreamSource(
        StreamExecutionEnvironment environment,
        TypeInformation<T> outTypeInfo,
        StreamSource<T, ?> operator,
        boolean isParallel,
        String sourceName,
        Boundedness boundedness) {
    super(
            environment,
            // 创建Transformation实例,Transformation是PhysicalTransformation的子类
            new LegacySourceTransformation<>(
                    sourceName,
                    // 将StreamSource封装到Transformation中
                    operator,
                    outTypeInfo,
                    environment.getParallelism(),
                    boundedness));

    // ...
}

继续查看DataStream的map操作可以可以发现,核心流程和上述由集合创建DataStream的过程基本一致:

  • 首先创建Function实例
  • 其次由Function实例创建AbstractUdfStreamOperator实例
  • 然后将AbstractUdfStreamOperator实例封装到Transformation实例中
  • 最后由Transformation和StreamExecutionEnvironment实例创建DataStream实例

不同之处在于,map操作最后将得到的PhysicalTransformation实例添加到StreamExecutionEnvironment实例中的transformations集合中去了。这点差异其实和Transformation实例表示的含义有关,放在文章末尾解释。

protected <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {
    // ...
    OneInputTransformation<T, R> resultTransform =
            new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    operatorFactory,
                    outTypeInfo,
                    environment.getParallelism());
    SingleOutputStreamOperator<R> returnStream =
            new SingleOutputStreamOperator(environment, resultTransform);

    // 区别:添加Transformation到StreamExecutionEnvironment中
    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

但并不是全部的DataStream转化操作都需要经历上述将Function实例封装成AbstractUdfStreamOperator实例,然后将AbstractUdfStreamOperator实例封装到PhysicalTransformation实例的过程。如示例代码中的keyBy和sum操作。其中keyBy并未直接涉及Function,而sum操作直接将得到的SumAggregator函数实例封装到了ReduceTransformation实例中,然后由ReduceTransformation实例得到DataStream实例。

5.2. Transformation

DataStream面向开发者,而Transformation面向flink内核。
每个DataStream实例中都包含一个Transformation实例,表示当前Datastream从上游的DataStream使用该Transformation而来。而所有DataStream中Transformation又都添加到了StreamExecutionEnvironment实例中的transformations集合中去,用于接下来的StreamGraph实例的生成。
Transformation中记录了上游的数据来源,但其并关心数据的物理来源、序列化、转发等问题。

Transformatio是顶层抽象类,有众多的子类,涵盖了DataStream的所有转换,其直接子类如下,可以分为两大类

  • PhysicalTransformation,将会转换成后续graph中节点信息
  • 非PhysicalTransformation,将会转换成后续graph中的边信息

在这里插入图片描述
Transformation中属性如下所示,其中Optional<SlotSharingGroup>表示共享槽位信息,只有开启了允许共享槽位后,该属性才会被设置值。

其构造方法如下,除name外还需要输出类型和并行度两个参数。

public Transformation(String name, TypeInformation<T> outputType, int parallelism) {
    this.id = getNewNodeId();
    this.name = Preconditions.checkNotNull(name);
    this.outputType = outputType;
    this.parallelism = parallelism;
    this.slotSharingGroup = Optional.empty();
}

PhysicalTransformation仅在其父类的基础上增加了设置ChainingStrategy的方法,用于表示生成算子链的策略。

@Internal
public abstract class PhysicalTransformation<T> extends Transformation<T> {
    PhysicalTransformation(String name, TypeInformation<T> outputType, int parallelism) {
        super(name, outputType, parallelism);
    }

    /** Sets the chaining strategy of this {@code Transformation}. */
    public abstract void setChainingStrategy(ChainingStrategy strategy);
}

PhysicalTransformation中有众多的实现子类,全部子类继承关系如下。

在这里插入图片描述
其中以下几个子类出场频率相对更高一些,其他子类只有我们的业务逻辑比较复杂时才会用到。

  • LegacySourceTransformation 表示Source的Transformation
  • LegacySinkTransformation 表示Sink的Transformation
  • SourceTransformation
  • SinkTransformation
  • OneInputTransformation 表示单个输入流的Transformation,如常见的map、flatMap、fliter等
  • TwoInputTransformation 表示两个输入流的Transformation,如concat

疑问:为什么Source和Sink都各自分别有两个Transformation子类?
通过名称也可以看出一些端倪,新老两种实现。
在1.14版本之前,分别通过env.addSource(SourceFunction)DataStream.addSink(SinkFunction)方法生成source和sink
从1.14版本开始新增了env.fromSource(Source)DataStream.sinkTo(Sink)的方式生成source和sink。
新旧方法中入参类型不同,因此导致了两种不同的Transformation实现,从各自的实现类中也可以体现这一点,如下所示。

public class LegacySourceTransformation<T> extends PhysicalTransformation<T>
        implements WithBoundedness {
    // sourceFunction的引用
    private final StreamOperatorFactory<T> operatorFactory;
    // ...
}

public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
        extends PhysicalTransformation<OUT> implements WithBoundedness {
    // source的引用
    private final Source<OUT, SplitT, EnumChkT> source;
    // ...
}

public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {

    private final Transformation<T> input;
    // sinkFunction的引用
    private final StreamOperatorFactory<Object> operatorFactory;
    // ...
}

public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<OutputT> {
    private final DataStream<InputT> inputStream;
    // sink的引用
    private final Sink<InputT> sink;
    private final Transformation<InputT> input;
    // ...
}

Source作为整个数据流的头部,不存在上游,因此其Transformation实现中没有上游Transformation的引用,除此之外其余的Transformation子类中,均持有一个表示上游Transformation的引用,如上述sink中的input属性。

最后解释下,前面提到的为什么没有将表示Source的DataStream中的Transformation加入到env中表示Transformation的集合中,而接下来的转化中,将对应的Transformation加入到了env中。因为Source作为数据源的头部,不会存在上游,而Source作为其他DataSteam的上游,一定会加入到其Transformation的input中,因此没必要单独将Source的transformation加入到env中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/686400.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

动手学深度学习——Kaggle小白入门

1. kaggle注册 注册网址&#xff1a;https://www.kaggle.com 注册账号不需要代理&#xff0c;但手机号验证需要代理。如果要使用GPU或TPU&#xff0c;则需要进行手机号验证。 手机号验证位置&#xff1a;右上角头像的settings界面。 手机号验证时会有几个问题&#xff1a; …

【栈】1096. 花括号展开 II

本文涉及知识点 栈 LeetCode 1096. 花括号展开 II 如果你熟悉 Shell 编程&#xff0c;那么一定了解过花括号展开&#xff0c;它可以用来生成任意字符串。 花括号展开的表达式可以看作一个由 花括号、逗号 和 小写英文字母 组成的字符串&#xff0c;定义下面几条语法规则&…

服务器数据恢复—raid5阵列磁盘坏道离线导致数据丢失的数据恢复案例

服务器数据恢复环境&#xff1a; 某品牌x3850 X5服务器&#xff0c;服务器上有一组由5块硬盘组建的raid5阵列&#xff08;包含一块热备盘&#xff09;&#xff0c;安装linux操作系统&#xff0c;运行oracle数据库。 服务器故障&#xff1a; 服务器上raid5阵列中两块硬盘由于未…

【Linux操作系统】进程状态(1)

&#x1f389;博主首页&#xff1a; 有趣的中国人 &#x1f389;专栏首页&#xff1a; Linux &#x1f389;其它专栏&#xff1a; C初阶 | C进阶 | 初阶数据结构 小伙伴们大家好&#xff0c;本片文章将会讲解 Linux操作系统 进程状态 的相关内容。 如果看到最后您觉得这篇文章…

HDFS 之 DataNode 核心知识点

优质博文&#xff1a;IT-BLOG-CN 一、DataNode工作机制 DataNode工作机制&#xff0c;如下所示&#xff1a; 【1】一个数据块在 DataNode上以文件形式存储在磁盘上&#xff0c;包括两个文件&#xff0c;一个是数据本身&#xff0c;一个是元数据包括数据块的长度&#xff0c…

Codeforces Round 951 (Div. 2)(A~C)

目录 A. Guess the Maximum B. XOR Sequences C. Earning on Bets 这次比赛也是打的稀碎了&#xff0c;第二个少个break检查了15分钟才检查出来&#xff0c;第三个符号搞错了&#xff0c;错了两次&#xff0c;道心直接破碎了 A. Guess the Maximum 题意&#xff1a;我们对…

计算机组成原理-cache详解

一、Cache的概念和原理 1、cache原理 2、cache性能分析 一道例题 3、cache和主存数据交换的单位 每次访问到的主存块会立即放入cache中 小结 二、cache和主存之间的映射关系 全相联映射 全相联访存过程 直接映射 组相联映射 小结 三、cache替换算法 在直接映射中&#xff0c…

webgl_framebuffer_texture

ThreeJS 官方案例学习&#xff08;webgl_framebuffer_texture&#xff09; 1.效果图 2.源码 <template><div><div id"container"></div><div id"selection"><div></div></div></div> </templa…

【sklearn】【逻辑回归1】

学习笔记来自&#xff1a; 所用的库和版本大家参考&#xff1a; Python 3.7.1Scikit-learn 0.20.1 Numpy 1.15.4, Pandas 0.23.4, Matplotlib 3.0.2, SciPy 1.1.0 1 概述 1.1 名为“回归”的分类器 在过去的四周中&#xff0c;我们接触了不少带“回归”二字的算法&#xf…

IDEA破解后的配置

以下所有操作都要求进入全局setting而不是某一个项目的setting 进入全局Setting File→close project 进入欢迎页面 低版本 然后点击Setting 关闭自动更新 不关闭有可能会破解失败 Appearance & Behavior->System Settings->Updates下取消Automatically chec…

k8s 对外服务之 Ingress(HTTPS/HTTP 代理访问 以及Nginx 进行 BasicAuth )

目录 一 Ingress HTTP 代理访问虚拟主机 &#xff08;一&#xff09;原理 &#xff08;二&#xff09;实验 1&#xff0c;准备 2&#xff0c;创建虚拟主机1资源 3&#xff0c;创建虚拟主机2资源 4&#xff0c;创建ingress资源 5&#xff0c;查看相关参数 6&#xff0…

ai写作工具哪些好用,分享4种ai智能写作软件

当我们踏入这个充满创意与无限可能的自媒体时代&#xff0c;ai写作工具就如同一盏盏闪耀的明灯&#xff0c;照亮我们的创作之路。那么&#xff0c;市面上的ai写作工具哪些好用呢&#xff1f;对于这个问题&#xff0c;今天&#xff0c;本文就带领大家一同揭开那神秘的面纱&#…

编译等底层知识

目录 一. GCC命令语句大全 二. GCC编译4个阶段 三. makefile的使用 四. CMake 五. GNU工具链开发流程图 六. Keil中的地址段 七. 静态库和动态库 一. GCC命令语句大全 -c只编译源文件&#xff0c;生成目标文件&#xff08;.o 文件&#xff09;&#xff0c;不进行链接。…

如何查询公网IP?

在互联网中&#xff0c;每个设备都有一个唯一的公网IP地址&#xff0c;用于标识设备在全球范围内的位置。查询公网IP是一个常见的需求&#xff0c;无论是用于远程访问、网络配置还是其他目的&#xff0c;了解自己的公网IP地址都是很有必要的。本文将介绍几种常见的方法来查询公…

Java并发编程中Future使用

Future类有什么用 Future类是异步思想的典型应用&#xff0c;主要用在一些需要执行耗时任务的场景&#xff0c;避免程序一直原地等待耗时任务完成&#xff0c;执行效率太低。具体来说&#xff1a;**当我们执行某一个耗时的任务时&#xff0c;可以将这个耗时任务交给一个子线程…

python3 -m http.server 检查打包前端的项目

python3 -m http.server这是 Python 提供的一个内置的简单 HTTP 服务器。当你在终端中运行 python3 -m http.server 命令时(在对应的打包目录比如dist目录)&#xff0c;Python 会启动一个 HTTP 服务器&#xff0c;它会将当前工作目录下的文件作为静态文件提供给浏览器。这个服务…

selenium非全新的方式同时启动多个浏览器又互不影响的一种实现方法,欢迎讨论!

最近在做模拟浏览器批量定时自动点击实现批量操作功能&#xff0c;主要使用selenium&#xff0c;但是发现selenium直接调用本地浏览器&#xff0c;启动的是一个全新的&#xff08;与手动打开的不一致&#xff09;&#xff0c;网站可以检测到&#xff0c;每次都要双重验证(密码登…

UnityXR Interaction Toolkit 如何使用XRHand手部识别

前言 Unity的XR Interaction Toolkit是一个强大的框架,允许开发者快速构建沉浸式的VR和AR体验。随着虚拟现实技术的发展,手部追踪成为了提升用户交互体验的关键技术之一。 本文将介绍如何在Unity中使用XR Interaction Toolkit实现手部识别功能。 准备工作 在开始之前,请…

论文阅读 A Distributional Framework for Data Valuation

本论文解决的问题 量化数据价值&#xff08;机器学习模型训练中各个数据点的贡献&#xff09; 避免数据价值受到其所处数据集的影响&#xff0c;使数据点的估值更加稳定、一致 变量假设 假设 D 表示一个在全集 Z 上的数据分布。对于监督学习问题&#xff0c;我们通常认为 Z…

RapidMiner数据挖掘4 —— 决策树

0. 序章 0.1 文本说明 所有应用程序操作的名称和编程说明都以黄色背景书写&#xff0c;问题以蓝色背景书写&#xff0c;以方便他们在文本中识别。 在整个课程中&#xff0c;请逐步遵循所有说明&#xff0c;并确保获得预期结果&#xff0c;然后再继续下一部分或问题。 通过在Ub…