Flink源码之JobManager启动流程

从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。

StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runCluster

private void runCluster(Configuration configuration, PluginManager pluginManager)
        throws Exception {
    synchronized (lock) {
        initializeServices(configuration, pluginManager); //初始化服务

        // write host information into configuration
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        final DispatcherResourceManagerComponentFactory
                dispatcherResourceManagerComponentFactory =
                        createDispatcherResourceManagerComponentFactory(configuration);
		//创建核心组件
        clusterComponent =
                dispatcherResourceManagerComponentFactory.create(
                        configuration,
                        ioExecutor,
                        commonRpcService,
                        haServices,
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
                        executionGraphInfoStore,
                        new RpcMetricQueryServiceRetriever(
                                metricRegistry.getMetricQueryServiceRpcService()),
                        this);
	...ignore code
    }
}

可以看出关键代码是调用initializeServices以及创建Cluster Component。

protected void initializeServices(Configuration configuration, PluginManager pluginManager)
        throws Exception {

    LOG.info("Initializing cluster services.");

    synchronized (lock) {
        rpcSystem = RpcSystem.load(configuration);

        commonRpcService =
                RpcUtils.createRemoteRpcService(
                        rpcSystem,
                        configuration,
                        configuration.getString(JobManagerOptions.ADDRESS),
                        getRPCPortRange(configuration),
                        configuration.getString(JobManagerOptions.BIND_HOST),
                        configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));

        JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

        // update the configuration used to create the high availability services
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        ioExecutor =
                Executors.newFixedThreadPool(
                        ClusterEntrypointUtils.getPoolSize(configuration),
                        new ExecutorThreadFactory("cluster-io"));
        haServices = createHaServices(configuration, ioExecutor, rpcSystem);
        blobServer = new BlobServer(configuration, haServices.createBlobStore());
        blobServer.start();
        heartbeatServices = createHeartbeatServices(configuration);
        metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);

        final RpcService metricQueryServiceRpcService =
                MetricUtils.startRemoteMetricsRpcService(
                        configuration, commonRpcService.getAddress(), rpcSystem);
        metricRegistry.startQueryService(metricQueryServiceRpcService, null);

        final String hostname = RpcUtils.getHostname(commonRpcService);

        processMetricGroup =
                MetricUtils.instantiateProcessMetricGroup(
                        metricRegistry,
                        hostname,
                        ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                configuration));

        executionGraphInfoStore =
                createSerializableExecutionGraphStore(
                        configuration, commonRpcService.getScheduledExecutor());
    }
}

在initializeServices中首先创建commonRpcService,这个RPCService实例是JobManager提供RPC服务的核心,可以看出它会有个地址和监听端口号,commonRpcService可将继承自Gateway的服务实例包装成AkkaActor对外提供RPC服务,比如ResourceManager、Dispatcher。此外还创建了其他服务:

haService: 可通过HAService获取ResourceManager/Dispatcher/RestEndpoint的地址,同时也提供选主服务,组件启动时需向HAService注册,如果被选主成功,则会调用监听器的grandLeadership回调函数
BlobServer: 可用来提供存储大对象存储服务
heartbeatServices:为组件间传递心跳信息
metricRegistry:提供metric上报和查询服务,监听端口不同,新建了一个RpcService专为Metric服务
processMetricGroup:注册系统运行状态信息的Metric,比如GC/Memory/Network运行时状况,添加Metric都是通过一个MetricGroup添加
executionGraphInfoStore:缓存Job执行时信息,比如ExecutionGrap

初始化服务创建完成后,通过DefaultDispatcherResourceManagerComponentFactory:create创建JobManager的三大核心组件:Dispacher/ResourceManager/RestEndpointServer, 都是通过工厂方法创建:

DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory

这些组件是JobManager向HAService注册获取leadership后,被ElectionService回调grantLeadership函数中创建出具体组件实例。

RestServer

RestServer并不是一个RPCServer,没有继承RpcGateway,只提供HTTP接口服务,然后将请求转交给Dispatcher处理,它的生成启动流程如下:

SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通过Netty启动Rest服务
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler处理客户端提交Job
WebMonitorEndpoint::initializeHandlers //关联Rest请求的Header和Handler
WebMonitorEndpoint::startInternal //竞选leader

ResourceManager

RM生成启动过程是ResourceManagerServiceImpl先竞选leader成功后再创建出具体的ResourceManager

ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//调用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start

Dispatcher

Dispacher生成启动过程是DefaultDispatcherRunner选主后再创建出具体实例

DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//创建SessionDispatcherLeaderProcess并调用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//创建Dispatcher并调用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart

总结

在这里插入图片描述
JobManager的启动过程就是创建三大组件RestServer/RM/Dispacher实例初始化的过程,RestSever通过Netty启动HTTP服务,RM/Dispacher被AkkaRpcService包装成AkkaActor提供本地或远程RPC服务,RestServer仅仅是接受请求解析消息后由具体Handler处理,JobGrap提交执行会转发给Dispatcher处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/66832.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Maven进阶1 -- 分模块开发、依赖管理、聚合与继承、属性、版本管理、多环境开发、跳过测试

目录 1.分模块开发 将原始模块按照功能拆分成若干个子模块&#xff0c;方便模块间的相互调用&#xff0c;接口共享。 案例&#xff1a;拆分一下这个SSM整合案例 ①创建maven模块 demo项目下的pom.xml文件&#xff08;主要看一下依赖&#xff09; <dependencies><!…

黑马头条项目学习--Day2: app端文章查看,静态化freemarker,分布式文件系统minIO

app端文章 Day02: app端文章查看&#xff0c;静态化freemarker,分布式文件系统minIOa. app端文章列表查询1) 需求分析2) 实现思路 b. app端文章详细1) 需求分析2) Freemarker概述a) 基础语法种类b) 集合指令&#xff08;List和Map&#xff09;c) if指令d) 运算符e) 空值处理f) …

GIS在地质灾害危险性评估与灾后重建中的应用教程

详情点击链接&#xff1a;GIS在地质灾害危险性评估与灾后重建中的实践技术应用 前言 地质灾害是指全球地壳自然地质演化过程中&#xff0c;由于地球内动力、外动力或者人为地质动力作用下导致的自然地质和人类的自然灾害突发事件。由于降水、地震等自然作用下&#xff0c;地质…

《合成孔径雷达成像算法与实现》Figure3.7

代码复现如下&#xff1a; clc clear all close all%参数设置 TBP 100; %时间带宽积 T 10e-6; %脉冲持续时间%参数计算 B TBP/T; …

MySQL — InnoDB介绍

文章目录 InnoDB 主要特点InnoDB 架构In-Memory StructuresBuffer PoolChange BufferAdaptive Hash IndexLog Buffer On-Disk StructuresSystem TablespaceFile-Per-Table TablespacesGeneral TablespacesUndo TablespacesTemporary TablespacesDoublewrite BufferRedo LogUndo…

半导体器件||的学习

电子管的介绍&#xff1a; 到底什么是电子管&#xff08;真空管&#xff09;&#xff1f; - 知乎 芯片破壁者&#xff08;一&#xff09;&#xff1a;从电子管到晶体管“奇迹”寻踪 - 知乎 晶体管&#xff1a; 什么是晶体管&#xff1f;它有什么作用&#xff1f; - 知乎 改…

多个配置WebMvcConfigurationSupport失效问题

最近在项目中用类继承WebMvcConfigurationSupport实现拦截器 Configuration RequiredArgsConstructor public class SpringWebSupport extends WebMvcConfigurationSupport {private final ProjectInterceptor projectInterceptor;// 拦截器 //设置拦截器对象和拦截请求Ove…

java 企业工程管理系统软件源码+Spring Cloud + Spring Boot +二次开发+ MybatisPlus + Redis em

​ 鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内…

5G RedCap

5G RedCap指的是3GPP所提出的5G标准。与之前发布的5G标准相比&#xff0c;功能更加精简。5G RedCap于2019年6月首次被纳入3GPP R17研究项目。 把一些不必要的功能去掉就可以带来模组价格的降低。背后的基本想法是&#xff1a;为物联网应用定义一种新的、不那么复杂的NR设备。 …

php实现登录的例子

界面&#xff1a; 登录界面login.html代码&#xff1a; <!DOCUMENT html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns"http://www.w3.org/1999/xhtml"…

Java中String方法魔性学习

这里写目录标题 先进行专栏介绍String详解常用构造方法代码演示常用成员方法代码示例总结 先进行专栏介绍 本专栏是自己学Java的旅途&#xff0c;纯手敲的代码&#xff0c;自己跟着黑马课程学习的&#xff0c;并加入一些自己的理解&#xff0c;对代码和笔记 进行适当修改。希望…

循环队列详解

1. 循环队列 1.1 概念及结构 循环队列是一种特殊类型的队列数据结构&#xff0c;也被称为”唤醒缓冲器“。它在数组的基础上实现了循环利用空间的功能。在循环队列中&#xff0c;队尾和队头之间形成了一个循环&#xff0c;当队尾指针“追上”队头指针时&#xff0c;队列不再继…

OpenAI允许网站阻止其网络爬虫;谷歌推出类似Grammarly的语法检查功能

&#x1f989; AI新闻 &#x1f680; OpenAI推出新功能&#xff0c;允许网站阻止其网络爬虫抓取数据训练GPT模型 摘要&#xff1a;OpenAI最近推出了一个新功能&#xff0c;允许网站阻止其网络爬虫从其网站上抓取数据训练GPT模型。该功能通过在网站的Robots.txt文件中禁止GPTB…

【前端 | CSS】flex布局

基本概念 Flexible模型&#xff0c;通常被称为 flexbox&#xff0c;是一种一维的布局模型。它给 flexbox 的子元素之间提供了强大的空间分布和对齐能力 我们说 flexbox 是一种一维的布局&#xff0c;是因为一个 flexbox 一次只能处理一个维度上的元素布局&#xff0c;一行或者…

vscode运行python报错:ModuleNotFoundError:No module named ‘xxx‘

在乌班图上使用pycharm的时候&#xff0c;pycharm总是莫名其妙卡死&#xff0c;又说是搜狗输入法的锅&#xff0c;又说别的原因&#xff0c;一气之下不用pycharm,转到vscode上&#xff0c;没想到出现了如下报错。 就是vscode在运行python的时候&#xff0c;自定义模块的调用无…

03.利用Redis实现缓存功能---解决缓存穿透版

学习目标&#xff1a; 提示&#xff1a;学习如何利用Redis实现添加缓存功能解决缓存穿透版 学习产出&#xff1a; 缓存穿透讲解图&#xff1a; 解决方案&#xff1a; 采用缓存空对象采用布隆过滤器 解决方案流程图&#xff1a; 1. 准备pom环境 <dependency><gro…

Redis储存结构

Redis怎么储存的 这个redisDb是数据库对象 里面的其他字段忽略了 然后里面有个dict列表(字典列表) 我们随便来看一个redisObject 区分一下子啊 他这个dict里面没有存redisObject的对象 也没有存dict对象 它只是存了个数据指针 你看那个redis每个底层编码 抠搜的 这块要是再保存…

gin和gorm框架安装

理论上只要这两句命令 go get -u gorm.io/gorm go get -u github.com/gin-gonic/gin然而却出现了问题 貌似是代理问题&#xff0c;加上一条命令 go env -w GOPROXYhttps://goproxy.cn,direct 可以成功安装 安装gorm的数据库驱动程序 go get -u gorm.io/driver/mysql

完整版:TCP、UDP报文格式

目录 TCP报文格式 报文格式 报文示例 UDP报文格式 报文格式 报文示例 TCP报文格式 报文格式 图1 TCP首部格式 字段长度含义Source Port16比特源端口&#xff0c;标识哪个应用程序发送。Destination Port16比特目的端口&#xff0c;标识哪个应用程序接收。Sequence Numb…

【Linux 网络】NAT技术——缓解IPv4地址不足

NAT技术 NAT 技术背景NAT IP转换过程NAPTNAT 技术的缺陷 NAT&#xff08;Network Address Translation&#xff0c;网络地址转换&#xff09;技术&#xff0c;是解决IP地址不足的主要手段&#xff0c;并且能够有效地避免来自网络外部的攻击&#xff0c;隐藏并保护网络内部的计算…