本文展示了如何使用Dagster框架实现一个动态ETL(Extract, Transform, Load)流程。通过定义多个操作(op),包括生成动态任务、处理单个任务、收集结果和汇总结果,构建了一个动态任务处理流程。generate_tasks操作生成多个动态任务,process_task对每个任务进行处理,collect_results收集所有处理结果,summarize_results汇总结果并生成资产。最后,通过Definitions将流程定义为可执行的作业(job),并提供了直接运行流程的示例代码
注意dagster版本
dagster, version 1.10.14
"""
实现dagster的etl实践案例。
"""
from dagster import op, graph, job
from dagster import DynamicOut, DynamicOutput, AssetMaterialization, Out, Output
from dagster import Definitions
from typing import List
@op(out=DynamicOut(int))
def generate_tasks(context):
"""生成动态任务,每个任务对应一个整数值"""
context.log.info("开始生成动态任务...")
for i in range(3):
yield DynamicOutput(value=i, mapping_key=f"task_{i}")
@op
def process_task(context, num: int) -> int:
"""处理单个任务,将输入值乘以2"""
context.log.info(f"处理任务 {num}")
return num * 2
@op(out=Out(List[int]))
def collect_results(context, results: List[int]) -> List[int]:
"""收集所有处理结果并返回列表"""
context.log.info(f"收集到 {len(results)} 个处理结果")
context.log.info(f"数据细节:{results}")
return results
@op(out = Out(int))
def summarize_results(context, results: list):
"""汇总处理结果,计算总和"""
total = sum(results)
total = int(total)
context.log.info(f"所有任务处理完成,总和为: {total}")
yield AssetMaterialization(
asset_key="final_result",
description="所有任务处理结果的总和",
metadata={"total": total}
)
yield Output(total, output_name="result")
@graph
def dynamic_pipeline():
"""定义动态任务处理流程"""
results = generate_tasks().map(process_task)
collected = collect_results(results.collect())
summarize_results(collected)
# 将graph转换为可执行的job
@job
def run_dynamic_pipeline():
dynamic_pipeline()
# 定义可执行实体
defs = Definitions(
jobs=[run_dynamic_pipeline]
)
# 示例:直接运行pipeline(用于测试)
if __name__ == "__main__":
result = run_dynamic_pipeline.execute_in_process()
print("执行结果:", result.success)