pytorch分布式训练

1 基本概念

  • rank:进程号,在多进程上下文中,我们通常假定rank 0是第一个进程或者主进程,其它进程分别具有1,2,3不同rank号,这样总共具有4个进程

  • node:物理节点,可以是一个容器也可以是一台机器,节点内部可以有多个GPU;nnodes指物理节点数量, nproc_per_node指每个物理节点上面进程的数量

  • local_rank:指在一个node上进程的相对序号,local_rank在node之间相互独立

  • WORLD_SIZE:全局进程总个数,即在一个分布式任务中rank的数量

  • Group:进程组,一个分布式任务对应了一个进程组。只有用户需要创立多个进程组时才会用到group来管理,默认情况下只有一个group

如下图所示,共有3个节点(机器),每个节点上有4个GPU,每台机器上起4个进程,每个进程占一块GPU,那么图中一共有12个rank,nproc_per_node=4,nnodes=3,每个节点都一个对应的node_rank。

「注意」 rank与GPU之间没有必然的对应关系,一个rank可以包含多个GPU;一个GPU也可以为多个rank服务(多进程共享GPU),在torch的分布式训练中习惯默认一个rank对应着一个GPU,因此local_rank可以当作GPU号

  • backend 通信后端,可选的包括:nccl(NVIDIA推出)、gloo(Facebook推出)、mpi(OpenMPI)。一般建议GPU训练选择nccl,CPU训练选择gloo

  • master_addr与master_port 主节点的地址以及端口,供init_method 的tcp方式使用。 因为pytorch中网络通信建立是从机去连接主机,运行ddp只需要指定主节点的IP与端口,其它节点的IP不需要填写。 这个两个参数可以通过环境变量或者init_method传入

# 方式1:
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("nccl", 
                        rank=rank, 
                        world_size=world_size)
# 方式2:
dist.init_process_group("nccl", 
                        init_method="tcp://localhost:12355",
                        rank=rank, 
                        world_size=world_size)

2. 使用分布式训练模型

使用DDP分布式训练,一共就如下个步骤:

  • 初始化进程组 dist.init_process_group

  • 设置分布式采样器 DistributedSampler

  • 使用DistributedDataParallel封装模型

  • 使用torchrun 或者 mp.spawn 启动分布式训练

2.1 初始化进程组

进程组初始化如下:

torch.distributed.init_process_group(backend, 
         init_method=None, 
         world_size=-1, 
         rank=-1, 
         store=None,
         ...)
  • backend: 指定分布式的后端,torch提供了NCCL, GLOO,MPI三种可用的后端,通常CPU的分布式训练选择GLOO, GPU的分布式训练就用NCCL即可
  • init_method:初始化方法,可以是TCP连接、File共享文件系统、ENV环境变量三种方式
    •  init_method='tcp://ip:port': 通过指定rank 0(即:MASTER进程)的IP和端口,各个进程进行信息交换。 需指定 rank 和 world_size 这两个参数
    • init_method='file://path':通过所有进程都可以访问共享文件系统来进行信息共享。需要指定rank和world_size参数
    • init_method=env://:从环境变量中读取分布式的信息(os.environ),主要包括 MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE。 其中,rank和world_size可以选择手动指定,否则从环境变量读取 

tcp和env两种方式比较类似(其实env就是对tcp的一层封装),都是通过网络地址的方式进行通信,也是最常用的初始化方法

「case 1」

import os, argparse
import torch
import torch.distributed as dist

parse = argparse.ArgumentParser()
parse.add_argument('--init_method', type=str)
parse.add_argument('--rank', type=int)
parse.add_argument('--ws', type=int)
args = parse.parse_args()

if args.init_method == 'TCP':
 dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765', rank=args.rank, world_size=args.ws)
elif args.init_method == 'ENV':
    dist.init_process_group('nccl', init_method='env://')

rank = dist.get_rank()
print(f"rank = {rank} is initialized")
# 单机多卡情况下,localrank = rank. 严谨应该是local_rank来设置device
torch.cuda.set_device(rank)
tensor = torch.tensor([1, 2, 3, 4]).cuda()
print(tensor)

 假设单机双卡的机器上运行,则「开两个终端」,同时运行下面的命令

# TCP方法
python3 test_ddp.py --init_method=TCP --rank=0 --ws=2
python3 test_ddp.py --init_method=TCP --rank=1 --ws=2
# ENV方法
MASTER_ADDR='localhost' MASTER_PORT=28765 RANK=0 WORLD_SIZE=2 python3 test_gpu.py --init_method=ENV
MASTER_ADDR='localhost' MASTER_PORT=28765 RANK=1 WORLD_SIZE=2 python3 test_gpu.py --init_method=ENV

如果开启的进程未达到 word_size 的数量,则所有进程会一直等待,直到都开始运行,可以得到输出如下:

# rank0 的终端:
rank 0 is initialized
tensor([1, 2, 3, 4], device='cuda:0')
# rank1的终端
rank 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')

在初始化DDP的时候,能够给后端提供主进程的地址端口、本身的RANK,以及进程数量即可。初始化完成后,就可以执行很多分布式的函数了,比如dist.get_rank, dist.all_gather等等

2.2 分布式训练数据加载

DistributedSampler把所有数据分成N份(N为worldsize), 并能正确的分发到不同的进程中,每个进程可以拿到一个数据的子集,不重叠,不交叉

torch.utils.data.distributed.DistributedSampler(
    dataset,
    num_replicas=None, 
    rank=None, 
    shuffle=True, 
    seed=0, 
    drop_last=False)
  • dataset: 需要加载的完整数据集

  • num_replicas: 把数据集分成多少份,默认是当前dist的world_size

  • rank: 当前进程的id,默认dist的rank

  • shuffle:是否打乱

  • drop_last: 如果数据长度不能被world_size整除,可以考虑是否将剩下的扔掉

  • seed:随机数种子。这里需要注意,从源码中可以看出,真正的种子其实是 self.seed+self.epoch 这样的好处是,不同的epoch每个进程拿到的数据是不一样,因此需要在每个epoch开始前设置下:sampler.set_epoch(epoch)

「case 2」

sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, sampler=sampler)
for epoch in range(start_epoch, n_epochs):
  sampler.set_epoch(epoch) # 设置epoch 更新种子
  train(loader)

2.3 模型分布式封装

将单机模型使用torch.nn.parallel.DistributedDataParallel 进行封装

torch.cuda.set_device(local_rank)
model = Model().cuda()
model = DistributedDataParallel(model, device_ids=[local_rank])

「注意」 要调用model内的函数或者属性,使用model.module.xxxx

这样在多卡训练时,每个进程有一个model副本和optimizer,使用自己的数据进行训练,之后反向传播计算完梯度的时候,所有进程的梯度会进行all-reduce操作进行同步,进而保证每个卡上的模型更新梯度是一样的,模型参数也是一致的。

在save和load模型时候,为了减小所有进程同时读写磁盘,一般处理方法是以主进程为主

「case 3」

model = DistributedDataParallel(model, device_ids=[local_rank])
CHECKPOINT_PATH ="./model.checkpoint"
if rank == 0:
  torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# barrier()其他保证rank 0保存完成
dist.barrier()
map_location = {"cuda:0": f"cuda:{local_rank}"}
model.load_state_dict(torch.load(CHECKPOINT_PATH, map_location=map_location))
# 后面正常训练代码
optimizer = xxx
for epoch:
  for data in Dataloader:
      model(data)
      xxx
    # 训练完成 只需要保存rank 0上的即可
    # 不需要dist.barrior(), all_reduce 操作保证了同步性
  if rank == 0:
     torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

2.4 启动分布式训练

如case1所示我们手动运行多个程序,相对繁琐。实际上本身DDP就是一个python 的多进程,因此完全可以直接通过多进程的方式来启动分布式程序。 torch提供了以下两种启动工具来更加方便的运行torch的DDP程序。

2.4.1 mp.spawn

使用torch.multiprocessing(python的multiprocessing的封装类) 来自动生成多个进程

mp.spawn(fn, args=(), nprocs=1, join=True, daemon=False)
  • fn: 进程的入口函数,该函数的第一个参数会被默认自动加入当前进*程的rank, 即实际调用: fn(rank, *args)

  • nprocs: 进程数量,即:world_size

  • args: 函数fn的其他常规参数以tuple的形式传递

「case 4」

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def fn(rank, ws, nums):
    dist.init_process_group('nccl', init_method='tcp://127.0.0.1:28765',
                            rank=rank, world_size=ws)
    rank = dist.get_rank()
    print(f"rank = {rank} is initialized")
    torch.cuda.set_device(rank)
    tensor = torch.tensor(nums).cuda()
    print(tensor)

if __name__ == "__main__":
    ws = 2
    mp.spawn(fn, nprocs=ws, args=(ws, [1, 2, 3, 4]))

直接执行一次命令 python3 test_ddp.py 即可,输出如下:

rank = 0 is initialized
rank = 1 is initialized
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')

这种方式同时适用于TCP和ENV初始化

2.4.2 launch/run

使用torch提供的 torch.distributed.launch工具,可以以模块的形式直接执行

python3 -m torch.distributed.launch --配置 train.py --args参数

常用配置有:

  • --nnodes: 使用的机器数量,单机的话,就默认是1了

  • --nproc_per_node: 单机的进程数,即单机的worldsize

  • --master_addr/port: 使用的主进程rank0的地址和端口

  • --node_rank: 当前的进程rank

在单机情况下, 只有--nproc_per_node 是必须指定的,--master_addr/portnode_rank都是可以由launch通过环境自动配置

「case5 test_dist.py」

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import os

dist.init_process_group('nccl', init_method='env://')

rank = dist.get_rank()
local_rank = os.environ['LOCAL_RANK']
master_addr = os.environ['MASTER_ADDR']
master_port = os.environ['MASTER_PORT']
print(f"rank = {rank} is initialized in {master_addr}:{master_port}; local_rank = {local_rank}")
torch.cuda.set_device(rank)
tensor = torch.tensor([1, 2, 3, 4]).cuda()
print(tensor)

输入如下命令

python3 -m torch.distribued.launch --nproc_per_node=2 test_dist.py

得到如下输出

rank = 0 is initialized in 127.0.0.1:29500; local_rank = 0
rank = 1 is initialized in 127.0.0.1:29500; local_rank = 1
tensor([1, 2, 3, 4], device='cuda:1')
tensor([1, 2, 3, 4], device='cuda:0')

注意:torch1.10开始用终端命令torchrun来代替torch.distributed.launch,具体来说,torchrun实现了launch的一个超集,不同的地方在于:

  • 完全使用环境变量配置各类参数,如RANK,LOCAL_RANK, WORLD_SIZE等,尤其是local_rank不再支持用命令行隐式传递的方式

  • 能够更加优雅的处理某个worker失败的情况,重启worker。需要代码中有load_checkpoint(path)和save_checkpoint(path) 这样有worker失败的话,可以通过load最新的模型,重启所有的worker接着训练。具体参考 imagenet-torchrun

  • 训练的节点数目可以弹性变化

上面的命令可以写成如下

torchrun --nproc_per_node=2 test_dist.py

torchrun或者launch对上面ENV的初始化方法支持最完善,TCP初始化方法的可能会出现问题,因此尽量使用env来初始化dist

3. 分布式做evaluation

分布式做evaluation的时候,一般需要先所有进程的输出结果进行gather,再进行指标的计算,两个常用的函数:

  • dist.all_gather(tensor_list, tensor) : 将所有进程的tensor进行收集并拼接成新的tensorlist返回

  • dist.all_reduce(tensor, op) 这是对tensor的in-place的操作, 对所有进程的某个tensor进行合并操作,op可以是求和等

「case 6 test_ddp.py」

import torch
import torch.distributed as dist

dist.init_process_group('nccl', init_method='env://')
rank = dist.get_rank()
torch.cuda.set_device(rank)

tensor = torch.arange(2) + 1 + 2 * rank
tensor = tensor.cuda()
print(f"rank {rank}: {tensor}")

tensor_list = [torch.zeros_like(tensor).cuda() for _ in range(2)]
dist.all_gather(tensor_list, tensor)
print(f"after gather, rank {rank}: tensor_list: {tensor_list}")

dist.barrier()
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
print(f"after reduce, rank {rank}: tensor: {tensor}")

通过torchrun --nproc_per_node=2 test_ddp.py 输出结果如下: 

rank 1: tensor([3, 4], device='cuda:1')
rank 0: tensor([1, 2], device='cuda:0')
after gather, rank 1: tensor_list: [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')]
after gather, rank 0: tensor_list: [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')]
after reduce, rank 0: tensor: tensor([4, 6], device='cuda:0')
after reduce, rank 1: tensor: tensor([4, 6], device='cuda:1')

在evaluation的时候,可以拿到所有进程中模型的输出,最后统一计算指标,基本流程如下

pred_list = []
for data in Dataloader:
    pred = model(data)
    batch_pred = [torch.zeros_like(label) for _ in range(world_size)]
    dist.all_gather(batch_pred, pred)
    pred_list.extend(batch_pred)
pred_list = torch.cat(pred_list, 1)
# 所有进程pred_list是一致的,保存所有数据模型预测的值

4. 常用函数

  • torch.distributed.get_rank(group=None) 获取当前进程的rank

  • torch.distributed.get_backend(group=None) 获取当前任务(或者指定group)的后端

  • data_loader_train = torch.utils.data.DataLoader(dataset=data_set, batch_size=32,num_workers=16,pin_memory=True)

    • num_workers: 加载数据的进程数量,默认只有1个,增加该数量能够提升数据的读入速度。(注意:该参数>1,在低版本的pytorch可能会触发python的内存溢出)
    • pin_memory: 锁页内存,加快数据在内存上的传递速度。 若数据加载成为训练速度的瓶颈,可以考虑将这两个参数加上

进程内指定显卡,很多场景下使用分布式都是默认一张卡对应一个进程,所以通常,我们会设置进程能够看到卡数

# 方式1:在进程内部设置可见的device
torch.cuda.set_device(args.local_rank)
# 方式2:通过ddp里面的device_ids指定
ddp_model = DDP(model, device_ids=[rank]) 
# 方式3:通过在进程内修改环境变量
os.environ['CUDA_VISIBLE_DEVICES'] = loac_rank

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/189296.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式篇---第七篇

系列文章目录 文章目录 系列文章目录前言一、如何将长链接转换成短链接,并发送短信?二、长链接和短链接如何互相转换?三、长链接和短链接的对应关系如何存储?四、如何提高系统的并发能力?前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一…

Docker Swarm总结+CI/CD Devops、gitlab、sonarqube以及harbor的安装集成配置(3/4)

博主介绍:Java领域优质创作者,博客之星城市赛道TOP20、专注于前端流行技术框架、Java后端技术领域、项目实战运维以及GIS地理信息领域。 🍅文末获取源码下载地址🍅 👇🏻 精彩专栏推荐订阅👇🏻…

Guitar Pro8.0.2吉他编曲软件 吉他打谱软件 吉他作曲软件

Guitar Pro8.0.2在音乐的大舞台上,谁不想成为一位吉他弹奏大师呢?但在现实中,学吉他并非一蹴而就,许多小伙伴都因为吉他的上手难度而被浇灭学习的热情。然而,这里有一款神奇的软件,叫做,它就像是…

bugku 渗透测试

场景1 查看源代码 场景2 用dirsearch扫描一下看看 ok看到登录的照应了第一个提示 进去看看 不出所料 随便试试admin/admin进去了 在基本设置里面看到falg 场景3 确实是没啥想法了 找到php在线运行 检查网络,我们发现这个php在线运行会写入文件 那我们是不是写…

【数据结构】树与二叉树(廿五):树搜索指定数据域的结点(算法FindTarget)

文章目录 5.3.1 树的存储结构5. 左儿子右兄弟链接结构 5.3.2 获取结点的算法1. 获取大儿子、大兄弟结点2. 搜索给定结点的父亲3. 搜索指定数据域的结点a. 算法FindTargetb. 算法解析c. 代码实现a. 使用指向指针的指针b. 直接返回找到的节点 4. 代码整合 5.3.1 树的存储结构 5.…

Visual Studio 使用MFC 单文档工程绘制单一颜色直线和绘制渐变颜色的直线(实例分析)

Visual Studio 使用MFC 单文档工程从创建到实现绘制单一颜色直线和绘制渐变颜色的直线 本文主要从零开始创建一个MFC单文档工程然后逐步实现添加按键(事件响应函数),最后实现单一颜色直线的绘制与渐变色直线的绘制o( ̄▽&#xffe…

利用chart.js来完成动态网页显示拆线图的效果

<% page language"java" contentType"text/html; charsetUTF-8"pageEncoding"UTF-8"%><%! String list"[一月份, 二月份, 三月份,四月份, 五月份, 六月份, 七月]"; String label"我的一个折线图"; String data &qu…

RabbitMQ之MQ的可靠性

文章目录 前言一、数据持久化交换机持久化队列持久化消息持久化 二、LazyQueue控制台配置Lazy模式代码配置Lazy模式更新已有队列为lazy模式 总结 前言 消息到达MQ以后&#xff0c;如果MQ不能及时保存&#xff0c;也会导致消息丢失&#xff0c;所以MQ的可靠性也非常重要。 一、…

Scrapy爬虫异步框架(一篇文章齐全)

1、Scrapy框架初识 2、Scrapy框架持久化存储&#xff08;点击前往查阅&#xff09; 3、Scrapy框架内置管道&#xff08;点击前往查阅&#xff09; 4、Scrapy框架中间件&#xff08;点击前往查阅&#xff09; Scrapy 是一个开源的、基于Python的爬虫框架&#xff0c;它提供了…

SQL Server秘籍:数据分隔解密,数据库处理新境界!

点击上方蓝字关注我 在数据数据过程中经常会遇到数据按照一定字符进行拆分&#xff0c;而在不同版本的SQL SERVER数据库中由于包含的函数不同&#xff0c;处理的方式也不一样。本文将列举2个版本的数据库中不同的处理方法。 1. 使用 XML 方法 在SQL SERVER 2016版本之前&#x…

C#,数值计算——有理函数插值和外推(Rational_interp)的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { /// <summary> /// 有理函数插值和外推 /// Rational Function Interpolation and Extrapolation /// Given a value x, and using pointers to data xx and yy, this routine returns …

2.1 总线问题

同一时间只能有一个去控制总线,因此需要一个输出开关去确保总线不出错 一旦同时开启输出开关,下面的锁存器还会被上面的数据修改如果上下同时开启可写,且同时开启可输出, 则短路

从零开始的RISC-V模拟器开发(一)环境搭建

前言 博主这系列文章是跟随中科院吴伟老师的b站公开课&#xff1a;[完结]从零开始的RISC-V模拟器开发第一季2021春季_哔哩哔哩_bilibili 记录的笔记。仅供学习使用&#xff0c;侵删&#xff01; 苦逼的博主现在自己毕设也是要设计类似的东西。哎。我需要做的是给一个现成的 R…

歌曲《难不难》由歌手荆涛演唱:面对挑战,勇敢前行

在人生的旅途中&#xff0c;我们都会遭遇种种困难和挑战。有时&#xff0c;一个看似简单的创意或想法&#xff0c;想要实现它却需要经历无数次的实践和辛酸。歌曲《难不难》由歌手荆涛演唱&#xff0c;以平实的语言和流畅的旋律&#xff0c;表达了面对困难和挑战时&#xff0c;…

01 _ 高并发系统:它的通用设计方法是什么?

我们知道&#xff0c;高并发代表着大流量&#xff0c;高并发系统设计的魅力就在于我们能够凭借自己的聪明才智设计巧妙的方案&#xff0c;从而抵抗巨大流量的冲击&#xff0c;带给用户更好的使用体验。这些方案好似能操纵流量&#xff0c;让流量更加平稳地被系统中的服务和组件…

用友NC word.docx接口存在任意文件读取漏洞 附POC

@[toc] 用友NC word.docx接口存在任意文件读取漏洞 附POC 免责声明:请勿利用文章内的相关技术从事非法测试,由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失,均由使用者本人负责,所产生的一切不良后果与文章作者无关。该文章仅供学习用途使…

多线程详解(未完结)

文章目录 ⭐️写在前面的话⭐️一、线程简介1.1 进程1.2 线程1.3 多线程和多进程的区别1.4 总结 二、继承实现2.1 继承Thread类例子&#xff1a;网图下载 2.2 实现Runnable接口 (推荐)案例&#xff1a;火车站买票问题案例&#xff1a;龟兔赛跑 2.3 实现Callable接口 (了解即可)…

虚拟机可ping树莓派树莓派无法ping虚拟机 的解决办法

问题描述 在学习交叉编译的过程中&#xff0c;发现了树莓派无法ping通虚拟机的问题。所以我尝试了各种ping&#xff0c;发现&#xff1a; 虚拟机可以ping通树莓派和主机树莓派可以ping通主机主机可以ping通树莓派和虚拟机唯独树莓派没法ping通虚拟机 尝试各种方法后找到一种…

突破技术障碍:软件工程师如何应对项目中的难题?

在软件开发项目中&#xff0c;工程师常常会遇到各种技术难题。这些难题可能涉及到复杂的算法、不兼容的系统、难以预见的软件行为&#xff0c;或者其他许多方面。 以下是一些策略和方法&#xff0c;可以帮助软件工程师有效地应对这些挑战&#xff1a; 1、理解问题&#xff1a;…

王道p150 14.假设二叉树采用二叉链表存储结构,设计一个算法,求非空二叉树 b的宽度(即具有结点数最多的那一层的结点个数) (c语言代码实现)

采用层次遍历的方法求出所有结点的层次&#xff0c;并将所有结点和对应的层次放在一个队列中。然后通过扫描队列求出各层的结点总数&#xff0c;最大的层结点总数即为二叉树的宽度。 /* A B C D E F …