Flink JobGraph构建过程

文章目录

  • 前言
  • JobGraph创建的过程
  • 总结


前言

StreamGraph构建过程中分析了StreamGraph的构建过程,在StreamGraph构建完毕之后会对StreamGraph进行优化构建JobGraph,然后再提交JobGraph。优化过程中,Flink会尝试将尽可能多的StreamNode聚合在一个JobGraph节点中,通过合并创建JobVertex,并生成JobEdge,以减少数据在不同节点之间流动所产生的序列化、反序列化、网络传输的开销。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是IntermediateDataSet。

2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是JobVertex,consumer 是 JobEdge。

3、JobEdge:代表了job graph中的一条数据传输通道。source是IntermediateDataSet,target是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
在这里插入图片描述


JobGraph创建的过程

AbstractJobClusterExecutor.execute -> PipelineExecutorUtils.getJobGraph  -> 
PipelineTranslator.translateToJobGraph -> StreamGraphTranslator.translateToJobGraph
 -> StreamGraph.getJobGraph ->  StreamingJobGraphGenerator.createJobGraph

createJobGraph()函数

private JobGraph createJobGraph() {
        preValidate();
        jobGraph.setJobType(streamGraph.getJobType());

        jobGraph.enableApproximateLocalRecovery(
                streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

        // 为节点生成确定性哈希,以便在提交时识别它们(如果它们没有更改)。.
        Map<Integer, byte[]> hashes =
                defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        setChaining(hashes, legacyHashes);

        setPhysicalEdges();

        markContainsSourcesOrSinks();

        setSlotSharingAndCoLocation();

        setManagedMemoryFraction(
                Collections.unmodifiableMap(jobVertices),
                Collections.unmodifiableMap(vertexConfigs),
                Collections.unmodifiableMap(chainedConfigs),
                id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());

        configureCheckpointing();

        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

        final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                JobGraphUtils.prepareUserArtifactEntries(
                        streamGraph.getUserArtifacts().stream()
                                .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                        jobGraph.getJobID());

        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                distributedCacheEntries.entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }

        // 在最后完成ExecutionConfig时设置它
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
        }

        jobGraph.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());

        addVertexIndexPrefixInVertexName();

        setVertexDescription();

        // Wait for the serialization of operator coordinators and stream config.
        try {
            FutureUtils.combineAll(
                            vertexConfigs.values().stream()
                                    .map(
                                            config ->
                                                    config.triggerSerializationAndReturnFuture(
                                                            serializationExecutor))
                                    .collect(Collectors.toList()))
                    .get();

            waitForSerializationFuturesAndUpdateJobVertices();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error in serialization.", e);
        }

        if (!streamGraph.getJobStatusHooks().isEmpty()) {
            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
        }

        return jobGraph;
    }

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什
么样的情况的下 Operator 能chain 在一起呢?

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认HEAD!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是
ALWAYS!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的shuffle方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

构造边

private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {

        physicalEdgesInOrder.add(edge);

        Integer downStreamVertexID = edge.getTargetId();

        JobVertex headVertex = jobVertices.get(headOfChain);
        JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);

        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

        downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);

        StreamPartitioner<?> partitioner = output.getPartitioner();
        ResultPartitionType resultPartitionType = output.getPartitionType();

        if (resultPartitionType == ResultPartitionType.HYBRID_FULL
                || resultPartitionType == ResultPartitionType.HYBRID_SELECTIVE) {
            hasHybridResultPartition = true;
        }

        checkBufferTimeout(resultPartitionType, edge);

        JobEdge jobEdge;
        if (partitioner.isPointwise()) {
            jobEdge =
                    downStreamVertex.connectNewDataSetAsInput(
                            headVertex,
                            DistributionPattern.POINTWISE,
                            resultPartitionType,
                            opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
                            partitioner.isBroadcast());
        } else {
            jobEdge =
                    downStreamVertex.connectNewDataSetAsInput(
                            headVertex,
                            DistributionPattern.ALL_TO_ALL,
                            resultPartitionType,
                            opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
                            partitioner.isBroadcast());
        }

        // set strategy name so that web interface can show it.
        jobEdge.setShipStrategyName(partitioner.toString());
        jobEdge.setForward(partitioner instanceof ForwardPartitioner);
        jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
        jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());

        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "CONNECTED: {} - {} -> {}",
                    partitioner.getClass().getSimpleName(),
                    headOfChain,
                    downStreamVertexID);
        }
    }

总结

1、在StreamGraph构建完毕之后会开始构建JobGraph,然后再提交JobGraph。

2、StreamingJobGraphGenerator.createJobGraph()是构建JobGraph的核心实现,实现中首先会广度优先遍历StreamGraph,为其中的每个StreamNode生成一个Hash值,如果用户设置了operator的uid,那么就根据uid来生成Hash值,否则系统会自己为每个StreamNode生成一个Hash值。如果用户自己为operator提供了Hash值,也会拿来用。生成Hash值的作用主要应用在从checkpoint中的数据恢复

3、在生成Hash值之后,会调用setChaining()方法,创建operator chain、构建JobGraph顶点JobVertex、边JobEdge、中间结果集IntermediateDataSet的核心方法。

1)、创建StreamNode chain(operator chain)

从source开始,处理出边StreamEdge和target节点(edge的下游节点),递归的向下处理StreamEdge上和target StreamNode,直到找到那条过渡边,即不能再进行chain的那条边为止。那么这中间的StreamNode可以作为一个chain。这种递归向下的方式使得程序先chain完StreamGraph后面的节点,再处理头结点,类似于后序递归遍历。

2)、创建顶点JobVertex

顶点的创建在创建StreamNode chain的过程中,当已经完成了一个StreamNode chain的创建,在处理这个chain的头结点时会创建顶点JobVertex,顶点的JobVertexID根据头结点的Hash值而决定。同时JobVertex持有了chain上的所有operatorID。因为是后续遍历,所有JobVertex的创建过程是从后往前进行创建,即从sink端到source端

3)、创建边JobEdge和IntermediateDataSet

JobEdge的创建是在完成一个StreamNode chain,在处理头结点并创建完顶点JobVertex之后、根据头结点和过渡边进行connect操作时进行的,连接的是当前的JobVertex和下游的JobVertex,因为JobVertex的创建是由下至上的。

根据头结点和边从jobVertices中找到对应的JobGraph的上下游顶点JobVertex,获取过渡边的分区器,创建对应的中间结果集IntermediateDataSet和JobEdge。IntermediateDataSet由上游的顶点JobVertex创建,上游顶点JobVertex作为它的生产者producer,IntermediateDataSet作为上游顶点的输出。JobEdge中持有了中间结果集IntermediateDataSet和下游的顶点JobVertex的引用, JobEdge作为中间结果集IntermediateDataSet的消费者,JobEdge作为下游顶点JobVertex的input。整个过程就是
上游JobVertex——>IntermediateDataSet——>JobEdge——>下游JobVertex

4、接下来就是为顶点设置共享solt组、设置checkpoint配置等操作了,最后返回JobGraph,JobGraph的构建就完毕了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/433719.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

资深测试总结,接口自动化测试常用配置文件(超细整理)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、常用的配置文件…

经典查找算法

经典的查找算法有几种&#xff0c;它们适用于不同的场景和数据结构。以下是一些常见的经典查找算法&#xff1a; 1. **线性查找&#xff08;Linear Search&#xff09;**&#xff1a;线性查找是一种简单直观的查找算法&#xff0c;它按顺序检查数组或列表中的每个元素&#xf…

javascript基础入门

1.第一个javascript程序 javascript程序不能够独立的运行&#xff0c;必须依赖于HTML文件&#xff0c;type属性值用来说明脚本的类型&#xff0c;这里 是指使用javascript编写的文本文件&#xff1b; 2.alert警告框 alert&#xff08;&#xff09;函数显示一条指定的信息&am…

07 外键和表关联关系

文章目录 外键约束表关联关系E-R模型图表关联查询 外键约束 约束 : 约束是一种限制&#xff0c;它通过对表的行或列的数据做出限制&#xff0c;来确保表的数据的完整性、关联性foreign key 功能 : 建立表与表之间的某种约束的关系&#xff0c;由于这种关系的存在&#xff0c;能…

论文翻译:一种基于强化学习的车辆队列控制策略,用于减少交通振荡中的能量消耗

A Reinforcement Learning-Based Vehicle Platoon Control Strategy for Reducing Energy Consumption in Traffic Oscillations 一种基于强化学习的车辆队列控制策略&#xff0c;用于减少交通振荡中的能量消耗 文章目录 A Reinforcement Learning-Based Vehicle Platoon Cont…

基础50刷题之一(交替合并字符串)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、题目二、力扣官方题解&#xff08;双指针&#xff09;三、文心一言解释总结 前言 刚上研一&#xff0c;有人劝我好好学C&#xff0c;当时用的不多就没学&a…

快速上手:剧本杀dm预约平台小程序的制作流程

在当今的娱乐市场中&#xff0c;剧本杀已经成为一种备受欢迎的娱乐方式。为了给玩家提供更好的服务和体验&#xff0c;开发一个剧本杀DM预约平台小程序是至关重要的。下面&#xff0c;我们将详细介绍如何使用乔拓云第三方平台开发这样一个预约平台。 首先&#xff0c;打开乔拓云…

HarmonyOS 数据持久化之首选项 preferences

接下来 我们来说说数据持久化的问题 我们平时处理的数据都是在内存中处理的 我们应用一点重启 所有数据都消失了 肯恩是不符合一些场景的 harmonyos中 提供了比较多的解决方案 最多的两种是 用户首选项 关系型数据库 本文的话 我们就来看看 用户首选项 首先&#xff0c;什么…

Matlab在同一张图中如何加入多个图例

根据代码最终画出的图片如下&#xff1a; 其实原理很简单&#xff0c;就是在一张figure中画多个坐标轴&#xff0c;每个坐标轴都有对应的图例&#xff0c;之后再将多余坐标轴隐藏&#xff0c;只保留一个即可。 代码如下&#xff1a; clear all; close all;dd_linewidth 1;a …

VR全景数字工厂,制造业企业线上营销新助手

VR全景技术逐渐渗透到各行各业&#xff0c;其中&#xff0c;很多实体工厂的线上营销宣传也借助720云VR全景技术也迎来了新的变革。 一、VR全景技术的独特魅力 VR全景技术是一种基于虚拟现实技术的全新视觉呈现方式&#xff0c;能够为用户带来身临其境的沉浸式体验。通过VR全景…

Docker容器化解决方案(镜像,容器的操作管理)

Docker镜像管理 搜索官方仓库镜像 docker search [rootlocalhost ~]# docker search nginx NAME DESCRIPTION STARS OFFICIAL nginx Official build of Nginx. …

计算机mfc140.dll文件缺失的修复方法分析,一键修复mfc140.dll

电脑显示mfc140.dll文件缺失信息时&#xff0c;不必担心&#xff0c;这通常是个容易解决的小问题。接下来让我们详细探究并解决mfc140.dll文件缺失的状况。以下将详述相应的解决方案&#xff0c;从而帮助您轻松克服这一技术难题。通过几个简单步骤&#xff0c;即可恢复正常使用…

C语言项目实战——贪吃蛇

C语言实现贪吃蛇 前言一、 游戏背景二、游戏效果演示三、课程目标四、项目定位五、技术要点六、Win32 API介绍6.1 Win32 API6.2 控制台程序6.3 控制台屏幕上的坐标COORD6.4 GetStdHandle6.5 GetConsoleCursorInfo6.5.1 CONSOLE_CURSOR_INFO 6.6 SetConsoleCursorInfo6.7 SetCon…

LeetCode每日一题只 快乐数

目录 题目介绍&#xff1a; 算法原理&#xff1a; 鸽巢原理&#xff1a; 如何找到环里元素&#xff1a; 代码实现&#xff1a; 题目介绍&#xff1a; 题目链接&#xff1a;. - 力扣&#xff08;LeetCode&#xff09; 算法原理&#xff1a; 我先简单举两个例子&#xff…

python大数据分析游戏行业中的 Apache Kafka:用例 + 架构!

python大数据分析游戏行业中的 Apache Kafka&#xff1a;用例 架构&#xff01; 这篇博文探讨了使用 Apache Kafka 的事件流如何提供可扩展、可靠且高效的基础设施&#xff0c;让游戏玩家开心并让游戏公司取得成功。讨论了游戏行业中的各种用例和架构&#xff0c;包括在线和移…

day04-SpringBootWeb入门

一、SpringBootWeb快速入门 1 需求 需求&#xff1a;基于 SpringBoot 的方式开发一个 web 应用&#xff0c;浏览器发起请求 /hello后&#xff0c;给浏览器返回字符串“Hello World ~”。 2 开发步骤 第1步&#xff1a;创建 SpringBoot 工程项目 第2步&#xff1a;定义 HelloC…

2024年k8s最新版本安装教程

k8s安装教程 1 k8s介绍2 环境搭建2.1 主机准备2.2 主机初始化2.2.1 安装wget2.2.2 更换yum源2.2.3 常用软件安装2.2.4 关闭防火墙2.2.5 关闭selinux2.2.6 关闭 swap2.2.7 同步时间2.2.8 修改Linux内核参数2.2.9 配置ipvs功能 2.3 容器安装2.3.1 设置软件yum源2.3.2 安装docker软…

Claude3、Gemini、Sora VS GPT-4:AI技术如何助力科研与产业发展?

【最新增加Claude3、Gemini、Sora、GPTs讲解及AI领域中的集中大模型的最新技术】 2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚…

【Java EE】文件内容的读写⸺数据流

目录 &#x1f334;数据流的概念&#x1f338;数据流分类 &#x1f333;字节流的读写&#x1f338;InputStream&#xff08;从文件中读取字节内容)&#x1f33b;示例1&#x1f33b;示例2&#x1f33b;利用 Scanner 进行字符读取 &#x1f338;OutputStream(向文件中写内容&…

阿里云重新更新系统导致秘钥失效

报错解决方案&#xff1a; ssh-keygen -f "/Users/pengzhanliang/.ssh/known_hosts" -R "39.105.149.49"这个命令会从~/.ssh/known_hosts文件中移除与IP地址39.105.149.49相关的所有条目 再次尝试连接到远程服务器。这次&#xff0c;SSH将提示您接受新的主…