文章目录
- 一、概述
- 1.1 PyTorch DDP
- 1.2 Accelerate 分布式训练简介
- 1.2.1 实例化Accelerator类
- 1.2.2 将所有训练相关 PyTorch 对象传递给 `prepare()`方法
- 1.2.3 启用 `accelerator.backward(loss)`
- 1.3 Accelerate 分布式评估
- 1.4 accelerate launch
- 二、Accelerate 进阶
- 2.1 notebook_launcher
- 2.2 TPU训练
- 2.3 单进程执行
- 2.4 在分布式设置中保存/加载模型
- 2.4.1 wait_for_everyone()同步所有进程
- 2.4.2 常规方式保存和加载模型
- 2.4.3 以分片检查点或 safetensors 格式保存和加载模型
- 2.4.4 Transformers模型的保存和加载
- 2.5 保存/加载所有状态
- 2.6 混合精度训练
- 2.6.1 在autocast()上下文管理器内进行混合精度训练
- 2.6.2 梯度更新注意事项
- 2.6.3 启用梯度裁剪(gradient clipping)
- 2.7 使用使用梯度累积( gradient accumulation)
- 2.8 其它API
- 2.8.1 accelerate.Accelerator
- 2.8.2 accelerator.free_memory
- 2.8.3 accelerator.save
- Accelerate GitHub,、HF文档、基础示例、复杂示例:每个文件夹都包含一个利用 Accelerate 库的 run_task_no_trainer.py !
- 《从 PyTorch DDP 到 Accelerate 到 Trainer,轻松掌握分布式训练》
一、概述
本章参考HF博客《Introducing Accelerate》、Accelerate文档《Quick tour》第一部分
1.1 PyTorch DDP
PyTorch DDP
(Distributed Data Parallel)是PyTorch实现分布式深度学习训练的一个重要组件,相比单GPU训练,使用它进行分布式训练需要做如下改动:
+ import os
import torch
import torch.nn.functional as F
from datasets import load_dataset
+ from torch.utils.data import DistributedSampler
+ from torch.nn.parallel import DistributedDataParallel
+ local_rank = int(os.environ.get("LOCAL_RANK", -1))
- device = 'cpu'
+ device = device = torch.device("cuda", local_rank)
model = torch.nn.Transformer().to(device)
+ model = DistributedDataParallel(model)
optim = torch.optim.Adam(model.parameters())
dataset = load_dataset('my_dataset')
+ sampler = DistributedSampler(dataset)
- data = torch.utils.data.DataLoader(dataset, shuffle=True)
+ data = torch.utils.data.DataLoader(dataset, sampler=sampler)
model.train()
for epoch in range(10):
+ sampler.set_epoch(epoch)
for source, targets in data:
source = source.to(device)
targets = targets.to(device)
optimizer.zero_grad()
output = model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
optimizer.step()
在这段代码中:
- 导入
DistributedSampler
和DistributedDataParallel
,这是DDP训练中需要的组件。 - 根据本地RANK初始化了device,使每个进程BIND到一块特定的GPU上。
- 使用
DistributedSampler
替换默认的随机采样器来加载数据,这可以使不同进程采样到不同的数据(分布式采样)。 - 在循环每轮epoch时调用DistributedSampler的
set_epoch
方法。 - 使用
DistributedDataParallel
封装模型,进行同步梯度聚合。 - 各进程仅加载本地RANK对应的那部分数据。
可以看到,使用这种PyTorch原生API进行分布式训练,需要编写大量的分布式训练的样板代码,如分布式采样器、数据并行封装模块等,还需要用户自己实现同步批标准化,梯度聚合等操作,而且调试和错误追踪也很困难。
另外,这份代码无法在单GPU或者CPU上运行;如果想改成TPU,则需要更改不同的代码行,混合精度训练也是如此,所以非常的麻烦。
有关数据并行策略DataParallel(DP)、DistributedDataParallel(DDP)、ZeRO Data Parallelism可参考我的另一篇博客《Hugging Face高效训练技术四:多GPU分布式训练(DP、PP、TP 、ZeRO)》
1.2 Accelerate 分布式训练简介
Accelerate 是在 torch_xla
和 torch.distributed
之上构建的,它提供了一个简单的 API,将与多 GPU 、 TPU 、混合精度训练相关的样板代码抽离了出来,让用户可以在不同设备上轻松地进行分布式训练和混合精度训练,而无需重复编写大量样板代码。另外Accelerate还提供了很多性能优化的功能,使得大规模训练和推理变得简单、高效且适应性强。
只需向任何标准 PyTorch 训练脚本添加四行代码,您现在就可以在任何类型的分布式设置上运行脚本,无论是否具有混合精度。Accelerate还可以为您自动处理设备放置,无论是单节点还是多节点(省去书写.to(device)
等代码),所以上述代码可以简化为:
import torch
import torch.nn.functional as F
from datasets import load_dataset
+ from accelerate import Accelerator
+ accelerator = Accelerator()
model = torch.nn.Transformer()
optim = torch.optim.Adam(model.parameters())
dataset = load_dataset('my_dataset')
data = torch.utils.data.DataLoader(dataset, shuffle=True)
+ model, optim, data = accelerator.prepare(model, optim, data)
model.train()
for epoch in range(10):
for source, targets in data:
optimizer.zero_grad()
output = model(source)
loss = F.cross_entropy(output, targets)
+ accelerator.backward(loss)
+ optimizer.step()
下面我们逐一解析添加的这几行代码。
1.2.1 实例化Accelerator类
from accelerate import Accelerator
accelerator = Accelerator()
在训练脚本的开头需要添加此内容,是因为它将初始化分布式训练所需的所有内容。您无需指明您所在的环境类型(单个GPU、单节点多GPU,多节点多GPU 或 TPU等),库将自动检测到这一点。
用户还可以通过向这个init
函数传入cpu=True
或者fp16=True
来强制使用CPU训练或者混合精度训练。这两个选项也可以通过启动训练脚本的启动器来设置。
另外,你也不需要再书写模型和输入数据的 .to(device)
或 .cuda()
等调用,该accelerator对象将为您在正确的设备上放置这些对象。如果一定要保留.to(device)
等语句,请确保使用 accelerator 对象提供的设备: accelerator.device
。
1.2.2 将所有训练相关 PyTorch 对象传递给 prepare()
方法
model, optim, data = accelerator.prepare(model, optim, data)
这是 API 的主体,将准备三种主要类型的对象:models
(torch.nn.Module)、optimizers
(torch.optim.Optimizer)、dataloaders
(torch.data.dataloader.DataLoader)。此外,与训练相关 PyTorch 对象还可以包括学习率调度器scheduler
,如果需要用到,也要一起传递给 prepare()方法。
不需要记住特定的顺,只需要按照传递给 prepare 方法的相同顺序拆包对象即可。
- 模型
模型的准备包括将其包装在适当的容器(例如 DistributedDataParallel)中,然后将其放置在适当的设备上。与普通分布式训练一样,进行保存或访问其特定的方法时,需要先通过accelerator.unwrap_model(model)
解开模型。
封装模型是为了训练方便,但有时需要解封装回原始模型,比如保存模型和直接访问内部方法时。解封装模型(
unwrap model
)指的是将封装后的模型剥离外部的容器或包装,恢复成原始的内部模型结构。之所以需要解封装模型,主要有以下两个原因:
- 保存模型时需要解封装
在分布式训练中,我们通常会用类似 torch.nn.DistributedDataParallel 这样的容器封装模型,用于进行分布式同步等操作。但是保存模型时,需要保存原始的模型权重,而不是保存封装后的容器对象。因此需要先解封装才能保存模型。- 访问内部方法时需要解封装
封装后的模型外部的容器可能会阻塞或者修改一些内部模型的方法,比如forward函数的输入和输出会被修改。如果想直接访问原始模型的某些方法,需要先解封装到内部模型然后再调用方法。
-
优化器
优化器也被包装在一个特殊的容器中,它将执行必要的操作以使混合精度工作(如缩放梯度、损失放大等)。如果状态字典(state dict)是非空的或从检查点加载的,它会准确地处理状态字典的设备放置,确保状态可以被正确加载到相应设备上。 -
数据加载器:将 DataLoader 传递给 prepare() 可以将数据划分到不同的 GPU/TPU 内核上,实现数据并行。
Accelerate库中最关键和复杂的实现隐藏在采样器和数据加载器的处理里。如示例所示,该库并不依赖DistributedSampler
,它会自动处理任何类型的采样器,让不同进程采样到不同的数据。另外它可以与你传入dataloader的任何采样器一起工作,如果你曾经不得不为自定义采样器编写分布式版本,现在则不需要这样做了。
另外还有一些注意事项:
- 如果有用到
scheduler
,那么也要传递给prepare(),除非不想在每次优化步骤中调用学习率调度程序,那么可以传递step_with_optimizer=False
来禁用。 - 如果要进行分布式评估,也需要把
validation dataloader
传递给prepare()
。分布式验证有一些细节需要注意,具体可以参考下一节。 - 任何使用
training dataloader
长度的操作(比如记录总训练步数),都应该在调用prepare()
之后进行。 - 如果启用
shuffle=True
或任何类型的随机采样器,所有进程的随机状态在每个迭代的数据加载器开始时都会同步,以确保以相同的方式洗牌数据。 - 由于数据是自动实现分布式并行,准备后的实际 batch size 会是原始 batch size 乘以使用的设备数量。如果希望实际 batch size保持不变,则可以在创建和初始化 Accelerator 时使用
split_batches=True
选项。同样的道理,如果在N
个 GPU 上运行,每个GPU 上的training dataloader
长度实际上是原来的1/N
(数据分片),除非你设置了split_batches=True
。
1.2.3 启用 accelerator.backward(loss)
accelerator.backward(loss)
此行代码为向后传递添加了必要的步骤来提高混合精度,但对于其他集成则需要进行一些自定义。
1.3 Accelerate 分布式评估
首先,如果你不想进行分布式评估,可以在训练脚本中进行常规的评估,这种情况下需要手动将输入数据放到 accelerator.device
上。
其次,如果要进行分布式评估,需要把validation dataloader
传递给 prepare()
:
validation_dataloader = accelerator.prepare(validation_dataloader)
和training dataloader
一样,如果在多设备上运行,每个设备只会看到部分验证数据。这意味着需要汇总不同设备上的预测,这可以通过 gather_for_metrics()方法来实现。数据集末尾可能会有重复的数据,此方法还可以在汇总时自动移除重复数据,以提供更准确的指标。
for source, targets in validation_dataloader:
predictions = model(source)
# 汇总所有的predictions和targets
all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets))
# 结合Datasets.Metric实例来使用
metric.add_batch(all_predictions, all_targets)
- 与
training dataloader
类似,通过prepare()
可能会改变validation dataloader
的长度,除非设置了split_batches=True
。
如果不希望自动做这件事,可以用 gather()来简单地汇总不同进程的数据:
+ eval_dataloader = accelerator.prepare(eval_dataloader)
predictions, labels = [], []
for source, targets in eval_dataloader:
with torch.no_grad():
output = model(source)
- predictions.append(output.cpu().numpy())
- labels.append(targets.cpu().numpy())
+ predictions.append(accelerator.gather(output).cpu().numpy())
+ labels.append(accelerator.gather(targets).cpu().numpy())
predictions = np.concatenate(predictions)
labels = np.concatenate(labels)
+ predictions = predictions[:len(eval_dataloader.dataset)]
+ labels = label[:len(eval_dataloader.dataset)]
metric_compute(predictions, labels)
最后添加那行会将预测和标签截断为数据集中的示例数,因为准备好的评估数据加载器会返回更多元素,以确保每个进程上的批次大小相同。
gather()
和gather_for_metrics()
要求每个进程上的张量大小相同。如果每个进程上的张量大小不一样(例如动态 padding 到最大长度),可以用pad_across_processes()
将张量 padding 到最大尺寸。
此评估默认在所有进程上运行,如果某些情况下,你只想在主进程上运行,可以使用test方法:
if accelerator.is_main_process:
# Evaluation loop
仅在主进程上仅运行评估的做法不常见,通常有以下几个考虑:
- 简化评估代码的编写,不需要考虑不同进程的数据聚合等问题
- 减少资源占用,避免全部进程参与评估导致的计算资源浪费。
- 评估数据集较小:小数据集分布到所有进程上batch size可能会很小,这时单进程评估更高效。
- 模型加载考虑:有些情况下只在主进程上保存或加载模型,如果评估也在主进程进行,可以避免其他进程加载模型。
- 调试方便:只在主进程评估更易于调试,可以方便地打印日志等。
1.4 accelerate launch
Accelerate训练代码完全兼容传统的启动器,比如torch.distributed.launch
,所以可以使用常规命令来启动分布式训练:torch.distributed.run PyTorch
。但是其参数设置比较麻烦。所以除了上述API,Accelerate 还提供了一个 统一的命令行启动工具 accelerate launch。
-
环境配置
启动前,需要先运行以下命令并回答提示来完成做一个快速的配置设置,,这将在缓存文件夹中生成一个默认配置文件default_config.yaml
。accelerate config
-
环境测试
- 配置设置完成后,您可以通过运行
accelerate test
命令来测试您的设置,这将启动一个简短的脚本来测试分布式环境。如果运行没有问题,您就可以进行下一步了!
accelerate test
- 如果不使用此默认的配置文件,需要用
--config_file
参数指明所使用的配置文件的位置。且在测试时,还需要传递此参数。
accelerate test --config_file path_to_config.yaml
- 配置设置完成后,您可以通过运行
-
启动训练
测试完成,就可以 用accelerate launch 加上脚本路径和参数来启动训练脚本。accelerate launch my_script.py --args_to_my_script
如果您将配置文件存储在非默认位置,则可以将其指示给启动器:
accelerate launch --config_file path_to_config.yaml path_to_script.py --args_for_the_script
另外,accelerate launch 可以覆盖配置文件中的默认参数。要看所有的可修改参数可以用
accelerate launch -h
。
有关启动脚本的更多信息,请查看 Launch tutorial 。
二、Accelerate 进阶
本章参考Accelerate文档《Quick tour》第二部分以及文档《Main Accelerator Class》
2.1 notebook_launcher
参考《Launching Multi-GPU Training from a Jupyter Environment》
您还可以使用notebook_launcher 模块直接在 Jupyter Notebook 中运行分布式代码。该启动器支持在 Colab 或 Kaggle 上启动 TPU 训练,以及多GPU 训练,伪代码如下:
from accelerate import notebook_launcher
notebook_launcher(training_function)
Accelerator 对象只能在训练函数内定义,这是因为初始化只能在启动器内部完成。
下面是《Launching Multi-GPU Training from a Jupyter Environment》教程中 ,使用resnet50d进行动物图片分类的示例。
-
配置环境
在执行任何训练之前,通过在终端中运行以下命令并回答提示来准备配置文件accelerate config
如果是启用默认设置,且设备不是TPU,Accelerate 有一个utils.write_basic_config() 方法,可以快速将 GPU 配置写入配置文件。
import os from accelerate.utils import write_basic_config write_basic_config() # Write a config file os._exit(00) # Restart the notebook
以上代码将在写入配置后会重新启动 Jupyter,因为这调用了 CUDA 代码来执行此操作。而CUDA 不能在多 GPU 系统上初始化多次,所以在最终训练前,需要执行完整的清理和重启工作。
-
准备数据和模型,略
-
定义优化器和调度器,略
-
编写训练函数
- 在训练循环中尽早设置种子并创建 Accelerator 对象。
def training_loop(mixed_precision="fp16", seed: int = 42, batch_size: int = 64): set_seed(seed) accelerator = Accelerator(mixed_precision=mixed_precision)
- 构建数据加载器并创建模型,以便种子也控制新的权重初始化
- 执行分布式评估时,需要通过
Gather()
传递预测和标签,以便所有数据在当前设备上可用,并且可以实现正确计算的指标。 - 其它,略
- 在训练循环中尽早设置种子并创建 Accelerator 对象。
以下是完整的训练代码:
def training_loop(mixed_precision="fp16", seed: int = 42, batch_size: int = 64):
set_seed(seed)
# 初始化 accelerator
accelerator = Accelerator(mixed_precision=mixed_precision)
# 构建dataloaders,具体步骤省略
train_dataloader, eval_dataloader = get_dataloaders(batch_size)
# 实例化模型(在这里构建模型,以便种子也可以控制权重初始化)
model = create_model("resnet50d", pretrained=True, num_classes=len(label_to_id))
# 冻结基础模型
for param in model.parameters():
param.requires_grad = False
for param in model.get_classifier().parameters():
param.requires_grad = True
# 对图像批处理进行标准化,以加速训练。为了在活动设备上使用这些常量,将它们设置为加速器设备
mean = torch.tensor(model.default_cfg["mean"])[None, :, None, None]
std = torch.tensor(model.default_cfg["std"])[None, :, None, None]
mean = mean.to(accelerator.device)
std = std.to(accelerator.device)
# 实例化optimizer和scheduler
optimizer = torch.optim.Adam(params=model.parameters(), lr=3e-2 / 25)
lr_scheduler = OneCycleLR(optimizer=optimizer, max_lr=3e-2, epochs=5, steps_per_epoch=len(train_dataloader))
# 准备工作
# There is no specific order to remember, you just need to unpack the objects in the same order you gave them to the
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
# 开始训练
for epoch in range(5):
model.train()
for batch in train_dataloader:
inputs = (batch["image"] - mean) / std
outputs = model(inputs)
loss = torch.nn.functional.cross_entropy(outputs, batch["label"])
accelerator.backward(loss)
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
model.eval()
# 传递的元素数量以及每个批次的总体准确率将添加到两个常量中
accurate = 0
num_elems = 0
for batch in eval_dataloader:
inputs = (batch["image"] - mean) / std
with torch.no_grad():
outputs = model(inputs)
predictions = outputs.argmax(dim=-1)
accurate_preds = accelerator.gather(predictions) == accelerator.gather(batch["label"])
num_elems += accurate_preds.shape[0]
accurate += accurate_preds.long().sum()
eval_metric = accurate.item() / num_elems
# 使用 accelerator.print 只在主进程上打印
accelerator.print(f"epoch {epoch}: {100 * eval_metric:.2f}")
-
单节点多GPU训练: 使用notebook_launcher()启动此训练。你需要传递训练函数、参数(元组类型)以及要训练的进程数。
from accelerate import notebook_launcher args = ("fp16", 42, 64) notebook_launcher(training_loop, args, num_processes=2)
-
多节点多GPU训练: 如果在多个节点上运行,则需要在每个节点上设置 Jupyter 会话并同时运行启动单元。
-
对于包含 2 个节点(计算机)、每个节点有 8 个 GPU 且主计算机的 IP 地址为“172.31.43.8”的环境,启动代码为:
notebook_launcher(training_loop, args, master_addr="172.31.43.8", node_rank=0, num_nodes=2, num_processes=8)
-
在另一台机器上的第二个 Jupyter 会话中,启动代码为(注意 node_rank 的变化):
notebook_launcher(training_loop, args, master_addr="172.31.43.8", node_rank=1, num_nodes=2, num_processes=8)
-
-
TPU训练: 如果在 TPU 上进行训练,您的训练循环应将模型作为参数,并且应在训练循环函数之外对其进行实例化。具体原因请参阅 TPU 最佳实践。
model = create_model("resnet50d", pretrained=True, num_classes=len(label_to_id)) args = (model, "fp16", 42, 64) notebook_launcher(training_loop, args, num_processes=8)
以上训练正常运行时,它将打印运行的设备数量及进度(本示例使用两个 GPU 运行):
Launching training on 2 GPUs.
epoch 0: 88.12
epoch 1: 91.73
epoch 2: 92.58
epoch 3: 93.90
epoch 4: 94.71
- Debugging
notebook_launcher 运行时最常遇见的问题是遇到 CUDA已初始化的问题(CUDA has already been initialized ),这通常是笔记本中导入或调用了torch.cuda导致CUDA提前初始化。你可以通过在环境变量中设置:ACCELERATE_DEBUG_MODE=yes来进行调试:- 在环境变量中设置:
export ACCELERATE_DEBUG_MODE=yes
- 运行notebook_launcher启动分布式训练,此时notebook_launcher会额外检查一次CUDA的初始化情况
- 如果检查发现CUDA已经被初始化了,会打印出详细的错误信息,指出是在哪个代码行初始化了CUDA。
- 检查信息并更正,再次用ACCELERATE_DEBUG_MODE=yes启动notebook_launcher,直至正常训练。
- 在环境变量中设置:
2.2 TPU训练
TPU教程见《Training on TPUs with 🤗 Accelerate》
TPU训练有一些需要注意的事项。
- 使用static shape
由于TPUs具有高度并行的架构,训练时为了提高性能,会对计算图进行优化和编译,所以TPU训练的第一步总是会非常长,因为构建和编译这个图以进行优化需要一些时间。不过,这个编译过程将被缓存,所以第二步和之后的步骤会快得多。但是,它只适用于所有步骤执行完全相同的操作的情况,这意味着:
- 所有批次中的所有张量长度相同(比如,NLP任务中不能使用动态填充)
- 拥有静态代码(不能使用具有for循环的layer,因为长度可能在步骤之间改变,比如lstm)
如果以上任何一项被改变,都会触发新的编译,训练将非常缓慢。
在训练神经网络时,有两个主要的计算图阶段:构建计算图和执行计算图。
- 构建阶段:神经网络的结构被定义并转化为计算图。这个过程包括定义输入、权重、操作(如前向传播、损失计算、反向传播等),以及它们之间的依赖关系。这个计算图描述了模型的整体结构和计算流程。
- 执行阶段:实际的数据被传递进入计算图,然后计算图中的操作按照依赖关系执行。这包括前向传播,计算损失,然后通过反向传播计算梯度以进行权重更新。
你可以通过检查加速器的distributed_type,以便在启用TPU训练时进行特殊控制:
from accelerate import DistributedType
if accelerator.distributed_type == DistributedType.TPU:
# 执行静态形状的操作
else:
# 可以灵活使用
- 权重绑定
如果你的模型有绑定权重,将这个模型移动到TPU上(无论是自己手动还是在传递模型给prepare()之后),将会打破这种绑定。你需要在之后重新绑定这些权重,示例见run_clm_no_trainer。
有关在TPU上进行训练的更多信息,请查看TPU教程《Training on TPUs with 🤗 Accelerate》。
2.3 单进程执行
在1.3 分布式评估中,我们提到过进行单进程评估的方法是:
if accelerator.is_main_process:
# Evaluation loop
分布式训练中,有一些操作是只需要单进程执行的,比如数据下载、日志语句、以及显示进度条等。
-
if accelerator.is_local_main_process
:local
表示每台机器。分布式训练中如果有多台机器,每个机器有多个GPU,那么这个条件就会在每台机器的主进程上执行一次,比如数据下载、日志记录、显示进度条等操作。# 为了避免输出中出现多个进度条,您应该只在本地主进程上显示一个 from tqdm.auto import tqdm progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)
-
if accelerator.is_main_process
:只在主机器的主进程上执行一次。这适用于在整个训练过程中只需要执行一次的操作,比如上传最终模型到模型中心。 -
accelerator中,应该使用
accelerator.print()
替代print()
,以便只在每个机器上只打印一次。
2.4 在分布式设置中保存/加载模型
2.4.1 wait_for_everyone()同步所有进程
在整个分布式训练环境中保存模型之前,需要先确保所有进程都已完成训练,因为多GPU上同时运行脚本时,不同的进程可能以不同的速度执行指令。
使用 accelerator.wait_for_everyone()
这个指令,可以阻塞所有先到达的进程,直到所有其他进程都达到了相同的点,然后再执行后续指令。这对于需要协调多个进程的操作是非常有用的。而如果你只在单个 GPU 或 CPU 上运行脚本,此指令不会产生任何效果。
# 假设有两个GPU进程
import time
from accelerate import Accelerator
accelerator = Accelerator()
if accelerator.is_main_process:
time.sleep(2)
else:
print("I'm waiting for the main process to finish its sleep...")
accelerator.wait_for_everyone()
# 所有进程会同时打印
print("Everyone is here")
上述代码中,主进程会休眠两秒,其它进程会进行等待。两秒后,所有进程会同时打印"Everyone is here"。通过wait_for_everyone(),可以在分布式环境下轻松实现多进程同步。
2.4.2 常规方式保存和加载模型
首先需要等待所有进程都训练完成,以确保同步。然后使用accelerator.save_model()
而不是 torch.save()
来保存模型。它将删除分布式过程中添加的所有模型包装器,获取模型的 state_dict 并保存它。 state_dict 将与正在训练的模型具有相同的精度。
accelerator.wait_for_everyone()
accelerator.save_model(model, save_directory)
如果是加载检查点,加载时建议在展开(取消包装)的模型上加载权重。这确保了加载的权重与原始模型相匹配。
unwrapped_model = accelerator.unwrap_model(model) # 取消包装模型
path_to_checkpoint = os.path.join(save_directory, "pytorch_model.bin")
unwrapped_model.load_state_dict(torch.load(path_to_checkpoint)) # 加载检查点
accelerator.unwrap_model的作用是展开模型,以下是一个示例:
# 假设有两个GPU进程
from torch.nn.parallel import DistributedDataParallel
from accelerate import Accelerator
accelerator = Accelerator()
model = accelerator.prepare(MyModel())
print(model.__class__.__name__)
model = accelerator.unwrap_model(model)
print(model.__class__.__name__)
DistributedDataParallel
MyModel
2.4.3 以分片检查点或 safetensors 格式保存和加载模型
save_model
方法还可以保存模型为分片检查点或 safetensors 格式,提供更灵活的保存选项。
accelerator.wait_for_everyone()
accelerator.save_model(model, save_directory, max_shard_size="1GB", safe_serialization=True)
加载时,推荐使用 load_checkpoint_in_model
函数将其加载在特定设备上。
load_checkpoint_in_model(unwrapped_model, save_directory, device_map={"": device})
2.4.4 Transformers模型的保存和加载
如果您使用 🤗 Transformers 库中的模型,则可以使用 .save_pretrained() 方法保存模型,这将确保模型与其他 🤗 Transformers 功能(例如 .from_pretrained()
方法)保持兼容。
from transformers import AutoModel
model = AutoModel.from_pretrained("bert-base-cased")
model = accelerator.prepare(model)
# ...fine-tune with PyTorch...
unwrapped_model = accelerator.unwrap_model(model)
unwrapped_model.save_pretrained(
"path/to/my_model_directory",
is_main_process=accelerator.is_main_process,
save_function=accelerator.save,
)
加载模型:
from transformers import AutoModel
model = AutoModel.from_pretrained("path/to/my_model_directory")
2.5 保存/加载所有状态
-
常规保存
在训练模型过程中,你可能需要保存整个模型的当前状态,包括model、optimizer、random generators(随机生成器)以及schedulers,以便在同一脚本中进行还原。为了实现这一目的,可以使用 save_state() 和 load_state()方法。from accelerate import Accelerator accelerator = Accelerator() model, optimizer, lr_scheduler = ... model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler) accelerator.save_state(output_dir="my_checkpoint") accelerator.load_state("my_checkpoint") # 只能与 Accelerator.save_state() 结合使用
-
自定义保存
-
save_state()
保存状态时的位置和方式可以被进一步的自定义,这可以通过 ProjectConfiguration类来实现。例如,如果启用了automatic_checkpoint_naming
,那么每个保存的检查点将被放置在Accelerator.project_dir/checkpoints/checkpoint_{checkpoint_number}
的位置。 -
另外,只要是通过
register_for_checkpointing()
方法注册的任何其他需要存储的具有状态的项,都将在保存或加载时被处理。这种方式允许你在模型训练过程中保存和加载更多的训练相关的状态信息。注意:任何通过
register_for_checkpointing()
注册的对象必须具有load_state_dict
和state_dict
函数,以便能够正确存储和加载其状态。from accelerate import Accelerator accelerator = Accelerator() # Assume `CustomObject` has a `state_dict` and `load_state_dict` function. obj = CustomObject() accelerator.register_for_checkpointing(obj) accelerator.save_state("checkpoint.pt")
-
2.6 混合精度训练
有关混合精度训练的原理,可参考我的博文《Hugging Face高效训练技术一:单 GPU 高效训练》
2.6.1 在autocast()上下文管理器内进行混合精度训练
在创建 Accelerator 实例时,通过 mixed_precision="fp16"
可以指定使用混合精度训练。此时,应使启用autocast()进行训练。在这个上下文管理器中,模型的计算图会自动转换为混合精度(FP16),以加速训练过程。上下文管理器结束后,计算图会恢复为默认的全精度(FP32)。
from accelerate import Accelerator
accelerator = Accelerator(mixed_precision="fp16")
with accelerator.autocast():
train()
具体来说,在 accelerator.autocast()
上下文管理器会发生以下变化(来自claude):
- 数据类型转换: 将计算图中的浮点32位数据类型转换为浮点16位。这有助于减少内存占用和提高计算速度,尤其是在现代深度学习模型中,其中大部分计算可以在较低的精度下进行而不损失太多信息。
- Loss Scaling: 为了补偿使用浮点16位数据类型可能引入的数值精度损失,会对损失进行一个缩放,通常是使用一个定值(例如2^15)。
- Down Cast: 在需要精度敏感的运算之前(如 softmax、loss计算等),会添加浮点16位到浮点32位的转型操作,以提高这些计算的精度。
- 使用主权重进行权重更新: 在更新模型权重时,会使用浮点32位的主权重(Master Weights)来累积参数更新,然后再将其转换为浮点16位,从而更新模型。
- 溢出检测与补偿逻辑: 检测梯度是否发生溢出,如果发生溢出,则会缩小 Loss Scale,以避免在下一步再次溢出。
通过这些手段,autocast
上下文管理器实现了混合精度训练,平衡了计算速度和数值稳定性。这种技术在训练大型深度学习模型时非常有用。
2.6.2 梯度更新注意事项
混合精度训练中的一个问题是,由于动态损失缩放策略,训练过程中可能会跳过一些梯度更新。这是因为在训练过程中存在梯度溢出的点,为了避免下一步再次发生这种情况,降低了损失缩放因子。这可能导致在没有梯度更新的情况下更新了你的scheduler。
通常情况下,这样做没有问题。但是当你的训练数据很少,或者scheduler的前几个学习率非常重要时,可能会产生影响。在这种情况下,你可以通过以下方式在optimizer step未执行时跳过scheduler的更新:
if not accelerator.optimizer_step_was_skipped:
lr_scheduler.step()
accelerator.optimizer_step_was_skipped 表示 optimizer 在当前步骤是否被跳过了更新,这样可以避免在optimizer被跳过更新的情况下错误地更新scheduler。
2.6.3 启用梯度裁剪(gradient clipping)
在深度学习中,梯度裁剪通过限制梯度的范数或值来缓解梯度爆炸问题。具体而言:
torch.nn.utils.clip_grad_norm_
: :按照指定的范数对梯度进行裁剪。它计算所有梯度的范数,然后根据指定的最大范数对梯度进行缩放。torch.nn.utils.clip_grad_value_
: :按照指定的值对梯度进行裁剪。它会将梯度中的任何元素的绝对值大于指定值的元素截断为指定值。
在使用混合精度训练时,上述两个函数可能会出现一些问题,因此如果你的脚本中使用了梯度裁剪(比如有使用梯度累积),建议将上述两个函数替换为 clip_grad_norm()和 clip_grad_value(),它们提供了更好的混合精度支持。以下是简单示例:
from accelerate import Accelerator
accelerator = Accelerator(gradient_accumulation_steps=2)
dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
for input, target in dataloader:
optimizer.zero_grad()
output = model(input)
loss = loss_func(output, target)
accelerator.backward(loss)
if accelerator.sync_gradients:
accelerator.clip_grad_norm_(model.parameters(), max_grad_norm)
# 或者是:
# accelerator.clip_grad_value_(model.parameters(), clip_value)
optimizer.step()
2.7 使用使用梯度累积( gradient accumulation)
为了实现梯度累积(gradient accumulation),可以在实例化Accelerator时指定 gradient_accumulation_steps,然后使用 accumulate()上下文管理器,这将会处理一些自动化的任务:
- 多设备梯度同步/非同步
多GPU或TPU训练中,可以选择梯度聚合方式是同步(synced)还是异步(unsynced)。accumulate()会根据训练设备数自动管理梯度聚合方式 - 检查梯度更新步骤是否应该执行
在梯度累积时,只有累积到指定步数时才执行参数更新。accumulate()会自动检查是否达到累积步数,简化了梯度累积逻辑的实现 - loss自动缩放
梯度累积时,损失会累加,需要平均除以累积步数才是实际的loss。accumulate()会自动平均累积的loss,简化操作。
accelerator = Accelerator(gradient_accumulation_steps=2)
model, optimizer, training_dataloader = accelerator.prepare(model, optimizer, training_dataloader)
for input, label in training_dataloader:
with accelerator.accumulate(model):
predictions = model(input)
loss = loss_function(predictions, label)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
2.8 其它API
参考《Main Accelerator Class》
2.8.1 accelerate.Accelerator
Accelerator源代码
accelerate.Accelerator(
device_placement: bool = True,
split_batches: bool = False,
mixed_precision: PrecisionType | str | None = None,
gradient_accumulation_steps: int = 1,
cpu: bool = False,
deepspeed_plugin: DeepSpeedPlugin | None = None,
fsdp_plugin: FullyShardedDataParallelPlugin | None = None,
megatron_lm_plugin: MegatronLMPlugin | None = None,
rng_types: list[str | RNGType] | None = None,
log_with: str | LoggerType | GeneralTracker | list[str | LoggerType | GeneralTracker] | None = None,
project_dir: str | os.PathLike | None = None,
project_config: ProjectConfiguration | None = None,
gradient_accumulation_plugin: GradientAccumulationPlugin | None = None,
dispatch_batches: bool | None = None,
even_batches: bool = Truestep_scheduler_with_optimizer: bool = True,
kwargs_handlers: list[KwargsHandler] | None = None,
dynamo_backend: DynamoBackend | str | None = None
)
以下是列出还没讲到但觉得会用到的参数:
- split_batches((bool,可选):是否应该在设备之间分割数据加载器产生的批次,详见本文1.2.2 。
- mixed_precision(str , 可选):是否使用混合精度训练。值为“no”、“fp16”、“bf16”或“fp8”
- cpu( bool ,可选): — 是否强制脚本在 CPU 上执行
- deepspeed_plugin、fsdp_plugin、megatron_lm_plugin:启用DeepSpeed、FSDP、MegatronLM分布式训练策略
- log_with (str、LoggerType或GeneralTracker的列表,可选):设置的日志记录器列表,用于实验跟踪,应为以下一种或几种
- “all”
- “tensorboard”
- “wandb”
- “comet_ml”
属性:
- device (torch.device) — 要使用的设备。
- distributed_type (DistributedType) — 分布式训练配置。
- local_process_index (int) — 当前机器上的进程索引。
- mixed_precision (str) — 配置的混合精度模式。
- num_processes (int) — 用于训练的进程总数。
- optimizer_step_was_skipped (bool) — 由于混合精度中的梯度溢出而跳过优化器更新(在这种情况下,不应更改学习率)。
- process_index (int) — 所有进程中当前进程的总体索引。
- state (AcceleratorState) — 分布式设置状态。
- sync_gradients (bool) — 当前是否正在进行梯度同步。这个属性可以用于判断当前进度,例如在梯度同步期间不能进行其他计算,需要等待同步完成。或者用于日志/Debug的时候判断当前是同步阶段还是计算阶段。
- use_distributed (bool) — 当前配置是否用于分布式训练。
2.8.2 accelerator.free_memory
该方法释放对内部存储的所有对象的引用,并调用垃圾收集器。建议在两次使用不同模型或优化器进行训练之间调用此方法。
from accelerate import Accelerator
accelerator = Accelerator()
model, optimizer, scheduler = ...
model, optimizer, scheduler = accelerator.prepare(model, optimizer, scheduler)
accelerator.free_memory()
del model, optimizer, scheduler
2.8.3 accelerator.save
accelerator.save(obj, f,safe_serialization = False)
- obj ( object ) — 要保存的对象。
- f ( str 或 os.PathLike ) — obj 内容的保存位置。
- safe_serialization ( bool ,可选,默认为 False ) — 是否使用 safetensors 保存 obj
accelerator.save默认在每个机器上保存一份对象到磁盘,用于替代 torch.save。
from accelerate import Accelerator
accelerator = Accelerator()
arr = [0, 1, 2, 3]
accelerator.save(arr, "array.pkl")