告别定时任务!用Dagster监听器实现秒级数据响应自动化

news2025/5/14 8:16:32

在数据管道开发中,我们经常面临需要根据外部事件触发计算任务的场景。传统基于时间的调度方式存在资源浪费和时效性不足的问题。本文将通过Dagster的**传感器(Sensor)**功能,演示如何构建事件驱动的数据处理流程。

场景模拟:动态销售报表生成系统

假设业务部门需要实时获取特定产品在指定时间段的销售分析报表。传统方案需要人工手动触发任务,而我们希望通过以下方式实现自动化:

  • 当新的销售请求文件到达时自动触发计算
  • 根据请求参数动态生成报表
  • 仅在检测到有效请求时运行作业

在这里插入图片描述

实现步骤详解

1. 定义事件驱动型资产

首先创建一个接收动态参数的资产,该资产将根据请求参数查询数据仓库生成报表:

from dagster import asset, MaterializeResult, Config
import duckdb

class AdhocRequestConfig(Config):
    """请求参数配置"""
    department: str
    product: str
    start_date: str
    end_date: str

@asset(deps=["joined_data"], compute_kind="Python")
def adhoc_request(
    config: AdhocRequestConfig,
    duckdb: duckdb.DuckDBResource
) -> MaterializeResult:
    """动态销售报表生成"""
    query = f"""
        SELECT 
            department, 
            rep_name, 
            product_name, 
            SUM(dollar_amount) AS total_sales
        FROM joined_data
        WHERE 
            date >= '{config.start_date}' AND 
            date < '{config.end_date}' AND
            department = '{config.department}' AND
            product_name = '{config.product}'
        GROUP BY department, rep_name, product_name
    """
    
    with duckdb.get_connection() as conn:
        preview_df = conn.execute(query).fetchdf()
        
    return MaterializeResult(
        metadata={
            "preview": MaterializeResult.MetadataValue.md(preview_df.to_markdown(index=False))
        }
    )

2. 构建事件监听传感器

使用@sensor装饰器创建传感器,持续监控指定目录下的请求文件:

import os
import json
from dagster import sensor, SensorEvaluationContext, RunRequest

@sensor(job=adhoc_request_job)
def adhoc_request_sensor(context: SensorEvaluationContext):
    """请求文件监听传感器"""
    requests_dir = os.path.join(os.path.dirname(__file__), "../data/requests")
    current_state = {}
    
    for filename in os.listdir(requests_dir):
        if filename.endswith(".json"):
            file_path = os.path.join(requests_dir, filename)
            file_mtime = os.path.getmtime(file_path)
            
            # 检测新文件或修改过的文件
            if filename not in current_state or current_state[filename] != file_mtime:
                with open(file_path) as f:
                    request_config = json.load(f)
                
                # 生成唯一运行标识
                run_key = f"adhoc_request_{filename}_{file_mtime}"
                
                yield RunRequest(
                    run_key=run_key,
                    run_config={
                        "ops": {
                            "adhoc_request": {
                                "config": request_config
                            }
                        }
                    }
                )
                
            current_state[filename] = file_mtime

3. 部署与测试

更新Dagster定义文件并启动服务:

from dagster import Definitions, AssetGroup

defs = Definitions(
    assets=[adhoc_request],
    sensors=[adhoc_request_sensor],
    resources={
        "duckdb": duckdb.DuckDBResource(database="data/mydb.duckdb")
    }
)

操作流程:

  1. 将请求文件放入data/requests目录
  2. 在Dagster UI中启用传感器
  3. 观察自动化触发记录
  4. 查看生成的Markdown格式报表预览

在这里插入图片描述

核心优势

  1. 精准触发:仅在检测到有效事件时运行,避免空跑
  2. 动态配置:通过JSON文件传递参数,支持复杂查询条件
  3. 审计追踪:自动记录每次触发的配置和结果元数据
  4. 幂等性保障:通过run_key防止重复执行

扩展建议

  • 添加文件格式验证(如JSON Schema)
  • 实现请求去重机制
  • 集成Slack通知功能
  • 增加请求优先级队列

通过这种架构,我们可以轻松将传统批处理流程升级为实时事件驱动系统,显著提升数据分析的响应速度和资源利用率。传感器机制使得Dagster在复杂ETL场景中展现出独特的灵活性和扩展能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2338254.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Unity】打包TextMeshPro的字体

前言 在Unity中&#xff0c;TextMeshPro与常规 Text 组件相比提供了更高级的文本呈现功能&#xff0c;TextMesh Pro 可以处理各种语言&#xff0c;包括中文。我们可以轻松地在 Unity 项目中使用中文&#xff0c;而不必担心字体和布局问题。TextMeshPro需要的字体资源就需要我们…

51单片机实验五:A/D和D/A转换

一、实验环境与实验器材 环境&#xff1a;Keli&#xff0c;STC-ISP烧写软件,Proteus. 器材&#xff1a;TX-1C单片机&#xff08;STC89C52RC&#xff09;、电脑。 二、 实验内容及实验步骤 1.A/D转换 概念&#xff1a;模数转换是将连续的模拟信号转换为离散的数字信…

使用VHD虚拟磁盘安装双系统,避免磁盘分区

前言 很多时候&#xff0c;我们对现在的操作系统不满意,就想要自己安装一个双系统 但是安装双系统又涉及到硬盘分区,非常复杂,容易造成数据问题 虚拟机的话有经常用的不爽,这里其实有一个介于虚拟机和双系统之间的解决方法,就是使用虚拟硬盘文件安装系统. 相当于系统在机上…

Kafka消费者端重平衡流程

重平衡的完整流程需要消费者 端和协调者组件共同参与才能完成。我们先从消费者的视角来审视一下重平衡的流程。在消费者端&#xff0c;重平衡分为两个步骤&#xff1a;分别是加入组和等待领导者消费者&#xff08;Leader Consumer&#xff09;分配方案。这两个步骤分别对应两类…

Django之modelform使用

Django新增修改数据功能优化 目录 1.新增数据功能优化 2.修改数据功能优化 在我们做数据优化处理之前, 我们先回顾下传统的写法, 是如何实现增加修改的。 我们需要在templates里面新建前端的页面, 需要有新增还要删除, 比如说员工数据的新增, 那需要有很多个输入框, 那html…

云轴科技ZStack入选中国人工智能产业发展联盟《大模型应用交付供应商名录》

2025年4月8日至9日&#xff0c;中国人工智能产业发展联盟&#xff08;以下简称AIIA&#xff09;第十四次全体会议暨人工智能赋能新型工业化深度行&#xff08;南京站&#xff09;在南京召开。工业和信息化部科技司副司长杜广达&#xff0c;中国信息通信研究院院长、中国人工智能…

写论文时降AIGC和降重的一些注意事项

‘ 写一些研究成果&#xff0c;英文不是很好&#xff0c;用有道翻译过来句子很简单&#xff0c;句型很单一。那么你会考虑用ai吗&#xff1f; 如果语句太正式&#xff0c;高级&#xff0c;会被误判成aigc &#xff0c;慎重选择ai润色。 有的话就算没有用ai生成&#xff0c;但…

AI 编程工具—如何在 Cursor 中集成使用 MCP工具

AI 编程工具—如何在 Cursor 中集成使用 MCP工具 这里我们给出了常用的MCP 聚合工具,也就是我们可以在这些网站找MCP服务 这是一个MCP Server共享平台,用户可以在上面发布和下载MCP Server配置。在这里可以选择你需要的MCP 服务。 如果你不知道你的mcp 对应的server 名称也不…

《软件设计师》复习笔记(12.2)——成本管理、配置管理

目录 一、项目成本管理 1. 定义 2. 主要过程 3. 成本类型 4. 其他概念 真题示例&#xff1a; 二、软件配置管理 1. 定义 2. 主要活动 3. 配置项 4. 基线&#xff08;Baseline&#xff09; 5. 配置库类型 真题示例&#xff1a; 一、项目成本管理 1. 定义 在批准…

Spring 中的 @Cacheable 缓存注解

1 什么是缓存 第一个问题&#xff0c;首先要搞明白什么是缓存&#xff0c;缓存的意义是什么。 对于普通业务&#xff0c;如果要查询一个数据&#xff0c;一般直接select数据库进行查找。但是在高流量的情况下&#xff0c;直接查找数据库就会成为性能的瓶颈。因为数据库查找的…

settimeout和setinterval区别

1. setTimeout&#xff1a;单次延迟执行 语法&#xff1a; const timeoutId setTimeout(callback, delay, arg1, arg2, ...); 核心功能&#xff1a;在指定的 delay&#xff08;毫秒&#xff09;后&#xff0c;执行一次 callback 函数。 参数&#xff1a; callback&#x…

Kaamel隐私与安全分析报告:Microsoft Recall功能评估与风险控制

本报告对Microsoft最新推出的Recall功能进行了全面隐私与安全分析。Recall是Windows 11 Copilot电脑的专属AI功能&#xff0c;允许用户以自然语言搜索曾在电脑上查看过的内容。该功能在初次发布时因严重隐私和安全问题而备受争议&#xff0c;后经微软全面重新设计。我们的分析表…

Thymeleaf简介

在Java中&#xff0c;模板引擎可以帮助生成文本输出。常见的模板引擎包括FreeMarker、Velocity和Thymeleaf等 Thymeleaf是一个适用于Web和独立环境的现代服务器端Java模板引擎。 Thymeleaf 和 JSP比较&#xff1a; Thymeleaf目前所作的工作和JSP有相似之处&#xff0c;Thyme…

o3和o4-mini的升级有哪些亮点?

ChatGPT是基于OpenAI GPT系列的高性能对话生成AI&#xff0c;经过多代迭代不断提升自然语言理解和生成能力。 在过去的一年中&#xff0c;OpenAI先后发布了GPT-4、GPT‑4.1及多种mini版本&#xff0c;为不同使用场景提供灵活选择。​ 随着用户需求向更高效、更精准的推理和视觉…

MATLAB 控制系统设计与仿真 - 36

鲁棒工具箱定义了个新的对象类ureal,可以定义在某个区间内可变的变量。 函数的调用格式为&#xff1a; p ureal(name,nominalvalue) % name为变量名,nominalValue为标称值&#xff0c;默认变化值为/-1 p ureal(name,nominalvalue,PlusMinus,plusminus) p ureal(name,nomin…

Spring数据访问全解析:ORM整合与JDBC高效实践

目录 一、Spring ORM集成深度剖析 &#x1f31f; ORM模块架构设计 核心集成特性&#xff1a; 整合MyBatis示例配置&#xff1a; 二、Spring JDBC高效实践指南 &#x1f31f; 传统JDBC vs Spring JDBC对比 &#x1f31f; JdbcTemplate核心操作示例 批量操作优化&#xf…

【HCIA】使用Access port实现简易的VLAN间通信

前言 当我们拥有一台三层交换机与两个vlan&#xff0c;我们可以使用简易的Vlanif配置实现VLAN间通信。 文章目录 前言1. 拓扑图2. 配置交换机3. 配置PC1与PC2的网络4. port link-type后记修改记录 1. 拓扑图 2. 配置交换机 <Huawei>system-view [Huawei]undo info-cent…

6.VTK 颜色

文章目录 概念RGB示例HSV示例 概念 RGB颜色系统&#xff1a;通过红(R)、绿(G)、蓝(B)三个颜色分量的组合来定义颜色。每个分量的取值范围是0到1&#xff0c;其中(0, 0, 0)代表黑色&#xff0c;而(1, 1, 1)代表白色。可以使用vtkProperty::SetColor(r, g, b)方法为Actor设置颜色…

shiro使用

shiro是apache提供的一种安全框架。他可以将登录&#xff0c;权限这一方面简单化。 使用shiro需要引入 <dependency><groupId>org.apache.shiro</groupId><artifactId>shiro-core</artifactId><version>1.9.0</version></depend…

光谱相机的成像方式

光谱相机的成像方式决定了其如何获取物体的空间与光谱信息&#xff0c;核心在于分光技术与扫描模式的结合。以下是主要成像方式的分类解析&#xff1a; ‌一、滤光片切换型‌ ‌1. 滤光片轮&#xff08;Filter Wheel&#xff09;‌ ‌原理‌&#xff1a;通过旋转装有多个窄带…