Flink Job 执行流程

Flink On Yarn 模式

在这里插入图片描述

基于Yarn层面的架构类似 Spark on Yarn模式,都是由Client提交AppRM上面去运行,然后 RM分配第一个container去运行AM,然后由AM去负责资源的监督和管理。需要说明的是,FlinkYarn模式更加类似Spark on Yarncluster模式,在cluster模式中,dirver将作为AM中的一个线程去运行。Flink on Yarn模式也是会将JobManager启动在container里面,去做个driver类似的任务调度和分配,Yarn AMFlink JobManager在同一个Container,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在Yarn集群上,Flink Yarn Client就可以提交Flink JobFlink JobManager,并进行后续的映射、调度和计算处理。

Fink on Yarn 的缺陷

【1】资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
【2】On-Yarn模式下,所有的container都是固定大小的,导致无法根据作业需求来调整container的结构。譬如CPU密集的作业或需要更多的核,但不需要太多内存,固定结构的container会导致内存被浪费。
【3】与容器管理基础设施的交互比较笨拙,需要两个步骤来启动Flink作业:1.启动Flink守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器部署的一部分,那么将不再需要步骤2。
【4】On-Yarn模式下,作业管理页面会在作业完成后消失不可访问。
【5】Flink推荐 per job clusters 的部署方式,但是又支持可以在一个集群上运行多个作业的session模式,令人疑惑。

Flink版本1.5中引入了DispatcherDispatcher是在新设计里引入的一个新概念。Dispatcher会从Client端接受作业提交请求并代表它在集群管理器上启动作业。引入Dispatcher的原因主要有两点:
【1】一些集群管理器需要一个中心化的作业生成和监控实例;
【2】能够实现Standalone模式下JobManager的角色,且等待作业提交。在一些案例中,Dispatcher是可选的Yarn或者不兼容的kubernetes

资源调度模型重构下的 Flink On Yarn 模式

[点击并拖拽以移动] ​

客户端提交JobGraph以及依赖jar包到YarnResourceManager,接着Yarn ResourceManager分配第一个container以此来启动AppMasterApplication Master中会启动一个FlinkResourceManager以及JobManagerJobManager会根据JobGraph生成的ExecutionGraph以及物理执行计划向FlinkResourceManager申请slotFlinkResoourceManager会管理这些slot以及请求,如果没有可用slot就向YarnResourceManager申请containercontainer启动以后会注册到FlinkResourceManager,最后JobManager会将subTask deploy到对应containerslot中去。
[点击并拖拽以移动] ​

在有Dispatcher的模式下:会增加一个过程,就是Client会直接通过HTTP Server的方式,然后用Dispatcher将这个任务提交到Yarn ResourceManager中。

新框架具有四大优势,详情如下:
【1】client直接在Yarn上启动作业,而不需要先启动一个集群然后再提交作业到集群。因此client再提交作业后可以马上返回。
【2】所有的用户依赖库和配置文件都被直接放在应用的classpath,而不是用动态的用户代码classloader去加载。
【3】container在需要时才请求,不再使用时会被释放。
【4】“需要时申请”的container分配方式允许不同算子使用不同profile (CPU和内存结构)的container

新的资源调度框架下 single cluster job on Yarn 流程介绍

[点击并拖拽以移动] ​

single cluster job on Yarn模式涉及三个实例对象:
【1】clifrontend Invoke App code;生成StreamGraph,然后转化为JobGraph
【2】YarnJobClusterEntrypoint(Master) 依次启动YarnResourceManagerMinDispatcherJobManagerRunner三者都服从分布式协同一致的策略;JobManagerRunnerJobGraph转化为ExecutionGraph,然后转化为物理执行任务Execution,然后进行deploydeploy过程会向 YarnResourceManager请求slot,如果有直接deploy到对应的YarnTaskExecutiontorslot里面,没有则向YarnResourceManager申请,带container启动以后deploy
【3】YarnTaskExecutorRunner (slave) 负责接收subTask,并运行。

整个任务运行代码调用流程如下图

[点击并拖拽以移动] ​

subTask在执行时是怎么运行的?

调用StreamTaskinvoke方法,执行步骤如下:
【1】initializeState()operatorinitializeState()
【2】openAllOperators()operatoropen()方法;
【3】最后调用run方法来进行真正的任务处理;

我们来看下flatMap对应的OneInputStreamTaskrun方法具体是怎么处理的。

@Override
protected void run() throws Exception {
    // 在堆栈上缓存处理器引用,使代码更易于JIT
    final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

    while (running && inputProcessor.processInput()) {
        // 所有的工作都发生在“processInput”方法中
    }
}

最终是调用StreamInputProcessorprocessInput()做数据的处理,这里面包含用户的处理逻辑。

public boolean processInput() throws Exception {
    if (isFinished) {
        return false;
    }
    if (numRecordsIn == null) {
        try {
            numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        } catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", e);
           numRecordsIn = new SimpleCounter();
       }
   }
   while (true) {
       if (currentRecordDeserializer != null) {
           DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
           if (result.isBufferConsumed()) {
               currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
               currentRecordDeserializer = null;
           }
           if (result.isFullRecord()) {
               StreamElement recordOrMark = deserializationDelegate.getInstance();
               //处理watermark
               if (recordOrMark.isWatermark()) {
                   // handle watermark
                   //watermark处理逻辑,这里可能引起timer的trigger
                   statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                   continue;
               } else if (recordOrMark.isStreamStatus()) {
                   // handle stream status
                   statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                   continue;
                   //处理latency watermark
               } else if (recordOrMark.isLatencyMarker()) {
                   // handle latency marker
                   synchronized (lock) {
                       streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                   }
                   continue;
               } else {
                   //用户的真正的代码逻辑
                   // now we can do the actual processing
                   StreamRecord<IN> record = recordOrMark.asRecord();
                   synchronized (lock) {
                       numRecordsIn.inc();
                       streamOperator.setKeyContextElement1(record);
                       //处理数据
                       streamOperator.processElement(record);
                   }
                   return true;
               }
           }
       }
            
       //这里会进行checkpoint barrier的判断和对齐,以及不同partition 里面checkpoint barrier不一致时候的,数据buffer,
       final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
       if (bufferOrEvent != null) {
           if (bufferOrEvent.isBuffer()) {
               currentChannel = bufferOrEvent.getChannelIndex();
               currentRecordDeserializer = recordDeserializers[currentChannel];
               currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
           }
           else {
               // Event received
               final AbstractEvent event = bufferOrEvent.getEvent();
               if (event.getClass() != EndOfPartitionEvent.class) {
                   throw new IOException("Unexpected event: " + event);
               }
           }
       }
       else {
           isFinished = true;
           if (!barrierHandler.isEmpty()) {
               throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
           }
           return false;
       }
   }
}

streamOperator.processElement(record)最终会调用用户的代码处理逻辑,假如operatorStreamFlatMap的话。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);//用户代码
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/274430.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C语言 linux文件操作(一)

一、linux文件权限 字符表示法 二进制 十进制 说明 r - - 100 4 仅可读 - w - 010 2 仅可写 - - x 001 1 仅可执行 r w - 110 6 可读可写 r - x 101 5 可读可执行 - w x 011 …

卷积神经网络 反向传播

误差的计算 softmax 经过softmax处理后所有输出节点概率和为1 损失&#xff08;激活函数&#xff09; 多分类问题&#xff1a;输出只可能归于某一个类别&#xff0c;不可能同时归于多个类别。 误差的反向传播 求w的误差梯度 权值的更新 首先是更新输出层和隐藏层之间的权重…

oracle下载

前言&#xff1a; 官网上提供都是最新的什么19c 21c这些版本&#xff0c;我要的是 11g 12c 或者更老的 8i 9i 这些版本。 准备下载一个oracle12c 版本&#xff0c;但是找了很久&#xff0c;最终…详情请看下面 oracle 数据库版本介绍 Oracle数据库有多个长期支持版本&#x…

模式识别与机器学习-SVM(带软间隔的支持向量机)

SVM&#xff08;带软间隔的支持向量机&#xff09; 软间隔思想的由来软间隔的引入 谨以此博客作为复习期间的记录。 软间隔思想的由来 在上一篇博客中&#xff0c;回顾了线性可分的支持向量机,但在实际情况中&#xff0c;很少有完全线性可分的情况&#xff0c;大部分线性可分…

OpenHarmony城市技术论坛武汉站:探索大模型时代的终端操作系统创新

2023年12月23日下午,OpenHarmony城市技术论坛(以下简称“技术论坛”)——第6期(武汉站)于华中科技大学梧桐语问学中心明德报告厅圆满举办。本次技术论坛聚焦“大模型时代的系统软件”,旨在探索AI大模型在终端操作系统领域的创新趋势和挑战。论坛从“终端操作系统十大技术挑战”…

事务管理解析:掌握Spring事务的必备技能!

AOP事务管理 1.1 Spring事务简介1.1.1 相关概念介绍1.1.2 转账案例-需求分析1.1.3 转账案例-环境搭建步骤1:准备数据库表步骤2:创建项目导入jar包步骤3:根据表创建模型类步骤4:创建Dao接口步骤5:创建Service接口和实现类步骤6:添加jdbc.properties文件步骤7:创建JdbcConfig配置…

相机内参标定理论篇------相机模型选择

相机种类&#xff1a; 当拿到一款需要标定内参的相机时&#xff0c;第一个问题就是选择那种的相机模型。工程上相机类型的划分并不是十分严格&#xff0c;一般来说根据相机FOV可以把相机大概分为以下几类&#xff1a; 长焦相机&#xff1a;< 标准相机&#xff1a;~&…

某验第四代滑块逆向快速破解

本期地址如下&#xff0c;使用base64解码获得网址 aHR0cHM6Ly9ndDQuZ2VldGVzdC5jb20v 破解某验&#xff0c;某盾已经是司空见惯的事情了&#xff0c;网上也有很多资料查阅&#xff0c;但是大多数都是繁琐、冗长&#xff0c;本文以最直接快速理解的方法讲解&#xff0c;稍微认真…

想要学会JVM调优,先掌握JVM内存模型和JVM运行原理

1、前言 今天将和你一起探讨Java虚拟机&#xff08;JVM&#xff09;的性能调优。 JVM算是面试中的高频问题了&#xff0c;通常情况下总会有人问到&#xff1a;请你讲解下 JVM 的内存模型&#xff0c;JVM 的 性能调优做过&#xff1f; 2、为什么 JVM 在 Java 中如此重要 首…

IT安全:实时网络安全监控

了解庞大而复杂的网络环境并非易事&#xff0c;它需要持续观察、深入分析&#xff0c;并对任何违规行为做出快速反应。这就是为什么实时网络安全监控工具是任何组织 IT 安全战略的一个重要方面。 网络攻击和合规性法规是 IT 安全的两个主要驱动因素。同时&#xff0c;数据泄露…

LaTeX论文排版

LaTeX论文排版 LaTeX 简介与使用为什么选择使用LaTeX进行论文排版&#xff1f;LaTeX下载与安装LaTeX环境安装——TeX Live(Windows、Linux)安装IDE——TeXstudio LaTeX软件界面 BIT-thesis模板BIT-Thesis&#xff1a;主控文件demo.tex&#xff1a; 公式、图片、表格的排版使用L…

c语言用四种方式求解成绩之中最高分和最低分的差值

文章目录 一&#xff0c;题目二&#xff0c;方法1&#xff0c;方法一2&#xff0c;方法二3&#xff0c;方法三4&#xff0c;方法四 三&#xff0c;示例结果 一&#xff0c;题目 最高分最低分之差 输入n个成绩&#xff0c;换行输出n个成绩中最高分数和最低分数的差 输入 : 两行…

安防视频监控系统EasyCVR实现H.265视频在3秒内起播的注意事项

可视化云监控平台/安防视频监控系统EasyCVR视频综合管理平台&#xff0c;采用了开放式的网络结构&#xff0c;可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、云存储等丰富的视频能力&#xff0c;同时…

Java中XML的解析

1.采用第三方开元工具dom4j完成 使用步骤 1.导包dom4j的jar包 2.add as lib.... 3.创建核心对象, 读取xml得到Document对象 SAXReader sr new SAXReader(); Document doc sr.read(String path); 4.根据Document获取根元素对象 Element root doc.getRootElement(); …

Bean 生命周期 和 SpringMVC 执行过程

这里简单记录下 Bean 生命周期的过程&#xff0c;方便自己日后面试用。源码部分还没看懂&#xff0c;这里先贴上结论 源码 结论

Spring Boot 中的虚拟线程

在本文中&#xff0c;我将讨论 Spring Boot 中的虚拟线程。 什么是虚拟线程&#xff1f; 虚拟线程作为 Java 中的一项功能引入&#xff0c;旨在简化并发性。 Virtual threads 是 轻量级的线程&#xff0c;由 Java Virtual Machine 而不是操作系统管理。它们被设计为易于使用且…

Apache Flink连载(十八):Flink On Yarn运行原理及环境准备

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录 1. Flink On Yarn运行原理…

Final Cut 视频剪辑快速入门,小白上手视频课的制作

本文是一个快速入门教程&#xff0c;如果您是0视频处理基础&#xff0c;又想录制网课或是一些对效果要求不高的视频那么这篇教程足够使用了。 本文主要用Final Cut处理视频课&#xff0c;本文是笔者在制作视频课过程中逐渐摸索的&#xff0c;如果您想制作一些比较专业的视频&a…

docker学习(二十一、network使用示例container、自定义)

文章目录 一、container应用示例1.需要共用同一个端口的服务&#xff0c;不适用container方式2.可用示例3.停掉共享源的容器&#xff0c;其他容器只有本地回环lo地址 总结 二、自定义网络应用示例默认bridge&#xff0c;容器间ip通信默认bridge&#xff0c;容器间服务名不通 自…

SpringBoot 3.2.0 基于Logback定制日志框架

依赖版本 JDK 17 Spring Boot 3.2.0 工程源码&#xff1a;Gitee 日志门面和日志实现 日志门面&#xff08;如Slf4j&#xff09;就是一个标准&#xff0c;同JDBC一样来制定“规则”&#xff0c;把不同的日志系统的实现进行了具体的抽象化&#xff0c;只提供了统一的日志使用接…