Apache Airflow 系列教程 | 第6课:DAG 解析与处理引擎

news2026/5/8 0:07:17
导读(Introduction)欢迎来到 Apache Airflow 源码深度解析系列的第六课。在前一课中,我们深入剖析了 Scheduler 的核心原理——它如何在循环中创建 DagRun、推进任务状态、将任务入队给 Executor。但 Scheduler 能调度 DAG 的前提是:系统必须先"看到"这些 DAG。这就是 DAG 解析与处理引擎所承担的职责。想象一下:你在dags/目录下新建了一个 Python 文件,定义了一个 DAG。Airflow 是如何发现这个文件、导入其中的 Python 模块、提取 DAG 对象、序列化并写入数据库的?这个从"文件"到"数据库记录"的转换过程,正是本课的核心主题。Airflow 3.x 对 DAG 解析体系做了重大重构,引入了DAG Bundle概念(源自 AIP-66),将 DAG 文件的组织与加载从"单一文件夹扫描"升级为"多源可插拔的 Bundle 管理"。同时,解析过程被设计为独立的子进程,实现了与 Scheduler 主进程的安全隔离——这意味着即使用户编写的 DAG 代码存在 bug 或安全风险,也不会影响调度器的稳定性。本课将带你逐层拆解这个精密的解析引擎,从文件发现到子进程解析,从序列化到数据库持久化,完整理解 Airflow 如何将静态的 Python 文件转化为可调度的工作流实体。学习目标(Learning Objectives)完成本课学习后,你将能够:理解 DAG 文件发现与导入机制——掌握 Airflow 如何扫描目录、过滤文件、检测 DAG 内容掌握 DagBag 容器的工作原理——了解 DAG 的内存管理、验证和收集过程理解 DAG Bundle 概念——掌握 AIP-66 引入的多源 DAG 文件组织与加载体系分析 DagFileProcessorManager 的处理循环——理解并行解析的调度策略和生命周期管理掌握子进程解析的通信协议——了解 Manager 与 Parser 子进程之间的 IPC 机制理解解析结果的持久化路径——从序列化 DAG 到写入 DagModel、SerializedDagModel 的完整链路正文内容(Main Content)1. 整体架构:从文件到数据库的转换链路DAG 解析引擎的核心使命是将用户编写的 Python DAG 文件转化为数据库中的结构化记录。这个过程可以概括为以下流水线:DAG 文件 (.py) │ ▼ ┌─────────────────────────────────────────┐ │ DagFileProcessorManager (管理进程) │ │ - 文件发现 Bundle 刷新 │ │ - 文件队列调度 │ │ - 子进程生命周期管理 │ └────────────────────┬────────────────────┘ │ fork 子进程 ▼ ┌─────────────────────────────────────────┐ │ DagFileProcessorProcess (子进程) │ │ - _parse_file_entrypoint() │ │ - DagBag.process_file() 加载 DAG │ │ - DagSerialization.to_dict() 序列化 │ │ - 通过 IPC 返回 DagFileParsingResult │ └────────────────────┬────────────────────┘ │ 结果回传 ▼ ┌─────────────────────────────────────────┐ │ persist_parsing_result() │ │ - update_dag_parsing_results_in_db() │ │ - 写入 DagModel / SerializedDagModel │ │ - 更新 ParseImportError / DagWarning │ └─────────────────────────────────────────┘让我们逐层深入了解每个组件的实现细节。2. DAG Bundle:多源文件组织体系2.1 Bundle 的设计动机在传统 Airflow 中,所有 DAG 文件必须放在一个统一的dags_folder中。这在大规模团队协作时会产生问题:不同团队的 DAG 混在一起,缺乏隔离无法为不同来源的 DAG 配置不同的刷新频率Git 仓库管理困难,无法支持多仓库结构Airflow 3.x 通过DAG Bundle(AIP-66)解决了这些问题。Bundle 是 DAG 文件的逻辑分组单位,每个 Bundle 可以有不同的来源(本地目录、Git 仓库等)、版本控制和刷新策略。2.2 BaseDagBundle 抽象基类所有 Bundle 实现都继承自BaseDagBundle:# 源码位置:airflow-core/src/airflow/dag_processing/bundles/base.pyclassBaseDagBundle(ABC):""" DAG bundles are used both by the DAG processor and by a worker when running a task. When running a task, we know what version of the bundle we need. The DAG processor uses a bundle to keep the DAGs up to date, always using the latest version. """supports_versioning:bool=Falsedef__init__(self,*,name:str,refresh_interval:int=conf.getint("dag_processor","refresh_interval"),version:str|None=None,view_url_template:str|None=None,)-None:self.name=name self.version=version self.refresh_interval=refresh_interval self.is_initialized:bool=Falseself.base_dir=get_bundle_base_folder(bundle_name=self.name)self.versions_dir=get_bundle_versions_base_folder(bundle_name=self.name)@property@abstractmethoddefpath(self)-Path:"""Airflow will use this path to find/load/execute the DAGs from the bundle."""@abstractmethoddefget_current_version(self)-str|None:"""Retrieve a string that represents the version of the DAG bundle."""@abstractmethoddefrefresh(self)-None:"""Retrieve the latest version of the files in the bundle."""definitialize(self)-None:"""Called before the bundle is used. Safe to call concurrently."""self.is_initialized=True关键设计要点:属性/方法说明supports_versioning是否支持版本控制,影响 Worker 获取特定版本的能力pathDAG 文件所在目录的路径refresh_interval多久检查一次是否需要刷新initialize()延迟初始化,避免昂贵操作过早执行refresh()拉取最新版本,必须支持并发安全get_current_version()获取当前版本标识符2.3 LocalDagBundle:本地目录实现最简单的 Bundle 实现是本地目录:# 源码位置:airflow-core/src/airflow/dag_processing/bundles/local.pyclassLocalDagBundle(BaseDagBundle):"""Local DAG bundle - exposes a local directory as a DAG bundle."""supports_versioning=Falsedef__init__(self,*,path:str|None=None,**kwargs)-None:super().__init__(**kwargs)ifpathisNone:path=settings.DAGS_FOLDER self._path=Path(path)defget_current_version(self)-None:returnNonedefrefresh(self)-None:"""Nothing to refresh - it's just a local directory."""@propertydefpath(self)-Path:returnself._pathLocalDagBundle不支持版本控制(supports_versioning = False),刷新操作为空操作。它就是简单地指向一个本地文件夹。2.4 DagBundlesManager:配置解析与实例化DagBundlesManager负责读取配置并管理所有 Bundle 实例:# 源码位置:airflow-core/src/airflow/dag_processing/bundles/manager.pyclassDagBundlesManager(LoggingMixin):"""Manager for DAG bundles."""def__init__(self):self._bundle_config:dict[str,_InternalBundleConfig]={}self.parse_config()defparse_config(self)-None:"""Get all DAG bundle configurations and store in instance variable."""config_list=conf.getjson("dag_processor","dag_bundle_config_list")# ... 解析 JSON 配置列表 ...forbundle_configinbundle_config_list:class_=import_string(bundle_config.classpath)self._bundle_config[bundle_config.name]=_InternalBundleConfig(bundle_class=class_,kwargs=bundle_config.kwargs,team_name=bundle_config.team_name,)defget_bundle(self,name:str,version:str|None=None)-BaseDagBundle:"""Get a DAG bundle by name."""cfg_bundle=self._bundle_config.get(name)returncfg_bundle.bundle_class(name=name,version=version,**cfg_bundle.kwargs)defsync_bundles_to_db(self,*,session:Session=NEW_SESSION)-None:"""Sync configured DAG bundles to the metadata database."""# 同步 DagBundleModel 记录配置示例(airflow.cfg):[dag_processor] dag_bundle_config_list = [ { "name": "my_team_dags", "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", "kwargs": {"path": "/opt/airflow/dags/my_team"}, "team_name": "data_engineering" }, { "name": "ml_pipelines", "classpath": "my_custom.GitDagBundle", "kwargs": {"repo_url": "https://github.com/org/ml-dags.git"} } ]2.5 Bundle 版本管理与 Worker 协作Bundle 在 DAG Processor 和 Worker 中的使用模式不同:┌─────────────────────────────────────────────────────────────┐ │ DAG Processor │ │ - 始终使用最新版本 │ │ - 定期 refresh() 拉取更新 │ │ - 同一时间只有一个版本在使用 │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ Worker │ │ - 使用任务创建时的特定版本 │ │ - 可能同时运行多个版本 │ │ - 通过 BundleVersionLock 锁定版本 │ └─────────────────────────────────────────────────────────────┘BundleUsageTrackingManager负责清理不再使用的旧版本:classBundleUsageTrackingManager:"""Utility helper for removing stale bundles."""defremove_stale_bundle_versions(self):"""Remove bundles not in use and not used for some time. Keeps last N used bundles, and bundles used within X time."""bundles=list(DagBundlesManager().get_all_dag_bundles())forbundleinbundles:ifnotbundle.supports_versioning:continueself._remove_stale_bundle_versions_for_bundle(bundle_name=bundle.name)3. DagBag:DAG 的内存容器3.1 DagBag 的职责DagBag是 Airflow 中经典的核心组件,它充当 DAG 对象的内存容器。其核心职责:扫描目录:发现可能包含 DAG 的 Python 文件导入模块:执行 Python 文件,提取 DAG 对象验证 DAG:检测循环依赖、重复 ID、执行集群策略收集错误:记录导入错误和警告# 源码位置:airflow-core/src/airflow/dag_processing/dagbag.pyclassDagBag(LoggingMixin):""" A dagbag is a collection of dags, parsed out of a folder tree. Makes it easier to run distinct environments for production and development, tests, or different teams or security profiles. """def__init__(self,dag_folder:str|Path|None=None,include_examples:bool|ArgNotSet=NOTSET,safe_mode:bool|ArgNotSet=NOTSET,load_op_links:bool=True,collect_dags:bool=True,known_pools:set[str]|None=None,bundle_path:Path|None=None,bundle_name:str|None=None,):self.dag_folder=dag_folderorsettings.DAGS_FOLDER self.dags:dict[str,DAG]={}self.file_last_changed:dict[str,datetime]={}self.import_errors:dict[str,str]={}self.captured_warnings:dict[str,tuple[str,...]]={}self.known_pools=known_pools self.bundle_path=bundle_path

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2593077.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…