Flink 调度源码分析4:Physical Slot 分配过程

Flink 调度源码分析1:拓扑图创建与提交过程
Flink 调度源码分析2:调度过程
Flink 调度源码分析3:Shared Slot 分配策略
Flink 调度源码分析4:Physical Slot 分配过程

1 整体过程

在 SlotSharingExecutionSlotAllocator.allocateSlotsForVertices() 中,会检查共享组是否有 slot,如果没有的话,会在下一步使用 PhysicalSlotProvider 为其分配 slot。

// 检查共享组是否有 slot
Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots =  
        tryAssignExistingSharedSlots(groupsToAssign);  
slots.putAll(assignedSlots);  
groupsToAssign.removeAll(assignedSlots.keySet());  
  
// 对没有 slot 的共享组分配 slot
if (!groupsToAssign.isEmpty()) {  
    Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots =  
            allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever);  
    slots.putAll(allocatedSlots);  
    groupsToAssign.removeAll(allocatedSlots.keySet());  
    // 所有的共享组一定有共享 slot    
    Preconditions.checkState(groupsToAssign.isEmpty());  
}

接下来查看 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 函数(注意这个函数,后面会多次提到这里)。
在这里插入图片描述

2 创建 slot 请求

2.1 获取 slot 配置

在 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 中,会对每一个共享组执行下面代码。

// 使用 SharedSlotProfileRetriever 创建 slot 配置文件  
ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);  
SlotProfile slotProfile =  
        sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);

会得到一个 SlotProfile。SlotProfile 是 task 希望调度的 slot 的配置文件。配置文件包含资源或位置限制等属性,其中一些可能是硬限制,也可能是软限制。它还包含 physical slot 的资源信息,当 shared slot 没有可用的 physical slot 时,可使用这些信息分配 physical slot。可以生成一个 matcher,通过在 SlotContext 中对 SlotProfile 以及其他要求进行匹配,筛选出候选 slot。
SlotProfile 包含下面这些属性。

/** This specifies the desired resource profile for the task slot. */  
private final ResourceProfile taskResourceProfile;  
/** This specifies the desired resource profile for the physical slot to host this task slot. */  
private final ResourceProfile physicalSlotResourceProfile;  
/** This specifies the preferred locations for the slot. */  
private final Collection<TaskManagerLocation> preferredLocations;  
/** This contains desired allocation ids of the slot. */  
private final Collection<AllocationID> preferredAllocations;  
/** This contains all reserved allocation ids from the whole execution graph. */  
private final Set<AllocationID> reservedAllocations;
  • taskResourceProfile 和 physicalSlotResourceProfile 是配置,两个一般是相等的。
  • preferredLocations 表示期望得到哪个 taskmanager 的 slot。
  • preferredAllocations 表示希望得到哪个 AllocationID,reservedAllocations存储了已经被分配的 reservedAllocations。
    AllocationID:JobManager 已分配 physical slot 的唯一标识符。该 ID 在 JobManager 首次请求 slot 时分配,并在重新分配时保持不变。JobManager 和 ResourceManager 使用此 ID 来跟踪和同步哪些 slot 分配给了哪个 TaskManager,哪些是空闲的。与 AllocationID 不同,SlotRequestId 用于任务从 SlotPool 请求 logical slot 时。多个 SlotRequestId 可以映射到一个 AllocationID(由于槽共享)。
    然后看看 sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile) 做了什么。
public SlotProfile getSlotProfile(  
        ExecutionSlotSharingGroup executionSlotSharingGroup,  
        ResourceProfile physicalSlotResourceProfile) {  
    Collection<AllocationID> priorAllocations = new HashSet<>();  
    Collection<TaskManagerLocation> preferredLocations = new ArrayList<>();  
    for (ExecutionVertexID execution : executionSlotSharingGroup.getExecutionVertexIds()) {  
        priorAllocationIdRetriever.apply(execution).ifPresent(priorAllocations::add);  
        preferredLocations.addAll(  
                preferredLocationsRetriever.getPreferredLocations(  
                        execution, producersToIgnore));  
    }  
    // 创建 SlotProfile
    return SlotProfile.priorAllocation(  
            physicalSlotResourceProfile,  
            physicalSlotResourceProfile,  
            preferredLocations,  // 指定 slot 位置的选择
            priorAllocations,  
            reservedAllocationIds);  
}

2.2 slot 优先位置

怎么确定 SlotProfile 中的 preferredLocations 参数的值?
位置的确定涉及两种接口:StateLocationRetriever 和 InputsLocationsRetriever。通过这两种获取优先部署位置。StateLocationRetriever 会获取每个执行节点的状态所在的位置。InputsLocationsRetriever 会获取当前节点的输入的所在位置。这两个逻辑在 SchedulerBase 构造函数中创建:

stateLocationRetriever =  // StateLocationRetriever 是只有一个方法的接口,所以这直接通过lambda函数创建实例
        executionVertexId ->  
                getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();  
inputsLocationsRetriever =  // 类为 ExecutionGraphToInputsLocationsRetrieverAdapter
        new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);

在 SlotSharingExecutionSlotAllocatorFactory.createInstance() 中制定了优先位置检索器。

SyncPreferredLocationsRetriever preferredLocationsRetriever =  
        new DefaultSyncPreferredLocationsRetriever(context, context);

查看怎么决定优先位置的代码:

MergingSharedSlotProfileRetriever.getSlotProfile()
->  preferredLocationsRetriever.getPreferredLocations(execution, producersToIgnore)
	->  asyncPreferredLocationsRetriever.getPreferredLocations(executionVertexId, producersToIgnore) // 这里虽然写着 async,但其实是同步的,也就是必须这个函数运行成功,才会执行下一步。也就是说这个位置必须是立即可用的,否则就不能用。
		->  getPreferredLocationsBasedOnInputs(executionVertexId, producersToIgnore)

getPreferredLocationsBasedOnInputs() 中的代码如下:

private CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(  
        final ExecutionVertexID executionVertexId,  
        final Set<ExecutionVertexID> producersToIgnore) {  
  
    CompletableFuture<Collection<TaskManagerLocation>> preferredLocations =  
            CompletableFuture.completedFuture(Collections.emptyList());  
  
    final Collection<ConsumedPartitionGroup> consumedPartitionGroups =  
            inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);  
    for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) {  
        // 为了避免太过分散,如果上游算子过多,则不获取它们的位置           
        if (consumedPartitionGroup.getConsumerVertexGroup().size()  
                > MAX_DISTINCT_CONSUMERS_TO_CONSIDER) {  
            continue;  
        }  

		// 获取上游节点的位置
        final Collection<CompletableFuture<TaskManagerLocation>> locationsFutures =  
                getInputLocationFutures(  
                        producersToIgnore,  
                        inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(  
                                consumedPartitionGroup));  
  
        preferredLocations = combineLocations(preferredLocations, locationsFutures);  
    }  
    return preferredLocations;  
}

这里返回的 preferredLocations 最终会传递给 SlotProfile。

2.3 slot 请求

下一步需要创建 PhysicalSlotRequest:

PhysicalSlotRequest request =  
        new PhysicalSlotRequest(  
                physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);

PhysicalSlotRequest 包含以下内容:

private final SlotRequestId slotRequestId;  
private final SlotProfile slotProfile;  
private final boolean slotWillBeOccupiedIndefinitely;  // jobType == JobType.STREAMING

3 分配 physical slot

在 allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever) 通过下面的代码分配 slot。

Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult =  
        slotProvider.allocatePhysicalSlots(slotRequests);

这里的 slotProvider 是 PhysicalSlotProvider 类。在 DefaultSchedulerComponents.createPipelinedRegionSchedulerComponents() 中创建了 PhysicalSlotProvider。可以看到它实际是个 PhysicalSlotProviderImpl。

final PhysicalSlotProvider physicalSlotProvider =  
        new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool);

在 slotProvider.allocatePhysicalSlots() 中尝试为每一个 slot 分配请求执行如下代码:

// 尝试从可用的 slots 中,为每个请求分配一个 physicalSlotMap<SlotRequestId, Optional<PhysicalSlot>> availablePhysicalSlots =  
        tryAllocateFromAvailable(physicalSlotRequestsById.values());

在 tryAllocateFromAvailable() 尝试为每一个 slot 请求分配一个 physical slot:

for (PhysicalSlotRequest request : slotRequests) {  
    // 使用 SlotSelectionStrategy 获取 slot    
    Optional<SlotSelectionStrategy.SlotInfoAndLocality> slot =  
            slotSelectionStrategy.selectBestSlotForProfile(  
                    freeSlotInfoTracker, request.getSlotProfile());

这里是根据 slotSelectionStrategy 选择 slot 的。slotSelectionStrategy 的值在 DefaultSchedulerComponents.createPipelinedRegionSchedulerComponents() 中指定:

final SlotSelectionStrategy slotSelectionStrategy =  
        SlotSelectionStrategyUtils.selectSlotSelectionStrategy(  
                jobType, jobMasterConfiguration);

这里是根据配置文件选择到底使用哪个策略。

cluster.evenly-spread-out-slot(EVENLY_SPREAD_OUT_SLOTS_STRATEGY) 为 True:// 默认为 false
	slotSelectionStrategy = EvenlySpreadOutLocationPreferenceSlotSelectionStrategy  // 均匀分布
否则:
	slotSelectionStrategy = DefaultLocationPreferenceSlotSelectionStrategy
  1. DefaultLocationPreferenceSlotSelectionStrategy
    选择 slot 的代码如下:
    protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(  
    		@Nonnull FreeSlotInfoTracker freeSlotInfoTracker,  
    		@Nonnull ResourceProfile resourceProfile) {  
    	for (AllocationID allocationId : freeSlotInfoTracker.getAvailableSlots()) {  
    		SlotInfo candidate = freeSlotInfoTracker.getSlotInfo(allocationId);  
    		if (candidate.getResourceProfile().isMatching(resourceProfile)) {  
    			return Optional.of(SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED));  
    		}  
    	}  
    	return Optional.empty();  
    }
    
    从所有可用的 slot 里顺序选择一个,只有满足资源需求,就直接分配。这样做,容易造成分配的 slot 集中在某几个 TaskManager 上。好处是可以减少不同 TaskManager 之间的通信代价,坏处是不能平衡各个 TaskManager 之间的资源利用率。
  2. EvenlySpreadOutLocationPreferenceSlotSelectionStrategy
    选择 slot 的代码如下:
    protected Optional<SlotInfoAndLocality> selectWithoutLocationPreference(  
            @Nonnull FreeSlotInfoTracker freeSlotInfoTracker,  
            @Nonnull ResourceProfile resourceProfile) {  
        return freeSlotInfoTracker.getAvailableSlots().stream()  
                .map(freeSlotInfoTracker::getSlotInfo)  
                // 过滤掉不满足资源要求的 slot
                .filter(slotInfo -> slotInfo.getResourceProfile().isMatching(resourceProfile))  
                // 获取每个 slot 的资源利用率
                .map(  
                        slot ->  
                                new Tuple2<>(  
                                        slot, freeSlotInfoTracker.getTaskExecutorUtilization(slot)))  
                // 找到资源利用率最小的 slot
                .min(Comparator.comparingDouble(tuple -> tuple.f1))  
                .map(  
                        slotInfoWithTaskExecutorUtilization ->  
                                SlotInfoAndLocality.of(  
                                        slotInfoWithTaskExecutorUtilization.f0,  
                                        Locality.UNCONSTRAINED));  
    }
    
    从所有满足资源要求的 slot,找到资源利用率最小的 slot,并分配该 slot。这样 slot 分配在各个 TaskManager 之间近似平均。好处是能平衡各个 TaskManager 之间的资源利用率,坏处是不同 TaskManager 之间的通信代价可能较大。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/635147.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

清空了电脑回收站,之前的文件还能否恢复?

电脑已成为我们日常生活中不可或缺的一部分。我们在电脑上处理文档、保存图片、下载视频等&#xff0c;而电脑中的回收站则成为我们处理不再需要文件的一个便捷工具&#xff0c;当我们想要删除某些文档的话&#xff0c;它并不是立即从硬盘上消失&#xff0c;而是被系统移动到了…

智能网关和交换机在智慧路灯杆上的用途差别

智慧路灯杆是智能城市建设中的一个重要组成部分&#xff0c;它整合了智能照明、视频监控、交通管理、环境监测、网络覆盖、信息发布、一键告警等多种功能。针对智慧路灯杆的使用场景&#xff0c;智能网关和交换机各自发挥着不同的作用&#xff0c;并且拥有各自的优缺点&#xf…

详解HTML

目录 1.HTML 结构 1.1认识HTML标签 1.2标签层次结构 1.3快速生成代码框架 2.HTML常见标签 2.1注释标签 2.2标题标签&#xff1a;h1-h6 2.3段落标签&#xff1a;p 2.4换行标签&#xff1a;br 2.5格式化标签 2.6图片标签&#xff1a;img 2.7超链接标签 2.8表格标签…

英语学习笔记22——Give me/him/her/us/them a .... Which one?

Give me/him/her/us/them a … Which one? 给我/他/她/我们/他们一个…… 哪一个&#xff1f; 词汇 Vocabulary empty a. 空的&#xff0c;啥也没有的    v. 倒空 例句&#xff1a;这个盒子是空的。    This box is empty.    这是个空盒子。    This is an emp…

VAE-变分自编码器(Variational Autoencoder,VAE)

变分自编码器&#xff08;Variational Autoencoder&#xff0c;VAE&#xff09;是一种生成模型&#xff0c;结合了概率图模型与神经网络技术&#xff0c;广泛应用于数据生成、表示学习和数据压缩等领域。以下是对VAE的详细解释和理解&#xff1a; 基本概念 1. 自编码器&#…

璞公英教学平台同时进驻两大云教育平台,让智慧教育“触手可及”!

近日&#xff0c;璞公英教学平台云上服务版图进一步扩大&#xff0c;在中国电信天翼云甄选商城、宁夏教育资源公共服务平台成功上线&#xff0c;为更多学校更多师生提供精细化服务。借助云平台的强大力量&#xff0c;璞公英教学平台将为用户带来前所未有、超越想象的教学体验。…

Java面试八股之进程和线程的区别

Java进程和线程的区别 定义与作用&#xff1a; 进程&#xff1a;在操作系统中&#xff0c;进程是程序执行的一个实例&#xff0c;是资源分配的最小单位。每个进程都拥有独立的内存空间&#xff0c;包括代码段、数据段、堆空间和栈空间&#xff0c;以及操作系统分配的其他资源…

【HarmonyOS4学习笔记】《HarmonyOS4+NEXT星河版入门到企业级实战教程》课程学习笔记(十一)

课程地址&#xff1a; 黑马程序员HarmonyOS4NEXT星河版入门到企业级实战教程&#xff0c;一套精通鸿蒙应用开发 &#xff08;本篇笔记对应课程第 18 节&#xff09; P18《17.ArkUI-状态管理Observed 和 ObjectLink》 第一件事&#xff1a;嵌套对象的类型上加上 Observed 装饰器…

推荐一个娱乐网站poki

今天&#xff0c;我要向您介绍一个充满乐趣的娱乐网站——Poki。这是一个集合了众多在线小游戏的平台&#xff0c;适合所有年龄段的玩家。无论您是想在工作间隙放松一下&#xff0c;还是寻找适合家庭聚会时的娱乐活动&#xff0c;Poki都能满足您的需求。所有游戏都无需下载或安…

leetcode_2024年5月19日10:51:26

238.除自身以外各元素的乘积 给你一个整数数组nums&#xff0c;返回数组answer&#xff0c;其中answer[i]等于nums中除nums[i]之外其余各元素的乘积。 题目数据保证数组nums之中任意元素的全部前缀元素和后缀的乘积都在32位整数范围内。 请不要使用除法&#xff0c;且在o&am…

使用神经实现路径表示的文本到向量生成

摘要 矢量图形在数字艺术中得到广泛应用&#xff0c;并受到设计师的青睐&#xff0c;因为它们具有可缩放性和分层特性。然而&#xff0c;创建和编辑矢量图形需要创造力和设计专业知识&#xff0c;使其成为一项耗时的任务。最近在文本到矢量&#xff08;T2V&#xff09;生成方面…

大语言模型的工程技巧(二)——混合精度训练

相关说明 这篇文章的大部分内容参考自我的新书《解构大语言模型&#xff1a;从线性回归到通用人工智能》&#xff0c;欢迎有兴趣的读者多多支持。 混合精度训练的示例请参考如下链接&#xff1a;regression2chatgpt/ch11_llm/gpt2_lora_optimum.ipynb 本文将讨论如何利用混合…

vue.js状态管理和服务端渲染

状态管理 vuejs状态管理的几种方式 组件内管理状态&#xff1a;通过data&#xff0c;computed等属性管理组件内部状态 父子组件通信&#xff1a;通过props和自定义事件实现父子组件状态的通信和传递 事件总线eventBus&#xff1a;通过new Vue()实例&#xff0c;实现跨组件通…

个人博客网站开发笔记2

文章目录 前言p2 hexo安装与使用安装 Nodejs安装 GitGit Bash的使用&#xff0c;代码克隆Clone p3 写作一级标题二级标题三级标题四级标题五级标题六级标题 前言 现在继续看教程 p2 hexo安装与使用 link 啊有点难受&#xff0c;开幕就是需要自己先安装Nodejs和Git&#xff…

git使用介绍

一、为什么做版本控制&#xff08;git是版本控制工具&#xff09; 为了保留之前所以的版本&#xff0c;以便回滚和修改 二、点击安装 三、基础操作 1、初步认识 想要让git对一个目录进行版本控制需要以下步骤&#xff1a; 进入要管理的文件夹进行初始化命令 git init管理…

el-table 组件实现 “合并单元格 + N行数据小计” 功能

目录 需求 - 要实现的效果初始代码代码升级&#xff08;可供多个表格使用&#xff09;CommonTable.vue 子组件 使用子组件1 - 父组件 - 图1~图3使用效果展示 使用子组件2 - 父组件 - 图4使用效果展示 注意【代码优化 - 解决bug】 需求 - 要实现的效果 父组件中 info 数据示例 …

Redis篇 浅谈分布式系统

分布式系统 一. 单机架构二.分布式系统引入三.引入更多的应用服务器四.读写分离五.引入缓存服务器六. 将数据库服务器拆分七.微服务架构 一. 单机架构 单机架构,就是用一台服务器,完成所有的工作. 这时候就需要我们引入分布式系统了. 分布式系统是什么含义呢?就是由一台主机服…

MySQL实战——主从异步复制搭建(一主一从)

一、搭建前的准备 主库 192.168.1.76 从库 192.168.1.77 二、搭建 1、编辑配置文件 vi /etc/my.cnf 主库 [mysqld] log-binmysql-bin server-id1 从库 [mysqld] server-id2 2、在主库创建复制用户 create user repl192.168.1.77 identified by repl123; grant replic…

9、QT—SQLite使用小记

前言 开发平台&#xff1a;Win10 64位 开发环境&#xff1a;Qt Creator 13.0.0 构建环境&#xff1a;Qt 5.15.2 MSVC2019 64位 sqlite版本&#xff1a;sqlite3 文章目录 一、Sqlite是什么二、sqlite使用步骤2.1 下载2.2 安装2.3 使用 三、Qt集成sqlite33.1 关键问题3.2 封装sql…

C#, PCANBasicd.dll库读写CAN设备数据

PCAN-Basic是一个简单的 PCAN 系统编程接口。 通过 PCAN-Basic Dll,可以将自己的应用程序连接到设备驱动程序和 PCAN 硬件,以与 CAN 总线进行通信。支持C、C++、C#、Delphi、JAVA、VB、Python等语言。 PCAN-Basic库和驱动下载地址 ​ ​https://www.peak-system.com/filead…