一、前言
随着 LLM 模型越来越大,单 GPU 已经无法加载一个模型。以 Qwen-14B-Chat 模型为例,模型权重大概 28GB,但是单个 NVIDIA A10 仅有 24GB 显存。如果想要在 A10 上部署 Qwen-14B-Chat 模型,我们需要将模型切分后部署到 2 个 A10 机器上,每个 A10 卡加载一半的模型,这种方式称之为分布式推理。
社区涌现了很多支持分布式推理的框架如 vllm、deepspeed-mii,rtp-llm 等。本文选取了 vllm 框架,从源码角度分析 vllm + Ray 如何实现 LLM 模型的分布式推理。
二、在 K8s 中部署 vllm 分布式推理应用
2.1 模型准备
下载 Qwen-14B-Chat 到 OSS 中,并在集群中创建对应的 pv,pvc。pvc 名称为 llm-model。
kubectl apply -f- << EOFapiVersion: v1kind: Secretmetadata:name: oss-secretstringData:akId: ${your-accesskey-id} # 用于访问oss的AKakSecret: ${your-accesskey-secert} # 用于访问oss的SK---apiVersion: v1kind: PersistentVolumemetadata:name: llm-modellabels:alicloud-pvname: llm-modelspec:capacity:storage: 30GiaccessModes:- ReadOnlyManypersistentVolumeReclaimPolicy: Retaincsi:driver: ossplugin.csi.alibabacloud.comvolumeHandle: model-ossnodePublishSecretRef:name: oss-secretnamespace: defaultvolumeAttributes:bucket: ${your-bucket-name}url: ${your-bucket-endpoint} # e.g. oss-cn-hangzhou.aliyuncs.comotherOpts: "-o umask=022 -o max_stat_cache_size=0 -o allow_other"path: "/"---apiVersion: v1kind: PersistentVolumeClaimmetadata:name: llm-modelspec:accessModes:- ReadOnlyManyresources:requests:storage: 30Giselector:matchLabels:alicloud-pvname: llm-modelEOF
2.2 部署分布式 vllm 应用
1. 执行以下命令,部署 vllm 应用
kubectl apply -f- <<EOFapiVersion: apps/v1kind: Deploymentmetadata:name: vllmlabels:app: vllmspec:replicas: 2selector:matchLabels:app: vllmtemplate:metadata:labels:app: vllmspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: appoperator: Invalues:- vllmtopologyKey: kubernetes.io/hostnamevolumes:- name: modelpersistentVolumeClaim:claimName: llm-modelcontainers:- name: vllmimage: kube-ai-registry.cn-shanghai.cr.aliyuncs.com/kube-ai/vllm:0.4.1command:- "sh"- "-c"- "sleep 7d"ports:- containerPort: 8080readinessProbe:tcpSocket:port: 8080initialDelaySeconds: 30periodSeconds: 30resources:limits:nvidia.com/gpu: "1"requests:cpu: 4memory: 8Ginvidia.com/gpu: "1"volumeMounts:- mountPath: /mnt/modelsname: modelEOF
2. 执行以下命令,启动 vllm 应用
-
启动 ray
在 Pod1 上运行
ray start --head# 启动后,日志中会显示ray-head-address地址
在 Pod2 上运行
# ray-head-address 设置为pod1日志中显示的ray-head-address地址ray start --address=<ray-head-address>
-
运行如下命令,初始化 Pod2 上的本地模型
python3 model_init.pyfrom transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfigconfig = AutoConfig.from_pretrained("/mnt/models/Qwen-14B-Chat",trust_remote_code=True)tokenizer = AutoTokenizer.from_pretrained("/mnt/models/Qwen-14B-Chat", trust_remote_code=True)
-
在 Pod1 上运行如下命令启动 qwen 模型
python3 -m vllm.entrypoints.openai.api_server \--port 8080 \--trust-remote-code \--served-model-name qwen \--model /mnt/models/Qwen-14B-Chat \--gpu-memory-utilization 0.95 \--tensor-parallel-size 2
-
登陆 pod1,访问应用
kubectl -n <your-namespace> exec -it <pod1-name> bashcurl -H "Content-Type: application/json" \http://localhost:8080/v1/chat/completions -X POST \-d '{"model": "qwen", "messages": [{"role": "user", "content": "你好"}], "max_tokens": 512, "temperature": 0.7, "top_p": 0.9, "seed": 10, "stop":["<|endoftext|>", "<|im_end|>", "<|im_start|>"]}'
三、分布式推理总体流程分析
1.入口函数:vllm/entrypoints/openai/api_server.py main
if __name__ == "__main__":# 构建engine argsengine_args = AsyncEngineArgs.from_cli_args(args)# 构建engineengine = AsyncLLMEngine.from_engine_args(engine_args, usage_context=UsageContext.OPENAI_API_SERVER)openai_serving_chat = OpenAIServingChat(engine, served_model_names,args.response_role,args.lora_modules,args.chat_template)openai_serving_completion = OpenAIServingCompletion(engine, served_model_names, args.lora_modules)app.root_path = args.root_pathuvicorn.run(app)
2.构建 LLM engine
engine = AsyncLLMEngine.from_engine_args(engine_args, usage_context=UsageContext.OPENAI_API_SERVER)def from_engine_args():"""Creates an async LLM engine from the engine arguments."""# Create the engine configs.engine_config = engine_args.create_engine_config()# ray 集群初始化initialize_ray_cluster(engine_config.parallel_config)from vllm.executor.ray_gpu_executor import RayGPUExecutorAsyncexecutor_class = RayGPUExecutorAsync# Create the engine configs.engine_config = engine_args.create_engine_config()# ray 集群初始化# 1. ray.init()# 2. 根据集群内gpu数量 & tp并发度设置ray placement策略initialize_ray_cluster(engine_config.parallel_config)from vllm.executor.ray_gpu_executor import RayGPUExecutorAsyncexecutor_class = RayGPUExecutorAsync# Create the async LLM engine.engine = cls(...) #创建一个AsyncLLMEngine实例# AsyncLLMEngine.__init__ -> self._init_engine -> _AsyncLLMEngine.__init__ -> LLMEngine.__init__ -> executor_class() 即调用RayGPUExecutorAsync.__init__
3.初始化 Ray 集群
Ray Worker 初始化包括 Ray 集群初始化,Ray Worker 初始化。在 Ray worker 初始化时会分布式加载模型。
# RayGPUExecutorAsync 继承了RayGPUExecutor及ExecutorAsyncBase 类,初始化时会调用RayGPUExecutor的self._init_executor 方法def _init_executor(self) -> None:# Create the parallel GPU workers. 初始化workers 核心代码self._init_workers_ray(placement_group)def _init_workers_ray():# 定义worker, 是vllm.worker.worker模块里的Worker类# actor为RayWorkerWrapper类worker = ray.remote(num_cpus=0,num_gpus=num_gpus,scheduling_strategy=scheduling_strategy,**ray_remote_kwargs,)(RayWorkerWrapper).remote(worker_module_name="vllm.worker.worker",worker_class_name="Worker",trust_remote_code=self.model_config.trust_remote_code,)# 在Ray Worker上依次执行如下方法self._run_workers("get_node_and_gpu_ids",use_dummy_driver=True)self._run_workers("update_environment_variables",all_args=all_args_to_update_environment_variables)self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)self._run_workers("init_device")self._run_workers("load_model",max_concurrent_workers=self.parallel_config.max_parallel_loading_workers,)def _run_workers():# Start the ray workers first.ray_worker_outputs = [# worker是前面定义的RayWorkerWrapper类, 继承RayWorkerWrapper类# 实际调用了RayWorkerWrapper.execute_method 并在远程实例上执行method方法worker.execute_method.remote(method, *worker_args,**worker_kwargs)for (worker, worker_args, worker_kwargs) in zip(self.workers, all_worker_args, all_worker_kwargs)]def init_worker():# worker_module_name 是 vllm.worker.worker 就是_init_workers_ray方法中传入的mod = importlib.import_module(self.worker_module_name)# Workerworker_class = getattr(mod, self.worker_class_name)self.worker = worker_class(*args, **kwargs)# Worker.__init__ -> ModelRunner.__init__def init_device():# 初始化分布式推理的机器信息"""Initialize the distributed environment."""init_distributed_environment(parallel_config.world_size, rank,distributed_init_method, local_rank)def load_model():self.model_runner.load_model() # ModelRunner.load_model() -> vllm.model_executor.model_loader.loader.load_model
执行完 load_model()的预期日志输出如下,可以看到两个 pod,每个加载了 13.2845 GB,即一半的模型。
INFO 04-26 09:39:46 model_runner.py:173] Loading model weights took 13.2845 GB(RayWorkerWrapper pid=3327, ip=192.168.12.132) INFO 04-26 09:39:51 model_runner.py:173] Loading model weights took 13.2845 GB
4.对外提供服务
创建 OpenAIServingChat 以及 OpenAIServingCompletion 实例,启动 uvicorn 对外提供服务。
@app.post("/v1/chat/completions")openai_serving_chat = OpenAIServingChat(engine, served_model_names,args.response_role,args.lora_modules,args.chat_template)@app.post("/v1/completions")openai_serving_completion = OpenAIServingCompletion(engine, served_model_names, args.lora_modules)app.root_path = args.root_pathuvicorn.run(app)
3.1 分布式推理过程
当启动参数--tensor-parallel-size > 1 时,会自动触发 ray 分布式部署。
1. 构建 LLM engine 时会对 Ray 集群进行初始化
# ray 集群初始化initialize_ray_cluster(engine_config.parallel_config)
parallel_config 的配置如下,pp=1,tp=2,world_size=2
{'pipeline_parallel_size': 1, 'tensor_parallel_size': 2, 'worker_use_ray': True, 'max_parallel_loading_workers': None, 'disable_custom_all_reduce': False, 'tokenizer_pool_config': None, 'ray_workers_use_nsight': False, 'placement_group': None, 'world_size': 2}
初始化时会为 worker 进程创建 placement_group。
1)获取 ray cluster 中所有 gpu 的数量。
2)根据 world size 申请 gpu placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size)。
3)创建 placement_group,ray 会根据 placement_group 在对应 node 上启动 actor。
2. 在每个 worker 上执行 get_node_and_gpu_ids 方法
# 获取node及node上分配的gpu卡信息def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]:node_id = ray.get_runtime_context().get_node_id()gpu_ids = ray.get_gpu_ids()return node_id, gpu_ids
3. 在每个 worker 上执行 update_environment_variables
# 第二步获取的worker_node以及gpu信息worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids",use_dummy_driver=True)# Set environment variables for the driver and workers.all_args_to_update_environment_variables = [({"CUDA_VISIBLE_DEVICES":",".join(map(str, node_gpus[node_id])),"VLLM_INSTANCE_ID":VLLM_INSTANCE_ID,"VLLM_TRACE_FUNCTION":os.getenv("VLLM_TRACE_FUNCTION", "0"),}, ) for (node_id, _) in worker_node_and_gpu_ids]
4. 在每个 worker 上执行 init_device 方法
# worker的启动参数init_worker_all_kwargs = []# worker_node_and_gpu_ids 是第二步获取的worker上的gpu信息for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids):local_rank = node_workers[node_id].index(rank)init_worker_all_kwargs.append(collect_arg_helper_func(model_config=self.model_config,parallel_config=self.parallel_config,scheduler_config=self.scheduler_config,device_config=self.device_config,cache_config=self.cache_config,load_config=self.load_config,local_rank=local_rank,rank=rank,distributed_init_method=distributed_init_method,lora_config=self.lora_config,vision_language_config=self.vision_language_config,is_driver_worker=rank == 0,))def init_device(self) -> None:if self.device_config.device.type == "cuda":# torch.distributed.all_reduce does not free the input tensor until# the synchronization point. This causes the memory usage to grow# as the number of all_reduce calls increases. This env var disables# this behavior.# Related issue:# https://discuss.pytorch.org/t/cuda-allocation-lifetime-for-inputs-to-distributed-all-reduce/191573os.environ["TORCH_NCCL_AVOID_RECORD_STREAMS"] = "1"# This env var set by Ray causes exceptions with graph building.os.environ.pop("NCCL_ASYNC_ERROR_HANDLING", None)self.device = torch.device(f"cuda:{self.local_rank}")torch.cuda.set_device(self.device)_check_if_gpu_supports_dtype(self.model_config.dtype)torch.cuda.empty_cache()self.init_gpu_memory = torch.cuda.mem_get_info()[0]else:raise RuntimeError(f"Not support device type: {self.device_config.device}")# Initialize the distributed environment.init_worker_distributed_environment(self.parallel_config, self.rank,self.distributed_init_method,self.local_rank)# Set random seed.set_random_seed(self.model_config.seed)
核心方法 init_worker_distributed_environment 用于构建分布式集群的 world 信息,类似 horovod 及 deepspeed 框架中的 world info。
该方法参数如下:
work1: self.rank=0, self.local_rank=0, self.distributed_init_method="tcp://192.168.12.120:42167" (ray master)
{'pipeline_parallel_size': 1, 'tensor_parallel_size': 2, 'worker_use_ray': True, 'max_parallel_loading_workers': None, 'disable_custom_all_reduce': False, 'tokenizer_pool_config': None, 'ray_workers_use_nsight': False, 'placement_group': <ray.util.placement_group.PlacementGroup object at 0x7fdeaa896ce0>, 'world_size': 2}, {'id': PlacementGroupID(51489eb26a9335f31ed1bdb4eace04000000), 'bundle_cache': [{'GPU': 1.0}, {'GPU': 1.0}]}, self.rank=0, tcp://192.168.12.120:42167, self.local_rank=0
work2: self.rank=1,
self.local_rank=0,self.distributed_init_method="tcp://192.168.12.120:42167"
{'pipeline_parallel_size': 1, 'tensor_parallel_size': 2, 'worker_use_ray': True, 'max_parallel_loading_workers': None, 'disable_custom_all_reduce': False, 'tokenizer_pool_config': None, 'ray_workers_use_nsight': False, 'world_size': 2}, self.rank=1, tcp://192.168.12.120:42167, self.local_rank=0
self.rank 全局递增,self.local_rank 是指在一个 pod 内第几个 gpu。
5. 在每个 worker 执行 load_model 方法
load_model 用于加载分布式模型,比较复杂,在下面的章节中单独介绍。
3.2 分布式模型加载流程
在每个 worker 执行 load_model 方法
def load_model():self.model_runner.load_model()# ModelRunner.load_model() -> vllm.model_executor.model_loader.loader.load_modeldef load_model(self) -> None:with CudaMemoryProfiler() as m:# get_model 获取模型self.model = get_model(model_config=self.model_config,device_config=self.device_config,load_config=self.load_config,lora_config=self.lora_config,vision_language_config=self.vision_language_config,parallel_config=self.parallel_config,scheduler_config=self.scheduler_config,)self.model_memory_usage = m.consumed_memorylogger.info(f"Loading model weights took "f"{self.model_memory_usage / float(2**30):.4f} GB")# get_model -> loader.load_model -> DefaultModelLoader.load_modeldef load_model(self, *, model_config: ModelConfig,device_config: DeviceConfig,lora_config: Optional[LoRAConfig],vision_language_config: Optional[VisionLanguageConfig],parallel_config: ParallelConfig,scheduler_config: SchedulerConfig) -> nn.Module:with set_default_torch_dtype(model_config.dtype):with torch.device(device_config.device):"""Initialize a model with the given configurations."""# 初始化模型model = _initialize_model(model_config, self.load_config,lora_config, vision_language_config)# 调用对应model的load_weights方法model.load_weights(self._get_weights_iterator(model_config.model,model_config.revision,fall_back_to_pt=getattr(model,"fall_back_to_pt_during_load",True)), )for _, module in model.named_modules():linear_method = getattr(module, "linear_method", None)if linear_method is not None:linear_method.process_weights_after_loading(module)if hasattr(module, "process_weights_after_loading"):module.process_weights_after_loading()return model.eval()# 根据model config找到具体是什么模型def _initialize_model(model_config: ModelConfig, load_config: LoadConfig,lora_config: Optional[LoRAConfig],vision_language_config: Optional[VisionLanguageConfig]) -> nn.Module:"""Initialize a model with the given configurations."""# Qwen-7B-Chat/config.json中architecture字段model_class = get_model_architecture(model_config)[0]linear_method = _get_linear_method(model_config, load_config)return model_class(config=model_config.hf_config,linear_method=linear_method,**_get_model_initialization_kwargs(model_class, lora_config, vision_language_config))# model_class 是 <class 'vllm.model_executor.models.qwen.QWenLMHeadModel'>
model.load_weights 即调用 QwenLMHeadModel 的 load_weights 方法
# QWenLMHeadModel.load_weightsdef load_weights(self, weights: Iterable[Tuple[str, torch.Tensor]]):stacked_params_mapping = [# (param_name, shard_name, shard_id)("gate_up_proj", "w2", 0),("gate_up_proj", "w1", 1),]# 模型每层权重及其名称# self.named_parameters即model.named_parameters()params_dict = dict(self.named_parameters())for name, loaded_weight in weights:# name: transformer.h.27.mlp.c_proj.weight# loaded_weight: tensor(xxx)if "rotary_emb.inv_freq" in name:continuefor (param_name, weight_name, shard_id) in stacked_params_mapping:if weight_name not in name:continue# 如果在stacked_params_mapping里,就需要把shard_name改为param_name# 如 name为 transformer.h.0.mlp.w1.weight,则name需要改为 transformer.h.0.mlp.gate_up_proj.weightname = name.replace(weight_name, param_name)# Skip loading extra bias for GPTQ models.if name.endswith(".bias") and name not in params_dict:continueparam = params_dict[name]weight_loader = param.weight_loaderweight_loader(param, loaded_weight, shard_id)breakelse:# python的for-else语法,到达这里意味着没有执行循环中的 break 语句# Skip loading extra bias for GPTQ models.if name.endswith(".bias") and name not in params_dict:continueparam = params_dict[name]# 根据name找到对应的weight_loader方法weight_loader = getattr(param, "weight_loader",default_weight_loader)weight_loader(param, loaded_weight)
模型层权重及其 weight_loader 方法
# param,weight_loaderlm_head.weight, weight_loader <bound method VocabParallelEmbedding.weight_loader of ParallelLMHead()>transformer.h.0.attn.c_attn.weight, weight_loader <bound method QKVParallelLinear.weight_loader of QKVParallelLinear()>transformer.h.0.attn.c_proj.weight, weight_loader <bound method RowParallelLinear.weight_loader of RowParallelLinear()>transformer.h.0.ln_1.weight, weight_loader <function default_weight_loader at 0x7f66201ee0e0>transformer.h.0.ln_2.weight, weight_loader <function default_weight_loader at 0x7f66201ee0e0>transformer.h.0.mlp.c_proj.weight, weight_loader <bound method RowParallelLinear.weight_loader of RowParallelLinear()>transformer.h.0.mlp.gate_up_proj.weight, weight_loader <bound method MergedColumnParallelLinear.weight_loader of MergedColumnParallelLinear()>transformer.ln_f.weight, weight_loader <function default_weight_loader at 0x7f66201ee0e0>transformer.wte.weight, weight_loader <bound method VocabParallelEmbedding.weight_loader of VocabParallelEmbedding()>
模型的每一层都有自己的分布式加载方法,如 transformer.h.0.attn.c_proj.weight 这个权重使用了 RowParallelLinear.weight_loader 方法。
class RowParallelLinear(torch.nn.Module):def weight_loader(self, param: Parameter, loaded_weight: torch.Tensor):# 获取worker的tp_rank,根据tp_rank计算需要加载的权重范围tp_rank = get_tensor_model_parallel_rank()input_dim = getattr(param, "input_dim", None)param_data = param.dataif input_dim is not None:shard_size = param_data.shape[input_dim]start_idx = tp_rank * shard_sizeloaded_weight = loaded_weight.narrow(input_dim, start_idx,shard_size)assert param_data.shape == loaded_weight.shapeparam_data.copy_(loaded_weight)
模型切分采用了 Megatron-LM 算法,详情可参考论文【文末查看】
四、分布式模型切分算法 Megatron-LM
4.1 分布式节点通信:AllReduce
https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/collectives.html#
1)Reduce:将每个 GPU 的计算结果汇总到某个特定的 GPU 上
2)Broadcast:将某个 GPU 的数据同步到所有 GPU 上
3)AllReduce = Reduce + Broadcast
4.2 Transformer 切分
Transformer 层由一个自注意力模块(self-attention block)后跟一个两层的多层感知机(MLP)实现的。
MLP
如图所示,MLP 由两个部分组成,GeLU 是非线形函数,
即所以不能采用行并行,需要采用列并行。
此时,B 需要采用行并行。如果 B 采用列并行的话,则需要进行一次 all-reduce 同步。
Dropout 是按照一定比例随机丢弃一些参数,因此 Dropout 前必须进行一次 all-reduce 同步。
Self-Attention
multi-head attention 机制中每个 attention 都是独立的 QKV 矩阵,每个 GPU 上计算部分 attention 就行。因此要求 attention head 可以被 tp_size 整除。否则会报错如下(Qwen-14b 设置 tp=3):
ValueError: Total number of attention heads (40) must be divisible by tensor parallel size (3).
同样,Dropout 前需要进行一次 all-reduce 操作。
因此,一次 Transformer 推理需要进行 2 次 all-reduce 操作,qwen-14b 中 transformer 有 40 个,一次推理需要执行 81 一个 all-reduce 操作。跨节点部署推理服务时,网络通信将会是比较大的开销。
本文重点分析 vllm 如何实现分布式推理,具体 vllm 的推理过程可参考下方【01 推理过程解析】
参考链接
[01] 推理过程解析
https://zhuanlan.zhihu.com/p/649974825
[02] 【深度学习】【分布式训练】一文捋顺千亿模型训练技术:流水线并行、张量并行和 3D 并行
https://zhuanlan.zhihu.com/p/617087561
[03] Hugging Face 高效训练技术四:多 GPU 分布式训练(DP、PP、TP 、ZeRO)_zero-dp
https://blog.csdn.net/qq_56591814/article/details/134099476
[04] Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism
https://arxiv.org/pdf/1909.08053
















![[Alogithm][动态规划][背包问题][组合总和IV][不同的二叉搜索树]详细讲解](https://img-blog.csdnimg.cn/direct/4d8c2ba17b0a4641be91c451634e4d12.png)

