目录
C++平台PyTorch模型部署流程
1.模型转换
1. 不支持的操作
2. 指定数据类型
2.保存序列化模型
3.C++ load训练好的模型
4. 执行Script Module
PyTorch分布式训练
分布式并行训练概述
Pytorch分布式数据并行
手把手渐进式实战
A. 单机单卡
B. 单机多卡DP
C. 多机多卡DDP
D. Launch / Slurm 调度方式
完整框架 Distribuuuu
Pytorch搭建神经网络
C++平台PyTorch模型部署流程
1.模型转换
libtorch不依赖于python,python训练的模型,需要转换为script model才能由libtorch加载,并进行推理。在这一步官网提供了两种方法:
方法一:Tracing
这种方法操作比较简单,只需要给模型一组输入,走一遍推理网络,然后由torch.ji.trace记录一下路径上的信息并保存即可。示例如下:
import torch
import torchvision
# An instance of your model.
model = torchvision.models.resnet18()
# An example input you would normally provide to your model's forward() method.
example = torch.rand(1, 3, 224, 224)
# Use torch.jit.trace to generate a torch.jit.ScriptModule via tracing.
traced_script_module = torch.jit.trace(model, example)
缺点是如果模型中存在控制流比如if-else语句,一组输入只能遍历一个分支,这种情况下就没办法完整的把模型信息记录下来。
方法二:Scripting
直接在Torch脚本中编写模型并相应地注释模型,通过torch.jit.script
编译模块,将其转换为ScriptModule
。示例如下:
class MyModule(torch.nn.Module):
def __init__(self, N, M):
super(MyModule, self).__init__()
self.weight = torch.nn.Parameter(torch.rand(N, M))
def forward(self, input):
if input.sum() > 0:
output = self.weight.mv(input)
else:
output = self.weight + input
return output
my_module = MyModule(10,20)
sm = torch.jit.script(my_module)
-
forward方法会被默认编译,forward中被调用的方法也会按照被调用的顺序被编译
-
如果想要编译一个forward以外且未被forward调用的方法,可以添加
@torch.jit.export
. -
如果想要方法不被编译,可使用
[@torch.jit.ignore](https://link.zhihu.com/?target=https%3A//pytorch.org/docs/master/generated/torch.jit.ignore.html%23torch.jit.ignore)
或者[@torch.jit.unused](https://link.zhihu.com/?target=https%3A//pytorch.org/docs/master/generated/torch.jit.unused.html%23torch.jit.unused)
# Same behavior as pre-PyTorch 1.2
@torch.jit.script
def some_fn():
return 2
# Marks a function as ignored, if nothing
# ever calls it then this has no effect
@torch.jit.ignore
def some_fn2():
return 2
# As with ignore, if nothing calls it then it has no effect.
# If it is called in script it is replaced with an exception.
@torch.jit.unused
def some_fn3():
import pdb; pdb.set_trace()
return 4
# Doesn't do anything, this function is already
# the main entry point
@torch.jit.export
def some_fn4():
return 2
在这一步遇到好多坑,主要原因可归为一下两点
1. 不支持的操作
TorchScript支持的操作是python的子集,大部分torch中用到的操作都可以找到对应实现,但也存在一些尴尬的不支持操作,详细列表可见unsupported-ops(https://pytorch.org/docs/master/jit_unsupported.html#jit-unsupported),下面列一些我自己遇到的操作:
1)参数/返回值不支持可变个数,例如
def __init__(self, **kwargs):
或者
if output_flag == 0:
return reshape_logits
else:
loss = self.loss(reshape_logits, term_mask, labels_id)
return reshape_logits, loss
2)各种iteration操作
eg1.
layers = [int(a) for a in layers]
报错torch.jit.frontend.UnsupportedNodeError: ListComp aren’t supported
可以改成:
for k in range(len(layers)):
layers[k] = int(layers[k])
eg2.
seq_iter = enumerate(scores)
try:
_, inivalues = seq_iter.__next__()
except:
_, inivalues = seq_iter.next()
eg3.
line = next(infile)
3)不支持的语句
eg1. 不支持continue
torch.jit.frontend.UnsupportedNodeError: continue statements aren’t supported
eg2. 不支持try-catch
torch.jit.frontend.UnsupportedNodeError: try blocks aren’t supported
eg3. 不支持with语句
4)其他常见op/module
eg1. torch.autograd.Variable
解决:使用torch.ones/torch.randn等初始化+.float()/.long()等指定数据类型。
eg2. torch.Tensor/torch.LongTensor etc.
解决:同上
eg3. requires_grad参数只在torch.tensor中支持,torch.ones/torch.zeros等不可用
eg4. tensor.numpy()
eg5. tensor.bool()
解决:tensor.bool()用tensor>0代替
eg6. self.seg_emb(seg_fea_ids).to(embeds.device)
解决:需要转gpu的地方显示调用.cuda()
总之一句话:除了原生python和pytorch以外的库,比如numpy什么的能不用就不用,尽量用pytorch的各种API。
2. 指定数据类型
1)属性,大部分的成员数据类型可以根据值来推断,空的列表/字典则需要预先指定
from typing import Dict
class MyModule(torch.nn.Module):
my_dict: Dict[str, int]
def __init__(self):
super(MyModule, self).__init__()
# This type cannot be inferred and must be specified
self.my_dict = {}
# The attribute type here is inferred to be `int`
self.my_int = 20
def forward(self):
pass
m = torch.jit.script(MyModule())
2)常量,使用_Final_关键字
try:
from typing_extensions import Final
except:
# If you don't have `typing_extensions` installed, you can use a
# polyfill from `torch.jit`.
from torch.jit import Final
class MyModule(torch.nn.Module):
my_constant: Final[int]
def __init__(self):
super(MyModule, self).__init__()
self.my_constant = 2
def forward(self):
pass
m = torch.jit.script(MyModule())
3)变量。默认是tensor类型且不可变,所以非tensor类型必须要指明
def forward(self, batch_size:int, seq_len:int, use_cuda:bool):
方法三:Tracing and Scriptin混合
一种是在trace模型中调用script,适合模型中只有一小部分需要用到控制流的情况,使用实例如下:
import torch
@torch.jit.script
def foo(x, y):
if x.max() > y.max():
r = x
else:
r = y
return r
def bar(x, y, z):
return foo(x, y) + z
traced_bar = torch.jit.trace(bar, (torch.rand(3), torch.rand(3), torch.rand(3)))
另一种情况是在script module中用tracing生成子模块,对于一些存在script module不支持的python feature的layer,就可以把相关layer封装起来,用trace记录相关layer流,其他layer不用修改。使用示例如下:
import torch
import torchvision
class MyScriptModule(torch.nn.Module):
def __init__(self):
super(MyScriptModule, self).__init__()
self.means = torch.nn.Parameter(torch.tensor([103.939, 116.779, 123.68])
.resize_(1, 3, 1, 1))
self.resnet = torch.jit.trace(torchvision.models.resnet18(),
torch.rand(1, 3, 224, 224))
def forward(self, input):
return self.resnet(input - self.means)
my_script_module = torch.jit.script(MyScriptModule())
2.保存序列化模型
如果上一步的坑都踩完,那么模型保存就非常简单了,只需要调用save并传递一个文件名即可,需要注意的是如果想要在gpu上训练模型,在cpu上做inference,一定要在模型save之前转化,再就是记得调用model.eval(),形如
gpu_model.eval()
cpu_model = gpu_model.cpu()
sample_input_cpu = sample_input_gpu.cpu()
traced_cpu = torch.jit.trace(traced_cpu, sample_input_cpu)
torch.jit.save(traced_cpu, "cpu.pth")
traced_gpu = torch.jit.trace(traced_gpu, sample_input_gpu)
torch.jit.save(traced_gpu, "gpu.pth")
3.C++ load训练好的模型
要在C ++中加载序列化的PyTorch模型,必须依赖于PyTorch C ++ API(也称为LibTorch)。libtorch的安装非常简单,只需要在pytorch官网下载对应版本,解压即可。会得到一个结构如下的文件夹。
libtorch/
bin/
include/
lib/
share/
然后就可以构建应用程序了,一个简单的示例目录结构如下:
example-app/
CMakeLists.txt
example-app.cpp
example-app.cpp和CMakeLists.txt的示例代码分别如下:
#include <torch/script.h> // One-stop header.
#include <iostream>
#include <memory>
int main(int argc, const char* argv[]) {
if (argc != 2) {
std::cerr << "usage: example-app <path-to-exported-script-module>\n";
return -1;
}
torch::jit::script::Module module;
try {
// Deserialize the ScriptModule from a file using torch::jit::load().
module = torch::jit::load(argv[1]);
}
catch (const c10::Error& e) {
std::cerr << "error loading the model\n";
return -1;
}
std::cout << "ok\n";
}
cmake_minimum_required(VERSION 3.0 FATAL_ERROR)
project(custom_ops)
find_package(Torch REQUIRED)
add_executable(example-app example-app.cpp)
target_link_libraries(example-app "${TORCH_LIBRARIES}")
set_property(TARGET example-app PROPERTY CXX_STANDARD 14)
至此,就可以运行以下命令从example-app/
文件夹中构建应用程序啦:
mkdir build
cd build
cmake -DCMAKE_PREFIX_PATH=/path/to/libtorch ..
cmake --build . --config Release
其中/path/to/libtorch是之前下载后的libtorch文件夹所在的路径。这一步如果顺利能够看到编译完成100%的提示,下一步运行编译生成的可执行文件,会看到“ok”的输出,可喜可贺!
4. 执行Script Module
终于到最后一步啦!下面只需要按照构建输入传给模型,执行forward就可以得到输出啦。一个简单的示例如下:
// Create a vector of inputs.
std::vector<torch::jit::IValue> inputs;
inputs.push_back(torch::ones({1, 3, 224, 224}));
// Execute the model and turn its output into a tensor.
at::Tensor output = module.forward(inputs).toTensor();
std::cout << output.slice(/*dim=*/1, /*start=*/0, /*end=*/5) << '\n';
前两行创建一个torch::jit::IValue
的向量,并添加单个输入. 使用torch::ones()
创建输入张量,等效于C ++ API中的torch.ones
。然后,运行script::Module
的forward
方法,通过调用toTensor()
将返回的IValue值转换为张量。C++对torch的各种操作还是比较友好的,通过torch::或者后加_的方法都可以找到对应实现,例如
torch::tensor(input_list[j]).to(at::kLong).resize_({batch, 128}).clone()
//torch::tensor对应pytorch的torch.tensor; at::kLong对应torch.int64;resize_对应resize
PyTorch分布式训练
分布式并行训练概述
最常被提起,容易实现且使用最广泛的,莫过于数据并行(Data Parallelism) 技术,其核心思想是将大batch划分为若干小barch分发到不同device并行计算,解决单GPU显存不足的限制。与此同时,当单GPU无法放下整个模型时,我们还需考虑 模型并行(Model / Pipeline Parallelism)。如考虑将模型进行纵向切割,不同的Layers放在不同的device上。或是将某些模块进行横向切割,通过矩阵运算进行加速。当然,还存在一些非并行的技术或者技巧,用于解决训练效率或者训练显存不足等问题。
本文的重点是介绍PyTorch原生的分布式数据并行(DDP) 及其用法,其他的内容,我们后面再聊(如果有机会的话qwq)。
这里我草率地将当前深度学习的大规模分布式训练技术分为如下三类:
-
Data Parallelism (数据并行)
-
Naive:每个worker存储一份model和optimizer,每轮迭代时,将样本分为若干份分发给各个worker,实现并行计算
-
ZeRO: Zero Redundancy Optimizer,微软提出的数据并行内存优化技术,核心思想是保持Naive数据并行通信效率的同时,尽可能降低内存占用(https://arxiv.org/abs/1910.02054)
-
-
Model/Pipeline Parallelism (模型并行)
-
Naive: 纵向切割模型,将不同的layers放到不同的device上,按顺序进行正/反向传播(https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html)
-
GPipe:小批量流水线方式的纵向切割模型并行(https://proceedings.neurips.cc/paper/2019/file/093f65e080a295f8076b1c5722a46aa2-Paper.pdf)
-
Megatron-LM:Tensor-slicing方式的模型并行加速(https://github.com/NVIDIA/Megatron-LM)
-
-
Non-parallelism approach (非并行技术)
-
Gradient Accumulation: 通过梯度累加的方式解决显存不足的问题,常用于模型较大,单卡只能塞下很小的batch的并行训练中(https://www.zhihu.com/question/303070254)
-
CPU Offload: 同时利用 CPU 和 GPU 内存来训练大型模型,即存在GPU-CPU-GPU的 transfers操作(https://www.deepspeed.ai/tutorials/zero-offload/)
-
etc.:还有很多不一一罗列(如Checkpointing, Memory Efficient Optimizer等)
-
不过这里我 强推 一下 DeepSpeed,微软在2020年开源的一个对PyTorch的分布式训练进行优化的库,让训练百亿参数的巨大模型成为可能,其提供的 3D-parallelism (DP+PP+MP)的并行技术组合,能极大程度降低大模型训练的硬件条件以及提高训练的效率
Pytorch分布式数据并行
将时间拨回2017年,我第一次接触深度学习,早期的TensorFlow使用的是PS(Parameter Server)架构,在结点数量线性增长的情况下,带宽瓶颈格外明显。而随后百度将Ring-Allreduce技术运用到深度学习分布式训练,PyTorch1.0之后香起来的原因也是因为在分布式训练方面做了较大改动,适配多种通信后端,使用RingAllReduce架构。
确保你对PyTorch有一定的熟悉程度,此前提下,对如下内容进行学习和了解,基本上就能够handle住大部分的数据并行任务了:
-
DataParallel 和 DistributedDataParallel 的原理和使用
-
进程组 和 torch.distributed.init_process_group 的原理和使用
-
集体通信(Collective Communication) 的原理和使用
关于理论的东西,我写了一大堆,最后又全删掉了。原因是我发现已经有足够多的文章介绍 PS/Ring-AllReduce 和 PyTorch DP/DDP 的原理,给出具有代表性的几篇:
-
PYTORCH DISTRIBUTED OVERVIEW(https://pytorch.org/tutorials/beginner/dist_overview.html)
-
PyTorch 源码解读之 DP & DDP(https://zhuanlan.zhihu.com/p/343951042)
-
Bringing HPC Techniques to Deep Learning(https://andrew.gibiansky.com/blog/machine-learning/baidu-allreduce/)
手把手渐进式实战
那么接下来我们以Step by Step的方式进行实践,你可以直接通过下面的快速索引进行跳转,大部分的解释都包含在代码中,每份代码最后也有使用说明和训练Log记录:
-
单机单卡 [snsc.py] https://github.com/BIGBALLON/distribuuuu/blob/master/tutorial/snsc.py
-
单机多卡 (with DataParallel) [snmc_dp.py] https://github.com/BIGBALLON/distribuuuu/blob/master/tutorial/snmc_dp.py
-
多机多卡 (with DistributedDataParallel)
-
torch.distributed.launch [mnmc_ddp_launch.py] https://github.com/BIGBALLON/distribuuuu/blob/master/tutorial/mnmc_ddp_launch.py
-
torch.multiprocessing [mnmc_ddp_mp.py] https://github.com/BIGBALLON/distribuuuu/blob/master/tutorial/mnmc_ddp_mp.py
-
Slurm Workload Manager [mnmc_ddp_slurm.py] https://github.com/BIGBALLON/distribuuuu/blob/master/tutorial/mnmc_ddp_slurm.py
-
ImageNet training example [imagenet.py] https://github.com/BIGBALLON/distribuuuu/blob/master/tutorial/imagenet.py
-
A. 单机单卡
Single Node Single GPU Card Training, 源码见 snsc.py,后续我们会在此代码上进行修改。简单看一下,单机单卡要做的就是定义网络,定义dataloader,定义loss和optimizer,开训,很简单的几个步骤。
"""(SNSC) Single Node Single GPU Card Training"""import torchimport torch.nn as nnimport torchvisionimport torchvision.transforms as transformsBATCH_SIZE = 256EPOCHS = 5if __name__ == "__main__": # 1. define network device = "cuda" net = torchvision.models.resnet18(num_classes=10) net = net.to(device=device) # 2. define dataloader trainset = torchvision.datasets.CIFAR10( root="./data", train=True, download=True, transform=transforms.Compose( [ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize( (0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010) ), ] ), ) train_loader = torch.utils.data.DataLoader( trainset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True, ) # 3. define loss and optimizer criterion = nn.CrossEntropyLoss() optimizer = torch.optim.SGD( net.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0001, nesterov=True, ) print(" ======= Training ======= \n") # 4. start to train net.train() for ep in range(1, EPOCHS + 1): train_loss = correct = total = 0 for idx, (inputs, targets) in enumerate(train_loader): inputs, targets = inputs.to(device), targets.to(device) outputs = net(inputs) loss = criterion(outputs, targets) optimizer.zero_grad() loss.backward() optimizer.step() train_loss += loss.item() total += targets.size(0) correct += torch.eq(outputs.argmax(dim=1), targets).sum().item() if (idx + 1) % 50 == 0 or (idx + 1) == len(train_loader): print( " == step: [{:3}/{}] [{}/{}] | loss: {:.3f} | acc: {:6.3f}%".format( idx + 1, len(train_loader), ep, EPOCHS, train_loss / (idx + 1), 100.0 * correct / total, ) ) print("\n ======= Training Finished ======= \n")"""usage:>>> python snsc.pyFiles already downloaded and verified ======= Training ======= == step: [ 50/196] [1/5] | loss: 1.959 | acc: 28.633% == step: [100/196] [1/5] | loss: 1.806 | acc: 33.996% == step: [150/196] [1/5] | loss: 1.718 | acc: 36.987% == step: [196/196] [1/5] | loss: 1.658 | acc: 39.198% == step: [ 50/196] [2/5] | loss: 1.393 | acc: 49.578% == step: [100/196] [2/5] | loss: 1.359 | acc: 50.473% == step: [150/196] [2/5] | loss: 1.336 | acc: 51.372% == step: [196/196] [2/5] | loss: 1.317 | acc: 52.200% == step: [ 50/196] [3/5] | loss: 1.205 | acc: 56.102% == step: [100/196] [3/5] | loss: 1.185 | acc: 57.254% == step: [150/196] [3/5] | loss: 1.175 | acc: 57.755% == step: [196/196] [3/5] | loss: 1.165 | acc: 58.072% == step: [ 50/196] [4/5] | loss: 1.067 | acc: 60.914% == step: [100/196] [4/5] | loss: 1.061 | acc: 61.406% == step: [150/196] [4/5] | loss: 1.058 | acc: 61.643% == step: [196/196] [4/5] | loss: 1.054 | acc: 62.022% == step: [ 50/196] [5/5] | loss: 0.988 | acc: 64.852% == step: [100/196] [5/5] | loss: 0.983 | acc: 64.801% == step: [150/196] [5/5] | loss: 0.980 | acc: 65.052% == step: [196/196] [5/5] | loss: 0.977 | acc: 65.076% ======= Training Finished ======= """
B. 单机多卡DP
Single Node Multi-GPU Crads Training (with DataParallel),源码见 snmc_dp.py, 和 snsc.py 对比一下,DP只需要花费最小的代价,既可以使用多卡进行训练(其实就一行???),但是因为GIL锁的限制,DP的性能是低于DDP的。
"""(SNMC) Single Node Multi-GPU Crads Training (with DataParallel)Try to compare with smsc.py and find out the differences."""import torchimport torch.nn as nnimport torchvisionimport torchvision.transforms as transforms
BATCH_SIZE = 256EPOCHS = 5
if __name__ == "__main__":
# 1. define network device = "cuda" net = torchvision.models.resnet18(pretrained=False, num_classes=10) net = net.to(device=device) # Use single-machine multi-GPU DataParallel, # you would like to speed up training with the minimum code change. net = nn.DataParallel(net)
# 2. define dataloader trainset = torchvision.datasets.CIFAR10( root="./data", train=True, download=True, transform=transforms.Compose( [ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize( (0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010) ), ] ), ) train_loader = torch.utils.data.DataLoader( trainset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True, )
# 3. define loss and optimizer criterion = nn.CrossEntropyLoss() optimizer = torch.optim.SGD( net.parameters(), lr=0.01, momentum=0.9, weight_decay=0.0001, nesterov=True, )
print(" ======= Training ======= \n")
# 4. start to train net.train() for ep in range(1, EPOCHS + 1): train_loss = correct = total = 0
for idx, (inputs, targets) in enumerate(train_loader): inputs, targets = inputs.to(device), targets.to(device) outputs = net(inputs)
loss = criterion(outputs, targets) optimizer.zero_grad() loss.backward() optimizer.step()
train_loss += loss.item() total += targets.size(0) correct += torch.eq(outputs.argmax(dim=1), targets).sum().item()
if (idx + 1) % 50 == 0 or (idx + 1) == len(train_loader): print( " == step: [{:3}/{}] [{}/{}] | loss: {:.3f} | acc: {:6.3f}%".format( idx + 1, len(train_loader), ep, EPOCHS, train_loss / (idx + 1), 100.0 * correct / total, ) )
print("\n ======= Training Finished ======= \n")
"""usage: 2GPUs for training>>> CUDA_VISIBLE_DEVICES=0,1 python snmc_dp.py
Files already downloaded and verified ======= Training =======
== step: [ 50/196] [1/5] | loss: 1.992 | acc: 26.633% == step: [100/196] [1/5] | loss: 1.834 | acc: 32.797% == step: [150/196] [1/5] | loss: 1.742 | acc: 36.201% == step: [196/196] [1/5] | loss: 1.680 | acc: 38.578% == step: [ 50/196] [2/5] | loss: 1.398 | acc: 49.062% == step: [100/196] [2/5] | loss: 1.380 | acc: 49.953% == step: [150/196] [2/5] | loss: 1.355 | acc: 50.810% == step: [196/196] [2/5] | loss: 1.338 | acc: 51.428% == step: [ 50/196] [3/5] | loss: 1.242 | acc: 55.727% == step: [100/196] [3/5] | loss: 1.219 | acc: 56.801% == step: [150/196] [3/5] | loss: 1.200 | acc: 57.195% == step: [196/196] [3/5] | loss: 1.193 | acc: 57.328% == step: [ 50/196] [4/5] | loss: 1.105 | acc: 61.102% == step: [100/196] [4/5] | loss: 1.098 | acc: 61.082% == step: [150/196] [4/5] | loss: 1.087 | acc: 61.354% == step: [196/196] [4/5] | loss: 1.086 | acc: 61.426% == step: [ 50/196] [5/5] | loss: 1.002 | acc: 64.039% == step: [100/196] [5/5] | loss: 1.006 | acc: 63.977% == step: [150/196] [5/5] | loss: 1.009 | acc: 63.935% == step: [196/196] [5/5] | loss: 1.005 | acc: 64.024%
======= Training Finished ======= """
C. 多机多卡DDP
Okay, 下面进入正题,来看一下多机多卡怎么做,虽然上面给出的文章都讲得很明白,但有些概念还是有必要提一下:
-
进程组的相关概念
-
GROUP:进程组,大部分情况下DDP的各个进程是在同一个进程组下
-
WORLD_SIZE:总的进程数量 (原则上一个process占用一个GPU是较优的)
-
RANK:当前进程的序号,用于进程间通讯,rank = 0 的主机为 master 节点
-
LOCAL_RANK:当前进程对应的GPU号
-
举个栗子 :4台机器(每台机器8张卡)进行分布式训练
通过 init_process_group() 对进程组进行初始化
初始化后 可以通过 get_world_size() 获取到 world size
在该例中为32, 即有32个进程,其编号为0-31<br/>通过 get_rank() 函数可以进行获取 在每台机器上,local rank均为0-8,这是 local rank 与 rank 的区别, local rank 会对应到实际的 GPU ID 上
(单机多任务的情况下注意CUDA_VISIBLE_DEVICES的使用
控制不同程序可见的GPU devices)
-
DDP的基本用法 (代码编写流程)
-
使用 torch.distributed.init_process_group 初始化进程组
-
使用 torch.nn.parallel.DistributedDataParallel 创建 分布式模型
-
使用 torch.utils.data.distributed.DistributedSampler 创建 DataLoader
-
调整其他必要的地方(tensor放到指定device上,S/L checkpoint,指标计算等)
-
使用 torch.distributed.launch / torch.multiprocessing 或 slurm 开始训练
-
-
集体通信的使用
-
torch.distributed
-
NCCL-Woolley
-
scaled_all_reduce
-
将各卡的信息进行汇总,分发或平均等操作,需要使用集体通讯操作(如算accuracy或者总loss时候需要用到allreduce),可参考:
-
-
不同启动方式的用法
-
torch.distributed.launch:mnmc_ddp_launch.py
-
torch.multiprocessing:mnmc_ddp_mp.py
-
Slurm Workload Manager:mnmc_ddp_slurm.py
-
"""(MNMC) Multiple Nodes Multi-GPU Cards Training with DistributedDataParallel and torch.distributed.launchTry to compare with [snsc.py, snmc_dp.py & mnmc_ddp_mp.py] and find out the differences."""
import os
import torchimport torch.distributed as distimport torch.nn as nnimport torchvisionimport torchvision.transforms as transformsfrom torch.nn.parallel import DistributedDataParallel as DDP
BATCH_SIZE = 256EPOCHS = 5
if __name__ == "__main__":
# 0. set up distributed device rank = int(os.environ["RANK"]) local_rank = int(os.environ["LOCAL_RANK"]) torch.cuda.set_device(rank % torch.cuda.device_count()) dist.init_process_group(backend="nccl") device = torch.device("cuda", local_rank)
print(f"[init] == local rank: {local_rank}, global rank: {rank} ==")
# 1. define network net = torchvision.models.resnet18(pretrained=False, num_classes=10) net = net.to(device) # DistributedDataParallel net = DDP(net, device_ids=[local_rank], output_device=local_rank)
# 2. define dataloader trainset = torchvision.datasets.CIFAR10( root="./data", train=True, download=False, transform=transforms.Compose( [ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize( (0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010) ), ] ), ) # DistributedSampler # we test single Machine with 2 GPUs # so the [batch size] for each process is 256 / 2 = 128 train_sampler = torch.utils.data.distributed.DistributedSampler( trainset, shuffle=True, ) train_loader = torch.utils.data.DataLoader( trainset, batch_size=BATCH_SIZE, num_workers=4, pin_memory=True, sampler=train_sampler, )
# 3. define loss and optimizer criterion = nn.CrossEntropyLoss() optimizer = torch.optim.SGD( net.parameters(), lr=0.01 * 2, momentum=0.9, weight_decay=0.0001, nesterov=True, )
if rank == 0: print(" ======= Training ======= \n")
# 4. start to train net.train() for ep in range(1, EPOCHS + 1): train_loss = correct = total = 0 # set sampler train_loader.sampler.set_epoch(ep)
for idx, (inputs, targets) in enumerate(train_loader): inputs, targets = inputs.to(device), targets.to(device) outputs = net(inputs)
loss = criterion(outputs, targets) optimizer.zero_grad() loss.backward() optimizer.step()
train_loss += loss.item() total += targets.size(0) correct += torch.eq(outputs.argmax(dim=1), targets).sum().item()
if rank == 0 and ((idx + 1) % 25 == 0 or (idx + 1) == len(train_loader)): print( " == step: [{:3}/{}] [{}/{}] | loss: {:.3f} | acc: {:6.3f}%".format( idx + 1, len(train_loader), ep, EPOCHS, train_loss / (idx + 1), 100.0 * correct / total, ) ) if rank == 0: print("\n ======= Training Finished ======= \n")
"""usage:>>> python -m torch.distributed.launch --help
exmaple: 1 node, 4 GPUs per node (4GPUs)>>> python -m torch.distributed.launch \ --nproc_per_node=4 \ --nnodes=1 \ --node_rank=0 \ --master_addr=localhost \ --master_port=22222 \ mnmc_ddp_launch.py
[init] == local rank: 3, global rank: 3 ==[init] == local rank: 1, global rank: 1 ==[init] == local rank: 0, global rank: 0 ==[init] == local rank: 2, global rank: 2 == ======= Training =======
== step: [ 25/49] [0/5] | loss: 1.980 | acc: 27.953% == step: [ 49/49] [0/5] | loss: 1.806 | acc: 33.816% == step: [ 25/49] [1/5] | loss: 1.464 | acc: 47.391% == step: [ 49/49] [1/5] | loss: 1.420 | acc: 48.448% == step: [ 25/49] [2/5] | loss: 1.300 | acc: 52.469% == step: [ 49/49] [2/5] | loss: 1.274 | acc: 53.648% == step: [ 25/49] [3/5] | loss: 1.201 | acc: 56.547% == step: [ 49/49] [3/5] | loss: 1.185 | acc: 57.360% == step: [ 25/49] [4/5] | loss: 1.129 | acc: 59.531% == step: [ 49/49] [4/5] | loss: 1.117 | acc: 59.800%
======= Training Finished =======
exmaple: 1 node, 2tasks, 4 GPUs per task (8GPUs)>>> CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch \ --nproc_per_node=4 \ --nnodes=2 \ --node_rank=0 \ --master_addr="10.198.189.10" \ --master_port=22222 \ mnmc_ddp_launch.py
>>> CUDA_VISIBLE_DEVICES=4,5,6,7 python -m torch.distributed.launch \ --nproc_per_node=4 \ --nnodes=2 \ --node_rank=1 \ --master_addr="10.198.189.10" \ --master_port=22222 \ mnmc_ddp_launch.py
======= Training =======
== step: [ 25/25] [0/5] | loss: 1.932 | acc: 29.088% == step: [ 25/25] [1/5] | loss: 1.546 | acc: 43.088% == step: [ 25/25] [2/5] | loss: 1.424 | acc: 48.032% == step: [ 25/25] [3/5] | loss: 1.335 | acc: 51.440% == step: [ 25/25] [4/5] | loss: 1.243 | acc: 54.672%
======= Training Finished =======
exmaple: 2 node, 8 GPUs per node (16GPUs)>>> python -m torch.distributed.launch \ --nproc_per_node=8 \ --nnodes=2 \ --node_rank=0 \ --master_addr="10.198.189.10" \ --master_port=22222 \ mnmc_ddp_launch.py
>>> python -m torch.distributed.launch \ --nproc_per_node=8 \ --nnodes=2 \ --node_rank=1 \ --master_addr="10.198.189.10" \ --master_port=22222 \ mnmc_ddp_launch.py
[init] == local rank: 5, global rank: 5 ==[init] == local rank: 3, global rank: 3 ==[init] == local rank: 2, global rank: 2 ==[init] == local rank: 4, global rank: 4 ==[init] == local rank: 0, global rank: 0 ==[init] == local rank: 6, global rank: 6 ==[init] == local rank: 7, global rank: 7 ==[init] == local rank: 1, global rank: 1 == ======= Training =======
== step: [ 13/13] [0/5] | loss: 2.056 | acc: 23.776% == step: [ 13/13] [1/5] | loss: 1.688 | acc: 36.736% == step: [ 13/13] [2/5] | loss: 1.508 | acc: 44.544% == step: [ 13/13] [3/5] | loss: 1.462 | acc: 45.472% == step: [ 13/13] [4/5] | loss: 1.357 | acc: 49.344%
======= Training Finished ======= """
D. Launch / Slurm 调度方式
这里单独用代码 imagenet.py 讲一下不同的启动方式,更详细的内容请看源码。
我们来看一下这个 setup_distributed
函数:
-
通过 srun 产生的程序在环境变量中会有 SLURM_JOB_ID, 以判断是否为slurm的调度方式
-
rank 通过 SLURM_PROCID 可以拿到
-
world size 实际上就是进程数, 通过 SLURM_NTASKS 可以拿到
-
IP地址通过
subprocess.getoutput(f"scontrol show hostname {node_list} | head -n1")
巧妙得到,栗子来源于 MMCV -
否则,就使用launch进行调度,直接通过 os.environ["RANK"] 和 os.environ["WORLD_SIZE"] 即可拿到 rank 和 world size
# 此函数可以直接移植到你的程序中,动态获取IP,使用很方便# 默认支持launch 和 srun 两种方式def setup_distributed(backend="nccl", port=None): """Initialize distributed training environment. support both slurm and torch.distributed.launch see torch.distributed.init_process_group() for more details """ num_gpus = torch.cuda.device_count()
if "SLURM_JOB_ID" in os.environ: rank = int(os.environ["SLURM_PROCID"]) world_size = int(os.environ["SLURM_NTASKS"]) node_list = os.environ["SLURM_NODELIST"] addr = subprocess.getoutput(f"scontrol show hostname {node_list} | head -n1") # specify master port if port is not None: os.environ["MASTER_PORT"] = str(port) elif "MASTER_PORT" not in os.environ: os.environ["MASTER_PORT"] = "29500" if "MASTER_ADDR" not in os.environ: os.environ["MASTER_ADDR"] = addr os.environ["WORLD_SIZE"] = str(world_size) os.environ["LOCAL_RANK"] = str(rank % num_gpus) os.environ["RANK"] = str(rank) else: rank = int(os.environ["RANK"]) world_size = int(os.environ["WORLD_SIZE"])
torch.cuda.set_device(rank % num_gpus)
dist.init_process_group( backend=backend, world_size=world_size, rank=rank, )
那提交任务就可以灵活切换,下面给出32卡使用Slurm调度,以及8卡单结点的Launch调度:
# ======== slurm 调度方式 ========# 32张GPU,4个node,每个node8张卡,8192的batch size,32个进程# see:https://github.com/BIGBALLON/distribuuuu/blob/master/tutorial/imagenet.pyslurm example: 32GPUs (batch size: 8192) 128k / (256*32) -> 157 itertaion>>> srun --partition=openai -n32 --gres=gpu:8 --ntasks-per-node=8 --job-name=slrum_test \ python -u imagenet.py[init] == local rank: 7, global rank: 7 ==[init] == local rank: 1, global rank: 1 ==[init] == local rank: 4, global rank: 4 ==[init] == local rank: 2, global rank: 2 ==[init] == local rank: 6, global rank: 6 ==[init] == local rank: 3, global rank: 3 ==[init] == local rank: 5, global rank: 5 ==[init] == local rank: 4, global rank: 12 ==[init] == local rank: 1, global rank: 25 ==[init] == local rank: 5, global rank: 13 ==[init] == local rank: 6, global rank: 14 ==[init] == local rank: 0, global rank: 8 ==[init] == local rank: 1, global rank: 9 ==[init] == local rank: 2, global rank: 10 ==[init] == local rank: 3, global rank: 11 ==[init] == local rank: 7, global rank: 15 ==[init] == local rank: 5, global rank: 29 ==[init] == local rank: 2, global rank: 26 ==[init] == local rank: 3, global rank: 27 ==[init] == local rank: 0, global rank: 24 ==[init] == local rank: 7, global rank: 31 ==[init] == local rank: 6, global rank: 30 ==[init] == local rank: 4, global rank: 28 ==[init] == local rank: 0, global rank: 16 ==[init] == local rank: 5, global rank: 21 ==[init] == local rank: 7, global rank: 23 ==[init] == local rank: 1, global rank: 17 ==[init] == local rank: 6, global rank: 22 ==[init] == local rank: 3, global rank: 19 ==[init] == local rank: 2, global rank: 18 ==[init] == local rank: 4, global rank: 20 ==[init] == local rank: 0, global rank: 0 == ======= Training ======= == step: [ 40/157] [0/1] | loss: 6.781 | acc: 0.703% == step: [ 80/157] [0/1] | loss: 6.536 | acc: 1.260% == step: [120/157] [0/1] | loss: 6.353 | acc: 1.875% == step: [157/157] [0/1] | loss: 6.207 | acc: 2.465%
# ======== launch 调度方式 ========# nproc_per_node: 每个node的卡数# nnodes: node数量# node_rank:node编号,从0开始# see: https://github.com/BIGBALLON/distribuuuu/blob/master/tutorial/mnmc_ddp_launch.pydistributed.launch example: 8GPUs (batch size: 2048) 128k / (256*8) -> 626 itertaion>>> python -m torch.distributed.launch \ --nproc_per_node=8 \ --nnodes=1 \ --node_rank=0 \ --master_addr=localhost \ --master_port=22222 \ imagenet.py[init] == local rank: 0, global rank: 0 ==[init] == local rank: 2, global rank: 2 ==[init] == local rank: 6, global rank: 6 ==[init] == local rank: 5, global rank: 5 ==[init] == local rank: 7, global rank: 7 ==[init] == local rank: 4, global rank: 4 ==[init] == local rank: 3, global rank: 3 ==[init] == local rank: 1, global rank: 1 == ======= Training ======= == step: [ 40/626] [0/1] | loss: 6.821 | acc: 0.498% == step: [ 80/626] [0/1] | loss: 6.616 | acc: 0.869% == step: [120/626] [0/1] | loss: 6.448 | acc: 1.351% == step: [160/626] [0/1] | loss: 6.294 | acc: 1.868% == step: [200/626] [0/1] | loss: 6.167 | acc: 2.443% == step: [240/626] [0/1] | loss: 6.051 | acc: 3.003% == step: [280/626] [0/1] | loss: 5.952 | acc: 3.457% == step: [320/626] [0/1] | loss: 5.860 | acc: 3.983% == step: [360/626] [0/1] | loss: 5.778 | acc: 4.492% == step: [400/626] [0/1] | loss: 5.700 | acc: 4.960% == step: [440/626] [0/1] | loss: 5.627 | acc: 5.488% == step: [480/626] [0/1] | loss: 5.559 | acc: 6.013% == step: [520/626] [0/1] | loss: 5.495 | acc: 6.520% == step: [560/626] [0/1] | loss: 5.429 | acc: 7.117% == step: [600/626] [0/1] | loss: 5.371 | acc: 7.580% == step: [626/626] [0/1] | loss: 5.332 | acc: 7.907%
完整框架 Distribuuuu
Distribuuuu 是我闲(没)来(事)无(找)事(事)写的一个完整的纯DDP分类训练框架,足够精简且足够有效率。支持launch和srun两种启动方式,可以作为新手学习和魔改的样板工程。
# 1 node, 8 GPUspython -m torch.distributed.launch \ --nproc_per_node=8 \ --nnodes=1 \ --node_rank=0 \ --master_addr=localhost \ --master_port=29500 \ train_net.py --cfg config/resnet18.yaml# see srun --help # and https://slurm.schedmd.com/ for details
# example: 64 GPUs# batch size = 64 * 128 = 8192# itertaion = 128k / 8192 = 156 # lr = 64 * 0.1 = 6.4
srun --partition=openai-a100 \ -n 64 \ --gres=gpu:8 \ --ntasks-per-node=8 \ --job-name=Distribuuuu \ python -u train_net.py --cfg config/resnet18.yaml \ TRAIN.BATCH_SIZE 128 \ OUT_DIR ./resnet18_8192bs \ OPTIM.BASE_LR 6.4
下面是用 Distribuuuu 做的一些简单的实验,botnet50 是复现了今年比较火的 Transformer+CNN 的文章 Bottleneck Transformers for Visual 的精度,主要是证明这个框架的可用性, resnet18最后小测了 64卡/16384BS 的训练, 精度尚可。另外稍微强调一下SyncBN不要随便乱用,如果单卡Batch已经足够大的情况下不需要开SyncBN。
Distribuuuu benchmark (ImageNet)
如果是出于学习目的,想进行一些魔改和测试,可以试试我的Distribuuuu(https://github.com/BIGBALLON/distribuuuu),因为足够简单很容易改吖 ,如果你想做research的话推荐用FAIR的 pycls, 有model zoo 而且代码足够优雅。另外,打比赛的话就不建议自己造轮子了,分类可直接魔改 pycls 或 MMClassification, 检测就魔改 MMDetection 和 Detectron2
Pytorch搭建神经网络
pytorch的网络搭建,比tensorflow简单很多。格式很好理解。
如果你想做一个网络,需要先定义一个Class,继承 nn.Module(这个是必须的,所以先import torch.nn as nn,nn是一个工具箱,很好用),我们把class的名字就叫成Net.
Class Net (nn.Module):
这个Class里面主要写两个函数,一个是初始化的__init__函数,另一个是forward函数。我们随便搭一个,如下:
def __init__(self):
super().__init__()
self.conv1=nn.Conv2d(1,6,5)
self.conv2=nn.Conv2d(6,16,5)
def forward(self, x):
x=F.max_pool2d(F.relu(self.conv1(x)),2)
x=F.max_pool2d(F.relu(self.conv2(x)),2)
return x
__init__里面就是定义卷积层,当然先得super()一下,给父类nn.Module初始化一下。
(Python的基础知识)在这个里面主要就是定义卷积层的,比如第一层,我们叫它conv1,把它定义成输入1通道,输出6通道,卷积核5*5的的一个卷积层。conv2同理。
神经网络“深度学习”其实主要就是学习卷积核里的参数,像别的不需要学习和改变的,就不用放进去。
比如激活函数relu(),你非要放进去也行,再给它起个名字叫myrelu,也是可以的。forward里面就是真正执行数据的流动。
比如上面的代码,输入的x先经过定义的conv1(这个名字是你自己起的),再经过激活函数F.relu()(这个就不是自己起的名字了,最开始应该先import torch.nn.functional as F,F.relu()是官方提供的函数。
当然如果你在__init__里面把relu定义成了我上面说的myrelu,那你这里直接第一句话就成了x=F.max_pool2d(myrelu(self.conv1(x)),2)。
下一步的F.max_pool2d池化也是一样的,不多废话了。在一系列流动以后,最后把x返回到外面去。
这个Net的Class定义主要要注意两点。
第一:是注意前后输出通道和输入通道的一致性。不能第一个卷积层输出4通道第二个输入6通道,这样就会报错。
第二:它和我们常规的python的class还有一些不同,发现了没有?我们该怎么用这个Net呢?
先定义一个Net的实例(毕竟Net只是一个类不能直接传参数,output=Net(input)当然不行)
net=Net()
这样我们就可以往里传x了,假设你已经有一个要往神经网络的输入的数据“input"(这个input应该定义成tensor类型,怎么定义tensor那就自己去看看书了。)在传入的时候,是:
output=net(input)
看之前的定义:
def __init__(self):
……
def forward(self, x):
……
有点奇怪。好像常规python一般向class里面传入一个数据x,在class的定义里面,应该是把这个x作为形参传入__init__函数里的,而在上面的定义中,x作为形参是传入forward函数里面的。
其实也不矛盾,因为你定义net的时候,是net=Net(),并没有往里面传入参数。如果你想初始化的时候按需传入,就把需要的传入进去。
只是x是神经网络的输入,但是并非是初始化需要的,初始化一个网络,必须要有输入数据吗?
未必吧。只是在传入网络的时候,会自动认为你这个x是喂给forward里面的。也就是说,先定义一个网络的实例net=Net(), 这时调用output=net(input), 可以理解为等同于调用output=net.forward(input), 这两者可以理解为一码事。
在网络定义好以后,就涉及到传入参数,算误差,反向传播,更新权重…确实很容易记不住这些东西的格式和顺序。
传入的方式上面已经介绍了,相当于一次正向传播,把一路上各层的输入x都算出来了。
想让神经网络输出的output跟你期望的ground truth差不多,那就是不断减小二者间的差异,这个差异是你自己定义的,也就是目标函数(object function)或者就是损失函数。
如果损失函数loss趋近于0,那么自然就达到目的了。
损失函数loss基本上没法达到0,但是希望能让它达到最小值,那么就是希望它能按照梯度进行下降。
梯度下降的公式,大家应该都很熟悉,不熟悉的话,建议去看一下相关的理论。谁喜欢看公式呢?所以我这里不讲。
只是你的输入是由你来决定的,那神经网络能学习和决定什么呢?
自然它只能决定每一层卷积层的权重。所以神经网络只能不停修改权重,比如y=wx+b,x是你给的,它只能改变w,b让最后的输出y尽可能接近你希望的y值,这样损失loss就越来越小。
如果loss对于输入x的偏导数接近0了,不就意味着到达了一个极值吗?
而l在你的loss计算方式已经给定的情况下,loss对于输入x的偏导数的减小,其实只能通过更新参数卷积层参数W来实现(别的它决定不了啊,都是你输入和提供的)。
所以,通过下述方式实现对W的更新:(注意这些编号,下面还要提)
【1】 先算loss对于输入x的偏导,(当然网络好几层,这个x指的是每一层的输入,而不是最开始的输入input)
【2】 对【1】的结果再乘以一个步长(这样就相当于是得到一个对参数W的修改量)
【3】 用W减掉这个修改量,完成一次对参数W的修改。
说的不太严谨,但是大致意思是这样的。这个过程你可以手动实现,但是大规模神经网络怎么手动实现?那是不可能的事情。所以我们要利用框架pytorch和工具箱torch.nn。
所以要定义损失函数,以MSEloss为例:
compute_loss=nn.MSELoss()
明显它也是个类,不能直接传入输入数据,所以直接loss=nn.MSEloss(target,output)是不对的。需要把这个函数赋一个实例,叫成compute_loss。
之后就可以把你的神经网络的输出,和标准答案target传入进去:
loss=compute_loss(target,output)
算出loss,下一步就是反向传播:
loss.backward()
这一步其实就是把【1】给算完了,得到对参数W一步的更新量,算是一次反向传播。
这里就注意了,loss.backward()是啥玩意?如果是自己的定义的loss(比如你就自己定义了个def loss(x,y):return y-x )这样肯定直接backward会出错。所以应当用nn里面提供的函数。
当然搞深度学习不可能只用官方提供的loss函数,所以如果你要想用自己的loss函数。
必须也把loss定义成上面Net的样子(不然你的loss不能反向传播,这点要注意,注:这点是以前写的,很久之前的版本不行,现在都可以了,基本不太需要这样了)。
也是继承nn.Module,把传入的参数放进forward里面,具体的loss在forward里面算,最后return loss。__init__()就空着,写个super().__init__就行了。
在反向传播之后,第【2】和第【3】怎么实现?就是通过优化器来实现。让优化器来自动实现对网络权重W的更新。
所以在Net定义完以后,需要写一个优化器的定义(选SGD方式为例):
from torch import optim
optimizer=optim.SGD(net.parameters(),lr=0.001,momentum=0.9)
同样,优化器也是一个类,先定义一个实例optimizer,然后之后会用。
注意在optimizer定义的时候,需要给SGD传入了net的参数parameters,这样之后优化器就掌握了对网络参数的控制权,就能够对它进行修改了。
传入的时候把学习率lr也传入了。
在每次迭代之前,先把optimizer里存的梯度清零一下(因为W已经更新过的“更新量”下一次就不需要用了)
optimizer.zero_grad()
在loss.backward()反向传播以后,更新参数:
optimizer.step()
所以我们的顺序是:
1.先定义网络:写网络Net的Class,声明网络的实例net=Net(),
2.定义优化器
optimizer=optim.xxx(net.parameters(),lr=xxx),
3.再定义损失函数(自己写class或者直接用官方的,compute_loss=nn.MSELoss()或者其他。
4.在定义完之后,开始一次一次的循环:
①先清空优化器里的梯度信息,optimizer.zero_grad();
②再将input传入,output=net(input) ,正向传播
③算损失,loss=compute_loss(target,output) ##这里target就是参考标准值GT,需要自己准备,和之前传入的input一一对应
④误差反向传播,loss.backward()
⑤更新参数,optimizer.step()