Apache Airflow 系列教程 | 第30课:Deadline 与 SLA 管理
导读(Introduction)在生产环境中运行的数据管道,"按时完成"往往和"正确完成"同样重要。当一个关键的每日报表管道必须在早上 8 点前完成,或者当一个下游系统依赖的数据必须在特定时间窗口内准备就绪时,仅仅依靠"失败后告警"是不够的——我们需要一种前瞻性的超时监控机制。Apache Airflow 3.x 引入了全新的Deadline(截止时间)机制,取代了旧版本中较为简单的 SLA Miss 功能。Deadline 机制提供了一套完整的超时管理框架:从声明式的截止时间定义,到灵活的时间参考点计算,再到可配置的回调通知。它不仅能告诉你"任务超时了",还能基于历史运行时间智能地预测"合理的完成时间应该是多少"。本课将深入分析 Deadline 机制的核心模型设计(Deadline、DeadlineAlert)、SDK 层的声明式接口(DeadlineReference、DeadlineAlert)、Scheduler 层的超时检测逻辑,以及回调系统如何将告警通知传递到外部系统。通过源码分析,你将全面理解这一机制的内部运作,并能够为自己的关键管道配置有效的超时保护。学习目标(Learning Objectives)完成本课学习后,你将能够:理解 Deadline 的设计理念——区分 Deadline 与传统 SLA 的差异,明确其前瞻性超时管理定位掌握 DeadlineAlert 配置模型——深入分析 Reference、Interval、Callback 三要素剖析多种 DeadlineReference 实现——理解 LogicalDate、QueuedAt、FixedDatetime、AverageRuntime 四种参考点的计算逻辑理解 Deadline 生命周期——从创建到检测超时、触发回调、成功清理的完整流程分析 Scheduler 的超时检测机制——FOR UPDATE SKIP LOCKED在 HA 环境下的并发安全处理实践 Deadline 配置——为关键数据管道配置有效的 Deadline 告警策略正文内容(Main Content)1. Deadline 设计理念与架构1.1 从 SLA Miss 到 Deadline在 Airflow 2.x 中,SLA(Service Level Agreement)机制提供了基础的超时检测能力。然而,旧的 SLA 机制存在明显局限:特性Airflow 2.x SLAAirflow 3.x Deadline参考时间点固定使用 execution_date可选多种参考点(logical_date、queued_at、固定时间、平均运行时间)粒度Task 级别DAG Run 级别回调方式邮件通知支持异步/同步回调,可路由到 Triggerer 或 Executor智能预测无基于历史平均运行时间动态计算HA 安全无保障FOR UPDATE SKIP LOCKED 防重复触发清理机制手动DAG Run 成功时自动清理未触发的 Deadline1.2 Deadline 核心概念Deadline 机制围绕三个核心概念构建:┌─────────────────────────────────────────────────────────────┐ │ DeadlineAlert(告警定义) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │ │ Reference │ │ Interval │ │ Callback │ │ │ │ 参考时间点 │ +│ 偏移量 │ →│ 超时触发的回调 │ │ │ │ │ │ timedelta │ │ (Async/Sync) │ │ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ └─────────────────────────────┬───────────────────────────────┘ │ 每个 DAG Run 创建时 ▼ ┌─────────────────────────────────────────────────────────────┐ │ Deadline(截止时间实例) │ │ │ │ deadline_time = Reference.evaluate() + Interval │ │ missed = False → Scheduler 检测超时后 → missed = True │ │ callback → 触发 TriggererCallback 或 ExecutorCallback │ └─────────────────────────────────────────────────────────────┘三个核心要素的职责:Reference(参考点):确定"从什么时间开始计算"——可以是 DAG Run 的逻辑日期、入队时间、固定时间点,或基于历史的平均运行时间Interval(偏移量):确定"允许多长时间"——一个timedelta加在参考点上得到最终截止时间Callback(回调):确定"超时后做什么"——支持异步回调(运行在 Triggerer)和同步回调(运行在 Executor)1.3 Deadline 生命周期DAG 定义 DeadlineAlert │ ▼ (DAG 序列化) DeadlineAlert 持久化到 deadline_alert 表 │ ▼ (DAG Run 创建时) 评估 Reference + Interval → 计算 deadline_time │ ▼ 创建 Deadline 记录(missed=False) │ ├─── DAG Run 在 deadline_time 前完成 → prune_deadlines() 删除记录 │ └─── deadline_time 到期时 DAG Run 仍在运行 │ ▼ (Scheduler 轮询检测) deadline.handle_miss() → 标记 missed=True → 触发 Callback2. SDK 层:DeadlineAlert 声明式接口2.1 DeadlineAlert 类用户通过 Task SDK 中的DeadlineAlert类为 DAG 配置截止时间。定义在task-sdk/src/airflow/sdk/definitions/deadline.py:# 源码位置:task-sdk/src/airflow/sdk/definitions/deadline.pyclassDeadlineAlert:"""Store Deadline values needed to calculate the need-by timestamp and the callback information."""def__init__(self,reference:DeadlineReferenceType,# 参考时间点interval:timedelta,# 偏移量callback:Callback,# 超时回调name:str|None=None,# 可选名称):self.reference=reference self.interval=interval self.name=name# 验证回调类型ifnotisinstance(callback,(AsyncCallback,SyncCallback)):raiseValueError(f"Callbacks of type{type(callback).__name__}are not currently supported")self.callback=callback关键设计要点:只接受AsyncCallback(异步,运行在 Triggerer)和SyncCallback(同步,运行在 Executor)name是可选字段,用于在 UI 中标识不同的 Deadline 告警实现了__eq__和__hash__,支持去重2.2 DeadlineReference 统一接口DeadlineReference类提供了用户友好的工厂接口:# 源码位置:task-sdk/src/airflow/sdk/definitions/deadline.pyclassDeadlineReference:"""The public interface class for all DeadlineReference options."""# 预定义实例:DAG Run 逻辑日期作为参考点DAGRUN_LOGICAL_DATE:DeadlineReferenceType=DagRunLogicalDateDeadline()# 预定义实例:DAG Run 入队时间作为参考点DAGRUN_QUEUED_AT:DeadlineReferenceType=DagRunQueuedAtDeadline()@classmethoddefAVERAGE_RUNTIME(cls,max_runs:int=0,min_runs:int|None=None)-DeadlineReferenceType:"""基于历史平均运行时间"""ifmax_runs==0:max_runs=AverageRuntimeDeadline.DEFAULT_LIMIT# 默认10次ifmin_runsisNone:min_runs=max_runsreturnAverageRuntimeDeadline(max_runs,min_runs)@classmethoddefFIXED_DATETIME(cls,dt:datetime)-DeadlineReferenceType:"""固定时间点"""returnFixedDatetimeDeadline(dt)TYPES 分类系统:classTYPES:"""Collection of DeadlineReference types for type checking."""# DAG Run 创建时就计算截止时间的类型DAGRUN_CREATED:DeadlineReferenceTypes=(DagRunLogicalDateDeadline,FixedDatetimeDeadline,AverageRuntimeDeadline,)# DAG Run 入队时才计算截止时间的类型DAGRUN_QUEUED:DeadlineReferenceTypes=(DagRunQueuedAtDeadline,)# 所有 DAG Run 相关类型的合集DAGRUN:DeadlineReferenceTypes=DAGRUN_CREATED+DAGRUN_QUEUED这个分类决定了 Deadline 在 DAG Run 生命周期的哪个阶段被创建:DAGRUN_CREATED:DAG Run 创建时立即计算并创建 Deadline(此时logical_date已知)DAGRUN_QUEUED:DAG Run 进入队列时才创建(此时queued_at才确定)2.3 Callback 定义回调系统定义在task-sdk/src/airflow/sdk/definitions/callback.py:# 源码位置:task-sdk/src/airflow/sdk/definitions/callback.pyclassCallback(ABC):""" Base class for Deadline Alert callbacks. Callbacks are used to execute custom logic when a deadline is missed. """path:str# 回调函数的导入路径kwargs:dict# 传递给回调的额外参数def__init__(self,callback_callable:Callable|str,kwargs:dict[str,Any]|None=None):self.path=self.get_callback_path(callback_callable)ifkwargsand"context"inkwargs:raiseValueError("context is a reserved kwarg for this class")self.kwargs=kwargsor{}@classmethoddefget_callback_path(cls,_callback:str|Callable)-str:"""Convert callback to a string path that can be used to import it later."""ifcallable(_callback):cls.verify_callable(_callback)returnf"{_callback.__module__}.{_callback.__qualname__}"# 字符串形式的 dotpathifnotisinstance(_callback,str)ornotis_valid_dotpath(_callback.strip()):raiseImportError(f"`{_callback}` doesn't look like a valid dot path.")return_callback.strip()classAsyncCallback(Callback):"""Asynchronous callback that runs in the triggerer."""@classmethoddefverify_callable(cls,callback:Callable):ifnot(inspect.iscoroutinefunction(callback)orhasattr(callback,"__await__")):raiseAttributeError(f"Provided callback{callback}is not awaitable.")classSyncCallback(Callback):"""Synchronous callback that runs in the specified or default executor."""executor:str|Nonedef__init__(self,callback_callable,kwargs=None,executor:str|None=None):super().__init__(callback_callable=callback_callable,kwargs=kwargs)self.executor=executor两种回调的运行位置:类型运行位置适用场景AsyncCallbackTriggerer 进程发送 HTTP 请求、调用外部 API、非阻塞通知SyncCallbackExecutor(默认或指定)执行耗时操作、需要特定环境的回调3. 核心模型层:Deadline 与 DeadlineAlert3.1 DeadlineAlert 持久化模型当 DAG 被序列化时,DeadlineAlert配置被保存到数据库:# 源码位置:airflow-core/src/airflow/models/deadline_alert.pyclassDeadlineAlert(Base):"""Table containing DeadlineAlert properties."""__tablename__="deadline_alert"id:Mapped[UUID]=mapped_column(Uuid(),primary_key=True,default=uuid6.uuid7)created_at:Mapped[datetime]=mapped_column(UtcDateTime,nullable=False,default=timezone.utcnow)# 关联到序列化的 DAGserialized_dag_id:Mapped[UUID]=mapped_column(Uuid(),ForeignKey("serialized_dag.id",ondelete="CASCADE"),nullable=False)# 告警元数据name:Mapped[str|None]=mapped_column(String(250),nullable=True)description:Mapped[str|None]=mapped_column(Text,nullable=True)# 核心配置(JSON 存储)reference:Mapped[dict]
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2604237.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!