dagster的etl实现

news2025/5/18 8:02:17

本文展示了如何使用Dagster框架实现一个动态ETL(Extract, Transform, Load)流程。通过定义多个操作(op),包括生成动态任务、处理单个任务、收集结果和汇总结果,构建了一个动态任务处理流程。generate_tasks操作生成多个动态任务,process_task对每个任务进行处理,collect_results收集所有处理结果,summarize_results汇总结果并生成资产。最后,通过Definitions将流程定义为可执行的作业(job),并提供了直接运行流程的示例代码

注意dagster版本

dagster, version 1.10.14

"""
实现dagster的etl实践案例。
"""

from dagster import op, graph, job
from dagster import DynamicOut, DynamicOutput, AssetMaterialization, Out, Output
from dagster import Definitions
from typing import List

@op(out=DynamicOut(int))
def generate_tasks(context):
    """生成动态任务,每个任务对应一个整数值"""
    context.log.info("开始生成动态任务...")
    for i in range(3):
        yield DynamicOutput(value=i, mapping_key=f"task_{i}")

@op
def process_task(context, num: int) -> int:
    """处理单个任务,将输入值乘以2"""
    context.log.info(f"处理任务 {num}")
    return num * 2

@op(out=Out(List[int]))
def collect_results(context, results: List[int]) -> List[int]:
    """收集所有处理结果并返回列表"""
    context.log.info(f"收集到 {len(results)} 个处理结果")
    context.log.info(f"数据细节:{results}")
    return results

@op(out = Out(int))
def summarize_results(context, results: list):
    """汇总处理结果,计算总和"""

    total = sum(results)
    total = int(total)
    context.log.info(f"所有任务处理完成,总和为: {total}")
    yield AssetMaterialization(
        asset_key="final_result",
        description="所有任务处理结果的总和",
        metadata={"total": total}
    )
    yield Output(total, output_name="result")



@graph
def dynamic_pipeline():
    """定义动态任务处理流程"""
    results = generate_tasks().map(process_task)
    collected = collect_results(results.collect())
    summarize_results(collected)

# 将graph转换为可执行的job
@job
def run_dynamic_pipeline():
    dynamic_pipeline()

# 定义可执行实体
defs = Definitions(
    jobs=[run_dynamic_pipeline]
)

# 示例:直接运行pipeline(用于测试)
if __name__ == "__main__":
    result = run_dynamic_pipeline.execute_in_process()
    print("执行结果:", result.success)


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2378356.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python的漫画网站管理系统

目录 技术栈介绍具体实现截图![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/0ed2084038144499a162b3fb731a5f37.png)![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/a76a091066f74a80bf7ac1be489ae8a8.png)系统设计研究方法:设计步骤设计流程核…

源码安装gperftools工具

源码安装gperftools工具 下载gperftools源码 https://github.com/gperftools/gperftools/releases/download/gperftools-2.16/gperftools-2.16.tar.gz 注:需要下载github上release版本,如果直接下载master分支上源码,将可能出现各种编译报错…

前端脚手架开发指南:提高开发效率的核心操作

前端脚手架通过自动化的方式可以提高开发效率并减少重复工作,而最强大的脚手架并不是现成的那些工具而是属于你自己团队量身定制的脚手架!本篇文章将带你了解脚手架开发的基本技巧,帮助你掌握如何构建适合自己需求的工具,并带着你…

搜索引擎工作原理|倒排索引|query改写|CTR点击率预估|爬虫

写在前面 使用搜索引擎是我们经常做的事情,搜索引擎的实现原理。 什么是搜索引擎 搜索引擎是一种在线搜索工具,当用户在搜索框输入关键词时,搜索引擎就会将与该关键词相关的内容展示给用户。比较大型的搜索引擎有谷歌,百度&…

Python实例题:Python自动工资条

目录 Python实例题 题目 python-automatic-payroll-slipPython 自动生成工资条脚本 代码解释 加载文件: 获取表头: 写入表头: 生成工资条: 保存文件: 运行思路 注意事项 Python实例题 题目 Python自动工资…

Function Calling万字实战指南:打造高智能数据分析Agent平台

个人主页:Guiat 归属专栏:科学技术变革创新 文章目录 1. Function Calling:智能交互的新范式1.1 Function Calling 技术概述1.2 核心优势分析 2. 数据分析Agent平台架构设计2.1 系统架构概览2.2 核心组件解析2.2.1 函数注册中心2.2.2 Agent控…

线对板连接器的兼容性问题:为何老旧设计难以满足现代需求?

线对板连接器作为电子设备的核心纽带,正面临前所未有的兼容性挑战。某智能工厂升级生产线时发现,沿用十年的2.54毫米间距连接器,在接入新型工业相机时出现30%的信号丢包率,而切换至0.4毫米超密间距连接器后,数据传输速…

AI517 AI本地部署 docker微调(失败)

本地部署AI 计划使用OLLAMA进行本地部署 修改DNS 访问github 刷新缓存 配置环境变量 OLLAMA安装成功 部署成功 计划使用docker进行微调 下载安装docker 虚拟化已开启 开启上面这些 准备下载ubuntu docker ragflow dify 用git去泡

VR和眼动控制集群机器人的方法

西安建筑科技大学信息与控制工程学院雷小康老师团队联合西北工业大学航海学院彭星光老师团队,基于虚拟现实(VR)和眼动追踪技术实现了人-集群机器人高效、灵活的交互控制。相关研究论文“基于虚拟现实和眼动的人-集群机器人交互方法” 发表于信…

TiDB 中新 Hash Join 的设计与性能优化

原文来源: https://tidb.net/blog/11667c37 本文作者:徐飞 导读 在数据库管理系统(DBMS)中,连接操作(Join)是查询处理的核心环节之一,其性能直接影响到整个系统的响应速度和效率…

1.共享内存(python共享内存实际案例,传输opencv frame)

主进程程序 send.py import cv2 import numpy as np from multiprocessing import shared_memory, resource_trackercap cv2.VideoCapture(0) if not cap.isOpened():print("无法打开 RTSP 流,请检查地址、网络连接或 GStreamer 配置。") else:# 创建共…

网页常见水印实现方式

文章目录 1 明水印技术实现1.1 DOM覆盖方案1.2 Canvas动态渲染1.3 CSS伪元素方案2 暗水印技术解析2.1 空域LSB算法2.2 频域傅里叶变换3 防篡改机制设计3.1 MutationObserver防护3.2 Canvas指纹追踪4 前后端实现对比5 攻防博弈深度分析5.1 常见破解手段5.2 进阶防御策略6 选型近…

【ARM】MDK如何将变量存储到指定内存地址

1、 文档目标 在嵌入式系统开发中,通过MDK(Microcontroller Development Kit)进行工程配置,将指定的变量存储到指定的内存地址上是一项非常重要的技术。这项操作不仅能够满足特定硬件架构的需求,还能优化系统的性能和…

Unity3D仿星露谷物语开发44之收集农作物

1、目标 在土地中挖掘后,洒下种子后逐渐成长,然后使用篮子收集成熟后的农作物,工具栏中也会相应地增加该农作物。 2、修改CropStandard的参数 Assets -> Prefabs -> Crop下的CropStandard,修改其Box Collider 2D的Size(Y…

langchain—chatchat

署部 下载项目 git clone --recursive https://github.com/chatchat-space/Langchain-Chatchat.git 进入目录 cd Langchain-Chatchat anaconda环境准备 创建python环境 conda create -n langchain_env python3.10 -y 激活环境 conda activate langchain_env 验证pyhton环境…

【LeetCode 热题 100】二叉树的最大深度 / 翻转二叉树 / 二叉树的直径 / 验证二叉搜索树

⭐️个人主页:小羊 ⭐️所属专栏:LeetCode 热题 100 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~ 目录 二叉树的中序遍历二叉树的最大深度翻转二叉树对称二叉树二叉树的直径二叉树的层序遍历将有序数组转换为二叉搜索树验…

关于软件测试开发的一些有趣的知识

文章目录 一、什么是测试?二、为什么要软件测试软件测试三、测试的岗位有哪些四 、软件测试和开发的区别五、走测试岗位为什么还要学开发。4、优秀的测试人员具备的素质我为什么走测试岗位 一、什么是测试? 其实这个问题说简单也不简单,说难…

uni-app 开发HarmonyOS的鸿蒙影视项目分享:从实战案例到开源后台

最近,HBuilderX 新版本发布,带来了令人兴奋的消息——uni-app 现在支持 Harmony Next 平台的 App 开发。这对于开发者来说无疑是一个巨大的福音,意味着使用熟悉的 Vue 3 语法和开发框架,就可以为鸿蒙生态贡献自己的力量。 前言 作…

售前工作.工作流程和工具

第一部分 售前解决方案及技术建议书的制作 售前解决方案编写的标准操作步骤SOP: 售前解决方案写作方法_哔哩哔哩_bilibili 第二部分 投标过程关键活动--商务标技术方案 1. 按项目管理--售前销售项目立项 销售活动和销售线索的跟踪流程和工具 1)拿到标书&#xff…

GPU与NPU异构计算任务划分算法研究:基于强化学习的Transformer负载均衡实践

点击 “AladdinEdu,同学们用得起的【H卡】算力平台”,H卡级别算力,按量计费,灵活弹性,顶级配置,学生专属优惠。 引言 在边缘计算与AI推理场景中,GPU-NPU异构计算架构已成为突破算力瓶颈的关键技…