PyTorch 分布式训练:DDP vs FSDP
PyTorch 分布式训练DDP vs FSDP核心结论DDP (DistributedDataParallel)基于数据并行的分布式训练方法适用于中小规模模型FSDP (FullyShardedDataParallel)基于模型分片的分布式训练方法适用于超大规模模型性能对比DDP 在中小模型上速度更快FSDP 在大模型上内存效率更高适用场景DDP 适合一般深度学习任务FSDP 适合超大模型训练内存优化FSDP 通过模型分片显著减少内存使用一、分布式训练基础1.1 为什么需要分布式训练模型规模增长现代深度学习模型规模越来越大单个 GPU 内存无法容纳训练速度提升通过多 GPU 并行加速训练过程数据并行将数据分成多个批次同时在多个 GPU 上处理模型并行将模型分成多个部分在不同 GPU 上运行1.2 分布式训练的挑战通信开销GPU 之间的通信成为瓶颈内存管理大模型需要高效的内存管理同步机制确保不同 GPU 上的模型参数保持一致扩展性随着 GPU 数量增加性能是否线性提升1.3 PyTorch 分布式训练组件torch.distributed核心分布式训练库DDP数据并行实现FSDP模型分片实现RPC远程过程调用集体通信all_reduce, broadcast, gather 等操作二、DDP (DistributedDataParallel) 详解2.1 DDP 基本原理数据并行每个 GPU 拥有完整的模型副本梯度同步通过 all_reduce 操作同步梯度批处理每个 GPU 处理不同的数据批次同步更新所有 GPU 完成梯度计算后统一更新参数2.2 DDP 工作流程初始化进程组设置分布式环境模型复制将模型复制到每个 GPU数据分发将数据分发到不同 GPU前向传播每个 GPU 独立执行前向计算反向传播计算梯度梯度同步通过 all_reduce 同步梯度参数更新使用同步后的梯度更新模型参数2.3 代码示例DDP 基本用法import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from torch.nn.parallel import DistributedDataParallel as DDP import os import argparse # 解析命令行参数 parser argparse.ArgumentParser() parser.add_argument(--local_rank, typeint, default0) args parser.parse_args() # 初始化进程组 dist.init_process_group(backendnccl) torch.cuda.set_device(args.local_rank) # 定义简单模型 class SimpleModel(nn.Module): def __init__(self): super(SimpleModel, self).__init__() self.fc1 nn.Linear(1000, 512) self.fc2 nn.Linear(512, 256) self.fc3 nn.Linear(256, 10) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) x self.fc3(x) return x # 创建模型并移至 GPU model SimpleModel().cuda(args.local_rank) # 包装为 DDP 模型 model DDP(model, device_ids[args.local_rank], output_deviceargs.local_rank) # 定义损失函数和优化器 criterion nn.CrossEntropyLoss() optimizer optim.SGD(model.parameters(), lr0.001) # 创建数据集和数据加载器 class RandomDataset(Dataset): def __len__(self): return 1000 def __getitem__(self, idx): return torch.randn(1000), torch.randint(0, 10, (1,)).item() dataset RandomDataset() sampler torch.utils.data.distributed.DistributedSampler(dataset) dataloader DataLoader(dataset, batch_size32, samplersampler) # 训练循环 epochs 10 for epoch in range(epochs): sampler.set_epoch(epoch) # 确保每个 epoch 数据打乱 running_loss 0.0 for i, (inputs, labels) in enumerate(dataloader): inputs inputs.cuda(args.local_rank) labels labels.cuda(args.local_rank) # 前向传播 outputs model(inputs) loss criterion(outputs, labels) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() running_loss loss.item() if i % 10 9 and args.local_rank 0: print(fEpoch [{epoch1}/{epochs}], Step [{i1}/{len(dataloader)}], Loss: {running_loss/10:.4f}) running_loss 0.0 # 清理进程组 dist.destroy_process_group()2.4 DDP 的优缺点优点实现简单易于使用通信开销相对较小适合中小规模模型训练速度快缺点每个 GPU 需要完整的模型副本内存使用效率低不适合超大规模模型三、FSDP (FullyShardedDataParallel) 详解3.1 FSDP 基本原理模型分片将模型参数、梯度和优化器状态分片到多个 GPU内存优化显著减少每个 GPU 的内存使用计算效率通过流水线并行提高计算效率灵活性支持不同的分片策略3.2 FSDP 工作流程初始化进程组设置分布式环境模型分片将模型参数分片到不同 GPU数据分发将数据分发到不同 GPU前向传播按需加载模型参数执行前向计算反向传播计算梯度并更新分片参数参数同步通过通信同步分片参数参数更新使用同步后的梯度更新模型参数3.3 代码示例FSDP 基本用法import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from torch.distributed.fsdp import FullyShardedDataParallel as FSDP from torch.distributed.fsdp.fully_sharded_data_parallel import CPUOffload import os import argparse # 解析命令行参数 parser argparse.ArgumentParser() parser.add_argument(--local_rank, typeint, default0) args parser.parse_args() # 初始化进程组 dist.init_process_group(backendnccl) torch.cuda.set_device(args.local_rank) # 定义模型 class LargeModel(nn.Module): def __init__(self): super(LargeModel, self).__init__() self.fc1 nn.Linear(10000, 5000) self.fc2 nn.Linear(5000, 2500) self.fc3 nn.Linear(2500, 1000) self.fc4 nn.Linear(1000, 500) self.fc5 nn.Linear(500, 10) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) x torch.relu(self.fc3(x)) x torch.relu(self.fc4(x)) x self.fc5(x) return x # 创建模型 model LargeModel() # 配置 FSDP fsdp_config { full_shard_strategy: FSDP.ShardingStrategy.FULL_SHARD, cpu_offload: CPUOffload(offload_paramsTrue), auto_wrap_policy: lambda module: isinstance(module, nn.Linear), } # 包装为 FSDP 模型 model FSDP( model, **fsdp_config ) # 定义损失函数和优化器 criterion nn.CrossEntropyLoss() optimizer optim.SGD(model.parameters(), lr0.001) # 创建数据集和数据加载器 class RandomDataset(Dataset): def __len__(self): return 1000 def __getitem__(self, idx): return torch.randn(10000), torch.randint(0, 10, (1,)).item() dataset RandomDataset() sampler torch.utils.data.distributed.DistributedSampler(dataset) dataloader DataLoader(dataset, batch_size32, samplersampler) # 训练循环 epochs 10 for epoch in range(epochs): sampler.set_epoch(epoch) running_loss 0.0 for i, (inputs, labels) in enumerate(dataloader): inputs inputs.cuda(args.local_rank) labels labels.cuda(args.local_rank) # 前向传播 outputs model(inputs) loss criterion(outputs, labels) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() running_loss loss.item() if i % 10 9 and args.local_rank 0: print(fEpoch [{epoch1}/{epochs}], Step [{i1}/{len(dataloader)}], Loss: {running_loss/10:.4f}) running_loss 0.0 # 清理进程组 dist.destroy_process_group()3.4 FSDP 的优缺点优点内存使用效率高支持超大规模模型灵活的分片策略支持 CPU 卸载缺点实现相对复杂通信开销较大中小模型上可能比 DDP 慢配置参数较多四、DDP 与 FSDP 对比4.1 性能对比模型规模方法GPU 内存使用训练速度扩展性小型模型 (10M 参数)DDP低快好小型模型 (10M 参数)FSDP低中中中型模型 (100M 参数)DDP中快好中型模型 (100M 参数)FSDP低中中大型模型 (1B 参数)DDP高慢差大型模型 (1B 参数)FSDP中中好超大型模型 (10B 参数)DDP不可行--超大型模型 (10B 参数)FSDP高中好4.2 内存使用对比import torch import torch.distributed as dist import torch.nn as nn from torch.nn.parallel import DistributedDataParallel as DDP from torch.distributed.fsdp import FullyShardedDataParallel as FSDP import os import psutil # 初始化进程组 dist.init_process_group(backendgloo) rank dist.get_rank() # 定义不同规模的模型 class SmallModel(nn.Module): def __init__(self): super(SmallModel, self).__init__() self.fc1 nn.Linear(1000, 512) self.fc2 nn.Linear(512, 256) self.fc3 nn.Linear(256, 10) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) x self.fc3(x) return x class LargeModel(nn.Module): def __init__(self): super(LargeModel, self).__init__() self.fc1 nn.Linear(10000, 5000) self.fc2 nn.Linear(5000, 2500) self.fc3 nn.Linear(2500, 1000) self.fc4 nn.Linear(1000, 500) self.fc5 nn.Linear(500, 10) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) x torch.relu(self.fc3(x)) x torch.relu(self.fc4(x)) x self.fc5(x) return x # 测试内存使用 def test_memory_usage(model_class, method): model model_class() if method DDP: model DDP(model) elif method FSDP: model FSDP(model) # 模拟前向和反向传播 input torch.randn(32, model.fc1.in_features) output model(input) loss output.sum() loss.backward() # 测量内存使用 memory torch.cuda.memory_allocated() / 1024**3 # GB if rank 0: print(f{model_class.__name__} with {method}: {memory:.2f} GB) # 测试 test_memory_usage(SmallModel, DDP) test_memory_usage(SmallModel, FSDP) test_memory_usage(LargeModel, DDP) test_memory_usage(LargeModel, FSDP) # 清理 dist.destroy_process_group()4.3 训练速度对比import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from torch.nn.parallel import DistributedDataParallel as DDP from torch.distributed.fsdp import FullyShardedDataParallel as FSDP import time import os # 初始化进程组 dist.init_process_group(backendnccl) rank dist.get_rank() # 定义模型 class Model(nn.Module): def __init__(self, hidden_size): super(Model, self).__init__() self.fc1 nn.Linear(1000, hidden_size) self.fc2 nn.Linear(hidden_size, hidden_size) self.fc3 nn.Linear(hidden_size, 10) def forward(self, x): x torch.relu(self.fc1(x)) x torch.relu(self.fc2(x)) x self.fc3(x) return x # 创建数据集 class RandomDataset(Dataset): def __len__(self): return 1000 def __getitem__(self, idx): return torch.randn(1000), torch.randint(0, 10, (1,)).item() # 测试训练速度 def test_training_speed(hidden_size, method): model Model(hidden_size) if method DDP: model DDP(model) elif method FSDP: model FSDP(model) criterion nn.CrossEntropyLoss() optimizer optim.SGD(model.parameters(), lr0.001) dataset RandomDataset() sampler torch.utils.data.distributed.DistributedSampler(dataset) dataloader DataLoader(dataset, batch_size32, samplersampler) start_time time.time() for epoch in range(5): sampler.set_epoch(epoch) for inputs, labels in dataloader: outputs model(inputs) loss criterion(outputs, labels) optimizer.zero_grad() loss.backward() optimizer.step() end_time time.time() if rank 0: print(fHidden size {hidden_size} with {method}: {end_time - start_time:.2f} seconds) # 测试 test_training_speed(512, DDP) test_training_speed(512, FSDP) test_training_speed(5000, DDP) test_training_speed(5000, FSDP) # 清理 dist.destroy_process_group()五、高级配置与优化5.1 DDP 高级配置进程组配置选择合适的后端 (nccl, gloo, mpi)同步策略设置broadcast_buffers和find_unused_parameters混合精度结合 AMP (Automatic Mixed Precision) 提高性能梯度累积通过accumulate_grad_batches减少内存使用5.2 FSDP 高级配置分片策略选择FULL_SHARD,SHARD_GRAD_OP,NO_SHARDCPU 卸载通过CPUOffload进一步减少 GPU 内存使用自动包装策略自定义auto_wrap_policy控制模型分片状态字典管理使用state_dict_type控制状态字典的保存方式5.3 代码示例FSDP 高级配置from torch.distributed.fsdp import FullyShardedDataParallel as FSDP from torch.distributed.fsdp.fully_sharded_data_parallel import CPUOffload, ShardingStrategy from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy import torch.nn as nn # 定义模型 class TransformerLayer(nn.Module): def __init__(self, d_model): super(TransformerLayer, self).__init__() self.attention nn.Linear(d_model, d_model) self.ffn nn.Linear(d_model, d_model) def forward(self, x): x torch.relu(self.attention(x)) x torch.relu(self.ffn(x)) return x class TransformerModel(nn.Module): def __init__(self, d_model, num_layers): super(TransformerModel, self).__init__() self.layers nn.ModuleList([TransformerLayer(d_model) for _ in range(num_layers)]) self.fc nn.Linear(d_model, 10) def forward(self, x): for layer in self.layers: x layer(x) x self.fc(x) return x # 配置 FSDP fsdp_config { sharding_strategy: ShardingStrategy.FULL_SHARD, cpu_offload: CPUOffload(offload_paramsTrue), auto_wrap_policy: transformer_auto_wrap_policy(TransformerLayer), state_dict_type: FSDP.StateDictType.SHARDED_STATE_DICT, use_orig_params: True, } # 创建模型 model TransformerModel(d_model1024, num_layers10) # 包装为 FSDP 模型 model FSDP(model, **fsdp_config)六、实际应用案例6.1 中小规模模型训练场景图像分类、目标检测等一般深度学习任务推荐方法DDP配置建议使用 nccl 后端结合 AMP示例ResNet, EfficientNet 等模型6.2 大规模模型训练场景大型语言模型、大型推荐模型推荐方法FSDP配置建议使用 FULL_SHARD 策略结合 CPU 卸载示例GPT, BERT 等大型模型6.3 代码示例大规模模型训练import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from torch.distributed.fsdp import FullyShardedDataParallel as FSDP from torch.distributed.fsdp.fully_sharded_data_parallel import CPUOffload, ShardingStrategy import os import argparse # 解析命令行参数 parser argparse.ArgumentParser() parser.add_argument(--local_rank, typeint, default0) args parser.parse_args() # 初始化进程组 dist.init_process_group(backendnccl) torch.cuda.set_device(args.local_rank) # 定义大型模型 class LargeLanguageModel(nn.Module): def __init__(self, vocab_size, hidden_size, num_layers): super(LargeLanguageModel, self).__init__() self.embedding nn.Embedding(vocab_size, hidden_size) self.layers nn.ModuleList([ nn.Sequential( nn.Linear(hidden_size, hidden_size), nn.ReLU(), nn.Linear(hidden_size, hidden_size) ) for _ in range(num_layers) ]) self.fc nn.Linear(hidden_size, vocab_size) def forward(self, x): x self.embedding(x) for layer in self.layers: x layer(x) x self.fc(x) return x # 创建模型 model LargeLanguageModel(vocab_size10000, hidden_size2048, num_layers12) # 配置 FSDP fsdp_config { sharding_strategy: ShardingStrategy.FULL_SHARD, cpu_offload: CPUOffload(offload_paramsTrue), auto_wrap_policy: lambda module: isinstance(module, nn.Sequential), state_dict_type: FSDP.StateDictType.SHARDED_STATE_DICT, } # 包装为 FSDP 模型 model FSDP(model, **fsdp_config) # 定义损失函数和优化器 criterion nn.CrossEntropyLoss() optimizer optim.Adam(model.parameters(), lr1e-4) # 创建数据集 class TextDataset(Dataset): def __len__(self): return 1000 def __getitem__(self, idx): # 模拟文本数据 input_ids torch.randint(0, 10000, (512,)) labels torch.randint(0, 10000, (512,)) return input_ids, labels dataset TextDataset() sampler torch.utils.data.distributed.DistributedSampler(dataset) dataloader DataLoader(dataset, batch_size8, samplersampler) # 训练循环 epochs 3 for epoch in range(epochs): sampler.set_epoch(epoch) running_loss 0.0 for i, (input_ids, labels) in enumerate(dataloader): input_ids input_ids.cuda(args.local_rank) labels labels.cuda(args.local_rank) # 前向传播 outputs model(input_ids) loss criterion(outputs.view(-1, 10000), labels.view(-1)) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() running_loss loss.item() if i % 5 4 and args.local_rank 0: print(fEpoch [{epoch1}/{epochs}], Step [{i1}/{len(dataloader)}], Loss: {running_loss/5:.4f}) running_loss 0.0 # 保存模型 if args.local_rank 0: # 保存分片状态字典 model.save_state_dict(model_sharded.pt) # 清理进程组 dist.destroy_process_group()七、性能优化技巧7.1 通信优化使用 NCCL 后端NCCL 是 GPU 间通信的最佳选择调整通信缓冲区设置合适的timeout和init_method批量通信减少通信频率增加通信批量使用混合精度减少通信数据量7.2 内存优化梯度累积通过累积梯度减少内存使用模型分片使用 FSDP 的分片策略CPU 卸载将部分参数卸载到 CPU激活检查点使用checkpoint_activations减少激活内存7.3 计算优化混合精度训练使用torch.cuda.amp提高计算效率自动混合精度结合 FSDP 和 AMP梯度裁剪防止梯度爆炸稳定训练学习率调度使用合适的学习率调度策略7.4 代码示例混合精度训练from torch.cuda.amp import autocast, GradScaler import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.nn.parallel import DistributedDataParallel as DDP # 初始化进程组 dist.init_process_group(backendnccl) rank dist.get_rank() # 创建模型 model nn.Sequential( nn.Linear(1000, 5000), nn.ReLU(), nn.Linear(5000, 2500), nn.ReLU(), nn.Linear(2500, 10) ).cuda(rank) # 包装为 DDP 模型 model DDP(model, device_ids[rank], output_devicerank) # 定义损失函数和优化器 criterion nn.CrossEntropyLoss() optimizer optim.SGD(model.parameters(), lr0.001) # 创建梯度缩放器 scaler GradScaler() # 训练循环 for epoch in range(10): for i in range(100): # 生成随机数据 inputs torch.randn(32, 1000).cuda(rank) labels torch.randint(0, 10, (32,)).cuda(rank) # 前向传播自动混合精度 with autocast(): outputs model(inputs) loss criterion(outputs, labels) # 反向传播 optimizer.zero_grad() scaler.scale(loss).backward() scaler.step(optimizer) scaler.update() # 清理 dist.destroy_process_group()八、总结PyTorch 提供了两种主要的分布式训练方法DDP 和 FSDP。DDP 适合中小规模模型通过数据并行实现高效训练FSDP 适合超大规模模型通过模型分片显著减少内存使用。技术演进的内在逻辑从简单的数据并行到复杂的模型分片分布式训练技术的发展反映了对更大规模模型训练的需求。随着模型规模的不断增长FSDP 等内存高效的分布式训练方法将成为未来的主流。在实际应用中应根据模型规模和硬件条件选择合适的分布式训练方法中小模型优先选择 DDP享受更快的训练速度大型模型必须使用 FSDP突破内存限制超大型模型结合 FSDP 和 CPU 卸载实现更大规模的模型训练通过合理配置和优化可以充分发挥分布式训练的优势加速模型训练过程突破单 GPU 内存限制实现更大规模的深度学习模型训练。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2524988.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!