08-MLOps与工程落地——工作流编排:Kubeflow
工作流编排KubeflowKubernetes原生ML流水线、组件化、分布式训练一、Kubeflow概述1.1 什么是Kubeflowimportmatplotlib.pyplotaspltfrommatplotlib.patchesimportRectangle,FancyBboxPatchimportwarnings warnings.filterwarnings(ignore)print(*60)print(KubeflowKubernetes原生ML平台)print(*60)# Kubeflow组件图fig,axplt.subplots(figsize(12,10))ax.axis(off)# 核心组件components{Kubeflow\nPipelines:(0.2,0.8),Katib\n(HPO):(0.5,0.8),KFServing:(0.8,0.8),Notebooks:(0.2,0.55),Training\n(TFJob/PyTorchJob):(0.5,0.55),Multi-Tenancy:(0.8,0.55),Istio:(0.2,0.3),Argo:(0.5,0.3),Kubernetes:(0.8,0.3),}forname,(x,y)incomponents.items():circleplt.Circle((x,y),0.1,colorlightblue,ecblack)ax.add_patch(circle)ax.text(x,y,name,hacenter,vacenter,fontsize7)ax.set_xlim(0,1)ax.set_ylim(0,1)ax.set_title(Kubeflow组件架构,fontsize14)plt.tight_layout()plt.show()print(\n Kubeflow核心优势:)print( - Kubernetes原生云原生架构)print( - 端到端ML工作流)print( - 支持分布式训练)print( - 可扩展的组件化设计)print( - 多框架支持(TensorFlow, PyTorch, MXNet))二、Kubeflow安装2.1 安装配置defkubeflow_installation():Kubeflow安装print(\n*60)print(Kubeflow安装)print(*60)code # 1. 使用kubectl安装 # 安装kubeflow export KF_NAMEkubeflow export BASE_DIR/opt/kubeflow export KF_DIR${BASE_DIR}/${KF_NAME} # 下载kfctl wget https://github.com/kubeflow/kfctl/releases/download/v1.7.0/kfctl_v1.7.0-0-g0e3e3a4_linux.tar.gz tar -xvf kfctl_v1.7.0-0-g0e3e3a4_linux.tar.gz # 部署 ${KF_DIR}/kfctl apply -V -f ${CONFIG_URI} # 2. 使用Minikube本地测试 # minikube start --cpus 4 --memory 8192 --disk-size 50g # kubectl create ns kubeflow # kfctl apply -V -f ${CONFIG_URI} # 3. 使用Kind # kind create cluster --name kubeflow --config kind-config.yaml # kubectl create ns kubeflow # kfctl apply -V -f ${CONFIG_URI} # 4. 访问Kubeflow UI # kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80 # 浏览器打开 http://localhost:8080 # 5. 查看Pod状态 # kubectl get pods -n kubeflow # 6. 卸载 # ${KF_DIR}/kfctl delete -V -f ${CONFIG_URI} print(code)kubeflow_installation()三、Kubeflow Pipelines3.1 组件定义defkubeflow_pipelines():Kubeflow Pipelinesprint(\n*60)print(Kubeflow Pipelines组件)print(*60)code import kfp from kfp import dsl from kfp.dsl import component, Input, Output, Dataset, Model from typing import NamedTuple # 1. 使用component装饰器定义组件 component( packages_to_install[pandas, numpy, scikit-learn], base_imagepython:3.9 ) def preprocess_op( input_data: Input[Dataset], output_train: Output[Dataset], output_test: Output[Dataset], test_size: float 0.2 ): import pandas as pd from sklearn.model_selection import train_test_split data pd.read_csv(input_data.path) X data.drop(target, axis1) y data[target] X_train, X_test, y_train, y_test train_test_split( X, y, test_sizetest_size, random_state42 ) train_data pd.concat([X_train, y_train], axis1) test_data pd.concat([X_test, y_test], axis1) train_data.to_csv(output_train.path, indexFalse) test_data.to_csv(output_test.path, indexFalse) # 2. 返回多个值的组件 component( packages_to_install[scikit-learn], base_imagepython:3.9 ) def train_op( train_data: Input[Dataset], model: Output[Model], n_estimators: int 100, max_depth: int 10 ) - NamedTuple(Outputs, [(accuracy, float), (model_path, str)]): import pandas as pd from sklearn.ensemble import RandomForestClassifier import joblib from collections import namedtuple data pd.read_csv(train_data.path) X data.drop(target, axis1) y data[target] clf RandomForestClassifier(n_estimatorsn_estimators, max_depthmax_depth) clf.fit(X, y) model_path model.path /model.joblib joblib.dump(clf, model_path) accuracy clf.score(X, y) Outputs namedtuple(Outputs, [accuracy, model_path]) return Outputs(accuracy, model_path) # 3. 使用ContainerOp底层API def train_op_container(train_data_path, n_estimators100): return dsl.ContainerOp( nametrain, imagepython:3.9, command[python, -c], arguments[ f import pandas as pd from sklearn.ensemble import RandomForestClassifier import joblib data pd.read_csv({train_data_path}) X data.drop(target, axis1) y data[target] model RandomForestClassifier(n_estimators{n_estimators}) model.fit(X, y) joblib.dump(model, /model/model.pkl) print(Model saved) ], file_outputs{model: /model} ) print(code)kubeflow_pipelines()3.2 Pipeline定义defpipeline_definition():Pipeline定义print(\n*60)print(Pipeline定义)print(*60)code # 1. 定义Pipeline dsl.pipeline( nameML Training Pipeline, descriptionEnd-to-end machine learning pipeline, pipeline_rootgs://my-bucket/pipeline-root ) def ml_pipeline( data_path: str gs://bucket/data.csv, test_size: float 0.2, n_estimators: int 100, max_depth: int 10 ): # 数据加载 load_task load_data_op(data_path) # 数据预处理 preprocess_task preprocess_op( input_dataload_task.outputs[data], test_sizetest_size ) # 模型训练 train_task train_op( train_datapreprocess_task.outputs[output_train], n_estimatorsn_estimators, max_depthmax_depth ) # 模型评估 evaluate_task evaluate_op( test_datapreprocess_task.outputs[output_test], modeltrain_task.outputs[model] ) # 条件部署 with dsl.Condition(evaluate_task.outputs[accuracy] 0.85): deploy_task deploy_op(modeltrain_task.outputs[model]) # 2. 带循环的Pipeline dsl.pipeline(nameHyperparameter Tuning Pipeline) def hp_tuning_pipeline( data_path: str gs://bucket/data.csv, n_estimators_list: list [50, 100, 150, 200] ): load_task load_data_op(data_path) preprocess_task preprocess_op(load_task.outputs[data]) # 并行训练多个模型 train_tasks [] for n_estimators in n_estimators_list: train_task train_op( train_datapreprocess_task.outputs[output_train], n_estimatorsn_estimators ) train_tasks.append(train_task) # 选择最佳模型 best_model_task select_best_model_op(train_tasks) # 3. 带资源的Pipeline dsl.pipeline(nameResource-aware Pipeline) def resource_pipeline(data_path: str gs://bucket/data.csv): load_task load_data_op(data_path).set_cpu_request(1).set_memory_request(2Gi) preprocess_task preprocess_op(load_task.outputs[data]).set_gpu_limit(0) train_task train_op( preprocess_task.outputs[output_train] ).set_cpu_request(4).set_memory_request(8Gi).set_gpu_limit(1) # 设置重试策略 train_task train_task.set_retry(3) # 4. 编译Pipeline # kfp.compiler.Compiler().compile(ml_pipeline, pipeline.yaml) # 5. 运行Pipeline import kfp client kfp.Client() run client.create_run_from_pipeline_func( ml_pipeline, arguments{ data_path: gs://bucket/data.csv, test_size: 0.2, n_estimators: 100 }, experiment_nameml_experiment ) print(code)pipeline_definition()四、分布式训练4.1 TensorFlow分布式训练defdistributed_training():分布式训练print(\n*60)print(分布式训练)print(*60)code # 1. TFJob定义 apiVersion: kubeflow.org/v1 kind: TFJob metadata: name: distributed-tfjob spec: tfReplicaSpecs: Chief: replicas: 1 template: spec: containers: - name: tensorflow image: tensorflow/tensorflow:2.13.0-gpu command: - python - /app/distributed_train.py resources: limits: nvidia.com/gpu: 1 Worker: replicas: 2 template: spec: containers: - name: tensorflow image: tensorflow/tensorflow:2.13.0-gpu command: - python - /app/distributed_train.py resources: limits: nvidia.com/gpu: 1 ParameterServer: replicas: 1 template: spec: containers: - name: tensorflow image: tensorflow/tensorflow:2.13.0 command: - python - /app/parameter_server.py # 2. PyTorchJob定义 apiVersion: kubeflow.org/v1 kind: PyTorchJob metadata: name: distributed-pytorchjob spec: pytorchReplicaSpecs: Master: replicas: 1 template: spec: containers: - name: pytorch image: pytorch/pytorch:2.0.0-cuda11.7 command: - python - -m - torch.distributed.run - --nnodes3 - --nproc_per_node1 - --rdzv_endpoint$(MASTER_ADDR):29500 - distributed_train.py resources: limits: nvidia.com/gpu: 1 Worker: replicas: 2 template: spec: containers: - name: pytorch image: pytorch/pytorch:2.0.0-cuda11.7 command: - python - -m - torch.distributed.run - --nnodes3 - --nproc_per_node1 - --rdzv_endpoint$(MASTER_ADDR):29500 - distributed_train.py resources: limits: nvidia.com/gpu: 1 # 3. MPIJob定义 apiVersion: kubeflow.org/v1 kind: MPIJob metadata: name: distributed-mpijob spec: slotsPerWorker: 1 runPolicy: cleanPodPolicy: Running mpiReplicaSpecs: Launcher: replicas: 1 template: spec: containers: - name: mpi-launcher image: mpioperator/tensorflow-benchmarks:latest command: - mpirun - --allow-run-as-root - -np 4 - --hostfile /etc/mpi/hostfile - python - distributed_train.py Worker: replicas: 2 template: spec: containers: - name: mpi-worker image: mpioperator/tensorflow-benchmarks:latest command: - /usr/sbin/sshd - -De print(code)distributed_training()五、Katib超参数调优5.1 超参数搜索defkatib_hpo():Katib超参数调优print(\n*60)print(Katib超参数调优)print(*60)code # 1. Katib Experiment定义 apiVersion: kubeflow.org/v1beta1 kind: Experiment metadata: name: random-forest-tuning spec: objective: type: maximize goal: 0.95 objectiveMetricName: accuracy algorithm: algorithmName: bayesianoptimization parallelTrialCount: 3 maxTrialCount: 12 maxFailedTrialCount: 3 parameters: - name: n_estimators parameterType: int feasibleSpace: min: 50 max: 300 - name: max_depth parameterType: int feasibleSpace: min: 5 max: 20 - name: min_samples_split parameterType: int feasibleSpace: min: 2 max: 10 - name: max_features parameterType: categorical feasibleSpace: list: - sqrt - log2 trialTemplate: primaryContainerName: training-container trialParameters: - name: n_estimators description: Number of trees reference: n_estimators - name: max_depth description: Max depth reference: max_depth - name: min_samples_split description: Min samples split reference: min_samples_split - name: max_features description: Max features reference: max_features trialSpec: apiVersion: batch/v1 kind: Job spec: template: spec: containers: - name: training-container image: training-image:latest command: - python - train.py - --n_estimators${trialParameters.n_estimators} - --max_depth${trialParameters.max_depth} - --min_samples_split${trialParameters.min_samples_split} - --max_features${trialParameters.max_features} # 2. 创建Experiment # kubectl apply -f experiment.yaml # 3. 查看Experiment状态 # kubectl get experiments # kubectl describe experiment random-forest-tuning # 4. 查看最佳试验 # kubectl get trials -l experimentrandom-forest-tuning # 5. 使用Python SDK from kubeflow.katib import KatibClient client KatibClient() client.create_experiment(experiment.yaml) experiments client.list_experiments(namespacekubeflow) best_trial client.get_optimal_hyperparameters(random-forest-tuning) print(fBest parameters: {best_trial}) print(code)katib_hpo()六、KFServing模型部署6.1 模型服务defkfserving():KFServing模型部署print(\n*60)print(KFServing模型部署)print(*60)code # 1. InferenceService定义 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: sklearn-model spec: predictor: sklearn: storageUri: gs://kfserving-examples/models/sklearn/iris resources: limits: cpu: 100m memory: 256Mi requests: cpu: 100m memory: 256Mi # 2. PyTorch模型部署 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: pytorch-model spec: predictor: pytorch: storageUri: gs://kfserving-examples/models/pytorch/cifar10 resources: limits: nvidia.com/gpu: 1 # 3. TensorFlow模型部署 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: tensorflow-model spec: predictor: tensorflow: storageUri: gs://kfserving-examples/models/tensorflow/mnist resources: limits: cpu: 2 memory: 4Gi # 4. 自定义模型 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: custom-model spec: predictor: containers: - name: custom-container image: custom-model:latest command: - python - -m - model_server args: - --model_namecustom - --model_dir/mnt/models resources: limits: cpu: 2 memory: 4Gi storageUri: gs://my-bucket/models # 5. 金丝雀发布 apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: canary-model spec: predictor: canary: trafficPercent: 10 model: sklearn: storageUri: gs://kfserving-examples/models/sklearn/iris-v2 # 6. 请求示例 # curl -X POST http://sklearn-model.default.example.com/v1/models/sklearn-model:predict \\ # -H Content-Type: application/json \\ # -d {instances:[[6.8,2.8,4.8,1.4]]} print(code)kfserving()七、完整Pipeline示例7.1 端到端Pipelinedefcomplete_pipeline():完整Pipeline示例print(\n*60)print(完整Kubeflow Pipeline)print(*60)code import kfp from kfp import dsl from kfp.dsl import component, Input, Output, Dataset, Model, Metrics # 定义所有组件 component( packages_to_install[pandas, numpy, scikit-learn], base_imagepython:3.9 ) def data_loader_op( data_url: str, output_data: Output[Dataset] ): import pandas as pd data pd.read_csv(data_url) data.to_csv(output_data.path, indexFalse) component( packages_to_install[pandas, scikit-learn], base_imagepython:3.9 ) def preprocessor_op( input_data: Input[Dataset], output_train: Output[Dataset], output_test: Output[Dataset], test_size: float 0.2 ): import pandas as pd from sklearn.model_selection import train_test_split data pd.read_csv(input_data.path) X data.drop(target, axis1) y data[target] X_train, X_test, y_train, y_test train_test_split( X, y, test_sizetest_size, random_state42 ) train_data pd.concat([X_train, y_train], axis1) test_data pd.concat([X_test, y_test], axis1) train_data.to_csv(output_train.path, indexFalse) test_data.to_csv(output_test.path, indexFalse) component( packages_to_install[scikit-learn, joblib], base_imagepython:3.9 ) def trainer_op( train_data: Input[Dataset], model: Output[Model], n_estimators: int 100, max_depth: int 10 ): import pandas as pd from sklearn.ensemble import RandomForestClassifier import joblib data pd.read_csv(train_data.path) X data.drop(target, axis1) y data[target] clf RandomForestClassifier( n_estimatorsn_estimators, max_depthmax_depth, random_state42 ) clf.fit(X, y) model_path model.path /model.joblib joblib.dump(clf, model_path) component( packages_to_install[scikit-learn, joblib, pandas], base_imagepython:3.9 ) def evaluator_op( test_data: Input[Dataset], model: Input[Model], metrics: Output[Metrics] ): import pandas as pd from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import joblib data pd.read_csv(test_data.path) X data.drop(target, axis1) y data[target] model_path model.path /model.joblib clf joblib.load(model_path) y_pred clf.predict(X) accuracy accuracy_score(y, y_pred) precision precision_score(y, y_pred, averageweighted) recall recall_score(y, y_pred, averageweighted) f1 f1_score(y, y_pred, averageweighted) metrics.log_metric(accuracy, accuracy) metrics.log_metric(precision, precision) metrics.log_metric(recall, recall) metrics.log_metric(f1, f1) component( packages_to_install[google-cloud-storage], base_imagepython:3.9 ) def deployer_op( model: Input[Model], model_name: str, bucket: str ): from google.cloud import storage import os client storage.Client() bucket_obj client.bucket(bucket) model_path model.path /model.joblib blob bucket_obj.blob(fmodels/{model_name}/model.joblib) blob.upload_from_filename(model_path) print(fModel deployed to gs://{bucket}/models/{model_name}/model.joblib) dsl.pipeline( nameComplete ML Pipeline, descriptionEnd-to-end machine learning pipeline on Kubeflow, pipeline_rootgs://my-bucket/pipeline-root ) def complete_ml_pipeline( data_url: str gs://bucket/data.csv, test_size: float 0.2, n_estimators: int 100, max_depth: int 10, model_name: str random_forest_v1, deploy_bucket: str my-model-bucket ): # 数据加载 load_task data_loader_op(data_urldata_url) # 数据预处理 preprocess_task preprocessor_op( input_dataload_task.outputs[output_data], test_sizetest_size ) # 模型训练 train_task trainer_op( train_datapreprocess_task.outputs[output_train], n_estimatorsn_estimators, max_depthmax_depth ) # 模型评估 evaluate_task evaluator_op( test_datapreprocess_task.outputs[output_test], modeltrain_task.outputs[model] ) # 条件部署 with dsl.Condition(evaluate_task.outputs[metrics][accuracy] 0.85): deploy_task deployer_op( modeltrain_task.outputs[model], model_namemodel_name, bucketdeploy_bucket ) # 编译并运行 if __name__ __main__: kfp.compiler.Compiler().compile(complete_ml_pipeline, pipeline.yaml) client kfp.Client() run client.create_run_from_pipeline_func( complete_ml_pipeline, arguments{ data_url: gs://bucket/data.csv, test_size: 0.2, n_estimators: 100, max_depth: 10 }, experiment_nameproduction_experiment ) print(code)complete_pipeline()八、总结组件功能适用场景Pipelines工作流编排ML流水线Katib超参数调优模型优化KFServing模型部署生产推理TFJob/PyTorchJob分布式训练大规模训练Notebooks开发环境交互式开发Kubeflow vs Airflow对比Kubeflow: Kubernetes原生适合大规模ML工作负载Airflow: 通用工作流适合数据ETL和调度
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2591018.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!