深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

Flink Program 编程套路回顾

1、获取执行环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、通过执行环境对象,注册数据源 Source,得到数据抽象
DataStream ds = env.socketTextStream(...)
3、调用数据抽象的各种Transformation执行逻辑计算
DataStream resultDS = ds.flatMap(...).keyBy(...).sum(...);
4、将各种Transformation执行完毕之后得到的计算结果数据抽象注册 Sink
resultDS.addSink(...)
5、提交Job执行
env.execute(...)

Flink Job 提交脚本解析

# Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-session
# Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run --target yarn-per-job
# Submission spinning up Flink on YARN cluster in Application Mode
./bin/flink run-application --target yarn-application

具体可以参考官网:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#advanced-cli

CliFrontend 提交分析

当用户把 Flink 应用程序打成 jar 使用 flink run … 的 shell 命令提交的时候,底层是通过 CliFrontend 来处理。底层的逻辑,就是通过反射来调用用户程序的 main() 方法执行。
需要注意的是,Application 模式下,会通过 YarnClusterDescriptor.deployInternal 方法在 yarn 中部署一个 application 集群,返回 YarnRestClusterClient 对象。yarn 中会启动一个 EmbeddedJobClient,执行 submitJob 方法提交 jobGraph。

ExecutionEnvironment 源码解析

StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:

1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源。
2、提供了 setParallelism() 设置应用程序的并行度。
3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责 Job 执行的一些行为配置管理。还管理了 Configuration 管理一些其他的配置。这个所谓的其他配置,还包含了 Checkpoint 的配置,这个 chekcpoint 的配置参数,会单独解析出来,存储在 CheckpontConfig 中
4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些 Transformation 按照逻辑拼接起来,就能得到 StreamGragh, 注意转换顺序:
UserFunction ==> StreamOperator ==> Transformation ==> StreamNode
5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph

Flink on YARN Per-job 模式提交流程分析

入口类:ApplicatoinMaster: YarnJobClusterEntryPoint
在这里插入图片描述
在这里插入图片描述

Job提交流程源码分析

getStreamGraph(jobName) 生成 StreamGraph 解析

// 入口
StreamGraph streamGraph = getStreamGraph(jobName, true){
    // 通过 StreamGraphGenerator 来生成 StreamGraph
    StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(){
        streamGraph = new StreamGraph(....)
        for(Transformation<?> transformation : transformations) {
            transform(transformation);
        }
    }
}

transform(transformation){
    // 先递归处理该 Transformation 的输入
    Collection<Integer> inputIds = transform(transform.getInput());
    // 将 Transformation 变成 Operator 设置到 StreamGraph 中,其实就是添加 StreamNode
    streamGraph.addOperator(....);
    // 设置该 StreamNode 的并行度
    streamGraph.setParallelism(transform.getId(), parallelism);
    // 设置该 StreamNode 的入边 SreamEdge
    for(Integer inputId : inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
        // 内部实现
        // 构建 StreamNode 之间的 边(StreamEdge) 对象
        StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, ...){
            // TODO_MA 注释: 给 上游 StreamNode 设置 出边
            getStreamNode(edge.getSourceId()).addOutEdge(edge);
        }
        // TODO_MA 注释: 给 下游 StreamNode 设置 入边
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
}

execute(StreamGraph) 解析

// 入口
JobClient jobClient = executeAsync(streamGraph){
    // 执行一个 SreamGraph
    executorFactory.getExecutor(configuration).execute(streamGraph, configuration){
        // 第一件事:由 StreamGraph 生成 JobGragh
        JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
        // 第二件事:通过 RestClusterClient 提交 JobGraph 到Flink集群
        clusterClient.submitJob(jobGraph)
    }
}

// 通过 RestClusterClient 来提交 JobGraph
RestClusterClient.submitJob(JobGraph jobGraph){
    // 继续提交
    RestClusterClient.sendRetriableRequest(){
        // 通过 RestClient 提交
        RestClient.sendRequest(webMonitorHost, webMonitorPort, ...){
            // 继续提交
            RestClient.submitRequest(targetAddress,targetPort,httpRequest,responseType)
        }
    }
}

最终通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。

小结

01、用户根据 Flink 应用程序的编写套路,写好应用程序,打成 jar 包,通过 flink run 的命令来执行提交
02、这个命令的底层,其实是执行: CliFrontend 组件来执行提交
03、这个 CliFrontend 的内部,会通过反射的技术,来转交执行到用户自定义应用程序的 main()
04、先获取 StreamExecutionEnvironment 执行环境对象实例
05、执行算子:其实就是从 算子 ---> function ---> StreamOperator ---> Transformation
06、执行 StreamExecutionEnvironment 的 executor 方法来执行提交
07、首先遍历 StreamExecutionEnvironment 的 transformations 这个 list 来生成 StreamGraph,之后会继续被构建成 JobGraph
08、具体的内部的提交是通过 RestClusterClient 来执行提交
09、在通过 RestClusterClient 提交之前,其实还会做一件事:把 SreamGraph 变成 JobGraph,也还会先把 JobGraph 持久化成为一个磁盘文件
10、在这个 RestClusterClient 的内部,其实是通过 RestClient 来提交
11、RestClient 其实在初始化的时候,就初始化了一个 Netty 客户端
12、通过封装一个 HttpRequest 对象,包含了需要提交的 JobGraph 文件和 Jar 包等,通过 Netty 客户端链接服务端,发送请求对象到服务端
13、Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler

在这里插入图片描述

WebMonitorEndpoint 处理 RestClient 的 JobSubmit 请求

最终处理这个请求: Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler。

// JobManager 服务端处理入口
JobSubmitHandler.handleRequest(){
    // 恢复得到 JobGraph
    JobGraph jobGraph = loadJobGraph(requestBody, nameToFile);
    // 通过 Dispatcher 提交 JobGraph
    Dispatcher.submitJob(jobGraph, timeout);
}

JobMaster 启动源码剖析

关键方法: jobMasterServiceFactory.createJobMasterService
核心的工作是:

  • 创建 JobMaster 这个 RpcEndpoint 组件,负责通信。内部会创建一个 DefaultScheduler 调度组件,在初始化该调度组件的时候,会调用 ExecutionGraphFactory 的相关方法,来把 JobGraph 转换成 ExectionGraph
  • JobMaster 启动,跳转到 onStart() 方法。内部的主要工作,就是以下这三:
    • 启动心跳机制,维持和 ResourceManager,和 TaskExecutor 之间的心跳
    • 启动 SlotPoolImpl 这个 slot 管理组件。
    • 从 ZK 获取 ResourceManager 的地址,从而进行 JobMaster 向 ResourceManager 的注册
  • 启动的这个 JobMaster 负责这个 Job 中的所有的 Task 的 slot 的申请和 任务的派发,状态的跟踪,容错,还有 checkpoint等各种操作

JobMaster 和 ResourceManager/TaskExecutor 的心跳

在这里插入图片描述

JobMaster 向 ResourceManager 注册

// 启动 JobMaster
jobMaster.start(){
    JobMaster.onStart(){
        startJobExecution(){
            // 第一件大事:启动 JobMaster 必要的一些工作
            startJobMasterServices(){
                // 第一件事: 启动心跳机制
                this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
                this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices);
                // 第二件事: 启动 SlotPoolImpl
                slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
                // 第三件事: 从 ZK 获取 ResourceManager 的地址
                // 这儿就是 JobMaster 向 ResourceManager 执行注册的入口
                resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            }
            // 第二件大事:开始调度执行
            startScheduling();
        }
    }
}
ResourceManager.registerJobManager(){
    // ResourceManager 关于 JobMaster 的注册内部实现,重要的事情做了四件
    registerJobMasterInternal(jobMasterGateway, jobId, ....){
        // TODO_MA 马中华 注释: 生成 JobMaster 注册对象
        JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, ....);
        // TODO_MA 马中华 注释: 完成注册
        jobManagerRegistrations.put(jobId, jobManagerRegistration);
        jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
        // TODO_MA 马中华 注释: 加入心跳管理
        jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() {});
        // TODO_MA 马中华 注释: 返回 JobMaster 注册成功
        return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);
    }
}

Flink Graph 演变

在这里插入图片描述

StreamGraph 构建和提交源码解析

在这里插入图片描述
关于 StreamNode 的定义:

public class StreamNode {
    private final int id;
    private int parallelism;
    private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
    private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
    private final Class<? extends AbstractInvokable> jobVertexClass;
}

关于 StreamEdge 的定义:

public class StreamEdge implements Serializable {
    private final String edgeId;
    private final int sourceId;
    private final int targetId;
}

JobGraph 构建和提交源码解析

JobGraph: StreamGraph 经过优化后生成了 JobGraph,提交给 Flink 集群的数据结构。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
3、JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什么样的情况的下 Operator 能chain 在一起呢 ?答案是要满足以下 9 个条件:

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是 ALWAYS)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的 shuffle 方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

在这里插入图片描述
构建逻辑的重点代码:

1、在 connect 之间,调用的 createChain() 就是先执行优化,然后再生成 JobVertex
2、然后 调用 connect 之后,是为了组织关系
    1、先生成 IntermediateDataSet 和 JobEdge
    2、把 IntermediateDataSet 和 当前 JobVertex 设置为 JobEdge 的 source 和 target
    3、把 JobEdge 设置为这个 IntermediateDataSet 的消费者

关于 JobVertex 的定义:

public class JobVertex implements java.io.Serializable {
    private final JobVertexID id;
    private final ArrayList<IntermediateDataSet> results = new ArrayList<>();
    private final ArrayList<JobEdge> inputs = new ArrayList<>();
    private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
    private String invokableClassName;
}

关于 IntermediateDataSet 的定义:

public class IntermediateDataSet implements java.io.Serializable {
    private final IntermediateDataSetID id;
    private final JobVertex producer;
    private final List<JobEdge> consumers = new ArrayList<JobEdge>();
}

关于 JobEdge 的定义:

public class JobEdge implements java.io.Serializable {
    private final JobVertex target;
    private IntermediateDataSet source;
    private IntermediateDataSetID sourceId;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/312643.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【RabbitMQ】2 RabbitMQ介绍与架构

目录 简介架构Connection 和Channel关系工作流程生产者发送消息的流程消费者接收消息的过程 RabbitMQ数据存储存储机制 安装和配置RabbitMQRabbitMQ常用操作命令 简介 RabbitMQ&#xff0c;俗称“兔子MQ”&#xff08;可见其轻巧&#xff0c;敏捷&#xff09;&#xff0c;是目…

AI系统ChatGPT网站系统源码AI绘画详细搭建部署教程,支持GPT语音对话+DALL-E3文生图+GPT-4多模态模型识图理解

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

2019年认证杯SPSSPRO杯数学建模C题(第一阶段)保险业的数字化变革全过程文档及程序

2019年认证杯SPSSPRO杯数学建模 基于 CART 决策树和 SVR 的客户续保概率预测 C题 保险业的数字化变革 原题再现&#xff1a; 车险&#xff0c;即机动车辆保险。保险自身是一种分散风险、消化损失的经济补偿制度&#xff0c;车险即为分散机动车辆在行驶过程中可能发作的未知风…

PPT插件-大珩助手-修改素材名称及搜索功能演示

修改素材名称及搜索功能演示 大珩助手的素材库&#xff0c;支持修改素材的名称&#xff0c;支持动态查找素材&#xff0c;删除素材 移动素材到其他分类 软件介绍 PPT大珩助手是一款全新设计的Office PPT插件&#xff0c;它是一款功能强大且实用的PPT辅助工具&#xff0c;支持…

服务网格 Service Mesh

什么是服务网格&#xff1f; 服务网格是一个软件层&#xff0c;用于处理应用程序中服务之间的所有通信。该层由容器化微服务组成。随着应用程序的扩展和微服务数量的增加&#xff0c;监控服务的性能变得越来越困难。为了管理服务之间的连接&#xff0c;服务网格提供了监控、记…

使用阿里云镜像创建一个Spring Boot项目

由于现在的idea在创建项目时已经不支持Java8版本了&#xff0c;如果我们还想用8版本&#xff0c;可以使用阿里云镜像创建。所以得改变原有的地址为&#xff1a;https://start.aliyun.com springboot版本选择2开头的任意版本的。 1.配置6个依赖 2.改变下载依赖地址 下载依赖默认…

语境化语言表示模型-ELMO、BERT、GPT、XLnet

一.语境化语言表示模型介绍 语境化语言表示模型&#xff08;Contextualized Language Representation Models&#xff09;是一类在自然语言处理领域中取得显著成功的模型&#xff0c;其主要特点是能够根据上下文动态地学习词汇和短语的表示。这些模型利用了上下文信息&#xf…

Linux ----冯诺依曼体系结构与操作系统

目录 前言 一、冯诺依曼体系结构 二、为什么选择冯诺依曼体系结构&#xff1f; 三、使用冯诺依曼结构解释问题 问题1&#xff1a; 问题2: 四、操作系统 1.操作系统是什么 2.为什么需要操作系统 3.操作系统怎样管理的 4.如何给用户提供良好环境 五、我们是怎样调用系…

【冥想X理工科思维】场景6:我被调岗了…

冥想音频合集&#xff1a;职场解压冥想音频 压力场景&#xff1a; 领导把我调换到并不喜欢也不擅长的岗位&#xff0c;如何借助冥想面对职业发展或公司变动时的不确定性和焦虑&#xff1f; 点击看大图&#xff1a; 详细说明&#xff1a; 在面对工作中的挑战时&#xff0c;制定一…

shp文件与数据库(创建shp文件)

前言 前面把shp文件中的内容读取到数据库&#xff0c;接下来就把数据库中的表变成shp文件。 正文 简单的创建一个shp文件 暂时不读取数据库的表&#xff0c;先随机创建一个shp文件。既然是随机的&#xff0c;这就需要使用到faker这个第三方库&#xff0c;代码如下。 impor…

mysql的gtid主从复制,从库误操作更新操作,

一&#xff1a;查看mysql的从库&#xff0c;发现sql进程状态 “no”.提示执行传输过来的binlog日志&#xff0c;执行失败&#xff0c; 二&#xff1a;查看主库对应的二进制日志的gtid地方。插入一些数据。 # mysqlbinlog --base64-outputdecode-rows -v mysql-bin.000001 |gre…

CSS 选择器全攻略:从入门到精通(上)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

HackTheBox-Keeper

OpenVPN连接 连接上HackTheBox&#xff01; 同时找到这个靶机&#xff0c;进行join&#xff01;分配的靶机的地址位10.10.11.227&#xff01; 信息收集 nmap -sT --min-rate 10000 -p- 10.10.11.227 开放端口为22和80端口 服务版本和操作系统信息探测&#xff1a; nmap -s…

6.3、SDN在云计算中的应用

目录 一、SDN概念 1.1、传统网络机制 1.2、SDN网络机制 1.3、二者区别 1.4、SDN架构 二、云数据中心 2.1、公有云环境特点 2.2、两大挑战 2.3、云数据中心引入SDN技术解决两大挑战 三、SDN云计算解决方案 3.1、SDN云计算解决方案之控制平面openflow协议 3.1.…

记录一下Canal的错误,主要是top.javatool.canal.client.util下的StringConvertUtil引起的

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; 由于数据库的一个localdatetime字段是空的&#xff0c; 然后修改数据库数据同步canal的时候报了这个错误&#xff1a; Caused by: java.lang.IllegalArgumentException: Can not set java.time.LocalD…

【PHP】PHP实现与硬件串口交互,接收硬件发送的实时数据

一、前言 目的&#xff1a;借助虚拟串口软件&#xff08;VSPD&#xff09;模拟硬件串口发送数据&#xff0c;使用PHP语言实现接收硬件发送的数据。 我这里的需求是连接天平&#xff0c;把天平的称量数据实时的传送到PHP使用。 使用工具&#xff1a;vspd串口调试工具 使用语…

华为网络设备 通过路由器子接口 Dot1q终结子接口实现跨VLAN通信

(二层交换机直接跳过三层交换价接入路由器时才使用该配置。推荐使用三层交换机建立VLANIF配置更简洁明了。如果VLAN较少可直接配置&#xff1b;路由器接口&#xff0c;一个物理接口一个VLAN) S1配置 vlan batch 2 to 3interface GigabitEthernet0/0/1port link-type trunkpor…

ChatGPT为教育发展带来便利与机遇,但也有伦理风险

2022年11月&#xff0c;美国人工智能研究实验室Open AI推出全新聊天机器人模型ChatGPT。凭借出色的生成语言文本能力&#xff0c;ChatGPT在上线后短短5天内便获得100百万用户&#xff0c;2个月的时间&#xff0c;月用户突破1亿&#xff0c;成为史上增长最快的“现象级”应用。作…

2022-ECCV-Explaining Deepfake Detection by Analysing Image Matching

一、研究背景 1.大量工作将深度伪造检测作为一个二分类任务并取得了良好的性能。 2.理解模型如何在二分类标签的监督下学习伪造相关特征仍难是个艰巨的任务。 3.视觉概念&#xff1a;具有语义的人脸区域&#xff0c;如嘴、鼻子、眼睛。 二、研究目标 1.验证假设&#xff0c;并…

MySQL 按日期流水号 条码 分布式流水号

有这样一个场景&#xff0c;有多台终端&#xff0c;要获取唯一的流水号&#xff0c;流水号格式是 日期0001形式&#xff0c;使用MySQL的存储过程全局锁实现这个需求。 以下是代码示例。 注&#xff1a;所有的终端连接到MySQL服务器获取流水号&#xff0c;如果获取到的是 “-1”…