随着模型规模的逐渐增大,需要的算力逐渐增强,但是算力需求增长速度远高于芯片算力增长速度。现在唯一的解决方案只有通过超大规模集群训练大模型。
大集群训练大模型的挑战
内存墙
200B参数量的模型,参数内存占用745GB内存,训练过程需要3500GB内存(参数+激活+梯度+优化器状态),一个模型需要128张卡才能放下。
性能墙
大模型切分到集群后,通信成为了主要的性能瓶颈。模型训练时,主要有几种并行策略:
- 模型并行:模型的不同层或不同部分被分布在不同设备
- Pipeline并行:不同部分的前向计算和反向计算被分配到不同设备
- 数据并行:不同设备处理同一个模型的不同批次
设备通讯主要发生在激活函数或者梯度需要在不同集群间传递(模型、管道并行),参数在GPU间需要同步(数据并行)。
大模型数千亿的参数导致了巨大的激活值和梯度值。受有限带宽的影响,在不同的设备间转移这些数据成为瓶颈。同时,即使带宽足够,小信息间初始化通信的延迟也会显著增加。
效率墙
算法的分布式并行开发一直是一道难题。如何让用户高效编写分布式代码,超大规模AI训练需要怎样的编程范式?
调优墙
在数千节点集群上,要保证计算的正确性、性能、可用性,手工难以全面兼顾。
大集群快速故障恢复
千卡大集群训练时间长,故障不可避免,跨苏故障恢复可减少等待时间。MindSpore解决方法如下:
- 定期保存CKPT(CheckPoint)文件
- 故障检测隔离
- 资源重条度
- 加载故障时刻CKPT
- 恢复训练任务
基于HCCL集合通信库实现分布式训练
HCCL(Huawei Collective Communication Library)基于Ascend芯片的高性能集合通信库,提供单机多卡、多机多卡集合通信原语。
数据并行
左右表示两个Ascend卡各自运行一个进程,每张卡上运行同一个模型、参数、优化器状态,复制n份,每张卡输入不同数据,进行一次反向传播后获得梯度, 随后进行一次allreduce,把每一张卡上得到的梯度聚合至一起,取平均后分发回各张卡。
存在问题:
要求单卡能放下整个模型
多卡训练时内存冗余(多存了n-1份模型参数)
MindSpore中以BERT与训练为例的数据并行
from mindspore.parallel._utils import _get_device_num, _get_gradients_mean
# 6. Pretrain
mean = _get_gradients_mean()
degree = _get_device_num()
grad_reducer = nn.DistributedGradReducer(optimizer.parameters, mean, degree)
def train_step(input_ids, input_mask, masked_lm_ids, masked_lm_positions, masked_lm_weghts, next_sentence_label, segment_ids):
status = init_register()
input_ids = ops.depend(input_ids, status)
(total_loss, masked_lm_loss, next_sentence_loss), grads = grad_fn(input_ids, input_mask, segment_ids, masked_lm_ids, masked_lm_positions, masked_lm_weights, next_sentence_label)
grads = clip_by_global_norm(grads, clip_norm=1.0)
# 进行allreduce操作,分发梯度
grads = grad_reducer(grads)
status = all_finite(grads, status)
if status:
total_loss = loss_scaler.unscale(total_loss)
grads = loss_scaler.unscale(grads)
total_loss = ops.depend(total_loss, optimizer(grads))
total_loss = ops.depend(total_loss, loss_scaler.adjust(status))
return total_loss, masked_lm_loss, next_sentence_loss, status
模型并行
模型并行是算子层面的并行,他利用某些算子的特性将算子拆分到多个设备上进行计算。因此不是网络中的所有算子都可以拆分计算。可以产分的算子需满足如下特性:
- 可以并行计算的算子
- 算子其中一个输入来自parameter
矩阵乘法(全连接层、attention核心)
左侧为一张卡进行矩阵乘法的示意图。
右侧为模型并行在两张卡上的示意图:X保持一样,W参数矩阵分为两份,分别和X进行矩阵乘法。最后将两张卡乘法的输出结果合并到一起。
即X.shape为(m, k),W1.shape为(k, n),拆分后,W1'.shape为(k, n/2),W1''.shape为(k, n/2),相乘输出为(m, n/2),合并后仍为(m, n) 。
但不是所有算子都适合这样的模型并行。
MindSpore算子级并行
MindSpore屏蔽了模型并行的前置和后置工作(通信、算子排布),开发者只需关心数据如何进行切分即可。
- MindSpore对每个算子独立建模,用户可以设置正向网络中每个算子的切分策略(对于未设置的算子,默认按数据并行进行切分)。
- 在构图阶段,框架将遍历正向图,根据算子的切分策略对每个算子及其输入张量进行切分建模,使得该算子的计算逻辑在切分前后保持数学等价。
- 框架内部使用Tensor Layout来表达输入输出张量在集群中的分布状态,Tensor Layout中包含了张量和设备间的映射关系,用户无需感知模型各切片在集群中如何分布,框架将自动调度分配。
- 框架还将遍历相邻算子间张量的Tensor Layout,如果前一个算子输出张量作为下一个算子的输入张量,且前一个算子输出张量的Tensor Layout与下一个算子输入张量的TensorLayout不同,则需要在两个算子之间进行张量重排布(Tensor Redistribution)
- 对于训练网络来说,框架处理完正向算子的分布式切分之后,依靠框架的自动微分能力即能自动完成反向算子的分布式切分。
示例
用户在4卡计算两个连续的二维矩阵乘法Z=(X * W) * V,第一个矩阵Y = X * W,用户想把X按行切4fen(数据并行),第二个矩阵Z = Y * V,用户想把V按列切4份(模型并行)
import mindspore.nn as nn
from mindspore import ops
import mindspore as ms
ms.set_auto_parallel_context(parallel_mode="semi_autoparallel", device_num=4)
class DenseMatMulNet(nn.Cell):
def __init__(self):
super(DenseMatMulNet, self).__init__()
# 切分只需要配置shard接口即可
# 对于matmul1, 接受两个输入,第一个输入,第一维切成四份,第二维不进行切割
# 第二个输入,均不切割
self.matmul1 = ops.MatMul.shard((4, 1), (1, 1))
# 对于matmul2,第二个输入的第二个维度切成四份
self.matmul2 = ops.MatMul.shard((1, 1), (1, 4))
def construct(self, x, w ,v):
y = self.matmul1(x, w)
z = self.matmul2(y, v)
return z
# a simple example to demenstarte auto data parallel and model parallel on Mindspore
import sys
import numpy as np
import mindspore.nn as nn
import mindspore as ms
from mindspore.nn import Cell, Momentum
from mindspore.ops import operations as ops
from mindspore.nn import SoftmaxCrossEntropyWithLogits
from mindspore import train
import mindspore.dataset as ds
import mindspore.communication as D
from mindspore.common.initializer import initializer
# generate fake dataset
step_per_epoch = 4
def get_dataset(*inputs):
def generate():
for _ in range(step_per_epoch):
yield inputs
return generate
# define a simple net which will cut data into pieces for multi-npu training
class Net(Cell):
def __init__(self):
super().__init__()
self.matmul = ops.MatMul().shard(((1, 2), (2, 1)))
self.weight = ms.Parameter(initializer("normal", [32, 16]), "w1")
self.relu = ops.ReLU().shard(((2, 1),))
def construct(self, x):
out = self.matmul(x, self.weight)
out = self.relu(out)
return out
# 设置运行环境的context。
# 在运行程序之前,应配置context。如果没有配置,默认情况下将根据设备目标进行自动设置。
# Compile the graph once and execute it multiple times.
ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend")
# Initialize distributed training environment
D.init()
# get rank of current device in the whole distributed computation
rank = D.get_rank()
# set semi-auto-parallel
ms.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=2, full_batch=True)
np.random.seed(1)
input_data = np.random.rand(16, 32).astype(np.float32)
label_data = np.random.rand(16, 16).astype(np.float32)
fake_dataset = get_dataset(input_data, label_data)
net = Net()
# define callback function while traing, including loss monitor and checkpoint file save path
callback = [train.LossMonitor(), train.ModelCheckpoint(directory="{}".format(rank))]
# generate dataset and define data column
dataset = ds.GeneratorDataset(fake_dataset, ["input", "label"])
# define loss function
loss = nn.SoftmaxCrossEntropyWithLogits()
lr = 1e-3
momentum=0.1
num_epochs = 5
optimizer = nn.Momentum(net.trainable_params(), lr, momentum)
model = ms.Model(net, loss_fn=loss, optimizer=optimizer)
model.train(num_epochs, dataset, callbacks=callback, dataset_sink_mode=False)
Pipeline并行
受Server间通信带宽低的影响,传统数据并行叠加模型并行的混合模式性能表现欠佳,需要引入流水线并行。流水线并行是将神经网络中的算子切分为多个阶段,再把阶段映射到不同设备上,使得不同设备去计算神经网络的不同部分。相当于每张卡上只放模型的其中基层或一层
如果Pipeline并行每次都知计算一批数据,不能充分利用多卡优势,因为总有卡处于空闲在等待数据。
wile提升效率,流水线并行进一步把mini-barch划分为micro-batch,在微批次中采用流水线式的执行序列,从而提升效率。
1F1B
一次个micro-batch进行正向后,立即进行一次反向计算。使得内存可以更早释放,进而确保内存使用峰值更低。1F1B是现在的主流方式。
示例
MindSpore中,Pipeline的实现方式非常简单。通过调用pipeline_stage接口来指定每个layer要在哪个stage上去执行。pipeline接口的洗礼度微Cell,即只要继承了nn.Cell的实例,就要配置pipeline_stage,并且需要按照网络执行的先后顺序,从小到大进行配置。
...
class ResNet(nn.Cell):
"""ResNet"""
def __init__(self, block, num_classes=100, batch_size=32):
"""init"""
super(ResNet, self).__init__()
self.batch_size = batch_size
self.num_classes = num_classes
self.head = Head()
self.layer1 = MakeLayer0(block, in_channels=64, out_channels=256, stride=1)
self.layer2 = MakeLayer1(block, in_channels=256, out_channels=512, stride=2)
self.layer3 = MakeLayer2(block, in_channels=512, out_channels=1024, stride=2)
self.layer4 = MakeLayer3(block, in_channels=1024, out_channels=2048, stride=2)
self.pool = ops.ReduceMean(keep_dims=True)
self.squeeze = ops.Squeeze(axis=(2, 3))
self.fc = fc_with_initialize(512 * block.expansion, num_classes)
# pipeline parallel config
self.head.pipeline_stage = 0
self.layer1.pipeline_stage = 0
self.layer2.pipeline_stage = 0
self.layer3.pipeline_stage = 1
self.layer4.pipeline_stage = 1
self.fc.pipeline_stage = 1
...
Pipeline并行的结果,只会在一张卡上输出loss,因为loss是集合到最后一张卡上进行输出的,故其他卡没有loss。
内存优化
重计算
计算某些反向算子时,需要一些正向算子的计算结果,导致这些正向算子结果需要驻留在内存中,知道依赖他们的反向算子计算完,这些正向算子的计算结果占用的内存才会被复用。
重计算是一种时间换空间的方法。为了降低内存峰值,重计算技术可以不保存正向计算结果,在反向计算时,重新计算一遍正向结果,使得内存可以被重复利用。
虽然运算速度慢了,但是可以有更多可利用的内存,可以增大batch_size大小。
MindSpore中,重计算接口可以针对单个算子和Cell进行设置。当用户调用Cell的重计算接口时,整个Cell中的所有正向算子都会被设置为重计算。
优化器并行(ZeRO)
进行数据并行训练时,模型的参数更新部份在各个卡之间存在冗余,优化器并行通过将优化器的计算量分散到数据并行维度的卡上,在大规模网络上可以有效减少内存消耗并提升网络性能。
Baseline为传统的数据并行,即每个一卡都存一份参数、梯度和优化器状态。优化器并行就是在每张卡上分发一份需要维护的optimizer status。
MindSpore的优化器并行,全局配置即可
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, enable_parallel_optimizer=True)
MindSpore分布式并行模式
半自动:如数据并行配置shard,Pipeline并行需要配置Pipeline_config仍需要用户自己指定参数配置,所以这是半自动的。
(全)自动并行:框架自动配置一个并行配置策略,适合想要并行训练但是不知道如何配置策略的用户。只需配置ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.AUTO_PARALELL)即可