一、什么是 Apache AirflowApache Airflow 是一个由 Airbnb 于 2014 年开源、2016 年进入 Apache 孵化器的工作流编排平台。它的核心理念可以用一句话概括用 Python 代码定义、调度和监控你的工作流。与 shell 脚本或 crontab 定时任务不同Airflow 将工作流抽象为有向无环图DAG提供了任务间的依赖管理、失败重试、可视化监控等一整套生产能力。核心概念速览Airflow 的架构围绕四个基础概念构建概念含义类比DAG(有向无环图)工作流的完整定义由 dag 装饰器或 DAG() 构造函数创建一次项目的蓝图Task(任务)DAG 中的一个最小执行单元蓝图里的一个步骤Operator(操作器)定义 Task 具体做什么——可以是 Bash 命令、Python 函数、SQL 查询、Spark 作业等每个步骤的动作模板Scheduler(调度器)持续轮询 DAG 目录解析 DAG 文件将到期 Task 放入执行队列整个系统的大脑除上述核心概念外Airflow 的架构还包括Executor决定 Task 以何种方式运行如本地进程、Celery 分布式或多容器 Kubernetes、Web Server提供 UI 监控与交互以及MetastorePostgreSQL / MySQL存储所有元数据与运行状态。一个最小的 Airflow DAG 长这样from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime # 定义一个 DAG每天凌晨 2 点运行带有基础重试机制 with DAG( dag_idhello_airflow, start_datedatetime(2025, 1, 1), schedule_interval0 2 * * *, catchupFalse, tags[demo], ) as dag: task_say_hello BashOperator( task_idsay_hello, bash_commandecho Hello from Airflow 2.x!, )这就是 Airflow 的魅力工作流就是代码可版本控制、可代码审查、可单元测试。二、使用优点1. Python 原生定义零学习曲线Airflow DAG 文件就是标准的 Python 脚本。你可以在 DAG 定义中使用任何 Python 语法——for 循环动态生成 Task、从配置文件读取参数、用 Jinja 模板注入变量。这意味着无 DSL 学习成本数据工程师的 Python 技能直接复用。无黑盒配置所有逻辑都在代码中显式呈现CR 一目了然。动态 DAG 生成可以从 YAML / JSON 配置文件批量生成几十甚至上百个结构相似的 DAG。# 动态生成多个 Task 的示例 task_list [] for table in [users, orders, products]: task_list.append( BashOperator( task_idfbackup_{table}, bash_commandfpg_dump -t {table} /backup/{table}.sql, ) ) # 按索引建立顺序依赖 for i in range(len(task_list) - 1): task_list[i] task_list[i 1]2. 丰富且活跃的 Provider 生态Airflow 2.x 引入了Provider 包机制将各类第三方集成从核心仓库中解耦独立发布和升级。目前已有超过 80 个官方 Provider云平台AWS、Azure、GCP、阿里云数据库PostgreSQL、MySQL、Snowflake、BigQuery、ClickHouse计算引擎Spark、Kubernetes、Docker、Databricks消息队列Kafka、RabbitMQ、SQS一个 Provider 包的安装可以极度精简pip install apache-airflow-providers-amazon即可在 DAG 中直接使用 S3Hook、EmrOperator 等组件。这套机制同时解决了旧版本装一个 Airflow 附带 500 个依赖的痛点。3. 强大的可视化监控Airflow Web UI 是业界公认的金牌体验2.x 版本中新增的Grid View更是将监控效率提升了一个档次Tree / Graph / Grid View从不同维度观察 DAG 运行拓扑与历史状态。甘特图一眼定位哪个 Task 是性能瓶颈。Landing Time追踪数据实际到达时间与期望时间的延迟。Task 级别操作直接在 UI 上 Clear、Retry、Mark Success / Failed无需登录服务器。在 2025 年的版本路线中Airflow 正在向Data-Aware Scheduling (AIP-48)迈进——UI 将不仅展示 Task 状态还会显示本次运行是由哪个上游数据资产触发进一步打通可观测性闭环。4. 灵活到极致的调度能力Airflow 支持多种调度触发方式远超传统 cron 表达式的范畴调度方式说明示例Cron 表达式最经典的时间调度0 6 * * 1-5工作日早 6 点Timedelta固定间隔从 start_date 累加datetime.timedelta(hours4)Dataset (AIP-48)数据驱动的调度——当某表有更新时触发schedule[Dataset(s3://bucket/sales/)]External Trigger由 API / CLI / 上游 DAG 主动触发airflow dags trigger my_dagSensor等待外部条件满足后继续ExternalTaskSensor、S3KeySensorDataset 机制是 Airflow 2.4 最大亮点之一它让跨 DAG 的依赖从时间耦合变成了数据耦合。两个独立团队分别维护的 DAG只要声明对同一个 Dataset 的产消关系Airflow 就能自动串起整个链路。5. 弹性可扩展的架构设计Airflow 的 Executor 模型支持从单机到超大规模集群的平滑演进本地开发: SequentialExecutor / LocalExecutor单机多进程 ↓ 中等规模: CeleryExecutor多机分布式Redis/RabbitMQ 做消息队列 ↓ 超大规模: KubernetesExecutor每个 Task 跑在独立 Pod 中资源完全隔离 ↓ 混合架构: 2025 年路线图中的 Edge Worker (AIP-72) 可实现跨 VPC / 跨云的远程任务执行关键点在于切换 Executor 只需修改 airflow.cfg 一个配置项DAG 代码完全不用改。这意味着团队可以从单机起步等业务增长后再无缝迁到分布式架构。三、使用场景场景 1数据 ETL / ELT 管道这是 Airflow 最经典的主场。假设电商平台每天需要从 MySQL 抽取前一日的订单数据在 Spark 中做聚合计算将结果写入 ClickHouse 供 BI 查询数据写入成功后通知下游报表系统整条链路可以组织为一个 DAG依赖关系和错误处理完全自动化。场景 2机器学习 Pipeline模型训练不是一步到位的而是数据拉取 → 特征工程 → 训练 → 评估 → 部署的级联任务。Airflow 可以将这些步骤编排在一起并利用 BranchPythonOperator 实现条件分支——例如评估指标不达标时自动走回退到旧模型的分支。结合 KubernetesPodOperator每个训练步骤运行在独立的 GPU Pod 中训练完即释放资源成本可控。场景 3报表自动化某金融机构每天需要生成上百份客户持仓报表PDF 格式通过邮件分发。传统做法是用 shell 脚本跑一堆 R / Python 脚本出错了靠人工排查。迁移到 Airflow 后每个报告生成步骤是一个 Task失败自动重试并通知。使用 EmailOperator 在 DAG 末尾统一发送。耗时统计直接看甘特图优化有据可依。场景 4DevOps 运维自动化Airflow 不仅可以编排数据任务也能编排基础设施操作定时执行数据库备份 → 上传到 S3 → 清理过期备份。每月自动生成 SSL 证书过期清单通知运维团队。大促前批量扩容 K8s 集群节点大促结束后缩容。场景 5数据质量监控结合 SQLCheckOperator 或 Great Expectations可以构建数据质量监控 DAG每日凌晨 4 点 → 检查核心表行数是否 0 → 检查关键字段空值率是否 阈值 → 检查数值字段分布是否偏离历史基准 → 任一检查失败 → 阻断下游 DAG 发送告警四、具体使用方式4.1 安装推荐使用 pip 配合约束文件安装避免依赖冲突# 设置 Airflow 版本 AIRFLOW_VERSION2.9.3 PYTHON_VERSION$(python --version | cut -d -f 2 | cut -d . -f 1-2) CONSTRAINT_URLhttps://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt pip install apache-airflow${AIRFLOW_VERSION} --constraint ${CONSTRAINT_URL}初始化数据库并创建管理员用户airflow db init airflow users create \ --username admin \ --firstname Admin \ --lastname User \ --role Admin \ --email adminexample.com启动所有组件开发环境airflow standalone # 一键启动 WebServer Scheduler 初始化生产环境建议将 WebServer、Scheduler、Worker 拆分部署并使用 PostgreSQL 替代默认的 SQLite。4.2 实战构建一个完整的数据管道 DAG以下代码展示了一个典型的API 拉取 → 数据清洗 → 入库 → 质量检查 → 通知管道from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.utils.dates import days_ago from datetime import timedelta import requests import json # ---------- 业务逻辑函数 ---------- def fetch_api_data(**context): 从公开 API 拉取数据并写入临时文件 response requests.get(https://jsonplaceholder.typicode.com/posts) response.raise_for_status() posts response.json() # 将数据作为 XCom 传递给下游 Task file_path /tmp/posts.json with open(file_path, w) as f: json.dump(posts, f) context[task_instance].xcom_push(keydata_file, valuefile_path) print(fFetched {len(posts)} posts.) def clean_data(**context): 读取原始数据清洗后写回 file_path context[task_instance].xcom_pull( keydata_file, task_idsfetch_data ) with open(file_path, r) as f: raw json.load(f) cleaned [ { id: p[id], user_id: p[userId], title: p[title].strip(), body_length: len(p[body]), } for p in raw ] clean_path /tmp/posts_clean.json with open(clean_path, w) as f: json.dump(cleaned, f) print(fCleaned {len(cleaned)} records.) # ---------- DAG 定义 ---------- default_args { owner: data-team, retries: 3, retry_delay: timedelta(minutes5), email_on_failure: True, email: [data-alertcompany.com], } with DAG( dag_iddata_pipeline_demo, default_argsdefault_args, descriptionAn end-to-end data pipeline: fetch - clean - load - check, schedule_interval0 5 * * *, # 每天凌晨 5 点 start_datedays_ago(1), catchupFalse, tags[production, etl], ) as dag: start BashOperator( task_idstart_pipeline, bash_commandecho Pipeline started at $(date), ) fetch_data PythonOperator( task_idfetch_data, python_callablefetch_api_data, ) clean PythonOperator( task_idclean_data, python_callableclean_data, ) # 注意这里仅展示语法实际需要目标表提前建好 create_table PostgresOperator( task_idcreate_table_if_not_exists, postgres_conn_idmy_postgres, sql CREATE TABLE IF NOT EXISTS public.posts ( id INT PRIMARY KEY, user_id INT, title TEXT, body_length INT ); , ) load_data BashOperator( task_idload_to_postgres, bash_command echo Data loading simulation: $(wc -c /tmp/posts_clean.json) bytes ready , ) quality_check BashOperator( task_idquality_check, bash_command count$(python -c import json; print(len(json.load(open(/tmp/posts_clean.json))))) if [ $count -lt 10 ]; then echo ERROR: Too few records! exit 1 fi echo Quality check passed: $count records , ) notify BashOperator( task_idnotify_success, bash_commandecho Pipeline completed successfully!, ) # 声明 Task 之间的依赖关系 start fetch_data clean [create_table, quality_check] create_table load_data [load_data, quality_check] notify这个 DAG 展示了 Airflow 2.x 中最常用的几种模式PythonOperator运行自定义 Python 函数适合灵活的业务逻辑。PostgresOperator原生 SQL 执行支持 postgres_conn_id 引用 Connection 配置。XComTask 间轻量数据传递xcom_push / xcom_pull适合传文件路径、ID 等小数据。大数据量请使用外部存储S3、共享文件系统。依赖声明 运算符语法糖清晰表达上下游关系[a, b] c 表示 a 和 b 都成功后触发 c。4.3 TaskFlow API更 Pythonic 的写法Airflow 2.0 引入的TaskFlow API将 Python 函数直接映射为 Task消除了 XCom 的显式 push/pull 模板代码from airflow.decorators import dag, task from datetime import datetime dag( schedule_intervalNone, start_datedatetime(2025, 1, 1), catchupFalse, tags[taskflow], ) def taskflow_demo(): task def extract(): 从多个数据源拉取返回一个列表 return [100, 200, 300, 400] task def transform(raw_numbers: list): 对每个元素做计算 return [x * 1.2 for x in raw_numbers] task def load(processed: list): 写入目标系统 print(fLoading {len(processed)} records: {processed}) # 数据流向即依赖关系 raw extract() transformed transform(raw) load(transformed) taskflow_demo()TaskFlow API 通过 Python 类型注解和函数返回值自动完成数据流转让 DAG 代码简洁到几乎看不出框架痕迹。对于数据科学团队来说这种风格的学习成本接近零。4.4 生产部署架构建议一个经过验证的中等规模部署方案┌─────────────────────────────────────────────────┐ │ Nginx (HTTPS) │ ├─────────────────┬───────────────────────────────┤ │ WebServer × 2 │ Scheduler × 2 (HA) │ ├─────────────────┴───────────────────────────────┤ │ PostgreSQL (Metastore) │ ├─────────────────────────────────────────────────┤ │ Redis / RabbitMQ (Broker) │ ├─────────────────────────────────────────────────┤ │ Celery Workers × N (Task 执行节点) │ └─────────────────────────────────────────────────┘WebServer和Scheduler各部署 2 个实例实现高可用。Metastore使用托管的 PostgreSQLRDS / Cloud SQL定期备份。Broker使用 Redis Sentinel 或 RabbitMQ 集群承载 Task 消息。Worker按需水平扩展配置 worker_autoscale 动态调节并发数。五、与其他方案对比Airflow vs Prefect vs Dagster vs Luigi维度Airflow 2.xPrefect 2.xDagsterLuigi核心哲学工作流编排现代 Python 编排数据资产管理任务依赖管理动态工作流较弱需变通原生支持运行时动态良好不支持数据血缘通过 OpenLineage 外挂基础标签级别原生一等公民无本地开发需要 Scheduler DB极简flow.run()优秀dagster dev一般生态丰富度最多80 Provider中等深度整合 dbt/Spark较少UI 体验成熟、功能全现代、清爽资产视角独特基础社区规模最大GitHub 36k stars中等17k stars快速增长11k stars较小最佳场景传统 ETL、批量调度Python 原生快速原型分析工程、数据湖管理简单链式任务选型建议选 Airflow团队已有一定规模需要稳定、成熟的调度方案生态要求高。选 Prefect数据科学团队为主重视开发体验需要运行时动态生成工作流。选 Dagster以数据资产为核心的管理视角重度使用 dbt重视数据血缘。Luigi轻量级场景、不想引入 Redis/RabbitMQ 等中间件但功能天花板较低。六、实践建议与避坑指南1. start_date 与 catchup 的配合这是 Airflow 新人踩坑率最高的配置。核心规则start_date 不是首次运行日期而是调度周期的逻辑起点。Airflow 默认会在首次激活时回填catchup从 start_date 到当前的所有历史周期。设置 catchupFalse 只运行当前及未来的周期。# 正确做法不想回填历史时显式关闭 with DAG(..., catchupFalse) as dag: ...2. 保持 DAG 文件轻量Scheduler 每隔 min_file_process_interval默认 30 秒会重新解析所有 DAG 文件。如果一个 DAG 文件顶部写了耗时的网络请求或数据库查询会严重拖慢整个调度循环。正确做法将业务逻辑放在 PythonOperator 的 python_callable 函数内部而非 DAG 文件的顶层代码。# ❌ 错误顶层执行 http 请求——Scheduler 每次解析都跑一次 import requests config requests.get(https://api.internal/config).json() # ✅ 正确将请求封装在 Task 函数内 def load_config(**context): config requests.get(https://api.internal/config).json() ...3. 管理 XCom 数据量XCom 默认存储在 Metastore 数据库中适合传递少量元数据Task ID、文件路径、短字符串。如果你试图通过 XCom 传递一份 50MB 的 DataFrame既拖慢执行也拖垮数据库。替代方案将大数据写入 S3 / GCS / NFS通过 XCom 只传递存储路径。4. 合理使用 SensorSensor 本质上是轮询外部条件的死循环默认 poke_interval 为 60 秒。当系统中有多个 Sensor 同时运行时会占用 Worker 槽位却不做实质计算。为 Sensor 设置合理的 timeout避免无限等待。使用 Smart SensorAirflow 2.2合并同类 Sensor 的轮询逻辑减少资源消耗。考虑用 Deferrable Operator异步模式替代同步 Sensor在等待期间释放 Worker 槽位。5. 版本与依赖管理始终使用约束文件constraints安装否则 pip 可能拉取不兼容版本的依赖库导致诡异报错。Provider 包应固定版本号apache-airflow-providers-amazon8.20.0而非 方式避免 CI 环境与生产环境版本漂移。升级 Airflow 前先用 airflow db upgrade --dry-run 预览数据库迁移脚本。6. 监控与告警自带的邮件告警适合小团队但对于生产环境建议将 Airflow 日志接入 ELK / Loki 做集中采集分析。配置 Prometheus Exporter 采集 Scheduler 心跳延迟、Task 队列积压数等关键指标。对 SLA Miss、DAG 解析失败率设置 Grafana 告警规则。写在最后Apache Airflow 2.x 已经从一个灵活版 crontab进化为成熟的工作流编排平台。它的 Provider 生态、数据感知调度能力以及向 Edge 架构的演进方向都表明它仍在积极适应现代数据栈的需求变化。如果你正在为团队选型工作流调度引擎Airflow 2.x 的 Python 原生亲和力、超大规模社区以及完善的托管服务GCP Cloud Composer、AWS MWAA、Astronomer将成为你最稳妥的选择之一。
Apache Airflow 2.x 深度指南:用 Python 编排一切的现代化工作流引擎
一、什么是 Apache AirflowApache Airflow 是一个由 Airbnb 于 2014 年开源、2016 年进入 Apache 孵化器的工作流编排平台。它的核心理念可以用一句话概括用 Python 代码定义、调度和监控你的工作流。与 shell 脚本或 crontab 定时任务不同Airflow 将工作流抽象为有向无环图DAG提供了任务间的依赖管理、失败重试、可视化监控等一整套生产能力。核心概念速览Airflow 的架构围绕四个基础概念构建概念含义类比DAG(有向无环图)工作流的完整定义由 dag 装饰器或 DAG() 构造函数创建一次项目的蓝图Task(任务)DAG 中的一个最小执行单元蓝图里的一个步骤Operator(操作器)定义 Task 具体做什么——可以是 Bash 命令、Python 函数、SQL 查询、Spark 作业等每个步骤的动作模板Scheduler(调度器)持续轮询 DAG 目录解析 DAG 文件将到期 Task 放入执行队列整个系统的大脑除上述核心概念外Airflow 的架构还包括Executor决定 Task 以何种方式运行如本地进程、Celery 分布式或多容器 Kubernetes、Web Server提供 UI 监控与交互以及MetastorePostgreSQL / MySQL存储所有元数据与运行状态。一个最小的 Airflow DAG 长这样from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime # 定义一个 DAG每天凌晨 2 点运行带有基础重试机制 with DAG( dag_idhello_airflow, start_datedatetime(2025, 1, 1), schedule_interval0 2 * * *, catchupFalse, tags[demo], ) as dag: task_say_hello BashOperator( task_idsay_hello, bash_commandecho Hello from Airflow 2.x!, )这就是 Airflow 的魅力工作流就是代码可版本控制、可代码审查、可单元测试。二、使用优点1. Python 原生定义零学习曲线Airflow DAG 文件就是标准的 Python 脚本。你可以在 DAG 定义中使用任何 Python 语法——for 循环动态生成 Task、从配置文件读取参数、用 Jinja 模板注入变量。这意味着无 DSL 学习成本数据工程师的 Python 技能直接复用。无黑盒配置所有逻辑都在代码中显式呈现CR 一目了然。动态 DAG 生成可以从 YAML / JSON 配置文件批量生成几十甚至上百个结构相似的 DAG。# 动态生成多个 Task 的示例 task_list [] for table in [users, orders, products]: task_list.append( BashOperator( task_idfbackup_{table}, bash_commandfpg_dump -t {table} /backup/{table}.sql, ) ) # 按索引建立顺序依赖 for i in range(len(task_list) - 1): task_list[i] task_list[i 1]2. 丰富且活跃的 Provider 生态Airflow 2.x 引入了Provider 包机制将各类第三方集成从核心仓库中解耦独立发布和升级。目前已有超过 80 个官方 Provider云平台AWS、Azure、GCP、阿里云数据库PostgreSQL、MySQL、Snowflake、BigQuery、ClickHouse计算引擎Spark、Kubernetes、Docker、Databricks消息队列Kafka、RabbitMQ、SQS一个 Provider 包的安装可以极度精简pip install apache-airflow-providers-amazon即可在 DAG 中直接使用 S3Hook、EmrOperator 等组件。这套机制同时解决了旧版本装一个 Airflow 附带 500 个依赖的痛点。3. 强大的可视化监控Airflow Web UI 是业界公认的金牌体验2.x 版本中新增的Grid View更是将监控效率提升了一个档次Tree / Graph / Grid View从不同维度观察 DAG 运行拓扑与历史状态。甘特图一眼定位哪个 Task 是性能瓶颈。Landing Time追踪数据实际到达时间与期望时间的延迟。Task 级别操作直接在 UI 上 Clear、Retry、Mark Success / Failed无需登录服务器。在 2025 年的版本路线中Airflow 正在向Data-Aware Scheduling (AIP-48)迈进——UI 将不仅展示 Task 状态还会显示本次运行是由哪个上游数据资产触发进一步打通可观测性闭环。4. 灵活到极致的调度能力Airflow 支持多种调度触发方式远超传统 cron 表达式的范畴调度方式说明示例Cron 表达式最经典的时间调度0 6 * * 1-5工作日早 6 点Timedelta固定间隔从 start_date 累加datetime.timedelta(hours4)Dataset (AIP-48)数据驱动的调度——当某表有更新时触发schedule[Dataset(s3://bucket/sales/)]External Trigger由 API / CLI / 上游 DAG 主动触发airflow dags trigger my_dagSensor等待外部条件满足后继续ExternalTaskSensor、S3KeySensorDataset 机制是 Airflow 2.4 最大亮点之一它让跨 DAG 的依赖从时间耦合变成了数据耦合。两个独立团队分别维护的 DAG只要声明对同一个 Dataset 的产消关系Airflow 就能自动串起整个链路。5. 弹性可扩展的架构设计Airflow 的 Executor 模型支持从单机到超大规模集群的平滑演进本地开发: SequentialExecutor / LocalExecutor单机多进程 ↓ 中等规模: CeleryExecutor多机分布式Redis/RabbitMQ 做消息队列 ↓ 超大规模: KubernetesExecutor每个 Task 跑在独立 Pod 中资源完全隔离 ↓ 混合架构: 2025 年路线图中的 Edge Worker (AIP-72) 可实现跨 VPC / 跨云的远程任务执行关键点在于切换 Executor 只需修改 airflow.cfg 一个配置项DAG 代码完全不用改。这意味着团队可以从单机起步等业务增长后再无缝迁到分布式架构。三、使用场景场景 1数据 ETL / ELT 管道这是 Airflow 最经典的主场。假设电商平台每天需要从 MySQL 抽取前一日的订单数据在 Spark 中做聚合计算将结果写入 ClickHouse 供 BI 查询数据写入成功后通知下游报表系统整条链路可以组织为一个 DAG依赖关系和错误处理完全自动化。场景 2机器学习 Pipeline模型训练不是一步到位的而是数据拉取 → 特征工程 → 训练 → 评估 → 部署的级联任务。Airflow 可以将这些步骤编排在一起并利用 BranchPythonOperator 实现条件分支——例如评估指标不达标时自动走回退到旧模型的分支。结合 KubernetesPodOperator每个训练步骤运行在独立的 GPU Pod 中训练完即释放资源成本可控。场景 3报表自动化某金融机构每天需要生成上百份客户持仓报表PDF 格式通过邮件分发。传统做法是用 shell 脚本跑一堆 R / Python 脚本出错了靠人工排查。迁移到 Airflow 后每个报告生成步骤是一个 Task失败自动重试并通知。使用 EmailOperator 在 DAG 末尾统一发送。耗时统计直接看甘特图优化有据可依。场景 4DevOps 运维自动化Airflow 不仅可以编排数据任务也能编排基础设施操作定时执行数据库备份 → 上传到 S3 → 清理过期备份。每月自动生成 SSL 证书过期清单通知运维团队。大促前批量扩容 K8s 集群节点大促结束后缩容。场景 5数据质量监控结合 SQLCheckOperator 或 Great Expectations可以构建数据质量监控 DAG每日凌晨 4 点 → 检查核心表行数是否 0 → 检查关键字段空值率是否 阈值 → 检查数值字段分布是否偏离历史基准 → 任一检查失败 → 阻断下游 DAG 发送告警四、具体使用方式4.1 安装推荐使用 pip 配合约束文件安装避免依赖冲突# 设置 Airflow 版本 AIRFLOW_VERSION2.9.3 PYTHON_VERSION$(python --version | cut -d -f 2 | cut -d . -f 1-2) CONSTRAINT_URLhttps://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt pip install apache-airflow${AIRFLOW_VERSION} --constraint ${CONSTRAINT_URL}初始化数据库并创建管理员用户airflow db init airflow users create \ --username admin \ --firstname Admin \ --lastname User \ --role Admin \ --email adminexample.com启动所有组件开发环境airflow standalone # 一键启动 WebServer Scheduler 初始化生产环境建议将 WebServer、Scheduler、Worker 拆分部署并使用 PostgreSQL 替代默认的 SQLite。4.2 实战构建一个完整的数据管道 DAG以下代码展示了一个典型的API 拉取 → 数据清洗 → 入库 → 质量检查 → 通知管道from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from airflow.utils.dates import days_ago from datetime import timedelta import requests import json # ---------- 业务逻辑函数 ---------- def fetch_api_data(**context): 从公开 API 拉取数据并写入临时文件 response requests.get(https://jsonplaceholder.typicode.com/posts) response.raise_for_status() posts response.json() # 将数据作为 XCom 传递给下游 Task file_path /tmp/posts.json with open(file_path, w) as f: json.dump(posts, f) context[task_instance].xcom_push(keydata_file, valuefile_path) print(fFetched {len(posts)} posts.) def clean_data(**context): 读取原始数据清洗后写回 file_path context[task_instance].xcom_pull( keydata_file, task_idsfetch_data ) with open(file_path, r) as f: raw json.load(f) cleaned [ { id: p[id], user_id: p[userId], title: p[title].strip(), body_length: len(p[body]), } for p in raw ] clean_path /tmp/posts_clean.json with open(clean_path, w) as f: json.dump(cleaned, f) print(fCleaned {len(cleaned)} records.) # ---------- DAG 定义 ---------- default_args { owner: data-team, retries: 3, retry_delay: timedelta(minutes5), email_on_failure: True, email: [data-alertcompany.com], } with DAG( dag_iddata_pipeline_demo, default_argsdefault_args, descriptionAn end-to-end data pipeline: fetch - clean - load - check, schedule_interval0 5 * * *, # 每天凌晨 5 点 start_datedays_ago(1), catchupFalse, tags[production, etl], ) as dag: start BashOperator( task_idstart_pipeline, bash_commandecho Pipeline started at $(date), ) fetch_data PythonOperator( task_idfetch_data, python_callablefetch_api_data, ) clean PythonOperator( task_idclean_data, python_callableclean_data, ) # 注意这里仅展示语法实际需要目标表提前建好 create_table PostgresOperator( task_idcreate_table_if_not_exists, postgres_conn_idmy_postgres, sql CREATE TABLE IF NOT EXISTS public.posts ( id INT PRIMARY KEY, user_id INT, title TEXT, body_length INT ); , ) load_data BashOperator( task_idload_to_postgres, bash_command echo Data loading simulation: $(wc -c /tmp/posts_clean.json) bytes ready , ) quality_check BashOperator( task_idquality_check, bash_command count$(python -c import json; print(len(json.load(open(/tmp/posts_clean.json))))) if [ $count -lt 10 ]; then echo ERROR: Too few records! exit 1 fi echo Quality check passed: $count records , ) notify BashOperator( task_idnotify_success, bash_commandecho Pipeline completed successfully!, ) # 声明 Task 之间的依赖关系 start fetch_data clean [create_table, quality_check] create_table load_data [load_data, quality_check] notify这个 DAG 展示了 Airflow 2.x 中最常用的几种模式PythonOperator运行自定义 Python 函数适合灵活的业务逻辑。PostgresOperator原生 SQL 执行支持 postgres_conn_id 引用 Connection 配置。XComTask 间轻量数据传递xcom_push / xcom_pull适合传文件路径、ID 等小数据。大数据量请使用外部存储S3、共享文件系统。依赖声明 运算符语法糖清晰表达上下游关系[a, b] c 表示 a 和 b 都成功后触发 c。4.3 TaskFlow API更 Pythonic 的写法Airflow 2.0 引入的TaskFlow API将 Python 函数直接映射为 Task消除了 XCom 的显式 push/pull 模板代码from airflow.decorators import dag, task from datetime import datetime dag( schedule_intervalNone, start_datedatetime(2025, 1, 1), catchupFalse, tags[taskflow], ) def taskflow_demo(): task def extract(): 从多个数据源拉取返回一个列表 return [100, 200, 300, 400] task def transform(raw_numbers: list): 对每个元素做计算 return [x * 1.2 for x in raw_numbers] task def load(processed: list): 写入目标系统 print(fLoading {len(processed)} records: {processed}) # 数据流向即依赖关系 raw extract() transformed transform(raw) load(transformed) taskflow_demo()TaskFlow API 通过 Python 类型注解和函数返回值自动完成数据流转让 DAG 代码简洁到几乎看不出框架痕迹。对于数据科学团队来说这种风格的学习成本接近零。4.4 生产部署架构建议一个经过验证的中等规模部署方案┌─────────────────────────────────────────────────┐ │ Nginx (HTTPS) │ ├─────────────────┬───────────────────────────────┤ │ WebServer × 2 │ Scheduler × 2 (HA) │ ├─────────────────┴───────────────────────────────┤ │ PostgreSQL (Metastore) │ ├─────────────────────────────────────────────────┤ │ Redis / RabbitMQ (Broker) │ ├─────────────────────────────────────────────────┤ │ Celery Workers × N (Task 执行节点) │ └─────────────────────────────────────────────────┘WebServer和Scheduler各部署 2 个实例实现高可用。Metastore使用托管的 PostgreSQLRDS / Cloud SQL定期备份。Broker使用 Redis Sentinel 或 RabbitMQ 集群承载 Task 消息。Worker按需水平扩展配置 worker_autoscale 动态调节并发数。五、与其他方案对比Airflow vs Prefect vs Dagster vs Luigi维度Airflow 2.xPrefect 2.xDagsterLuigi核心哲学工作流编排现代 Python 编排数据资产管理任务依赖管理动态工作流较弱需变通原生支持运行时动态良好不支持数据血缘通过 OpenLineage 外挂基础标签级别原生一等公民无本地开发需要 Scheduler DB极简flow.run()优秀dagster dev一般生态丰富度最多80 Provider中等深度整合 dbt/Spark较少UI 体验成熟、功能全现代、清爽资产视角独特基础社区规模最大GitHub 36k stars中等17k stars快速增长11k stars较小最佳场景传统 ETL、批量调度Python 原生快速原型分析工程、数据湖管理简单链式任务选型建议选 Airflow团队已有一定规模需要稳定、成熟的调度方案生态要求高。选 Prefect数据科学团队为主重视开发体验需要运行时动态生成工作流。选 Dagster以数据资产为核心的管理视角重度使用 dbt重视数据血缘。Luigi轻量级场景、不想引入 Redis/RabbitMQ 等中间件但功能天花板较低。六、实践建议与避坑指南1. start_date 与 catchup 的配合这是 Airflow 新人踩坑率最高的配置。核心规则start_date 不是首次运行日期而是调度周期的逻辑起点。Airflow 默认会在首次激活时回填catchup从 start_date 到当前的所有历史周期。设置 catchupFalse 只运行当前及未来的周期。# 正确做法不想回填历史时显式关闭 with DAG(..., catchupFalse) as dag: ...2. 保持 DAG 文件轻量Scheduler 每隔 min_file_process_interval默认 30 秒会重新解析所有 DAG 文件。如果一个 DAG 文件顶部写了耗时的网络请求或数据库查询会严重拖慢整个调度循环。正确做法将业务逻辑放在 PythonOperator 的 python_callable 函数内部而非 DAG 文件的顶层代码。# ❌ 错误顶层执行 http 请求——Scheduler 每次解析都跑一次 import requests config requests.get(https://api.internal/config).json() # ✅ 正确将请求封装在 Task 函数内 def load_config(**context): config requests.get(https://api.internal/config).json() ...3. 管理 XCom 数据量XCom 默认存储在 Metastore 数据库中适合传递少量元数据Task ID、文件路径、短字符串。如果你试图通过 XCom 传递一份 50MB 的 DataFrame既拖慢执行也拖垮数据库。替代方案将大数据写入 S3 / GCS / NFS通过 XCom 只传递存储路径。4. 合理使用 SensorSensor 本质上是轮询外部条件的死循环默认 poke_interval 为 60 秒。当系统中有多个 Sensor 同时运行时会占用 Worker 槽位却不做实质计算。为 Sensor 设置合理的 timeout避免无限等待。使用 Smart SensorAirflow 2.2合并同类 Sensor 的轮询逻辑减少资源消耗。考虑用 Deferrable Operator异步模式替代同步 Sensor在等待期间释放 Worker 槽位。5. 版本与依赖管理始终使用约束文件constraints安装否则 pip 可能拉取不兼容版本的依赖库导致诡异报错。Provider 包应固定版本号apache-airflow-providers-amazon8.20.0而非 方式避免 CI 环境与生产环境版本漂移。升级 Airflow 前先用 airflow db upgrade --dry-run 预览数据库迁移脚本。6. 监控与告警自带的邮件告警适合小团队但对于生产环境建议将 Airflow 日志接入 ELK / Loki 做集中采集分析。配置 Prometheus Exporter 采集 Scheduler 心跳延迟、Task 队列积压数等关键指标。对 SLA Miss、DAG 解析失败率设置 Grafana 告警规则。写在最后Apache Airflow 2.x 已经从一个灵活版 crontab进化为成熟的工作流编排平台。它的 Provider 生态、数据感知调度能力以及向 Edge 架构的演进方向都表明它仍在积极适应现代数据栈的需求变化。如果你正在为团队选型工作流调度引擎Airflow 2.x 的 Python 原生亲和力、超大规模社区以及完善的托管服务GCP Cloud Composer、AWS MWAA、Astronomer将成为你最稳妥的选择之一。