Megatron-LM源码系列(七):Distributed-Optimizer分布式优化器实现Part2

1. 使用入口

  • DistributedOptimizer类定义在megatron/optimizer/distrib_optimizer.py文件中。创建的入口是在megatron/optimizer/__init__.py文件中的get_megatron_optimizer函数中。根据传入的args.use_distributed_optimizer参数来判断是用DistributedOptimizer还是Float16OptimizerWithFloat16Params
def get_megatron_optimizer(model,
                           no_weight_decay_cond=None,
                           scale_lr_cond=None,
                           lr_mult=1.0):
        ...
        # Megatron optimizer.
        opt_ty = DistributedOptimizer \
            if args.use_distributed_optimizer else \
            Float16OptimizerWithFloat16Params
        return opt_ty(optimizer,
                      args.clip_grad,
                      args.log_num_zeros_in_grad,
                      params_have_main_grad,
                      args.use_contiguous_buffers_in_local_ddp,
                      args.fp16,
                      args.bf16,
                      args.params_dtype,
                      grad_scaler,
                      model)
  • 相关的Optimizer的使用参考【Megatron-LM源码系列(六):Distributed-Optimizer分布式优化器实现Part1】

2. 初始化init源码说明

在这里插入图片描述

  • 初始化的过程很大程度对应的上图grad buffer分片的实现,对应init函数如下:
    def __init__(self, optimizer, clip_grad, log_num_zeros_in_grad,
                 params_have_main_grad, use_contiguous_buffers_in_local_ddp,
                 fp16, bf16, params_dtype, grad_scaler, models):
  • init时会通过build_model_gbuf_range_map函数先创建grad buffer的范围映射,也就是对应图中的world_index/local_index/param_index三个。这里的self.models是一个list类型,对于使用了interleave流水线方式的训练来说,这里的self.models中会保存多份model, 其余情况list中只有一个元素。
        # Model grad buffer ranges.
        self.model_gbuf_ranges = []
        for model_index, model in enumerate(self.models):
            self.model_gbuf_ranges.append(self.build_model_gbuf_range_map(model))
  • build_model_gbuf_range_map会依次按grad buffer中类型来进行range的初始化build_model_gbuf_range。这里定义了一个单独的Range类。

    @classmethod
    def build_model_gbuf_range_map(cls, model):
        """
        Create param-to-grad-buffer mappings, for grad buffer data types
        within a specific virtual model.
        """
        return {
            dtype : cls.build_model_gbuf_range(model, dtype)
            for dtype in model._grad_buffers
        }
        
class Range:
    """
    A range represents a start and end points for indexing a shard
    from a full tensor.
    """
    def __init__(self, start, end):
        self.start = start
        self.end = end
        self.size = end - start
    def normalize(self, start = 0):
        return Range(start, start + self.size)
    def __str__(self):
        return "%d,%d [%d]" % (self.start, self.end, self.size)
    def __len__(self):
        return self.end - self.start
  • build_model_gbuf_range初始化range的流程如下:
    • 获取DP的rank,计算单个Grad buffer切片的大小
    • 保存当前rank的world range和local range, 分别对应world index和local index
    • 计算param的range范围,对应param index
    • 返回当前rank的相关range范围
    @classmethod
    def build_model_gbuf_range(cls, model, dtype):
        # 获取DP的rank
        data_parallel_rank = mpu.get_data_parallel_rank()
        data_parallel_world_size = mpu.get_data_parallel_world_size()

        # 计算单个Grad buffer切片的大小
        grad_buffer = model._grad_buffers[dtype]
        gbuf_size = grad_buffer.numel
        max_gbuf_range_size = int(math.ceil(gbuf_size / data_parallel_world_size))

        # 跟据DDP的rank总数,分别计算每个rank对应的全局range
        gbuf_world_all_ranges = []
        for r in range(data_parallel_world_size):
            gbuf_world_start = r * max_gbuf_range_size
            gbuf_world_end = min(gbuf_size, gbuf_world_start+max_gbuf_range_size)
            gbuf_world_range = Range(gbuf_world_start, gbuf_world_end)
            gbuf_world_all_ranges.append(gbuf_world_range)
            
        # 保存当前rank的world range和local range
        # Local DP's ranges.
        gbuf_world_range = gbuf_world_all_ranges[data_parallel_rank]
        gbuf_local_range = gbuf_world_range.normalize()
        
        # 计算param的range范围
        param_range_map = cls.build_model_gbuf_param_range_map(model,
                                                               dtype,
                                                               gbuf_world_range)
        
        # Group into dict.
        data = {
            "local" : gbuf_local_range,
            "world" : gbuf_world_range,
            "world_all" : gbuf_world_all_ranges,
            "param_map" : param_range_map,
            "max_range_size" : max_gbuf_range_size,
        }

        return data
  • 接着会根据当前rank相关的Range内容self.model_gbuf_ranges调用build_model_param_gbuf_map函数,主要作用是创建model_gbuf_ranges的逆映射,保存param->(modex_index, type)的映射。
class DistributedOptimizer(MixedPrecisionOptimizer):
    def __init__(...):
        ...
        self.model_param_gbuf_map = \
            self.build_model_param_gbuf_map(self.model_gbuf_ranges)
        ...
            
    def build_model_param_gbuf_map(cls, model_gbuf_ranges):
        """
        Create a reverse of the model_gbuf_ranges, for referencing in
        opposite direction.
        """
        param_gbuf_map = {}
        for model_index, model_gbuf_range_map in enumerate(model_gbuf_ranges):
            for dtype, gbuf_range_map in model_gbuf_range_map.items():
                for param, param_range_map in gbuf_range_map["param_map"].items():
                    param_gbuf_map[param] = (model_index, dtype)
        return param_gbuf_map
  • self.build_model_param_gbuf_map之后是初始化Optimizer对应的local group range,Optimizer原本有param_groups包括多个参数组,这里build_optimizer_group_ranges为了创建param参数到group_index的map映射,也就是<model_parameter:group_index>;self.build_model_param_gbuf_map最后对每个group_range中增加新的orig_grouporig_group_idx两个key,原来group_range初始化的时候只有params一个key
class DistributedOptimizer(MixedPrecisionOptimizer):
    def __init__(...):
        ...
        # Optimizer ranges.
        self.model_param_group_index_map, self.opt_group_ranges = \
            self.build_optimizer_group_ranges(self.optimizer.param_groups,
                                              self.model_gbuf_ranges)
        ...

    def build_optimizer_group_ranges(cls, param_groups, model_gbuf_ranges):
        # 获取param_groups中组的个数
        num_groups = len(param_groups)
        
        # 创建全局的参数到group_index的map映射,也就是<model_parameter:group_index>
        world_param_group_map = {}
        for group_index, group in enumerate(param_groups):
            for param in group["params"]:
                assert param.requires_grad
                world_param_group_map[param] = group_index

        # 创建当前rank的local_param_group_map, local_param_group_map是param与(group_index, group_params_len)的映射, local_param_group_map虽然返回了但后面没用
        local_param_group_map = {}
        group_ranges = [ {"params": []} for _ in param_groups ]
        for model_gbuf_range_map in model_gbuf_ranges:
            for dtype, gbuf_range_map in model_gbuf_range_map.items():
                for param in gbuf_range_map["param_map"]:
                    group_index = world_param_group_map[param]
                    group_range = group_ranges[group_index]
                    group_range["params"].append(param)
                    local_param_group_map[param] = \
                        (group_index, len(group_range["params"]) - 1)
        # Squeeze zero-size group ranges.
        for group_index, group_range in enumerate(group_ranges):
            group_range["orig_group"] = param_groups[group_index]
            group_range["orig_group_idx"] = param_groups[group_index]

        return local_param_group_map, group_ranges
  • 在初始化Optimizer之后,是通过创建self.build_model_and_main_param_groups创建optimizer step要用到的main parameter groups, 这里的group一方面是要进行reduce和gather通信操作,另一方面是被优化器用于梯度的更新操作。
class DistributedOptimizer(MixedPrecisionOptimizer):
    def __init__(...):
        ...
        # Allocate main param shards.
        (
            self.model_float16_groups,
            self.model_fp32_groups,
            self.shard_float16_groups,
            self.shard_fp32_groups,
            self.shard_fp32_from_float16_groups,
        ) = self.build_model_and_main_param_groups(self.model_gbuf_ranges,
                                                   self.model_param_gbuf_map,
                                                   self.opt_group_ranges)
        ...

  • self.build_model_and_main_param_groups的实现主要是关于fp32/fp16/bf16三种类型训练时优化器内的显存分配。
    @classmethod
    def build_model_and_main_param_groups(cls,
                                          model_gbuf_ranges,
                                          param_gbuf_map,
                                          opt_group_ranges):
        ...
        # 保存原本fp16类型param
        model_float16_groups = []
        # 保存原本fp32类型param
        model_fp32_groups = []
        # 保存原本fp16类型param的切片
        shard_float16_groups = []
        # 保存原本fp32类型param的切片
        shard_fp32_groups = []
        # 保存原本fp16类型param的fp32类型param的副本
        shard_fp32_from_float16_groups = []
        
        # 分配每个group的param参数切片
        for group_index, group_range in enumerate(opt_group_ranges):
            for model_param in group_range["params"]:
                if model_param.type() in ['torch.cuda.HalfTensor',
                                          'torch.cuda.BFloat16Tensor']:
                    # 如果是fp16/bf16类型参数,clone为fp32类型的切片.
                    shard_model_param = model_param.detach().view(-1) \
                        [param_range.start:param_range.end]
                    shard_main_param = shard_model_param.clone().float()
                    ...
                    # 添加到group中
                    model_float16_params_this_group.append(model_param)
                    shard_float16_params_this_group.append(shard_model_param)
                    shard_fp32_from_float16_params_this_group.append(shard_main_param)
                elif model_param.type() == 'torch.cuda.FloatTensor':
                    # 如果是fp32类型参数,不进行clone,直接引用
                    shard_model_param = model_param.view(-1) \
                        [param_range.start:param_range.end]
                    model_fp32_params_this_group.append(model_param)
                    shard_fp32_params_this_group.append(shard_model_param)
                    ...
            # 更新优化器的参数
            group_range["orig_group"]["params"] = [
                *shard_fp32_params_this_group,
                *shard_fp32_from_float16_params_this_group,
            ]
        return (
            model_float16_groups,
            model_fp32_groups,
            shard_float16_groups,
            shard_fp32_groups,
            shard_fp32_from_float16_groups,
        )
  • 在Optimizer init中,接下来是初始化self.param_buffers,这里的self.param_buffers是DDP模型的grad buffer的view示图,跟grad buffer共享存储,但是用自己的数据类型;最后更新优化器的param_groups。
class DistributedOptimizer(MixedPrecisionOptimizer):
    def __init__(...):
        ...
        # 初始化self.param_buffers
        self.param_buffers = []
        for model_index, model in enumerate(self.models):
            current_param_buffers = {}
            for dtype, grad_buffer in model._grad_buffers.items():
                # 获取存储,这里是兼容的写法.
                try:
                    storage = grad_buffer.data.storage()._untyped()
                except:
                    storage = grad_buffer.data.storage().untyped()
                # 基于grad_buffer的storage创建param_buffer类型,这里的params_dtype是参数类型; 这里的torch.tensor没有autograd的历史。
                param_buffer = torch.tensor(
                    storage,
                    dtype = params_dtype,
                    device = grad_buffer.data.device)
                param_buffer = param_buffer[:grad_buffer.numel_padded]
                # 这里的dtype是grad_buffer的类型
                current_param_buffers[dtype] = param_buffer
            self.param_buffers.append(current_param_buffers)
        
        # 最后更新优化器的param_groups
        self.optimizer.param_groups = \
            [ g["orig_group"] for g in self.opt_group_ranges ]
        self.optimizer.load_state_dict(self.optimizer.state_dict())

3. 参考

  • Megatron-LM源码系列(六):Distributed-Optimizer分布式优化器实现Part1
  • NVIDIA/Megatron-LM

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/367417.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++初阶】--入门基础(二)

目录 一.C输出与输入 二.缺省参数 1.概念 2.缺省参数分类 (1) 全缺省参数 (2)半缺省参数 三.函数重载 1.概念 2.C支持函数重载的原理--名字修饰 四.引用 1.概念 2.语法 3.引用的特性 (1)引用在定义时必须初始化 (2)引用时不能改变指向 (3)一个变量…

区间时间检索

前端 <el-col :md"6" v-if"advanced"><el-form-item :label"$t(inRecord.column.createTime)"><el-date-pickerstyle"width: 100%;"v-model"daterangeCreateTime"value-format"yyyy-MM-dd"type&qu…

装饰你的APP:使用Lottie-Android创建动画效果

装饰你的APP&#xff1a;使用Lottie-Android创建动画效果 1. Lottie-Android简介 Lottie-Android是一个强大的开源库&#xff0c;由Airbnb开发&#xff0c;旨在帮助开发者轻松地在Android应用中添加高质量的动画效果。它基于Adobe After Effects软件中的Bodymovin插件&#x…

【项目简记】逆向工程裸机内核镜像

本教程将是裸机逆向工程系列的一部分。 自从拆解了几部安卓手机后&#xff0c;我对嵌入式系统的兴趣越来越大。 虽然手机本身并不是嵌入式系统&#xff0c;但我知道手机最终会取代计算机&#xff1b;因此&#xff0c;我想学习更多关于它们的知识。 就在那时&#xff0c;我开始…

Linux 系统开始配置

文章目录 备份源为root 设置密码安装基本工具切换root 用户删除snap从 Ubuntu 移除 Snap 后使用 deb 文件安装软件商店和 Firefox在 Ubuntu 系统恢复到 Snap 软件包总结 删除 vim安装neovim在线安装neovim压缩安装neovim安装lazyvim安装剪切板 安装qt配置 Qt 环境不在sudoers文…

SAP 消息号 FAGL_CLOSING_ACT011

在S4当中&#xff0c;月末外币评估的时候&#xff0c;会出现如下报错&#xff1a; 解决方法是&#xff1a; “创建错误更正和暂记会计核算运行标识的编号范围、 在ECS中创建凭证编号范围” 给以上2个事务&#xff0c;添加号码范围即可。

关于破解IDEA后启动闪退的问题

问题描述&#xff1a;2023.1启动不了&#xff0c;双击桌面图标&#xff0c;没有响应。 解决办法&#xff1a; 打开C:\Users\c\AppData\Roaming\JetBrains\IntelliJIdea2023.1\idea64.exe.vmoptions 这个文件。 内容如下所示&#xff1a; 删除红框的数据以后&#xff0c;再登录…

使用 IDEA 开发一个简单易用的 SDK

目录 一、什么是 SDK 二、为什么要开发 SDK 三、开发 SDK 的详细步骤 四、导入 SDK 进行测试 附&#xff1a;ConfigurationProperties 注解的介绍及使用 一、什么是 SDK 1. 定义&#xff1a;软件开发工具包 Software Development Kit 2. 用于开发特定软件或应用程序的工…

LLM之Agent(十一)| 多智能体框架CrewAI与AutoGen相比

基于LLM构建的Agent中有一个明显的现象就是多智能体体系结构的表现要超越单智能体&#xff0c;即使单智能体使用无可挑剔的提示策略。本文将探索另一个有趣的多智能体框架——CrewAI。 一、CrewAI整体优势 CrewAI可以应用在生成环境中。它在发言人的反应和编排上牺牲了一点灵活…

【BIAI】Lecture 9-Motor system 1

Motor System 专业词汇 skeletal muscle 骨骼肌 smooth muscle 平滑肌 cardiac muscle 心肌 flexor reflex 屈曲反射 central pattern generators 中央模式生成器 bio-inspired bipedal robots 仿生双足机器人 myotatic stretch reflex 肌肉自伸展反射 Cortex optic nerve 视皮…

架构整洁之道-软件架构-概述、独立性、划分边界与边界剖析

6 软件架构 6.1 什么是软件架构 “架构”这个词给人的直观感受就是充满了权力和神秘感&#xff0c;因此谈论架构总让人有一种正在进行责任重大的决策或者深度技术分析的感觉。而软件架构师的工作内容究竟是什么呢&#xff1f; 软件架构师自身需要是程序员&#xff0c;并且必须…

C语言指针学习 之 指针变量

前言&#xff1a; 通过学习我们认识了什么是指针&#xff0c;就让我们一起来分析一个例子。 #include<stdio.h> int main() {int a100;int * hz; hz &a;printf("a%d \n",a);printf("*hz%d \n",*hz);return 0; }a100 *hz100 PS C:\csay\cyuya…

spring boot yaml文件中如何设置duration对象值

Spring Boot对表示持续时间有专门的支持。如果您公开java.time.Duration属性&#xff0c;则应用程序对应Duration类型的属性有以下格式可用: long类型的常规表示(使用毫秒作为默认单位&#xff0c;除非指定了DurationUnit)java.time.Duration 使用的标准ISO-8601格式其中值和单…

【巧用异或】单身狗2题解

✨✨欢迎大家来到Celia的博客✨✨ &#x1f389;&#x1f389;创作不易&#xff0c;请点赞关注&#xff0c;多多支持哦&#x1f389;&#x1f389; 所属专栏&#xff1a;【每日刷题】C语言 个人主页&#xff1a;Celias blog~ 题目 一个数组中只有两个数字是出现一次&#xff0c…

应对手机数据丢失的5大安卓数据恢复软件

我们都去过那里。您的手机上的数据丢失了&#xff0c;现在无法恢复。这尤其令人恐惧&#xff0c;因为我们的手机上都有如此多的信息。从图片、应用程序、个人信息&#xff0c;甚至是来自可能已不复存在的亲人的短信和语音邮件。这种情况确实发生了&#xff0c;而且也不仅仅是An…

【Java程序设计】【C00239】基于Springboot的漫画之家管理系统(有论文)

基于Springboot的漫画之家管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的漫画之家系统 本系统分为系统功能模块、管理员功能模块以及用户功能模块。 系统功能模块&#xff1a;在系统首页可以查看首页&a…

【Tomcat与网络3】Tomcat的整体架构

目录 1.演进1&#xff1a;将连接和处理服务分开 2演进2&#xff1a;Container的演进 3 再论Tomcat的容器结构 4 Tomcat处理请求的过程 5 请求的处理过程与Pipeline-Valve管道 在前面我们介绍了Servlet的基本原理&#xff0c;本文我们结合Tomcat来分析一下如何设计一个大型…

Flutter开发2:安装Flutter

在本篇博客中&#xff0c;我们将详细介绍如何安装Flutter开发环境。安装Flutter是开始使用Flutter进行跨平台移动应用开发的第一步。让我们开始吧&#xff01; 官方安装文档 步骤1&#xff1a;下载Flutter SDK 打开浏览器&#xff0c;访问Flutter官方网站&#xff1a;https://…

latex multirow学习

今天搞了一晚上的这个multirow&#xff0c;总算弄出来了几个比较好的例子&#xff0c;主要是这个multirow的语法我没看懂&#xff0c;这个逻辑我是没理解&#xff0c;就很尴尬&#xff0c;一改就报错&#xff0c;只能先弄几个例子&#xff0c;自己慢慢试 \documentclass{artic…

Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询

1概况 本文展示如何使用 Flink CDC Iceberg Doris 构建实时湖仓一体的联邦查询分析&#xff0c;Doris 1.1版本提供了Iceberg的支持&#xff0c;本文主要展示Doris和Iceberg怎么使用&#xff0c;大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。 2系统架构 我们整…