flink内存管理(三):MemorySegment内存使用场景:托管内存与网络内存

文章目录

  • 一.ManagedMemory(算子)内存的申请与使用
    • 1. tm内存申请与使用大致流程
    • 2. 创建MemoryManager实例
    • 3. 算子使用通过MemoryManager使用内存
    • 4. ManagedMemory内存空间申请流程
  • 二.NetworkBuffer内存申请与使用
    • 1. NetworkBuffer构造器

在Flink内存模型中我们已经知道,Flink会将内存按照使用方式、内存类型分为不同的内存区域,底层会借助MemorySegment对内存块进行管理和访问,MemorySegment的使用场景有很多,本文我们主要看下ManagedMemory和NetworkBuffer是如何申请和使用MemorySegment内存块的。

一.ManagedMemory(算子)内存的申请与使用

1. tm内存申请与使用大致流程

Task使用的物理计算资源主要是TaskSlot提供的,TaskSlot由TaskManager中TaskSlotTable组件创建和管理。

  • 创建MemoryManager:JobManager申请到足够的Slot计算资源后,会在TaskSlotTable中创建相应的TaskSlot,然后对TaskSlot基本环境进行初始化,包括在TaskSlot内部创建MemoryManager组件。最终使用MemoryManager管理当前TaskSlot的内存计算资源。
  • task线程使用内存:当Task线程启动时,会直接从TaskSlot中获取MemoryManager组件申请内存空间。通过MemoryManager对MemorySegment内存空间进行管理,这一步对应内存模型中的ManagedMemory,也被称为托管内存。

 

2. 创建MemoryManager实例

在TaskSlot的构造器中调用createMemoryManager()方法创建MemoryManager实例,管理当前TaskSlot(代表一个线程的资源) 中的内存空间

/**
创建具有**给定**容量和给定页面大小的内存管理器。
这是 MemoryManager 的生产版本,一旦 MemoryManager 
的所有者准备好处置,它就会检查内存泄漏 ( verifyEmpty() )。

参数:
memorySize – 该内存管理器管理的堆外内存的总大小。 
pageSize – 内存管理器分配的页面大小。
**/
private static MemoryManager createMemoryManager(  
        ResourceProfile resourceProfile, int pageSize) {  
    return MemoryManager.create(resourceProfile.getManagedMemory().getBytes(), pageSize);  
}

在TaskSlot.createMemoryManager()方法中,会根据ResourceProfile参数获取内存空间大小,默认设置为非堆ing。其中pageSize参数就是MemorySegment的大小,如下代码默认为32kb。

TaskManagerOptions.
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)  
public static final ConfigOption<MemorySize> MEMORY_SEGMENT_SIZE =  
        key("taskmanager.memory.segment-size")  
                .memoryType()  
                .defaultValue(MemorySize.parse("32kb"))  
                .withDescription(  
                        "Size of memory buffers used by the network stack and the memory manager.");
                    

 

3. 算子使用通过MemoryManager使用内存

MemoryManager创建完毕后,会通过TaskSlot将MemoryManager对象传递给Task,此时Task会通过将MemoryManager封装在Environment变量中,然后传递给算子
算子接收到MemoryManager对象后,通过MemoryManager动态申请内存空间,最终用于算子的具体计算过程。

需要注意的是:并不是所有的算子都会使用MemoryManager申请内存空间,这个步骤主要针对批计算类型的算子,例如HashJoinOperator、SortMergeJoinOperator和SortOperator等,这些算子往往需要借助非常大的内存空间进行数据的排序等操作。

 

4. ManagedMemory内存空间申请流程

申请ManagedMemory内存空间,是调用MemoryManager.allocatePages()方法执行的,见如下逻辑。

  • 1)从AllocationRequest参数中获取MemorySegment的空集合、申请Pages总数量以及资源Owner(与内存关联的所有者:slot?还是算子?)等参数,并对参数进行非空和状态检查;
  • 2)计算申请内存大小,并预留出内存空间;
  • 3)根据page数、pageCleanup、owner等,开始分配内存,将内存以MemorySegment为单位,并维护一个set集合,最终返回给算子使用。
/**
从此内存管理器分配一组内存段。
分配的总内存不会超过构造函数中声明的大小限制。
参数:
owner – 与内存段关联的所有者,用于后备释放。 
target – 将分配的内存页放入其中的列表。 numberOfPages – 要分配的页数。
**/
public void allocatePages(Object owner, Collection<MemorySegment> target, int numberOfPages)  
        throws MemoryAllocationException {  
    // sanity check  
    Preconditions.checkNotNull(owner, "The memory owner must not be null.");  
    Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");  
    Preconditions.checkArgument(  
            numberOfPages <= totalNumberOfPages,  
            "Cannot allocate more segments %s than the max number %s",  
            numberOfPages,  
            totalNumberOfPages);  
  
    // reserve array space, if applicable  
    if (target instanceof ArrayList) {  
        ((ArrayList<MemorySegment>) target).ensureCapacity(numberOfPages);  
    }  
    //计算申请内存大小,并预留空间(以免申请过程中被用掉)
    long memoryToReserve = numberOfPages * pageSize;  
    try {  
        memoryBudget.reserveMemory(memoryToReserve);  
    } catch (MemoryReservationException e) {  
        throw new MemoryAllocationException(  
                String.format("Could not allocate %d pages", numberOfPages), e);  
    }  
    //创建pageCleanup方法用于清理unsafe内存
    Runnable pageCleanup = this::releasePage;  
    allocatedSegments.compute(  
            owner,  
            (o, currentSegmentsForOwner) -> {  
                Set<MemorySegment> segmentsForOwner =  
                        currentSegmentsForOwner == null  
                                ? new HashSet<>(numberOfPages)  
                                : currentSegmentsForOwner;  
                for (long i = numberOfPages; i > 0; i--) {  
                 //分配内存
                    MemorySegment segment =  
                            allocateOffHeapUnsafeMemory(getPageSize(), owner, pageCleanup);  
                    target.add(segment);  
                    segmentsForOwner.add(segment);  
                }  
                return segmentsForOwner;  
            });  
  
    Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");  
}					

如下如下算子会申请内存使用:
在这里插入图片描述

 

二.NetworkBuffer内存申请与使用

在Flink内存模型中,另外一个非常重要的堆外内存使用区域就是Network内存。Network内存主要用于网络传输中Buffer数据的缓冲区。

1. NetworkBuffer构造器

在NetworkBufferPool的构造器中可以看出,创建NetworkBufferPool时会根据用户配置的NetworkBuffer数量,调用MemorySegmentFactory创建相应的MemorySegment内存空间,再通过LocalBufferPool应用到ResultSubPartition或InputChannel组件中。

public NetworkBufferPool(
            int numberOfSegmentsToAllocate, int segmentSize, Duration requestSegmentsTimeout) {
        this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
        this.memorySegmentSize = segmentSize;

        Preconditions.checkNotNull(requestSegmentsTimeout);
        checkArgument(
                requestSegmentsTimeout.toMillis() > 0,
                "The timeout for requesting exclusive buffers should be positive.");
        this.requestSegmentsTimeout = requestSegmentsTimeout;

        final long sizeInLong = (long) segmentSize;

        try {
            this.availableMemorySegments = new ArrayDeque<>(numberOfSegmentsToAllocate);
        } catch (OutOfMemoryError err) {
            throw new OutOfMemoryError(
                    "Could not allocate buffer queue of length "
                            + numberOfSegmentsToAllocate
                            + " - "
                            + err.getMessage());
        }

        try {
            //申请segment内存,并放到availableMemorySegments中。
            for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                availableMemorySegments.add(
                        MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
            }
        } catch (OutOfMemoryError err) {
        //如果申请过程中失败,则释放已申请的内存,算出缺少多少内存
            int allocated = availableMemorySegments.size();

            // free some memory
            availableMemorySegments.clear();

            long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20;
            long allocatedMb = (sizeInLong * allocated) >> 20;
            long missingMb = requiredMb - allocatedMb;

            throw new OutOfMemoryError(
                    "Could not allocate enough memory segments for NetworkBufferPool "
                            + "(required (MB): "
                            + requiredMb
                            + ", allocated (MB): "
                            + allocatedMb
                            + ", missing (MB): "
                            + missingMb
                            + "). Cause: "
                            + err.getMessage());
        }

        availabilityHelper.resetAvailable();
        //计算共申请了多少mb:20:为2的20次方
        long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;

        LOG.info(
                "Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
                allocatedMb,
                availableMemorySegments.size(),
                segmentSize);
    }

 
参考:《Flink设计与实现:核心原理与源码解析》- 张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/348194.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Windows11 Copilot助手开启教程(免费GPT-4)

Windows11上开启Copilot助手教程踩坑指南 Copilot介绍Copilot开启步骤1、更新系统2、更改语言和区域3、下载 ViVeTool 工具4、开启Copilot 使用 Copilot介绍 Windows Copilot 是 Windows 11 中的一个新功能&#xff0c;它可以让你与一个智能助理进行对话&#xff0c;获取信息&…

树莓派无显示屏连接

终端命令控制树莓派关机 1&#xff1a;用网线连接树莓派 按照正常的步骤 &#xff0c;搜索控制面板&#xff0c;网络和internet&#xff0c;网络和共享中心&#xff0c;更改适配器设置&#xff0c;右键WIFI&#xff0c;点击属性&#xff0c;点击共享&#xff0c;打勾允许即可&…

5G安卓手机定制_基于天玑900的安卓主板方案

5G安卓手机方案是一款采用联发科MT6877(天玑900)平台的高性能、可运行安卓操作系统的5G智能模块。该手机采用台积电6纳米低功耗工艺&#xff0c;主频高达2.4GHz&#xff0c;内存支持LPDDR5&#xff0c;并支持5G Sub-6GHz全频段和5G双载波聚合技术等多种制式。同时&#xff0c;该…

Typora1.7.6安装、激活、图床设置和使用

1.安装Typora 双击”typora-setup-x64-1.7.6.exe“安装包。 如果之前安装过先卸载&#xff0c;删除原文件夹。 Typora 1.7.6下载 提取码&#xff1a;ix2b 选择“Install for all users”。 图1-1 选择安装模式 选择安装目录&#xff0c;然后选择“Next”。 图1-2 选择安装路…

23111 C++ day2

思维导图 自己封装一个矩形类(Rect)&#xff0c;拥有私有属性:宽度(width)、高度(height),定义公有成员函数: 初始化函数:void init(int w, int h)更改宽度的函数:set_w(int w)更改高度的函数:set_h(int h) 输出该矩形的周长和面积函数:void show() #include <iostream&g…

web安全学习笔记【10】——数据包分析

基础[1] [2] [3] [4] 入门-HTTP数据包&Postman构造&请求方法&请求头修改&状态码判断[5] [6] [7] #知识点&#xff1a; 1、Web常规-系统&中间件&数据库&源码等 2、Web其他-前后端&软件&Docker&分配站等 3、Web拓展-CDN&WAF&OS…

go语言(十三)-----interface

一、Interface 通用万能类型 空接口int&#xff0c;string&#xff0c;float&#xff0c;struct都实现了interface都可以用interface{}类型,引用任意的数据类型 package mainimport "fmt"//interface()是万能数据类型 func myFunc(arg interface{}) {fmt.Println(&…

pycharm中无法使用anaconda虚拟环境

anaconda里创建了虚拟环境&#xff0c;然后在虚拟环境中明明安装了TensorFlow1.12&#xff0c;但是到pycharm中使用anaconda的虚拟环境时&#xff0c;就是没有TensorFlow1.12&#xff0c;注意下面这幅图 里面有一个选项“use conda package manager”&#xff0c;这个默认是勾…

聊聊 程序员裁员潮:技术变革下的职业危机

前几天一则新闻爆料&#xff1a;一对来自中国的工程师夫妻在美身亡&#xff0c;疑因谷歌裁员致悲剧发生。看到后深感可惜&#xff0c;鲜活的生命就因为裁员殒落了&#xff1b;同时我也深有感触&#xff0c;有幸经历过裁员&#xff0c;体会过那一段低迷不振的日子。 但是回首当下…

Aleo项目详细介绍-一个兼顾隐私和可编程性的隐私公链

Aleo上线在即&#xff0c;整理一篇项目的详细介绍&#xff0c;喜欢的收藏。有计划做aleo节点的可交流。 一、项目简介 Aleo 最初是在 2016 年构思的&#xff0c;旨在研究可编程零知识。公司由 Howard Wu、Michael Beller、Collin Chin 和 Raymond Chu 于 2019 年正式成立。 …

Docker 和 Kubernetes:容器化时代的崛起与演变

在过去的十年间&#xff0c;容器化技术彻底改变了软件开发和部署的面貌。 Docker 的登场无疑是这场变革的催化剂&#xff0c;它将应用和服务的打包、分发、部署流程标准化&#xff0c;让开发者的生活变得更加简单。 紧随其后&#xff0c;Kubernetes 作为容器编排的领军者&#…

文旅AI交互数字人,提升景区数字化导览服务体验

随着数字化的普及&#xff0c;文化旅游逐渐走向数字化&#xff0c;通过数字人技术手段对文化旅游资源进行整合与开发。 AI交互数字人可以部署于交互式终端设备和移动端&#xff0c;可以为游客提供“面对面”的语音交互&#xff0c;提供路径规划、游览路线推荐、景点讲解等服务&…

IP被封怎么办?访问网站时IP被阻止?解决IP禁令全方法

相信很多人遇到过IP禁令&#xff1a;比如你在访问社交媒体、搜索引擎或电子商务网站时会被限制访问&#xff0c;又或者你的的账号莫名被封&#xff0c;这些由于网络上的种种限制我们经常会遭遇IP被封的情况&#xff0c;导致无法使用继续进行网络行动。在本文中&#xff0c;我们…

【Leetcode】2859. 计算 K 置位下标对应元素的和

文章目录 题目思路代码结果 题目 题目链接 给你一个下标从 0 开始的整数数组 nums 和一个整数 k 。 请你用整数形式返回 nums 中的特定元素之和 &#xff0c;这些特定元素满足&#xff1a;其对应下标的二进制表示中恰存在 k 个置位。 整数的二进制表示中的 1 就是这个整数的…

数灵通让抖音跳转微信公众号更方便

当提到抖音跳转微信&#xff0c;人们通常指的是在抖音平台上利用一些方法将用户引导到企业的微信公众号。以下是关于如何实现这一目标的一些建议&#xff1a; 1.扫码跳转&#xff1a;在抖音视频或动态中嵌入一个二维码&#xff0c;用户扫描后可自动跳转至微信公众号页面。 2.个…

ERA ·Era Network:Web3.0社交的破局者

在当今数字化环境中&#xff0c;互联网的集中化严重制约了个人对数据的控制权&#xff0c;引发了对数据隐私、所有权和自主权的重大关切。这一问题尤其在社交网络、数据存储和内容传输等关键领域表现得尤为明显&#xff0c;用户常常感到无法充分掌握自己的数字身份和个人数据。…

IDEA 中怎么查看 Maven 依赖关系图

程序员的公众号&#xff1a;源1024&#xff0c;获取更多资料&#xff0c;无加密无套路&#xff01; 最近整理了一份大厂面试资料《史上最全大厂面试题》&#xff0c;Springboot、微服务、算法、数据结构、Zookeeper、Mybatis、Dubbo、linux、Kafka、Elasticsearch、数据库等等 …

【云原生】Docker如何构建镜像

目录 前言 一、基于已有的镜像创建 步骤一&#xff1a;先基于现有的镜像创建一个容器&#xff0c;然后进入容器去完成修改 步骤二&#xff1a;将该容器作为一个模板提交创建为一个新的镜像 步骤三&#xff1a;基于新的镜像&#xff0c;docker run创建一个容器&#xff0c;进…

进阶C语言-自定义类型

为了便于描述复杂的对象,C语言就支持了自定义类型&#xff0c;其中包括了结构体、枚举和联合体&#xff0c;下面将为大家一一介绍。 自定义类型 &#x1f388;1.结构体&#x1f50e;1.1结构的基础知识&#x1f50e;1.2结构的声明&#x1f50e;1.3特殊的声明&#x1f50e;1.4结构…

Maya---多切割

tips&#xff1a;鼠标左键不能松开 捕捉步长设置