Dagster Pipes系列-1:调用外部Python脚本

news2025/5/14 4:36:00

本文是"Dagster Pipes教程"的第一部分,介绍如何通过Dagster资产调用外部Python脚本并集成到数据管道中。首先,创建Dagster资产subprocess_asset,利用PipesSubprocessClient资源执行外部脚本external_code.py,实现跨进程的数据处理。通过dagster dev启动UI,可在Dagster界面中监控子进程的执行状态和日志输出,包括标准输出(stdout)内容。本文详细讲解了资产定义、资源注入及命令执行的完整流程,为后续修改外部代码以支持Dagster Pipes通信奠定基础。此方法适用于需要将现有脚本集成到Dagster数据管道的场景,提升自动化与可观测性。完成本部分后,读者可继续学习第二部分,掌握如何增强外部脚本与Dagster的交互能力。

教程概述

本教程将指导你完成以下步骤:

  1. 创建一个调用外部Python脚本的Dagster资产
  2. 定义必要的Dagster资源(resources)
  3. 在Dagster UI中运行并查看结果
    在这里插入图片描述

前提条件

在开始之前,请确保你已经:

  • 安装了Dagster
  • 创建了一个名为external_code.py的独立Python脚本,内容如下:
import pandas as pd

def main():
    orders_df = pd.DataFrame({
        "order_id": [1, 2],
        "item_id": [432, 878]
    })
    total_orders = len(orders_df)
    print(f"processing total {total_orders} orders")

第一步:定义Dagster资产

首先,在与external_code.py相同的目录下创建一个名为dagster_code.py的新文件。

1.1 创建资产定义

将以下代码复制到dagster_code.py中:

import shutil
import dagster as dg

@dg.asset
def subprocess_asset(
    context: dg.AssetExecutionContext,
    pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
    cmd = [
        shutil.which("python"),
        dg.file_relative_path(__file__, "external_code.py")
    ]
    return pipes_subprocess_client.run(
        command=cmd,
        context=context
    ).get_materialize_result()

代码解析:

  • 我们创建了一个名为subprocess_asset的资产
  • 使用AssetExecutionContext作为上下文参数,它提供了系统信息如资源、配置和日志记录
  • 指定了PipesSubprocessClient资源
  • 构建了一个命令列表来执行外部脚本
  • 使用pipes_subprocess_client.run()方法在管道会话中同步执行子进程

1.2 从资产调用外部代码

上述代码中的关键部分是:

pipes_subprocess_client.run(
    command=cmd,
    context=context
).get_materialize_result()

这段代码做了什么:

  • PipesSubprocessClient资源暴露了一个run方法
  • 当资产执行时,这个方法会在管道会话中同步执行子进程
  • 返回一个PipesClientCompletedInvocation对象
  • 可以使用get_materialize_result()方法访问子进程报告的MaterializeResult事件

第二步:定义Definitions对象

为了让Dagster工具(如CLI、UI和Dagster+)能够加载和访问资产及子进程资源,我们需要创建一个Definitions对象。

dagster_code.py文件末尾添加以下代码:

from dagster import Definitions

defs = Definitions(
    assets=[subprocess_asset],
    resources={
        "pipes_subprocess_client": dg.PipesSubprocessClient()
    }
)

此时,dagster_code.py文件应该如下所示:

import shutil
import dagster as dg

@dg.asset
def subprocess_asset(
    context: dg.AssetExecutionContext,
    pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:
    cmd = [
        shutil.which("python"),
        dg.file_relative_path(__file__, "external_code.py")
    ]
    return pipes_subprocess_client.run(
        command=cmd,
        context=context
    ).get_materialize_result()

from dagster import Definitions

defs = Definitions(
    assets=[subprocess_asset],
    resources={
        "pipes_subprocess_client": dg.PipesSubprocessClient()
    }
)

第三步:从Dagster UI运行子进程

现在,让我们在Dagster UI中执行我们创建的子进程资产。

  1. 在新的命令行会话中运行以下命令启动UI:

    dagster dev -f dagster_code.py
    
  2. 点击右上角的"Materialize"按钮来运行你的代码

  3. 导航到"Run details"页面,在这里你可以看到运行的日志

  4. external_code.py中,我们有一个打印语句将输出到stdout。Dagster会在UI的原始计算日志视图中显示这些内容。

  5. 要查看stdout日志,切换日志部分到stdout:

在这里插入图片描述

下一步

到目前为止,你已经创建了一个调用外部Python脚本的Dagster资产,在子进程中执行了代码,并在Dagster UI中查看了结果。接下来,你将学习如何修改外部代码以与Dagster Pipes配合工作,将信息发送回Dagster。

总结

通过本教程的第一部分,我们实现了:

  • 创建了一个Dagster资产来调用外部Python脚本
  • 配置了必要的资源来支持子进程执行
  • 在Dagster UI中成功运行并查看了结果

这个基础设置为你在后续步骤中实现更复杂的管道通信打下了良好的基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2375105.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python shutil 指定文件夹打包文件为 zip 压缩包

python shutil 指定文件夹打包文件为 zip 压缩包,具体代码如下: import shutil# 指定要打包的文件夹路径 src_doc ./test# 指定输出的压缩包文件名(不包含扩展名) output_filename testfromat_ zip# 打包并压缩文件夹为 ZIP …

Webug4.0通关笔记25- 第30关SSRF

目录 一、SSRF简介 1.SSRF原理 2.渗透方法 二、第30关SSRF渗透实战 1.打开靶场 2.渗透实战 (1)Windows靶场修复 (2)Docker靶场修复 (3)获取敏感文件信息 (4)内网端口与服务…

OpenCV 中用于背景分割的一个类cv::bgsegm::BackgroundSubtractorLSBP

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::bgsegm::BackgroundSubtractorLSBP 是 OpenCV 中用于背景分割的一个类,它基于局部样本二进制模式(Local Sample Bina…

MacOS 上构建 gem5

MacOS 中只存在 python3,但是scons 只认 python,不在 系统中创建 软连接,一个是因为比较难操作;另一个是尽量不要更改系统。所以独立构件python 和scons: 1,安装python 下载源代码: Python S…

认识中间件-以及两个简单的示例

认识中间件-以及两个简单的示例 什么是中间件一个响应处理中间件老朋友 nest g如何使用为某个module引入全局引入编写逻辑一个日志中间件nest g mi 生成引入思考代码进度什么是中间件 官方文档 中间件是在路由处理程序之前调用的函数。中间件函数可以访问请求和响应对象,以及…

(五)毛子整洁架构(分布式日志/Redis缓存/OutBox Pattern)

文章目录 项目地址一、结构化日志1.1 使用Serilog1. 安装所需要的包2. 注册服务和配置3. 安装Seq服务 1.2 添加分布式id中间件1. 添加中间件2. 注册服务3. 修改Application的LoggingBehavior 二、Redis缓存2.1 添加缓存1. 创建接口ICaching接口2. 实现ICaching接口3. 注册Cachi…

大模型微调终极方案:LoRA、QLoRA原理详解与LLaMA-Factory、Xtuner实战对比

文章目录 一、微调概述1.1 微调步骤1.2 微调场景 二、微调方法2.1 三种方法2.2 方法对比2.3 关键结论 三、微调技术3.1 微调依据3.2 LoRA3.2.1 原理3.2.2 示例 3.3 QLoRA3.4 适用场景 四、微调框架4.1 LLaMA-Factory4.2 Xtuner4.3 对比 一、微调概述 微调(Fine-tun…

云效 MCP Server:AI 驱动的研发协作新范式

作者:黄博文、李晔彬 云效 MCP Server 是什么? 云效 MCP(Model Context Protocol)是阿里云云效平台推出的模型上下文协议标准化接口系统,作为连接 AI 助手与 DevOps 平台的核心桥梁,通过模型上下文协议将…

Linux常见指令解析(三)

通配符 * *可以匹配任意名称的文件,如: ls * 列出当前目录下的所有非隐藏文件和目录,并展开目录内容 ls *.c 列出当前目录下以.c为结尾的文件 rm -rf * 删除所有非隐藏文件 alias指令 alias指令用于给命令取别名。如: 给ls …

HTTP学习

HTTP知识 01. 经典五层模型 应用层 为应用软件提供了很多服务,构建于协议之上。 传输层 数据的传输都是在这层定义的,数据过大分包,分片。 网络层 为数据在节点之间传输创建逻辑链路 数据链路层 通讯实体间建立数据链路连接 物理层 主要作用…

go语言实现IP归属地查询

效果: 实现代码main.go package mainimport ("encoding/json""fmt""io/ioutil""net/http""os" )type AreaData struct {Continent string json:"continent"Country string json:"country"ZipCode …

Android RxJava框架分析:它的执行流程是如何的?它的线程是如何切换的?如何自定义RxJava操作符?

目录 RxJava是什么?为什么使用。RxJava是如何使用的呢?RxJava如何和Retrofit一起使用。RxJava源码分析。 (1)他执行流程是如何的。(2)map(3)线程的切换。 如何自定义RxJava操作符…

MySQL及线程关于锁的面试题

目录 1.了解过 MySQL 死锁问题吗? 2.什么是线程死锁?死锁相关面试题 2.1 什么是死锁: 2.2 形成死锁的四个必要条件是什么? 2.3 如何避免线程死锁? 3. MySQL 怎么排查死锁问题? 4.Java线上死锁问题如…

【工作记录】crmeb后端项目打开、运行

1、下载代码 1)安装git 不再详述 2)git拉代码 项目地址如下,在vscode-分支中拉代码 # 克隆项目 git clone https://gitee.com/ZhongBangKeJi/crmeb_java/ 截图如下是已经成功拉下来 注意安装对应版本 2、maven配置 安装配置见&#x…

智能手表测试计划文档(软/硬件)

📄 智能手表测试计划文档(软/硬件) 项目名称:Aurora Watch S1 文档编号:AW-S1-QA-TP-001 编制日期:2025-xx-xx 版本:V1.0 编写人:xxx(测试主管) 一、测试目标…

k8s监控方案实践(三):部署与配置Grafana可视化平台

k8s监控方案实践(三):部署与配置Grafana可视化平台 文章目录 k8s监控方案实践(三):部署与配置Grafana可视化平台一、Grafana简介1. 什么是Grafana?2. Grafana与Prometheus的关系3. Grafana应用场…

嵌入式系统架构验证工具:AADL Inspector v1.10 全新升级

软件架构建模与早期验证是嵌入式应用的关键环节。架构分析与设计语言(AADL)是专为应用软件及执行平台架构模型设计的语言,兼具文本与图形化的双重特性。AADL Inspector是一款轻量级的独立工具: 核心处理能力包括 √ 支持处理AA…

STM32-模电

目录 一、MOS管 二、二极管 三、IGBT 四、运算放大器 五、推挽、开漏、上拉电阻 一、MOS管 1. MOS简介 这里以nmos管为例,注意箭头方向。G门极/栅极,D漏极,S源极。 当给G通高电平时,灯泡点亮,给G通低电平时&a…

华为云Flexus+DeepSeek征文|从开通到应用:华为云DeepSeek-V3/R1商用服务深度体验

前言 本文章主要讲述在华为云ModelArts Studio上 开通DeepSeek-V3/R1商用服务的流程,以及开通过程中的经验分享和使用感受帮我更多开发者,在华为云平台快速完成 DeepSeek-V3/R1商用服务的开通以及使用入门注意:避免测试过程中出现部署失败等问…

鸿蒙NEXT开发动画案例5

1.创建空白项目 2.Page文件夹下面新建Spin.ets文件,代码如下: /*** TODO SpinKit动画组件 - Pulse 脉冲动画* author: CSDN—鸿蒙布道师* since: 2024/05/09*/ ComponentV2 export struct SpinFive {// 参数定义Require Param spinSize: number 48;Re…