欢迎关注我的CSDN:https://spike.blog.csdn.net/
本文地址:https://blog.csdn.net/caroline_wendy/article/details/137686312

在 PyTorch Lightning 中使用 Horovod 策略,可以在多个 GPU 上并行训练模型。Horovod 是分布式训练框架,通过优化数据传输来提高多 GPU/CPU 训练的效率。要在 PyTorch Lightning 中使用 Horovod,需要在训练命令中指定 Horovod 作为策略。
- PyTorch Lightning 源码:GitHub - pytorch-lightning
- Horovod 策略的具体源码:pytorch_lightning.strategies.horovod
1. 构建 Docker 环境
首先,需要构建支持 MPI 运行的 Docker,安装 PyTorch Lightning 与 Horovod 的安装包,目前而言,PyTorch Lightning 的 2.+ 版本,以上,已经移除 Horovod 策略,需要降级至 1.8.6 版本,才支持 Horovod 策略,即:
pip install pytorch-lightning==1.8.6
pip install cmake==3.24.2 
pip install horovod==0.27.0
注意:horovod 安装之前,需要满足 cmake 版本,需要预先安装 cmake 包,否则报错:
File "/tmp/pip-install-qcugcd1u/horovod_a39ef0ac7a9e4940bc6b5969457a47f4/setup.py", line 88, in get_cmake_bin
	raise RuntimeError("Failed to install temporary CMake. "
RuntimeError: Failed to install temporary CMake. Please update your CMake to 3.13+ or set HOROVOD_CMAKE appropriately.
参考:StackOverflow - How to reinstall the latest cmake version?
验证 PyTorch 与 Horovod 是否安装成功:
python
import torch
print(torch.__version__)  # 1.13.1
print(torch.cuda.is_available())  # True
from horovod.torch import mpi_lib_v2 as mpi_lib
# pass
也可以使用 Horovod 策略补充工程,支持 PyTorch Lightning 的 2.+ 版本,参考 GitHub - lightning-Horovod

启动 Docker:
nvidia-docker run -it --name [your name] -v /pfs_beijing:/pfs_beijing -v /nfs_beijing:/nfs_beijing -v /nfs_beijing_ai:/nfs_beijing_ai [your image]:[version]
上传 Docker 至服务器:
# 提交 Tag
docker ps -l
docker commit 20df5ad955bb [your image]:[version]
# 准备远程 Tag
docker tag [your image]:[version] [remote image]:[version]
docker images | grep [your image]
# 推送至远程
docker push [remote image]:[version]
2. 配置 Horovod 策略
固定随机种子,确保分布式的表现一致:
# 设置 seed 参数
if args.seed is not None:
    seed_everything(args.seed)
    logger.info(f"[CL] Using seed: {args.seed}")
配置 Horovod 环境变量 与 策略,即:
from pytorch_lightning.strategies import HorovodStrategy
os.environ["HOROVOD_FUSION_THRESHOLD"] = "0"
os.environ["HOROVOD_CACHE_CAPACITY"] = "0"
os.environ["OMPI_MCA_btl_vader_single_copy_mechanism"] = "none"
import horovod.torch as hvd
hvd.init()
torch.cuda.set_device(hvd.local_rank())
strategy = HorovodStrategy()
# Horovod 不需要设置,使用默认值
args.num_nodes = 1
args.gpus = None
logger.info(f"[CL] Using HorovodStrategy")
注意:Horovod 策略,在
pl.Trainer中,不需要设置num_nodes和gpus,使用默认值,即 1 和 None。
具体的 pl.Trainer 配置 Horovod 策略,如下:
trainer = pl.Trainer(
    accelerator="gpu",
    # ...
    strategy=strategy,  # 多机多卡配置
    num_nodes=args.num_nodes,  # 节点数
    devices=args.gpus,  # 每个节点 GPU 卡数
)
3. 配置 Horovod 的 all_gather 实例
在 PyTorch Lightning 中,不推荐直接使用 torch.distributed.all_gather_object() 进行分布式数据汇集,建议在 pl.LightningModule 类中,直接调用 self.all_gather() 方法。
- torch.distributed.all_gather_object()的源码,参考 Doc - PyTorch
- LightningModule.all_gather()的源码,参考 Doc - Lighting 1.8.6
- horovod.torch.allgather()的源码,参考 Doc - Horovod
LightningModule 的 all_gather() 调用 Horovod 的 allgather() 函数,源码如下:
def all_gather(self, result: Tensor, group: Optional[Any] = dist_group.WORLD, sync_grads: bool = False) -> Tensor:
        if group is not None and group != dist_group.WORLD:
            raise ValueError("Horovod does not support allgather using a subcommunicator at this time. Unset `group`.")
        if len(result.shape) == 0:
            # Convert scalars to single dimension tensors
            result = result.reshape(1)
        # sync and gather all
        self.join()
        return hvd.allgather(result)
其中,torch.distributed.all_gather_object() 方法,报错如下:
horovod all_gather_object "Default process group has not been initialized, please make sure to call init_process_group.""
原因是,在 LightningModule 中,不推荐直接使用 torch.distributed 的方法,建议直接调用 LightningModule 的内部方法。
其中 all_gather 的源码修改示例,如下:
class ModelWrapper(pl.LightningModule):
  	
    def gather_log(self, log, world_size):
        if world_size == 1:
            return log
        # 异常代码,不建议直接调用 torch.distributed
        # log_list = [None] * world_size
        # torch.distributed.all_gather_object(log_list, log)
        # log = {key: sum([l[key] for l in log_list], []) for key in log}
        log_gather_map = self.all_gather(log)
        # logger.info(f"[CL] log: {log}")
        # logger.info(f"[CL] log_list_map: {log_gather_map}")
        log_parse_map = dict()
        for key in log_gather_map.keys():
            # [sample,num_node],例如 样本 3 个,Node 2个,[[1,2],[3,4],[5,6]]
            tmp_list = log_gather_map[key]
            for item in tmp_list:
                if isinstance(item, torch.Tensor):
                    item_cpu = item.detach().cpu()
                    item_x = item_cpu.numpy().tolist()
                    if key not in log_parse_map.keys():
                        log_parse_map[key] = []
                    # sum([[1,2],[3,4]], []) -> [1, 2, 3, 4]
                    log_parse_map[key] += item_x
                elif isinstance(item, str):
                    # val_name = ['7skh_B', '7vqk_A', '7vrf_A'],all_gather 问题
                    continue
        # logger.info(f"[CL] log_parse_map: {log_parse_map}")
        return log_parse_map
      
	# ...
日志输出,包括2个卡,每个卡的数据,all_gather之后,获得全部数据,如下:
# Worker 0, all_gather 之前:
[worker-0:163] [INFO] [CL] log: 
{
  'val_first_ref_rmsd': [30.974, 21.57, 18.238],
  # ...
}
# Worker 1, all_gather 之前:
[worker-1:163] [INFO] [CL] log: 
{
	'val_first_ref_rmsd': [27.358, 19.888, 32.003],
  # ...
}
# Worker 0, all_gather 之后:
[worker-0:163] [INFO] [CL] log_gather_map:
{
  'val_first_ref_rmsd': [
    tensor([30.9740, 27.4560], device='cuda:0'),
    tensor([21.5700, 19.6400], device='cuda:0'),
    tensor([18.2380, 31.5020], device='cuda:0')
  ],
  # ...
}
# 获得全部的6个样本数据:
[worker-1:163] [INFO] [CL] log_parse_map: 
{
	'val_first_ref_rmsd': [30.9740, 27.4560, 21.5700, 19.6400, 18.2380, 31.5020],
	# ...
}



















