PyTorch数据并行(DP/DDP)浅析

一直以来都是用的单机单卡训练模型,虽然很多情况下已经足够了,但总有一些情况得上分布式训练:

  • 模型大到一张卡放不下;
  • 单张卡batch size不敢设太大,训练速度慢;
  • 当你有好几张卡,不想浪费;
  • 展示一下技术

由于还没遇到过一张显卡放不下整个模型的情况,本文的分布式训练仅限数据并行。主要从数据并行的原理和一些简单的实践例子进行说明。

文章目录

    • 原理介绍
    • DataParallel
      • 小样
    • DistributedDataParallel
      • 小样
      • 一些概念
    • DDP与DP的区别
    • 参考

原理介绍

与每个step一个batch数据相比,数据并行是指每个step用更多的数据(多个batch)进行计算——即多个batch的数据并行进行前向计算。既然是并行,那么就涉及到多张卡一起计算。单卡和多卡训练过程如下图1所示,主要有三个过程:

  • 各卡分别计算损失和梯度,即图中红线部分;
  • 所以梯度整合到主device,即图中蓝线部分;
  • 主device进行参数更新,并将新模型拷贝到其他device上,即图中绿线部分。
../_images/ps.svg
左图是单GPU训练;右图是多GPU训练的一个变体:(1)计算损失和梯度,(2)所有梯度聚合在一个GPU上,(3)发生参数更新,并将参数重新广播给所有GPU

如果不使用数据并行,在显存足够的情况下,我们可以将batch_size设大,这和数据并行的区别在哪呢?如果只将batch_size设大,计算还是在一张卡上完成,速度相对来说是不如将数据均分后放在不同卡上并行计算的。当然,考虑到卡之间的通信问题,要发挥多卡并行的力量需要进行一定权衡。

torch中主要有两种数据并行方式:DP和DDP。

DataParallel

DP是较简单的一种数据并行方式,直接将模型复制到多个GPU上并行计算,每个GPU计算batch中的一部分数据,各自完成前向和反向后,将梯度汇总到主GPU上。其基本流程:

  1. 加载模型、数据至内存;
  2. 创建DP模型;
  3. DP模型的forward过程:
    1. 一个batch的数据均分到不同device上;
    2. 为每个device复制一份模型;
    3. 至此,每个device上有模型和一份数据,并行进行前向传播;
    4. 收集各个device上的输出;
  4. 每个device上的模型反向传播后,收集梯度到主device上,更新主device上的模型,将模型广播到其他device上;
  5. 3-4循环。

在DP中,只有一个主进程,主进程下有多个线程,每个线程管理一个device的训练。因此,DP中内存中只存在一份数据,各个线程间是共享这份数据的。DP和Parameter Server的方式很像。

小样

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

# 假设我们有一个简单的数据集类
class SimpleDataset(Dataset):
    def __init__(self, data, target):
        self.data = data
        self.target = target

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.target[idx]

# 假设我们有一个简单的神经网络模型
class SimpleModel(nn.Module):
    def __init__(self, input_dim):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(input_dim, 1)

    def forward(self, x):
        return torch.sigmoid(self.fc(x))

# 假设我们有一些数据
n_sample = 100
n_dim = 10
batch_size = 10
X = torch.randn(n_sample, n_dim)
Y = torch.randint(0, 2, (n_sample, )).float()

dataset = SimpleDataset(X, Y)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# ===== 注意:刚创建的模型是在 cpu 上的 ===== #
device_ids = [0, 1, 2]
model = SimpleModel(n_dim).to(device_ids[0])
model = nn.DataParallel(model, device_ids=device_ids)


optimizer = optim.SGD(model.parameters(), lr=0.01)

for epoch in range(10):
    for batch_idx, (inputs, targets) in enumerate(data_loader):
        inputs, targets = inputs.to('cuda'), targets.to('cuda')
        outputs = model(inputs)
        
        loss = nn.BCELoss()(outputs, targets.unsqueeze(1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}')

其中最重要的一行便是:

model = nn.DataParallel(model, device_ids=device_ids)

注意,模型的参数和缓冲区都要放在device_ids[0]上。在执行forward函数时,模型会被复制到各个GPU上,对模型的属性进行更新并不会产生效果,因为前向完后各个卡上的模型就被销毁了。只有在device_ids[0]上对模型的参数或者buffer进行的更新才会生效!2

DistributedDataParallel

DDP,顾名思义,即分布式的数据并行,每个进程独立进行训练,每个进程会加载完整的数据,但是读取不重叠的数据。DDP执行流程3

  • 准备阶段

    • 环境初始化
      • 在各张卡上初始化进程并建立进程间通信,对应代码:init_process_group
    • 模型广播
      • 将模型parameter、buffer广播到各节点,对应代码:model = DDP(model).to(local_rank)
    • 创建管理器reducer,给每个参数注册梯度平均hook。
  • 准备数据

    • 加载数据集,创建适用于分布式场景的数据采样器,以防不同节点使用的数据不重叠。
  • 训练阶段

    • 前向传播
      • 同步各进程状态(parameter和buffer);
      • 当DDP参数find_unused_parametertrue时,其会在forward结束时,启动一个回溯,标记未用到的参数,提前将这些设置为ready
    • 计算梯度
      • reducer外面:各进程各自开始反向计算梯度;
      • reducer外面:当某个参数的梯度计算好了,其之前注册的grad hook就会触发,在reducer里把这个参数的状态标记为ready
      • reducer里面:当某个bucket的所有参数都是ready时,reducer开始对这个bucket的所有参数开始一个异步的all-reduce梯度平均操作;
      • reducer里面:当所有bucket的梯度平均都结束后,reducer把得到的平均梯度正式写入到parameter.grad里。
    • 优化器应用梯度更新参数。

小样

import argparse
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# 1. 基础模块 ### 
class SimpleModel(nn.Module):
    def __init__(self, input_dim):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(input_dim, 1)
        cnt = torch.tensor(0)
        self.register_buffer('cnt', cnt)

    def forward(self, x):
        self.cnt += 1
        # print("In forward: ", self.cnt, "Rank: ", self.fc.weight.device)
        return torch.sigmoid(self.fc(x))

class SimpleDataset(Dataset):
    def __init__(self, data, target):
        self.data = data
        self.target = target

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.target[idx]
    
# 2. 初始化我们的模型、数据、各种配置  ####
## DDP:从外部得到local_rank参数。从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank

## DDP:DDP backend初始化
torch.cuda.set_device(local_rank)
dist.init_process_group(backend='nccl')

## 假设我们有一些数据
n_sample = 100
n_dim = 10
batch_size = 25
X = torch.randn(n_sample, n_dim)  # 100个样本,每个样本有10个特征
Y = torch.randint(0, 2, (n_sample, )).float()

dataset = SimpleDataset(X, Y)
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
data_loader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)

## 构造模型
model = SimpleModel(n_dim).to(local_rank)
## DDP: Load模型要在构造DDP模型之前,且只需要在master上加载就行了。
ckpt_path = None
if dist.get_rank() == 0 and ckpt_path is not None:
    model.load_state_dict(torch.load(ckpt_path))

## DDP: 构造DDP model —————— 必须在 init_process_group 之后才可以调用 DDP
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

## DDP: 要在构造DDP model之后,才能用model初始化optimizer。
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
loss_func = nn.BCELoss().to(local_rank)

# 3. 网络训练  ###
model.train()
num_epoch = 100
iterator = tqdm(range(100))
for epoch in iterator:
    # DDP:设置sampler的epoch,
    # DistributedSampler需要这个来指定shuffle方式,
    # 通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。
    data_loader.sampler.set_epoch(epoch)
    # 后面这部分,则与原来完全一致了。
    for data, label in data_loader:
        data, label = data.to(local_rank), label.to(local_rank)
        optimizer.zero_grad()
        prediction = model(data)
        loss = loss_func(prediction, label.unsqueeze(1))
        loss.backward()
        iterator.desc = "loss = %0.3f" % loss
        optimizer.step()

    # DDP:
    # 1. save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。
    #    因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。
    # 2. 只需要在进程0上保存一次就行了,避免多次保存重复的东西。
    if dist.get_rank() == 0 and epoch == num_epoch - 1:
        torch.save(model.module.state_dict(), "%d.ckpt" % epoch)

结合上面的代码,一个简化版的DDP流程:

  1. 读取DDP相关的配置,其中最关键的就是:local_rank
  2. DDP后端初始化:dist.init_process_group
  3. 创建DDP模型,以及数据加载器。注意要为加载器创建分布式采样器(DistributedSampler);
  4. 训练。

DDP的通常启动方式:

CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 ddp.py

一些概念

以上过程中涉及到一些陌生的概念,其实走一遍DDP的过程就会很好理解:每个进程是一个独立的训练流程,不同进程之间共享同一份数据。为了避免不同进程使用重复的数据训练,以及训练后同步梯度,进程间需要同步。因此,其中一个重点就是每个进程序号,或者说使用的GPU的序号。

  • node:节点,可以是物理主机,也可以是容器;
  • ranklocal_rank:都表示进程在整个分布式任务中的编号。rank是进程在全局的编号,local_rank是进程在所在节点上的编号。显然,如果只有一个节点,那么二者是相等的。在启动脚本中的--nproc_per_node即指定一个节点上有多少进程;
  • world_size:即整个分布式任务中进程的数量。

DDP与DP的区别

  • DP是单进程多线程的,只能在单机上工作;DDP是多进程的,可以在多级多卡上工作。DP通常比DDP慢,主要原因有:1)DP是单进程的,受到GIL的限制;2)DP每个step都需要拷贝模型,以及划分数据和收集输出;
  • DDP可以与模型并行相结合;
  • DP的通信成本随着卡数线性增长,DDP支持Ring-AllReduce,通信成本是固定的。

本文利用pytorch进行数据并行训练进行了一个粗浅的介绍。包括DP和DDP的基本原理,以及简单的例子。实际在分布式过程中涉及到的东西还是挺多的,比如DP/DDP中梯度的回收是如何进行的,DDP中数据采样的细节,DDP中的数据同步操作等。更多的还是要基于真实的需求出发才能真的体会得到。

参考


  1. 参数服务器-动手学深度学习2.0. ↩︎

  2. dataparallel ↩︎

  3. Pytorch Distributed Data Parallal. ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/292990.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

设计模式 七大原则

1.单一职责原则 单一职责原则(SRP:Single responsibility principle)又称单一功能原则 核心:解耦和增强内聚性(高内聚,低耦合)。 描述: 类被修改的几率很大,因此应该专注…

Python处理音频

从video中抽取audio from moviepy.editor import VideoFileClip from pydub import AudioSegmentvideo_path /opt/audio/audios/video1.mp4 audio_path /opt/audio/audios/video1.wav # 提取的音频保存路径# 加载视频文件 video VideoFileClip(video_path)# 提取音频 audi…

华为DriveONE电机控制器拆解实拍

如果说之前的问界M5、M7,华为让我们看到其在智能化上确实拥有遥遥领先的能力,那么在智界S7上,则让我们看到华为在动力、底盘这些硬件执行层面,竟然也有不输给很多车企的实力。1、华为电驱,全球第一?在智界S…

如何使用Cloudreve+Cpolar搭建个人PHP云盘系统并发布公网可访问

文章目录 1、前言2、本地网站搭建2.1 环境使用2.2 支持组件选择2.3 网页安装2.4 测试和使用2.5 问题解决 3、本地网页发布3.1 cpolar云端设置3.2 cpolar本地设置 4、公网访问测试5、结语 1、前言 自云存储概念兴起已经有段时间了,各互联网大厂也纷纷加入战局&#…

微信小程序:flex常用布局

在我们平时微信小程序开发过程中为了页面能达到设计小伙伴的预期,追求还原度,那我们肯定会使用很多常用的布局方式,那我们今天就介绍一下微信小程序中常用的一些flex布局 1、常用flex布局 /** 水平垂直居中 **/ .flex-center {display: fle…

轻量化网络-MobileNet系列

整理备忘 目录 1. MobileNetV1 1.1 论文 1.2 网络结构 1.3 深度可分离卷积 1.4 计算量下降了 1.5 参数量下降了 2. MobileNetV2 2.1 论文 2.2 网络结构 2.3 效果 3. MobileNetV3 3.1 论文 3.2 网络结构 3.3 效果 1. MobileNetV1 1.1 论文 https://arxiv.org/a…

从0开始python学习-39.requsts库

目录 HTTP协议 1. 请求 2. 响应 Requests库 1. 安装 2. 请求方式 2.1 requests.请求方式(参数) 2.2 requests.request() 2.3 requests.session().request() 2.4 三种方式之间的关联 3. 请求参数 3.1 params:查询字符串参数 3.2 data:Form表单…

出现 No such instance field: ‘XXXX‘ 的解决方法

目录 1. 问题所示2. 原理分析3. 解决方法1. 问题所示 作为一个全栈的开发玩家,需要调试前后端的数据传输,方便发现问题所在! 在debug整个项目的时候,检查传输数据的时候,发现前端可以传输,但是后端一直拿不到 出现如下问题:No such instance field: parentModel 截图…

【git使用】了解三种git commit合并的使用场景(rebase、merge、cherry-pick)

参考 【Git学习笔记】逃不掉的merge和rebase-腾讯云开发者社区-腾讯云git merge 和 git rebase - 知乎git cherry-pick 教程 - 阮一峰的网络日志 简单理解各种合并的方法 线性合并,使用 rebase —— feature 分支开发,提交前拉取 master 最新改动进行…

Java Review - MapStruct_使用 Intellij 和 Maven Debug 分析MapStruct实现原理

文章目录 Java动态编译、JSR 269 和 MapStructJSR 269JSR 269的工作原理MapStruct示例 MappingProcessor调试编译期生成的代码 Java动态编译、JSR 269 和 MapStruct Java动态编译是指在运行时动态地将Java源代码编译成字节码并加载到Java虚拟机中执行。 JSR 269 是Java规范请求…

使用Kafka与Spark Streaming进行流数据集成

在当今的大数据时代,实时数据处理和分析已经变得至关重要。为了实现实时数据集成和分析,组合使用Apache Kafka和Apache Spark Streaming是一种常见的做法。本文将深入探讨如何使用Kafka与Spark Streaming进行流数据集成,以及如何构建强大的实…

Java面试题之集合篇

前言 本篇主要总结JAVA面试中关于集合相关的高频面试题。本篇的面试题基于网络整理以及自己的总结编辑。在不断的完善补充哦。欢迎小伙伴们在评论区发表留言哦! 1、基础 1.1、Java 集合框架有哪些? Java 集合框架,大家可以看看 《Java 集…

读算法霸权笔记11_微目标

1. 脸书 1.1. 一份请愿书属于脸书了,而社交网络的算法会对如何最大限度地利用这份请愿书做出判断 1.1.1. 脸书的算法在决定谁能看到我的请愿书时会把所有因素都考虑在内 1.2. 通过改变信息推送的方式,脸书研究了我们…

MATLAB全局最优搜索函数:GlobalSearch函数

摘要:本文介绍了 GlobalSearch 函数的使用句式(一)、三个运行案例(二)、以及 GlobalSearch 函数的参数设置(三、四)。详细介绍如下: 一、函数句法 Syntax gs GlobalSearch gs Glo…

java每日一题——输出星星塔(答案及编程思路)

前言: 打好基础,daydayup! 题目:请编写输出如下图的星星塔 编程思路:1,计算要输入几行;2,计算每行的⭐数量,及空格的数量;计算相应的关系; 如图:假…

【中小型企业网络实战案例 八】配置映射内网服务器和公网多出口、业务测试和保存配置

相关学习文章: 【中小型企业网络实战案例 一】规划、需求和基本配置 【中小型企业网络实战案例 二】配置网络互连互通【中小型企业网络实战案例 三】配置DHCP动态分配地址 【中小型企业网络实战案例 四】配置OSPF动态路由协议【中小型企业网络实战案例 五】配置可…

react+AntDesign 之 pc端项目案例

1.环境搭建以及初始化目录 CRA是一个底层基于webpack快速创建React项目的脚手架工具 # 使用npx创建项目 npx create-react-app react-jike# 进入到项 cd react-jike# 启动项目 npm start2.安装SCSS SASS 是一种预编译的 CSS,支持一些比较高级的语法,…

优化企业运营,深入探索SAP库存管理解决方案

SAP库存管理是SAP提供的一款领先的企业库存管理解决方案。它致力于帮助企业实现对库存的全面掌控,优化供应链管理,降低库存成本,提高客户满意度。这个功能强大的系统为企业提供了丰富的仓储管理功能,如库存盘点、物料追踪、供应商…

C# 使用Microsoft消息队列(MSMQ)

写在前面 Microsoft Message Queuing (MSMQ) 是在多个不同的应用之间实现相互通信的一种异步传输模式,相互通信的应用可以分布于同一台机器上,也可以分布于相连的网络空间中的任一位置。 使用消息队列可以实现异步通讯,无需关心接收端是否在…

CCNP课程实验-06-EIGRP-Trouble-Shooting

目录 实验条件网络拓朴 环境配置开始排错错误1:没有配置IP地址,IP地址宣告有误错误2:R3配置了与R1不同的K值报错了。错误3:R4上的AS号配置错,不是1234错误4:R2上配置的Key-chain的R4上配置的Key-chain不一致…