手动跑SEO脚本太痛苦了。我用Apache Airflow搭了一套自动化数据管道每天自动采集、分析、报告。这篇文章分享Airflow DAG设计和代码。一、为什么用AirflowAirflow的优势可视化DAG图直观展示依赖关系调度cron表达式精确控制执行时间重试失败自动重试监控Web UI查看任务状态扩展轻松添加新任务二、核心DAG设计2.1 每日SEO管道# dags/daily_seo_pipeline.pyfromairflowimportDAGfromairflow.operators.pythonimportPythonOperatorfromairflow.operators.emailimportEmailOperatorfromdatetimeimportdatetime,timedelta default_args{owner:seo-team,depends_on_past:False,email:[seocompany.com],email_on_failure:True,email_on_retry:False,retries:2,retry_delay:timedelta(minutes5)}withDAG(daily_seo_pipeline,default_argsdefault_args,descriptionDaily SEO data collection and reporting,schedule_interval0 6 * * *,# 每天6点start_datedatetime(2026,1,1),catchupFalse,tags[seo,daily])asdag:# 任务1: 采集SERP数据collect_serpPythonOperator(task_idcollect_serp_data,python_callablecollect_serp_task,op_kwargs{keywords:{{ var.value.seo_keywords }},api_key:{{ conn.serpbase.password }}})# 任务2: 采集竞品数据collect_competitorsPythonOperator(task_idcollect_competitor_data,python_callablecollect_competitor_task)# 任务3: 分析数据analyzePythonOperator(task_idanalyze_data,python_callableanalyze_task)# 任务4: 生成报告generate_reportPythonOperator(task_idgenerate_report,python_callablegenerate_report_task)# 任务5: 发送邮件send_emailEmailOperator(task_idsend_email,to[teamcompany.com],subjectDaily SEO Report - {{ ds }},html_content h3Daily SEO Report - {{ ds }}/h3 pReport generated. Please check the dashboard./p )# 依赖关系[collect_serp,collect_competitors]analyzegenerate_reportsend_email2.2 任务函数defcollect_serp_task(keywords:str,api_key:str):采集SERP数据任务keyword_listkeywords.split(,)forkeywordinkeyword_list:headers{X-API-Key:api_key,Content-Type:application/json}body{q:keyword.strip(),hl:en,gl:us,page:1}rrequests.post(https://api.serpbase.dev/google/search,headersheaders,jsonbody,timeout30)# 存储到数据库store_serp_data(keyword,r.json())returnfCollected{len(keyword_list)}keywordsdefcollect_competitor_task():采集竞品数据任务competitorsVariable.get(seo_competitors,default_var).split(,)forcompetitorincompetitors:ifcompetitor.strip():track_competitor(competitor.strip())returnfTracked{len(competitors)}competitorsdefanalyze_task():分析数据任务# 计算排名变化ranking_changescalculate_ranking_changes()# 检测异常anomaliesdetect_anomalies()# 生成洞察insightsgenerate_insights()# 存储分析结果store_analysis(ranking_changes,anomalies,insights)returnAnalysis completedefgenerate_report_task():生成报告任务# 生成HTML报告report_htmlgenerate_html_report()# 保存到文件withopen(f/reports/seo_report_{datetime.now().strftime(%Y%m%d)}.html,w)asf:f.write(report_html)returnReport generated三、高级特性3.1 动态任务生成fromairflow.operators.pythonimportPythonOperatordefcreate_dynamic_tasks(**context):动态生成任务keywordsVariable.get(seo_keywords,default_var).split(,)forkeywordinkeywords:ifkeyword.strip():taskPythonOperator(task_idfcollect_{keyword.strip().replace( ,_)},python_callablecollect_single_keyword,op_kwargs{keyword:keyword.strip()})# 添加到DAGcontext[dag].add_task(task)# 使用BranchPythonOperator做条件分支defbranch_on_anomaly(**context):根据是否有异常决定分支has_anomalycheck_if_anomaly_exists()ifhas_anomaly:returnsend_alert_taskelse:returnskip_alert_taskbranch_taskBranchPythonOperator(task_idbranch_on_anomaly,python_callablebranch_on_anomaly)3.2 监控和告警defcheck_task_health(**context):检查任务健康状态ticontext[ti]# 获取上游任务状态upstream_tasksti.get_dagrun().get_task_instances()failed_tasks[tfortinupstream_tasksift.statefailed]iffailed_tasks:send_alert(fTasks failed:{[t.task_idfortinfailed_tasks]})returnHealth check complete四、部署# docker-compose.ymlversion:3.8services:airflow-webserver:image:apache/airflow:2.8.0command:webserverports:-8080:8080volumes:-./dags:/opt/airflow/dags-./logs:/opt/airflow/logsenvironment:-AIRFLOW__CORE__EXECUTORLocalExecutor-AIRFLOW__DATABASE__SQL_ALCHEMY_CONNpostgresqlpsycopg2://airflow:airflowpostgres/airflowairflow-scheduler:image:apache/airflow:2.8.0command:schedulervolumes:-./dags:/opt/airflow/dags-./logs:/opt/airflow/logspostgres:image:postgres:15environment:POSTGRES_USER:airflowPOSTGRES_PASSWORD:airflowPOSTGRES_DB:airflowAirflow让SEO自动化从脚本集合变成了工程系统。可视化DAG图让团队能理解整个流程失败自动重试减少人工干预监控告警确保问题及时发现。部署成本一个2核4G的服务器就能跑起来。
SEO数据管道:用Airflow搭建自动化工作流
手动跑SEO脚本太痛苦了。我用Apache Airflow搭了一套自动化数据管道每天自动采集、分析、报告。这篇文章分享Airflow DAG设计和代码。一、为什么用AirflowAirflow的优势可视化DAG图直观展示依赖关系调度cron表达式精确控制执行时间重试失败自动重试监控Web UI查看任务状态扩展轻松添加新任务二、核心DAG设计2.1 每日SEO管道# dags/daily_seo_pipeline.pyfromairflowimportDAGfromairflow.operators.pythonimportPythonOperatorfromairflow.operators.emailimportEmailOperatorfromdatetimeimportdatetime,timedelta default_args{owner:seo-team,depends_on_past:False,email:[seocompany.com],email_on_failure:True,email_on_retry:False,retries:2,retry_delay:timedelta(minutes5)}withDAG(daily_seo_pipeline,default_argsdefault_args,descriptionDaily SEO data collection and reporting,schedule_interval0 6 * * *,# 每天6点start_datedatetime(2026,1,1),catchupFalse,tags[seo,daily])asdag:# 任务1: 采集SERP数据collect_serpPythonOperator(task_idcollect_serp_data,python_callablecollect_serp_task,op_kwargs{keywords:{{ var.value.seo_keywords }},api_key:{{ conn.serpbase.password }}})# 任务2: 采集竞品数据collect_competitorsPythonOperator(task_idcollect_competitor_data,python_callablecollect_competitor_task)# 任务3: 分析数据analyzePythonOperator(task_idanalyze_data,python_callableanalyze_task)# 任务4: 生成报告generate_reportPythonOperator(task_idgenerate_report,python_callablegenerate_report_task)# 任务5: 发送邮件send_emailEmailOperator(task_idsend_email,to[teamcompany.com],subjectDaily SEO Report - {{ ds }},html_content h3Daily SEO Report - {{ ds }}/h3 pReport generated. Please check the dashboard./p )# 依赖关系[collect_serp,collect_competitors]analyzegenerate_reportsend_email2.2 任务函数defcollect_serp_task(keywords:str,api_key:str):采集SERP数据任务keyword_listkeywords.split(,)forkeywordinkeyword_list:headers{X-API-Key:api_key,Content-Type:application/json}body{q:keyword.strip(),hl:en,gl:us,page:1}rrequests.post(https://api.serpbase.dev/google/search,headersheaders,jsonbody,timeout30)# 存储到数据库store_serp_data(keyword,r.json())returnfCollected{len(keyword_list)}keywordsdefcollect_competitor_task():采集竞品数据任务competitorsVariable.get(seo_competitors,default_var).split(,)forcompetitorincompetitors:ifcompetitor.strip():track_competitor(competitor.strip())returnfTracked{len(competitors)}competitorsdefanalyze_task():分析数据任务# 计算排名变化ranking_changescalculate_ranking_changes()# 检测异常anomaliesdetect_anomalies()# 生成洞察insightsgenerate_insights()# 存储分析结果store_analysis(ranking_changes,anomalies,insights)returnAnalysis completedefgenerate_report_task():生成报告任务# 生成HTML报告report_htmlgenerate_html_report()# 保存到文件withopen(f/reports/seo_report_{datetime.now().strftime(%Y%m%d)}.html,w)asf:f.write(report_html)returnReport generated三、高级特性3.1 动态任务生成fromairflow.operators.pythonimportPythonOperatordefcreate_dynamic_tasks(**context):动态生成任务keywordsVariable.get(seo_keywords,default_var).split(,)forkeywordinkeywords:ifkeyword.strip():taskPythonOperator(task_idfcollect_{keyword.strip().replace( ,_)},python_callablecollect_single_keyword,op_kwargs{keyword:keyword.strip()})# 添加到DAGcontext[dag].add_task(task)# 使用BranchPythonOperator做条件分支defbranch_on_anomaly(**context):根据是否有异常决定分支has_anomalycheck_if_anomaly_exists()ifhas_anomaly:returnsend_alert_taskelse:returnskip_alert_taskbranch_taskBranchPythonOperator(task_idbranch_on_anomaly,python_callablebranch_on_anomaly)3.2 监控和告警defcheck_task_health(**context):检查任务健康状态ticontext[ti]# 获取上游任务状态upstream_tasksti.get_dagrun().get_task_instances()failed_tasks[tfortinupstream_tasksift.statefailed]iffailed_tasks:send_alert(fTasks failed:{[t.task_idfortinfailed_tasks]})returnHealth check complete四、部署# docker-compose.ymlversion:3.8services:airflow-webserver:image:apache/airflow:2.8.0command:webserverports:-8080:8080volumes:-./dags:/opt/airflow/dags-./logs:/opt/airflow/logsenvironment:-AIRFLOW__CORE__EXECUTORLocalExecutor-AIRFLOW__DATABASE__SQL_ALCHEMY_CONNpostgresqlpsycopg2://airflow:airflowpostgres/airflowairflow-scheduler:image:apache/airflow:2.8.0command:schedulervolumes:-./dags:/opt/airflow/dags-./logs:/opt/airflow/logspostgres:image:postgres:15environment:POSTGRES_USER:airflowPOSTGRES_PASSWORD:airflowPOSTGRES_DB:airflowAirflow让SEO自动化从脚本集合变成了工程系统。可视化DAG图让团队能理解整个流程失败自动重试减少人工干预监控告警确保问题及时发现。部署成本一个2核4G的服务器就能跑起来。