【分布式】大模型分布式训练入门与实践 - 04

大模型分布式训练

  • 数据并行-Distributed Data Parallel
    • 1.1 背景
    • 1.2 PyTorch DDP
      • 1) DDP训练流程
      • 2)DistributedSampler
      • 3)DataLoader: Parallelizing data loading
      • 4)Data-parallel(DP)
      • 5)DDP原理解析
    • 2 模型并行Model Parallel
      • 1)tensor parallel
      • 2) Pipeline parallel
  • 参考

数据并行-Distributed Data Parallel

1.1 背景

数据并行(Distributed Data Parallel)是一种用于加快深度学习模型训练速度的技术。在过去,训练大型模型往往受限于单卡训练的瓶颈,尤其是当处理大规模数据集时。数据并行通过在多个处理单元上同时训练模型,并通过增加BatchSize来提高并行度,有效地减少了训练时间。这种技术在加快深度学习模型训练速度的同时,还提高了模型处理大规模数据集的能力,为解决现实世界中的复杂问题提供了强有力的支持。

1.2 PyTorch DDP

1) DDP训练流程

这里我们通过一段代码过一下DDP的训练流

import torch
import torchvision
import argparse

# Step 1: import distributed
import torch.distributed as dist
parser = argparse.ArgumentParser()

# Step 2: torch.distributed.launchlocal_rank
parser.add_argument("--local_rank", default=-1)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank
torch.cuda.set_device(local_rank)

# Step 3: DDP backend
dist.init_process_group(backend='nccl') # ncclgloompi
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True)

# Step 4: DistributedSamplerDataLoaderbatch_sizebatch_sizebatch_sizebatch_size)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)
device = torch.device("cuda", local_rank)
model = nn.Linear(batch_size*32*32, 10).to(device)

# Step 5: DDP wrapper model
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

for epoch in range(num_epochs):
 # Step 6: sampler epoch and shuffle
 train_loader.sampler.set_epoch(epoch)
 for step, (input, label) in enumerate(train_loader):
 	optimizer.zero_grad()
 	input, label = input.to(device), label.to(device)
	output = model(input) 
	loss = loss_fn(output, label)
	loss.backward()
	optimizer.step()

运行测试一下

## Bash
# 5torch.distributed.launchDDP
# main.pylocal_rank":local_rank"
python -m torch.distributed.launch --nproc_per_node 4 main.py

2)DistributedSampler

在这里插入图片描述
Worldsize: num Hosts x num GPUs per host (= 2x3 = 6)
每个进程都知道本端的进程序号(local rank)和节点序号(host number),可以根据这一点计算全局序号(global rank)
global rank = local rank + num GPUs per host x host number
在这里插入图片描述

在distributed.py的源码中:

# DistributedSampler
def __iter__(self):
	# deterministically shuffle based on epoch
	g = torch.Generator()
	g.manual_seed(self.epoch)
	indices = torch.randperm(len(self.dataset), generator=g).tolist()
	
	# add extra samples to make it evenly divisible
	indices += indices[: self.total_size - len(indices))]
	assert len(indices) == self.total_size
	
	# subsample
	indices = indices(self.rank:self.total_size:self.num_replicas]
	assert len(indeices) == self.num_samples
	print("len_indices {0}".format(len(indices)))
	return iter(indices)

由于每次epoch我们都会随机shuffle数据集,

  • 不同进程之间要怎么保持shuffle后数据集的一致性?
  • 每次epoch怎么保证取到的数据集顺序不一样呢?

原理很简单,就是给不同进程分配数据集的不重叠、不交叉部分。

DistributedSampler使用set_epoch方法当前epoch作为随机数种子,保证每个epoch的随机数种子不一样,从而使得不同epoch下有不同的shuffle结果。

3)DataLoader: Parallelizing data loading

在这里插入图片描述
Dataloader在每个batch的加载过程如下:

  1. 从磁盘加载数据到主机(host)。
  2. 将数据从可页内存(pageable memory)转移到主机上的固定内存(pinnedmemory)
  3. 将数据从固定内存传输到GPU。
  4. 在GPU上运行前向(forward)和反向(backward)传递过程。

当设置多个num_workers时,多个进程同时加载数据,通过预取(prefetch)减少了IO带来的影响。

4)Data-parallel(DP)

model = torch.nn.DataParallel(model)

在这里插入图片描述
在旧版pytorch1.0之前使用DP实现数据并行。整个过程如图,没有主GPU。

  1. 从硬盘加载数据到host端页锁定内存(Page-locked Memory)。用多个工作进程实现数据并行加载。
  2. 把minibatch数据从页锁定内存(Page-locked Memory)并行转移到每个GPU上。不需要广播数据。每个GPU都拥有一个独立的模型拷贝,所以也不需要广播模型。
  3. 每个GPU进行前向计算,输出结果(Output)
  4. 计算loss,计算反向梯度。并利用allreduce并行计算梯度
  5. 更新模型参数。因为每个GPU在一开始就有独立的模型拷贝,而且梯度已经全部完成更新,也不需要模型同步。

在DP模式中,总共只有一个进程,所以存在很多缺点:

  • 冗余的数据拷贝:master GPU拿到数据后需要分发给其他GPU
  • 在forward之前其他GPU需要做一次模型拷贝:这是由于模型参数是在master GPU上更新的,所以要每次forward将更新好的参数同步给其他GPU
  • 存在不必需要的output gather
  • 不均衡的GPU利用率:master GPU占用率很高,CPU要完成数据分发,gather output,loss计算,梯度聚合以及参数更新等。

与DP相比,DDP流程明显就少了很多:

  • 每个GPU基于DistributedSampler load自己的那份数据,而不像DP那样由master GPU load,然后分发给其他GPU
  • master GPU不需要在做额外的那些冗余工作,GPU利用率是均衡的。通过allreduce完成梯度更新后,获得所有GPU的梯度,然后由同步后的梯度更新参数,保证参数一致性。不像DP那样master GPU完成梯度计算和参数更新,然后再同步给其他GPU

5)DDP原理解析

在使用分布式数据并行(DDP)时,我们需要处理的是模型的参数以及梯度。
DDP的核心在于同步各进程间的参数变化量,而不是直接同步参数本身。因此,确保各进程的模型状态一致性变得至关重要。模型的状态主要由模型的权重参数以及梯度组成。
也就是要关注模型的状态一致性,同时需要一个简单的逻辑构成

Parameter和buffer

在PyTorch中,所有的模型都会继承module类。可以说,一个模型就是由一系列module组合而成的。要了解模型,就必须从module下手。
module的基本要素分为两种:1. 状态 2. 钩子(hooks)

当一个模型的网络结构被定义后,其状态就是由parameter和buffer的迭代组合表示的。当我们保存模型,调用model.state_dict()的时候,我们同时会得到模型的parameter和buffer

也就是说,在DDP中,如果我们要在不同进程中维持相同的状态,我们不光要传递parameter的梯度,也要传递buffer。事实上,DDP就是这么做的。当每次网络传播开始前,其都会把master节点上的buffer广播给其他节点,维持状态的统一。

同时再利用钩子(hooks),提供插入接口。
主程序提供的这样的钩子定义,用户实现钩子函数,而这个函数会在实际运行中插入执行。
这个在tf和早期的mxnet中经常见到,就不做赘述。

通过这一系列定义和工具,我们可以完成对模型状态的统一,也可以利用钩子函数来实现整个逻辑结构

具体实现

第一步初始化:

操作具体实现
准备环境 init_process_group如果进程数量不足(小于worldsize),进程会一直等待。
DDP初始化model=DDP(model)parameter和buffer从master节点传到其他节点,使得所有进程状态一致
(这一步过后不可修改模型的任何参数)。
然后根据指定的size进行分组,每一组称为一个bucket(默认25M为一组)。
最后创建reducer管理器,给每个parameter注册平均的hook

正式训练:

操作具体实现
1. 采样数据从dataloader得到一个batch的数据,用于当前计算(for input, lable in dataloader); DistributedSampler会使各个进程之间的数据不重复。
2.forward同步各进程状态(即各进程之间的buffer和parameters。
执行forward过程
(可选)当DDP参数find_unused_parameter为true时,其会在forward结束时,启动一个回溯,标记出所有没被用到的parameter,提前把这些设定为ready。但这个配置默认是关闭的,会影响速度
3.backwardreducer外面:各个进程各自开始从后往前反向计算梯度,即最后面的参数反而是最先得到梯度的。
reducer外面:当某个parameter的梯度计算好了的时候,其之前注册的grad hook就会被触发,在bucket里把这个parameter的状态标记为ready。
reducer里面:当某个bucket的所有parameter都是ready状态时,reducer会开始对这个bucket的所有parameter都开始一个异步的allreduce梯度平均操作。
4.优化器optimizer应用gradient,更新参数这一部分为了保证各个进程参数一直,需要确保optimizer的初始状态和每次step()时的梯度相同。这个在model初始化时就会做,因此optimeizer的初始化必须放在DDP模型创建后。如果是自己实现的optimizer,则需要自己利用接口来保证更新时的一致性。

2 模型并行Model Parallel

背景:数据并行下分布式训练的显存占用
早在AlexNet诞生之时就有模型并行了。
已知训练时占用显存的只有4个部分,parameters,gradients,activation和optimizer states
parameter、gradients和optimizer_states所占的显存训练全程都会存在,activation则只占用前向阶段。
由于大模型的parameters和buffer动则上百G,所以现在的模型并行,是为了保证有限显存情况下,Transformer结构大模型的训练和推理。

1)tensor parallel

tensor parallel是把层的权重(即tensor)切分到不同的 tensor parallel process group所在的rank上。
以Megatron FFN为例:
tensor parallel

def forward(self, hidden_states):
	 # [b,s, h]->[b, s, 4h/tp]
	 intermediate_parallel = self.dense_h_to_4h(hidden_states) # ColumnParallel
	 intermediate_parallel = F.Gelu(intermediate_parallel + bias_parallel)
	 #[b, s, 4h/tp]->[b, s, h]
	 output = self.dense_4h_to_h(intermediate_parallel) # RowParallelLinear
	 return output

column parallel(纵向切分)
在这里插入图片描述
column parallel

def forward(self, input_):
	 # Set up backprop all-reduce. h4h/pgpu
	 identityall-reducef
	 input_parallel = copy_to_tensor_model_parallel_region(input_) 
	 bias = self.bias if not self.skip_bias_add else None 
	 # X * A_i^T = (b, s, h) * (h, 4h/tp) = (b, s, 4h/tp)
	 output_parallel = F.linear(input_parallel, self.weight, bias) 
	 return output_parallel

row parallel(横向)
row parallel

def forward(self, input_):
	 # Matrix multiply. X_i=(b, s, 4h/tp) * A_i^T=(4h/tp, h) -> (b, s, h)
	 output_parallel = F.linear(input_, self.weight) 
	 # All-reduce across all the partitionsg.
	 output_ = reduce_from_tensor_model_parallel_region(output_parallel) 
	 if not self.skip_bias_add:
	 output = output_ + self.bias if self.bias is not None else output_ 
	 else:
	 output = output_
	 return output

疑问:为什么第1个linear层是列切而不是行切?
如果第一个linear层使用行切,relu或者bert中的gelu都是非线性函数,那么我们无法先执行这个非线性函数,然后再相加,即gelu(X1A1)+gelu(X2A2) != gelu(X1A1+X2A2)。所以满足等价性,需要在gelu之前加一个allreduce同步一下,这种做法性能不是最优的。

2) Pipeline parallel

pipeline parallel是把整个模型的层数切分到不同的 pipeline parallel process group所在的rank上,每个pipeline stage拿到的层是不同的,如下图:
在这里插入图片描述
在这里插入图片描述

这样做的话,大部分GPU都处在一个等待状态,所以利用率并不高。
假设有K个GPU,每个mini-batch切分成M个micro-batches,Bubble率为
K − 1 / M + K − 1 K-1/M+K-1 K1/M+K1

GPipe的论文中提到当M大于四倍K的时候,基本上Bubble就可以忽略不计了(还有一部分原因是在计算梯度的时候,作者在前面的层提前计算了activation,而不用等到后面层的梯度计算完,这样能一定程度上实现并行)。

G-pipe
G-pipe
可以看到,G-pipe将1个mini-batch切分成多个micro-batch(上图是8),前向时每个micro-batch从device1流向device4,反向时在device4上算出梯度后并更新device4对应层的参数,然后将梯度send给device3,device3做梯度计算和参数更新后传给device2,以此类推直至device1参数更新完后,第一个mini-batch就结束了。

在G-pipe里,micro-batch切的数量越多,pipeline bubble就越少,但会带来很高的显存占用,因为每份micro-batch都要存储activation为反向计算梯度使用。

为了缓解显存问题,NV提出了1F1B(one forward one backward)方法。
1F1B
与G-pipe不同,1F1B在每算完一个micro-batch的前向后就会立即算反向,反向完成后就可以释放该micro-batch对应的activation。
在step中间稳定阶段,形成1前向1反向的形式,该阶段每个device上只需要保存1份micro-batch的激活。相比于G-pipe要保存micro-batch数量的激活值,无论是显存占用还是pipeline bubble率均减少了很多。
Interleaved 1F1B
为了进一步降低Bubble率,Megatron-LM的第二篇论文里面在PipeDream的基础上做了进一步改进。
Interleaved 1F1B
interleaved-1F1B在 device 数量不变的情况下,对micro-batch进一步切分,分出了更多的 pipeline stage(称为virtual pipeline stage),以更多的通信量,换取pipeline bubble比率降低。

那么它是怎么解决计算负载不均衡呢?假设网络共16层(编号 0-15),4个Device,前述 G-pipe 和 1F1B是分成 4 个stage, 按编号 0-3 层放 Device1,4-7层放 Device2 以此类推。

interleaved-1F1B则是按virtual_pipeline_stage 概念减小切分粒度,以 virtual_pipeline_stage=2 为例,将 0-1 层放 Device1、2-3 层放在 Device2,…,6-7 层放到 Device4,8-9 层继续放在 Device1,
10-11 层放在 Device2,…,14-15 层放在 Device4。

不过如果本身在PipeDream里面每个设备就只执行一层网络的话,就没有办法进一步通过Interleaved 1F1B进行优化了。

Megatron-LM 1F1B核心代码

def forward_backward_pipelining_without_interleaving(...):
 for i in range(num_microbatches_remaining):
 	output_tensor = forward_step(...)
 	if forward_only:
 		p2p_communication.send_forward(output_tensor, timers)
 	else:
 		output_tensor_grad = p2p_communication.send_forward_recv_backward(output_tensor, timers)
 
 	# Add input_tensor and output_tensor to end of list, then pop from the
 	# start of the list for backward pass.
 	input_tensors.append(input_tensor)
 	output_tensors.append(output_tensor)
 
 	if forward_only:
 		if not last_iteration:
 			input_tensor = p2p_communication.recv_forward(timers)
 		else:
 			input_tensor, output_tensor = input_tensors.pop(0), output_tensors.pop(0)
 			input_tensor_grad = backward_step(...)
 			if last_iteration:
 				input_tensor = None
 				p2p_communication.send_backward(input_tensor_grad, timers)
 			else:
 				input_tensor = p2p_communication.send_backward_recv_forward(input_tensor_grad, timers)

参考

  1. How PyTorch implements DataParallel?
  2. PyTorch2 doc:DATAPARALLEL
  3. PyTorch distributed: experiences on accelerating data parallel training
  4. GPU-内存拷贝
  5. https://arxiv.org/pdf/2001.08361.pdf
  6. https://www.usenix.org/conference/osdi20/presentation/jiang

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/103584.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CrossOver 23.6 让Mac可以运行Windows程序的工具

在当今数字化时代,虚拟机技术被广泛应用于软件开发、系统测试、网络安全等领域。虚拟机提供了一个隔离的虚拟环境,使得我们能够在一台物理计算机上同时运行多个操作系统和应用程序。下面我们就来看虚拟机软件怎么安装,虚拟机怎么使用吧&#…

【Linux】TCP协议

文章目录 📖 前言1. TCP协议格式2. 确认应答机制3. 16位窗口大小4. 6个标记位4.1 URG紧急指针标记位: 5. 超时重传机制:6. 连接管理机制6.1 TCP三次握手(重点):6.1 - 1 三次握手的原因6.1 - 2 RST复位标志位…

javaEE -6(10000详解文件操作)

一:认识文件 我们先来认识狭义上的文件(file)。针对硬盘这种持久化存储的I/O设备,当我们想要进行数据保存时,往往不是保存成一个整体,而是独立成一个个的单位进行保存,这个独立的单位就被抽象成文件的概念&#xff0c…

机关事务管理局数字化平台,让数据纵向直报,业务横向打通

机关事务管理局的核心职能是实现对机关事务的管理、保障、服务,是面向政府机关部门的“后勤服务”部门。 主要职能有:推进国有资产管理、公务用车管理、办公用房管理、公共机构节能管理、后勤管理等。党和政府“过紧日子”的要求为机关事务工作提出了更…

光影之梦2:动画渲染前后对比,揭示视觉艺术的惊人转变!

动画渲染是影视艺术中不可或缺的一环,它赋予了角色和场景鲜活的生命。渲染过程中的光影、色彩、材质等元素,像是画家的调色板,将平淡无奇的线条和形状转化为充满韵味与情感的画面。动画角色仿佛拥有了自己的灵魂,无论是一颦一笑&a…

集成学习方法之随机森林-入门

1、 什么是集成学习方法 集成学习通过建立几个模型组合的来解决单一预测问题。它的工作原理是生成多个分类器/模型,各自独立地学习和作出预测。这些预测最后结合成组合预测,因此优于任何一个单分类的做出预测。 2、 什么是随机森林 在机器学习中&…

Parallels Client for Mac:改变您远程控制体验的革命性软件

在当今数字化的世界中,远程控制软件已经成为我们日常生活和工作中不可或缺的一部分。在众多远程控制软件中,Parallels Client for Mac以其独特的功能和出色的性能脱颖而出,让远程控制变得更加简单、高效和灵活。 Parallels Client for Mac是…

Python 面向对象编程:类、对象、初始化和方法详解

Python 是一种面向对象的编程语言。在 Python 中,几乎所有东西都是对象,都具有其属性和方法。 类似于对象构造函数或用于创建对象的“蓝图”的类。 创建一个类 要创建一个类,请使用关键字 class: 示例,创建一个名为…

深度学习——图像分类(CIFAR-10)

深度学习——图像分类(CIFAR-10) 文章目录 前言一、实现图像分类1.1. 获取并组织数据集1.2. 划分训练集、验证集1.3. 图像增广1.4. 引入数据集1.5. 定义模型1.6. 定义训练函数1.7. 训练模型并保存模型参数 二、生成一个桌面小程序2.1. 使用QT设计师设计界…

公立医院绩效考核系统源码,能适应医院多种绩效核算方式,技术架构:springboot、mybaits +avue +MySQL

医院绩效考核系统源码 ,绩效核算系统全套成品源码(有医院项目应用案例)可适应医院多种绩效核算方式。 系统概述: 医院绩效考核管理系统是采用B/S架构模式设计、使用JAVA语言开发、后台使用MySql数据库进行管理的一整套计算机应用…

Python 深度学习入门之CNN

CNN 前言一、CNN简介1、简介2、结构 二、CNN简介1、输出层2、卷积层3、池化层4、全连接层5、输出层 前言 1024快乐!1024快乐!今天开新坑,学点深度学习相关的,说下比较火的CNN。 一、CNN简介 1、简介 CNN的全称是Convolutiona…

网络协议--ARP:地址解析协议

4.1 引言 本章我们要讨论的问题是只对TCP/IP协议簇有意义的IP地址。数据链路如以太网或令牌环网都有自己的寻址机制(常常为48 bit地址),这是使用数据链路的任何网络层都必须遵从的。一个网络如以太网可以同时被不同的网络层使用。例如&#…

【数据结构】数组和字符串(三):特殊矩阵的压缩存储:三角矩阵、对称矩阵——一维数组

文章目录 4.2.1 矩阵的数组表示4.2.2 特殊矩阵的压缩存储a. 对角矩阵的压缩存储b. 三角矩阵的压缩存储结构体初始化元素设置元素获取打印矩阵主函数输出结果代码整合 c. 对称矩阵的压缩存储元素设置元素获取主函数输出结果代码整合 4.2.1 矩阵的数组表示 【数据结构】数组和字…

GEE案例——一个完整的火灾监测案例dNBR差异化归一化烧毁指数

差异化归一化烧毁指数 dNBR是"差异化归一化烧毁指数"的缩写。它是一种用于评估卫星图像中烧毁区域严重程度的遥感指数。dNBR值通过将火灾前的归一化烧毁指数(NBR)减去火灾后的NBR来计算得出。该指数常用于野火监测和评估。 dNBR(差异化归一化烧毁指数)是一种用…

EMC简述01

电磁兼容性(EMC:Electromagnetic Compatibility) 电磁兼容性(EMC)主要分为两种 一种是设备本身的电磁噪声对其他设备或人体带来的影响(电磁干扰,EMI:Electromagnetic Interference…

计算机算法分析与设计(20)---回溯法(0-1背包问题)

文章目录 1. 题目描述2. 算法思路3. 例题分析4. 代码编写 1. 题目描述 对于给定的 n n n 个物品,第 i i i 个物品的重量为 W i W_i Wi​,价值为 V i V_i Vi​,对于一个最多能装重量 c c c 的背包,应该如何选择放入包中的物品…

轻量级导出 Excel 标准格式

一般业务系统中都有导出到 Excel 功能,其实质就是把数据库里面一条条记录转换到 Excel 文件上。Java 常用的第三方类库有 Apache POI 和阿里巴巴开源的 EasyExcel 等。另外也有通过 Web 模板技术渲染 Excel 文件导出,这实质是 MVC 模式的延伸&#xff0c…

React环境初始化

环境初始化 学习目标: 能够独立使用React脚手架创建一个React项目 1.使用脚手架创建项目 官方文档:(https://create-react-app.bootcss.com/)    - 打开命令行窗口    - 执行命令      npx create-react-app projectName    说明&#xff1a…

智安网络|探索语音合成技术的未来:揭秘人工智能配音技术的背后

随着人工智能技术的迅猛发展,配音行业也迎来了人工智能配音技术的崭新时代。人工智能配音技术通过语音合成和自然语言处理等技术手段,实现了逼真的语音合成,为影视、广告和游戏等领域带来了新的可能性。 第一部分:语音合成技术的…

天锐绿盾加密软件——企业数据透明加密、防泄露系统

天锐绿盾是一种企业级数据透明加密、防泄密系统,旨在保护企业的核心数据,防止数据泄露和恶意攻击。它采用内核级透明加密技术,可以在不影响员工正常工作的前提下,对需要保护的数据进行加密操作。 PC访问地址: https:/…