欢迎关注我的CSDN:https://spike.blog.csdn.net/
本文地址:https://blog.csdn.net/caroline_wendy/article/details/137686312
在 PyTorch Lightning 中使用 Horovod 策略,可以在多个 GPU 上并行训练模型。Horovod 是分布式训练框架,通过优化数据传输来提高多 GPU/CPU 训练的效率。要在 PyTorch Lightning 中使用 Horovod,需要在训练命令中指定 Horovod 作为策略。
- PyTorch Lightning 源码:GitHub - pytorch-lightning
- Horovod 策略的具体源码:pytorch_lightning.strategies.horovod
1. 构建 Docker 环境
首先,需要构建支持 MPI 运行的 Docker,安装 PyTorch Lightning 与 Horovod 的安装包,目前而言,PyTorch Lightning 的 2.+ 版本,以上,已经移除 Horovod 策略,需要降级至 1.8.6 版本,才支持 Horovod 策略,即:
pip install pytorch-lightning==1.8.6
pip install cmake==3.24.2
pip install horovod==0.27.0
注意:horovod 安装之前,需要满足 cmake 版本,需要预先安装 cmake 包,否则报错:
File "/tmp/pip-install-qcugcd1u/horovod_a39ef0ac7a9e4940bc6b5969457a47f4/setup.py", line 88, in get_cmake_bin
raise RuntimeError("Failed to install temporary CMake. "
RuntimeError: Failed to install temporary CMake. Please update your CMake to 3.13+ or set HOROVOD_CMAKE appropriately.
参考:StackOverflow - How to reinstall the latest cmake version?
验证 PyTorch 与 Horovod 是否安装成功:
python
import torch
print(torch.__version__) # 1.13.1
print(torch.cuda.is_available()) # True
from horovod.torch import mpi_lib_v2 as mpi_lib
# pass
也可以使用 Horovod 策略补充工程,支持 PyTorch Lightning 的 2.+ 版本,参考 GitHub - lightning-Horovod
启动 Docker:
nvidia-docker run -it --name [your name] -v /pfs_beijing:/pfs_beijing -v /nfs_beijing:/nfs_beijing -v /nfs_beijing_ai:/nfs_beijing_ai [your image]:[version]
上传 Docker 至服务器:
# 提交 Tag
docker ps -l
docker commit 20df5ad955bb [your image]:[version]
# 准备远程 Tag
docker tag [your image]:[version] [remote image]:[version]
docker images | grep [your image]
# 推送至远程
docker push [remote image]:[version]
2. 配置 Horovod 策略
固定随机种子,确保分布式的表现一致:
# 设置 seed 参数
if args.seed is not None:
seed_everything(args.seed)
logger.info(f"[CL] Using seed: {args.seed}")
配置 Horovod 环境变量 与 策略,即:
from pytorch_lightning.strategies import HorovodStrategy
os.environ["HOROVOD_FUSION_THRESHOLD"] = "0"
os.environ["HOROVOD_CACHE_CAPACITY"] = "0"
os.environ["OMPI_MCA_btl_vader_single_copy_mechanism"] = "none"
import horovod.torch as hvd
hvd.init()
torch.cuda.set_device(hvd.local_rank())
strategy = HorovodStrategy()
# Horovod 不需要设置,使用默认值
args.num_nodes = 1
args.gpus = None
logger.info(f"[CL] Using HorovodStrategy")
注意:Horovod 策略,在
pl.Trainer
中,不需要设置num_nodes
和gpus
,使用默认值,即 1 和 None。
具体的 pl.Trainer
配置 Horovod 策略,如下:
trainer = pl.Trainer(
accelerator="gpu",
# ...
strategy=strategy, # 多机多卡配置
num_nodes=args.num_nodes, # 节点数
devices=args.gpus, # 每个节点 GPU 卡数
)
3. 配置 Horovod 的 all_gather 实例
在 PyTorch Lightning 中,不推荐直接使用 torch.distributed.all_gather_object()
进行分布式数据汇集,建议在 pl.LightningModule
类中,直接调用 self.all_gather()
方法。
torch.distributed.all_gather_object()
的源码,参考 Doc - PyTorchLightningModule.all_gather()
的源码,参考 Doc - Lighting 1.8.6horovod.torch.allgather()
的源码,参考 Doc - Horovod
LightningModule 的 all_gather()
调用 Horovod 的 allgather()
函数,源码如下:
def all_gather(self, result: Tensor, group: Optional[Any] = dist_group.WORLD, sync_grads: bool = False) -> Tensor:
if group is not None and group != dist_group.WORLD:
raise ValueError("Horovod does not support allgather using a subcommunicator at this time. Unset `group`.")
if len(result.shape) == 0:
# Convert scalars to single dimension tensors
result = result.reshape(1)
# sync and gather all
self.join()
return hvd.allgather(result)
其中,torch.distributed.all_gather_object()
方法,报错如下:
horovod all_gather_object "Default process group has not been initialized, please make sure to call init_process_group.""
原因是,在 LightningModule 中,不推荐直接使用 torch.distributed
的方法,建议直接调用 LightningModule
的内部方法。
其中 all_gather
的源码修改示例,如下:
class ModelWrapper(pl.LightningModule):
def gather_log(self, log, world_size):
if world_size == 1:
return log
# 异常代码,不建议直接调用 torch.distributed
# log_list = [None] * world_size
# torch.distributed.all_gather_object(log_list, log)
# log = {key: sum([l[key] for l in log_list], []) for key in log}
log_gather_map = self.all_gather(log)
# logger.info(f"[CL] log: {log}")
# logger.info(f"[CL] log_list_map: {log_gather_map}")
log_parse_map = dict()
for key in log_gather_map.keys():
# [sample,num_node],例如 样本 3 个,Node 2个,[[1,2],[3,4],[5,6]]
tmp_list = log_gather_map[key]
for item in tmp_list:
if isinstance(item, torch.Tensor):
item_cpu = item.detach().cpu()
item_x = item_cpu.numpy().tolist()
if key not in log_parse_map.keys():
log_parse_map[key] = []
# sum([[1,2],[3,4]], []) -> [1, 2, 3, 4]
log_parse_map[key] += item_x
elif isinstance(item, str):
# val_name = ['7skh_B', '7vqk_A', '7vrf_A'],all_gather 问题
continue
# logger.info(f"[CL] log_parse_map: {log_parse_map}")
return log_parse_map
# ...
日志输出,包括2个卡,每个卡的数据,all_gather
之后,获得全部数据,如下:
# Worker 0, all_gather 之前:
[worker-0:163] [INFO] [CL] log:
{
'val_first_ref_rmsd': [30.974, 21.57, 18.238],
# ...
}
# Worker 1, all_gather 之前:
[worker-1:163] [INFO] [CL] log:
{
'val_first_ref_rmsd': [27.358, 19.888, 32.003],
# ...
}
# Worker 0, all_gather 之后:
[worker-0:163] [INFO] [CL] log_gather_map:
{
'val_first_ref_rmsd': [
tensor([30.9740, 27.4560], device='cuda:0'),
tensor([21.5700, 19.6400], device='cuda:0'),
tensor([18.2380, 31.5020], device='cuda:0')
],
# ...
}
# 获得全部的6个样本数据:
[worker-1:163] [INFO] [CL] log_parse_map:
{
'val_first_ref_rmsd': [30.9740, 27.4560, 21.5700, 19.6400, 18.2380, 31.5020],
# ...
}