【Flink集群RPC通讯机制(四)】集群组件(tm、jm与rm)之间的RPC通信

文章目录

  • 1. 集群内部通讯方法概述
  • 2. TaskManager向ResourceManager注册RPC服务
  • 3. JobMaster向ResourceManager申请Slot计算资源

现在我们已经知道Flink中RPC通信框架的底层设计与实现,接下来通过具体的实例了解集群运行时中组件如何基于RPC通信框架构建相互之间的调用关系。

1. 集群内部通讯方法概述

通过RegisteredRpcConnection进行连接注册与通讯(维护心跳等)

当TaskExecutor启动后,会立即向ResourceManager中注册当前TaskManager的信息。同样,JobMaster组件启动后也立即会向ResourceManager注册JobMaster的信息。这些注册操作实际上就是在构建集群中各个组件之间的RPC连接,这里的注册连接在Flink中被称为RegisteredRpcConnection,集群组件之间的RPC通信都会通过创建RegisteredRpcConnection进行,例如获取RpcEndpoint对应的RpcGateway接口以及维护组件之间的心跳连接等。

如下图,集群运行时中各组件的注册连接主要如下三种RegisteredRpcConnection的实现。

  • JobManagerRegisteredRpcConnection:用于管理TaskManager中与JobManager之间的RPC连接。
  • ResourceManagerConnection:用于管理JobManager中与ResourceManager之间的RPC连接。
  • TaskExecutorToResourceManagerConnection:用于管理TaskExecutor中与ResourceManager之间的RPC连接。

如下图再有:

  1. RegisteredRpcConnection提供了generateRegistration()抽象方法,主要用于生成组件之间的RPC连接,每次调用RegisteredRpcConnection.start()方法启动RegisteredRpcConnection时,都会创建新的RetryingRegistration。

不同RegisteredRpcConnection创建的RetryingRegistration也会有所不同,例如在TaskExecutorToResourceManagerConnection中就会创建ResourceManagerRegistration实例。

  1. 调用rpcService.connect(targetAddress, targetType) ,返回RpcGateway的代理对象,通过RpcGateway连接到目标RpcEndpoint上。
  2. 在RetryingRegistration中会提供invokeRegistration()抽象方法,用于实现子类的RPC注册操作。

例如在ResourceManagerRegistration中会实现invokeRegistration()方法,在方法中调用resourceManager.registerTaskExecutor()将TaskExecutor信息注册到ResourceManager中,这里的ResourceManager就是ResourceManagerGateway接口代理类。

  1. 调用onRegistrationSuccess()方法继续后续操作,例如在JobManagerRegisteredRpcConnection中会向jobLeaderListener添加当前的jobId等信息。
  2. 如果当前组件没有成功到注册至目标组件时,会调用onRegistrationFailure()抽象方法继续后续操作,包括连接重连或停止整个RpcEndpoint对应的服务。

在这里插入图片描述

接着以TaskManager向ResourceManager注册RPC服务为例,介绍整个RPC连接的注册过程。
 

2. TaskManager向ResourceManager注册RPC服务

TaskManager向ResourceManager注册RPC服务的过程如图所示。
在这里插入图片描述

  1. TaskExecutor节点正常启动后,调用RpcEndpoint.onStart()方法初始化并启动TaskExecutor组件的内部服务。
  2. 创建监听服务
  1. TaskExecutor调用resourceManagerLeaderRetriever.start()方法,启动ResourceManager组件领导节点的监听服务并传入ResourceManagerLeaderListener,用于监听ResourceManager的领导节点的变化情况。
  2. 当ResourceManagerLeaderListener接收到来自ResourceManager的leaderAddress以及leaderSessionID的信息后,调用notifyOfNewResourceManagerLeader()方法通知TaskExecutor和新的ResourceManagerLeader建立RPC连接。
  1. 创建与ResourceManager组件的RPC网络连接

a. 调用TaskExecutor.reconnectToResourceManager()内部方法,创建与ResourceManager组件之间的RPC网络连接。
b. 在reconnectToResourceManager()方法中会事先调用closeResourceManagerConnection()方法关闭之前的ResourceManager连接,然后依次调用tryConnectToResourceManager()和connectToResourceManager()方法创建与ResourceManager节点的RPC连接。

  1. 创建TaskExecutorRegistration对象

在connectToResourceManager()方法中会创建TaskExecutorRegistration对象,用于存储TaskManager的注册信息,其中包括taskExecutorAddress、resourceId以及dataPort等连接信息,同时还包含hardwareDescription、defaultSlotResourceProfile以及totalResourceProfile等资源描述信息。

  1. 正式建立网络连接

创建TaskExecutorToResourceManagerConnection实例,正式与ResourceManager建立RPC网络连接,同时调用TaskExecutorToResourceManagerConnection.start()方法启动RPC连接。实际上调用的是RegisteredRpcConnection.start()方法。

  1. 创建新的创建新的Registration与其他组件的RPC连接

在RegisteredRpcConnection中会调用内部方法createNewRegistration()创建新的Registration。而在createNewRegistration()方法中会调用generateRegistration()子类方法,创建与其他组件之间的RPC连接。这里主要调用的是TaskExecutorToResourceManagerConnection.generateRegistration()方法。

  1. 调用RetryingRegistration.startRegistration()方法注册具体的RPC连接,实际上会调用AkkaRpcService.connect()方法创建并获取ResourceManager对应的RpcGateway接口。
  2. 调用ResourceManagerGateway.registerTaskExecutor()方法,最终完成在ResourceManager中注册TaskManager的操作。创建的TaskExecutorRegistration同时会传递给ResourceManager。
  3. 当ResourceManager接收到TaskManager的注册信息后,会在本地维护TaskManager的注册信息,并建立与TaskManager组件之间的心跳连接,至此完成了TaskManager启动后向ResourceManager进行RPC网络连接注册的全部流程。

如代码所示

  • TaskExecutor.connectToResourceManager()方法中首先会创建TaskExecutorRegistration注册信息和TaskExecutorToResourceManagerConnection对象。
  • 然后调用TaskExecutorToResourceManagerConnection.start()方法启动TaskManager和ResourceManager之间的RPC注册连接。
private void connectToResourceManager() {
   assert(resourceManagerAddress != null);
   assert(establishedResourceManagerConnection == null);
   assert(resourceManagerConnection == null);
   log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
   // TaskExecutor注册信息
   final TaskExecutorRegistration taskExecutorRegistration = 
       new TaskExecutorRegistration(
      getAddress(),
      getResourceID(),
      taskManagerLocation.dataPort(),
      hardwareDescription,
      taskManagerConfiguration.getDefaultSlotResourceProfile(),
      taskManagerConfiguration.getTotalResourceProfile()
   );
   resourceManagerConnection =
      new TaskExecutorToResourceManagerConnection(
         log,
         getRpcService(),
         taskManagerConfiguration.getRetryingRegistrationConfiguration(),
         resourceManagerAddress.getAddress(),
         resourceManagerAddress.getResourceManagerId(),
         getMainThreadExecutor(),
         new ResourceManagerRegistrationListener(),
         taskExecutorRegistration);
   resourceManagerConnection.start();
}

接着看RegisteredRpcConnection.start()的代码逻辑,如代码所示。

public void start() {
   checkState(!closed, "The RPC connection is already closed");
   checkState(!isConnected() && pendingRegistration == null, 
              "The RPC connection is already started");
   // 创建RetryingRegistration
   final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
     // 启动RetryingRegistration
   if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
      newRegistration.startRegistration();
   } else {
      // 并行启动后,直接取消当前Registration
      newRegistration.cancel();
   }
}

关注:RetryingRegistration.startRegistration()逻辑。

  1. 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGateway。
  2. 创建RpcGateway代理类后,就可以连接到指定的RpcEndpoint了。对于rpcService.connect()方法的定义,我们已经在RPC底层原理中介绍过。
  3. 创建RPC连接后,尝试注册操作。
  4. 如果注册失败,则进行Retry操作,除非接收到取消操作的消息。
public void startRegistration() {
        if (canceled) {
            return;
        }
        try {
            final CompletableFuture<G> rpcGatewayFuture;
            // 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGateway
            if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
                rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(
                    targetAddress,
                    fencingToken,
                    targetType.asSubclass(FencedRpcGateway.class));
            } else {
                rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
            }
            // 成功获取网关后,尝试注册操作
            CompletableFuture<Void> rpcGatewayAcceptFuture = 
                rpcGatewayFuture.thenAcceptAsync(
                (G rpcGateway) -> {
                    log.info("Resolved {} address, beginning registration", 
                       targetName);
                    register(rpcGateway, 1, retryingRegistrationConfiguration.
                       getInitialRegistrationTimeoutMillis());
                },
                rpcService.getExecutor());
            // 如果注册失败,则进行Retry操作,除非取消操作
            rpcGatewayAcceptFuture.whenCompleteAsync(
                (Void v, Throwable failure) -> {
                    if (failure != null && !canceled) {
                        final Throwable strippedFailure =
                            ExceptionUtils.stripCompletionException(failure);
                        // 间隔指定时间后再次注册
                        startRegistrationLater(retryingRegistrationConfiguration.
                           getErrorDelayMillis());
                    }
                },
                rpcService.getExecutor());
        }
        catch (Throwable t) {
            completionFuture.completeExceptionally(t);
            cancel();
        }
    }

继续了解ResourceManagerRegistration.invokeRegistration()的具体实现。

该方法会创建和ResourceManagerGateway之间的连接以及注册操作
resourceManager会接收来自TaskExecutor的注册信息,并根据taskExecutorRegistration提供的注册信息,将TaskExecutor信息记录在ResourceManager的本地存储中,然后开启TaskExecutor之间的心跳连接。至此,TaskManager能和ResourceManager进行正常的RPC通信了。

protected CompletableFuture<RegistrationResponse> invokeRegistration(
      ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, 
    long timeoutMillis) throws Exception {
   Time timeout = Time.milliseconds(timeoutMillis);
   return resourceManager.registerTaskExecutor(
      taskExecutorRegistration,
      timeout);
}

对于其他组件之间的RpcConnection注册操作,例如TaskManager与JobMaster之间的RPC连接注册,基本上和ResourceManagerRegistration一样,这里暂不介绍。

接下来我们看JobMaster是如何向ResourceManager申请Slot计算资源的。

 

3. JobMaster向ResourceManager申请Slot计算资源

当JobMaster组件启动后,

  • 会(调用JobMaster.startJobMasterServices())启动JobMaster中的内部服务,其中就包括了SlotPool组件。
  • 同时会创建和启动JobMaster与ResourceManager之间的RPC连接ResourceManagerConnection。创建成功后,会执行包括向ResourceManager发送申请Slot计算资源的RPC请求等后续操作。

如代码所示

//从SlotPoolImpl.connectToResourceManager()可以看出,方法中分别遍历
//waitingForResourceManager集合中的PendingRequest,
//然后就每个PendingRequest调用requestSlotFromResourceManager()方法向
//ResourceManager申请PendingRequest中指定的Slot计算资源。
public void connectToResourceManager(
    @Nonnull ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
        for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
            requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
        }
        waitingForResourceManager.clear();
}

继续看SlotPoolImpl.requestSlotFromResourceManager()方法的实现,如下代码所示。

  1. 创建AllocationID并将pendingRequest和AllocationID存储在pendingRequests集合中。
  2. 判断pendingRequest是否出现异常或已经分配了其他AllocationID,如果出现异常或已分配则取消当前pendingRequest中的资源分配请求。
  3. 调用resourceManagerGateway.requestSlot()远程RPC方法向ResourceManager申请Slot计算资源,此时会在方法中创建SlotRequest对象,指定申请Slot计算资源的具体参数。
  4. ResourceManager接收到SlotPool发送的SlotRequest请求后,会将SlotRequest转发给SlotManager进行处理,此时如果能正常分配到Slot资源,则会返回Acknowledge信息。
  5. 调用FutureUtils.whenCompleteAsyncIfNotDone()方法执行返回rmResponse CompletableFuture的对象,此时如果Slot资源申请过程出现异常,则调用slotRequestToResourceManager-Failed()方法进行处理。
private void requestSlotFromResourceManager(
            final ResourceManagerGateway resourceManagerGateway,
            final PendingRequest pendingRequest) {
        checkNotNull(resourceManagerGateway);
        checkNotNull(pendingRequest);
        log.info("Requesting new slot [{}] and profile {} from resource manager.", 
                 pendingRequest.getSlotRequestId(), pendingRequest.
                    getResourceProfile());
        final AllocationID allocationId = new AllocationID();
        pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId,
                            pendingRequest);
        pendingRequest.getAllocatedSlotFuture().whenComplete(
            (AllocatedSlot allocatedSlot, Throwable throwable) -> {
                if (throwable != null 
                    || !allocationId.equals(allocatedSlot.getAllocationId())) {
                    resourceManagerGateway.cancelSlotRequest(allocationId);
                }
            });
        CompletableFuture<Acknowledge> rmResponse = 
            resourceManagerGateway.requestSlot(
            jobMasterId,
            new SlotRequest(jobId, allocationId, 
                            pendingRequest.getResourceProfile(), jobManagerAddress),
            rpcTimeout);
        FutureUtils.whenCompleteAsyncIfNotDone(
            rmResponse,
            componentMainThreadExecutor,
            (Acknowledge ignored, Throwable failure) -> {
                if (failure != null) {
                    slotRequestToResourceManagerFailed(pendingRequest.
                                                     getSlotRequestId(), failure);
                }
            });
}

从以上实例可以看出,集群运行时中各个组件之间都是基于RPC通信框架相互访问的。RpcEndpoint组件会创建与其他RpcEndpoint之间的RegisteredRpcConnection,并通过RpcGateway接口的动态代理类与其他组件进行通信。

需要注意的是,Flink把Akka作为RPC底层的通信框架,但没有使用Akka其他丰富的监督功能,并且未来有去掉Akka依赖的可能。

 
参考:《Flink设计与实现:核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/404538.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

大数据 - Spark系列《十一》- Spark累加器详解

Spark系列文章&#xff1a; 大数据 - Spark系列《一》- 从Hadoop到Spark&#xff1a;大数据计算引擎的演进-CSDN博客 大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客 大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客 大数据 - Spark系列《…

2024/02/23

使用消息队列完成两个进程间相互通信 A.c #include<myhead.h> struct msgbuf {long mtype;char mtext[1024]; }; //定义表示正文内容大小的宏 #define MSGSIZE sizeof(struct msgbuf)-sizeof(long)int main(int argc, const char *argv[]) {//创建一个key值key_t key;ke…

知乎66条高赞回答,句句醍醐灌顶!

-01- 穷人是小心翼翼地大方&#xff0c; 有钱人是大大方方地小气。 ——论如何判断一个人是真有钱还是装有钱 -02- 枕头要常晒&#xff0c; 因为里面装满了心酸的泪和发霉的梦。 ——一切终将随风而逝 -03- 人活得累&#xff0c;一是太认真&#xff0c;二是太想要。 …

第3部分 原理篇2去中心化数字身份标识符(DID)(3)

3.2.2.4. DID文档 (DID Document) 本聪老师&#xff1a;DID标识符和DID URL还都只是ID&#xff0c;必须为它附加一个基本属性才可以证明是该主体独有的。这个就是我们下面介绍的DID文档。 本聪老师&#xff1a;每个DID标识符都唯一对应一个DID文档&#xff0c;也可以说&#x…

计算机功能简介:EC, NVMe, SCSI/ISCSI与块存储接口 RBD,NUMA

一 EC是指Embedded Controller 主要应用于移动计算机系统和嵌入式计算机系统中&#xff0c;为此类计算机提供系统管理功能。EC的主要功能是控制计算机主板上电时序、管理电池充电和放电&#xff0c;提供键盘矩阵接口、智能风扇接口、串口、GPIO、PS/2等常规IO功能&#xff0c;…

docker自定义网络实现容器之间的通信

Background docker原理 docker是一个Client-Server结构的系统&#xff0c;Docker的守护进程运行在主机上。通过Socket从客户端访问。docker核心三大组件&#xff1a;image–镜像、container-容器、 repository-仓库。docker使用的cpu、内存以及系统内核等资源都是直接使用宿主…

A Novel Two-Layer DAG-based Reactive Protocol for IoT Data Reliability in Metaverse

在IOT 场景中&#xff0c;需要保证数据的完整性和可靠性。通常区块链可以用来做这件事&#xff0c;但是IoT 设备的计算能力和贷款都是有限的。 对于PBFT 要求的通信量太大。 本文提出的 two layer directed acycle graph (2LDAG) 是一种被动共识协议&#xff0c;除非有节点主动…

快速构建 Debezium MySQL Example 数据库

博主历时三年精心创作的《大数据平台架构与原型实现&#xff1a;数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行&#xff0c;点击《重磅推荐&#xff1a;建大数据平台太难了&#xff01;给我发个工程原型吧&#xff01;》了解图书详情&#xff0c;…

EXCEL 在列不同单元格之间插入N个空行

1、第一步数据&#xff0c;要求在每个数字之间之间插入3个空格 2、拿数据个数*&#xff08;要插入空格数1&#xff09; 19*4 3、填充 4、复制数据到D列 5、下拉数据&#xff0c;选择复制填充这样1-19就会重复4次 6、全选数据D列排序&#xff0c;这样即完成了插入空格 以…

SQL语法-DQL-测试练习

因篇幅原因&#xff0c;本篇承接此篇->第八篇&#xff1a;SQL语法-DQL-数据查询语言-CSDN博客 本篇是对于SQL语法DQL语句的练习&#xff0c;因水平和精力有限&#xff08;就不像前两篇的DDL&#xff0c;DML那样自出练习了&#xff09;直接照搬了【黑马程序员】在哔哩哔哩的…

基于卷积神经网络的图像去噪

目录 背影 卷积神经网络CNN的原理 卷积神经网络CNN的定义 卷积神经网络CNN的神经元 卷积神经网络CNN的激活函数 卷积神经网络CNN的传递函数 基于卷积神经网络的图像去噪 完整代码:基于卷积神经网络的图像去噪.rar资源-CSDN文库 https://download.csdn.net/download/abc9918351…

如何在java中使用 Excel 动态函数生成依赖列表

前言 在Excel 中&#xff0c;依赖列表或级联下拉列表表示两个或多个列表&#xff0c;其中一个列表的项根据另一个列表而变化。依赖列表通常用于Excel的业务报告&#xff0c;例如学术记分卡中的【班级-学生】列表、区域销售报告中的【区域-国家/地区】列表、人口仪表板中的【年…

vue3 + ts + echart 实现柱形图表

首先封装Echart一个文件 代码如下 <script setup lang"ts"> import { ECharts, EChartsOption, init } from echarts; import { ref, watch, onMounted, onBeforeUnmount } from vue;// 定义props interface Props {width?: string;height?: string;optio…

网工内推 | 信息安全售前,国企、上市公司,补贴福利多

01 中电科网络安全科技有限公司 招聘岗位&#xff1a;信息安全售前工程师 职责描述&#xff1a; 1.负责为客户提供整体信息安全规划、IT治理需求调研、现状分析、蓝图规划与实施路线设计&#xff0c;为客户提供设计方案&#xff1b; 2.承担行业信息安全发展研究、行业业务规划…

vue3 vuex

目录 Vuex 是什么 什么是“状态管理模式”&#xff1f; 什么情况下我应该使用 Vuex&#xff1f; 使用方法&#xff1a; 提交载荷&#xff08;Payload&#xff09; 对象风格的提交方式 使用常量替代 Mutation 事件类型 Mutation 必须是同步函数 在组件中提交 Mutation …

sentinel中监听器的运用--规则管理

sentinel中监听器的运用–规则管理 规则结构 类图关系 类关系图如下 Rule 将规则抽象成一个类, 规则与资源是紧密关联的, 也就是说规则作用于资源。因此, 我们需要将规则表示为一个类, 并包含一个获取资源的方法 这里采用接口的原因就是规则是一个抽象概念而非具体实现。…

导入excel某些数值是0

目录 导入excel某些数值是0数据全部都是0原因解决 部分数据是0原因解决 导入excel某些数值是0 数据全部都是0 有一列“工单本月入库重量”全部的数据都是0 原因 展示的时候&#xff0c;展示的字段和内表需要展示的字段不一致&#xff0c;导致显示的是0。 解决 修改展示的字…

Vue | (四)使用Vue脚手架(上) | 尚硅谷Vue2.0+Vue3.0全套教程

文章目录 &#x1f4da;初始化脚手架&#x1f407;创建初体验&#x1f407;分析脚手架结构&#x1f407;关于render&#x1f407;查看默认配置 &#x1f4da;ref与props&#x1f407;ref属性&#x1f407;props配置项 &#x1f4da;混入&#x1f4da;插件&#x1f4da;scoped样…

DBeaver的下载安装和连接MySQL数据库

DBeaver的下载安装和连接MySQL数据库 1、dbeaver的下载 dbeaver是一款的数据库连接工具&#xff0c;免费&#xff0c;跨平台。 官网&#xff1a;https://dbeaver.io/ 下载地址&#xff1a;https://dbeaver.io/download/ GitHub下载地址&#xff1a;https://github.com/dbeav…

使用向量数据库pinecone构建应用02:检索增强生成RAG

Building Applications with Vector Databases 下面是这门课的学习笔记&#xff1a;https://www.deeplearning.ai/short-courses/building-applications-vector-databases/ Learn to create six exciting applications of vector databases and implement them using Pinecon…