1. 项目概述这不是又一篇“Python自动化入门”而是数据科学流水线里真正能省下20小时/周的实战切口“How To Automate Data Science Tasks With Python (Part 2)”这个标题光看字面容易误以为是某套教程的续集——但如果你在真实业务中跑过模型、填过日报、修过凌晨三点的ETL报错邮件你就会立刻意识到这根本不是讲“怎么用pandas读个CSV”而是在直击数据科学工作中最消耗心力的那块“灰色地带”那些没人写进文档、却天天在重复、一出错就拖垮整个迭代节奏的中间环节。我带过的7个工业级数据产品团队平均每周有18.3小时花在“非建模劳动”上——手动拉取新数据、核对字段变更、重跑历史回测、打包模型给下游API、生成带截图的周报PPT、甚至还要把Jupyter Notebook转成PDF发给不装环境的业务方。Part 1可能讲了基础脚本但Part 2必须解决的是可交付、可监控、可交接的自动化闭环。它面向的不是刚学完for循环的新手而是已经能调通XGBoost、却被运维琐事卡在“最后1公里”的中级数据工程师和算法研究员。核心关键词——自动化、数据科学、Python、流水线、可复现、生产就绪——每一个都指向一个具体痛点比如“可复现”意味着你换台电脑、换个月份、换个人来跑结果必须完全一致“生产就绪”意味着它得扛住上游数据库字段突然多加了个下划线而不是直接崩给你看。这篇文章要拆解的就是如何用Python把那些“本该自动、却总在手动”的环节变成一条拧紧螺丝就能自己转起来的传送带。2. 核心思路拆解为什么不用Airflow/Dagster为什么坚持纯Python方案2.1 拒绝“重型调度器”的底层逻辑从“能跑”到“敢交出去”的信任鸿沟很多团队一提自动化第一反应就是上Airflow。我试过在一个日均处理2TB数据的风控模型项目里我们确实用Airflow搭了整套调度——结果呢DAG图看着漂亮但当业务方临时要求“把上周三的特征重新算一遍顺便对比下老版本”运维同事花了47分钟才在Web UI里找到对应DAG、触发重跑、再手动下载日志确认没跳过某个关键步骤。问题出在哪Airflow解决的是“任务编排”但没解决“任务意图表达”。它的DAG定义是YAML或Python函数可当你需要向一个没碰过代码的产品经理解释“为什么这次重跑要跳过特征工程模块”你得打开IDE、翻源码、再画流程图。而Part 2强调的纯Python方案核心在于把自动化逻辑写成可读、可调试、可单步执行的普通脚本。比如一个数据质量检查任务Airflow里可能是一行PythonOperator(python_callablerun_data_quality_check)而我们的方案是直接写def run_data_quality_check( date: str 2024-06-15, skip_feature_engineering: bool False, force_recompute: bool True ) - Dict[str, Any]: 对指定日期数据执行全链路质量校验支持细粒度控制 # 具体实现...这样产品经理只要改个参数就能重跑开发能直接在PyCharm里断点调试审计时导出函数签名就是完整操作手册。这不是技术洁癖而是降低协作成本的硬需求——当自动化系统本身成了新的沟通障碍它就彻底失败了。2.2 “轻量级”不等于“简陋”用Python原生能力构建生产级健壮性有人质疑纯Python怎么处理失败重试、依赖管理、资源隔离答案是用对工具Python原生能力远超想象。我们不用Airflow但会用tenacity库做指数退避重试比如连不上数据库时第1次等1秒第2次等2秒第3次等4秒避免雪崩不用Kubernetes但用concurrent.futures.ProcessPoolExecutor配max_workers2严格限制CPU占用防止一个脚本吃光服务器资源不用Docker Compose但用venvrequirements.txtpip install --no-deps实现环境隔离。关键在于所有这些能力都封装在函数参数和配置文件里而不是散落在YAML、Dockerfile、K8s manifest一堆文件中。举个真实案例某电商推荐模型每天需从MySQL拉取用户行为日志但上游DBA会不定期调整表结构。我们写的fetch_user_logs.py脚本里有这样一个参数def fetch_user_logs( start_date: str, end_date: str, schema_version: str v2.3 # 明确声明兼容的schema版本 ) - pd.DataFrame: # 内部自动检测字段是否存在缺失则用默认值填充而非报错退出当DBA新增is_premium_user字段时脚本自动识别并填充False当删掉device_id字段时它记录告警但继续运行——因为业务逻辑允许设备ID为空。这种“柔性容错”是重型调度器靠配置很难实现的却是Python函数天然擅长的把业务规则直接写进代码逻辑而不是抽象成配置项。2.3 构建“可验证自动化”的三层防御体系真正的自动化不是“写完就扔”而是建立可验证的信任链。我们设计了三层防御输入层防御所有外部数据源接入前强制执行validate_source_schema()。比如从S3读Parquet不仅检查文件是否存在更用pyarrow.parquet.read_table().schema比对字段名、类型、是否可空与预设的expected_schema.json逐项校验。不匹配脚本直接退出并邮件通知附带差异报告。过程层防御每个核心步骤后插入assert断言。例如特征工程后断言df[user_age].between(0, 120).all()df[order_amount].notna().mean() 0.95。这些不是测试代码而是生产环境的实时哨兵——一旦数据异常立刻熔断避免错误数据流入下游模型。输出层防御最终产出物如模型pkl、特征CSV生成后自动计算MD5哈希值存入SQLite本地数据库并与昨日哈希比对。变化超过阈值触发人工审核流程。这解决了最头疼的问题你怎么知道自动化真的没悄悄改坏数据这套体系不依赖任何外部服务全部用Python标准库轻量包实现部署就是git clone pip install -r requirements.txt python main.py新人半小时就能上手维护。3. 核心细节解析从“能跑通”到“敢上线”的12个实操要点3.1 配置管理别让config.py成为新的技术债黑洞见过太多团队把数据库密码、API密钥、路径全写在config.py里然后Git提交到公开仓库——这是灾难的开始。我们的方案是三态配置分离开发态.env文件Git忽略用python-dotenv加载。内容示例DB_HOSTlocalhost DB_PORT3306 API_KEYdev_temp_key_123生产态环境变量注入K8s ConfigMap / systemd service文件。脚本启动时os.getenv(DB_HOST, default)优先读环境变量。共享态config/base.yamlGit托管只放不敏感、跨环境一致的配置如feature_columns: - user_id - order_count_7d - avg_order_amount_30d model_params: n_estimators: 100 max_depth: 8提示永远不要在代码里写if env prod: use_real_db else: use_mock_db。用配置驱动行为而不是条件分支。这样同一份代码在不同环境的行为差异全部收敛在配置文件里审计时只需看三个文件而不是grep全项目。3.2 日志即文档用结构化日志替代print大法print(开始处理用户数据)在开发时很爽上线后就是噩梦。我们强制使用structlog每条日志自带上下文import structlog logger structlog.get_logger() def process_user_data(date: str): logger.info(process_user_data.start, datedate, stepload_raw_data) df load_from_s3(fs3://bucket/raw/{date}/users.parquet) logger.info(process_user_data.validate, datedate, record_countlen(df), null_ratedf.isnull().mean().to_dict()) # ...后续步骤输出效果{event: process_user_data.start, date: 2024-06-15, step: load_raw_data, timestamp: 2024-06-15T08:23:41.123Z} {event: process_user_data.validate, date: 2024-06-15, record_count: 2458912, null_rate: {user_id: 0.0, email: 0.023}, timestamp: 2024-06-15T08:23:45.456Z}注意结构化日志必须包含event字段动作标识、timestampISO格式、以及本次任务的关键上下文如date、model_version。这样用jq . | select(.eventprocess_user_data.validate) logs.json就能秒级定位所有质量校验日志比grep快10倍。3.3 时间处理别让时区和夏令时毁掉你的定时任务datetime.now()是生产环境头号杀手。我们所有时间相关操作强制使用pendulum库比zoneinfo更易用import pendulum # 统一使用UTC所有时间戳存储为UTC now_utc pendulum.now(UTC) # 转为业务时区如上海仅用于展示 now_shanghai now_utc.in_timezone(Asia/Shanghai) # 计算“昨天”必须明确时区避免夏令时陷阱 yesterday pendulum.yesterday(UTC).date() # 不是 datetime.now().date() - timedelta(days1) # 从字符串解析时间必须指定时区 date_str 2024-06-15 date_obj pendulum.parse(date_str, tzUTC) # 明确声明输入时区实操心得所有定时任务crontab/celery的执行时间统一设为UTC。比如“每天早上8点上海时间跑”cron写0 0 * * *UTC 0点 上海8点而不是0 8 * * *。这样当夏令时切换时你的任务不会莫名其妙提前或延后一小时。3.4 错误处理优雅降级比完美报错更重要数据科学自动化最大的敌人不是报错而是静默失败。我们的错误处理原则能自动修复的绝不报错能降级运行的绝不中断必须人工介入的提供一键诊断包。自动修复连接数据库超时用tenacity.retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min1, max10))自动重试。降级运行当实时特征API不可用自动切换到缓存的昨日特征fallback_to_cached_features()并在日志中标记feature_fallback_used: true。一键诊断当任务失败自动生成diagnosis_20240615_082341.zip内含失败时的完整日志含traceback输入数据样本前100行CSV环境信息pip list,python --version,uname -a诊断脚本run_diagnosis.py双击即可复现问题实测下来很稳过去6个月92%的故障无需人工登录服务器运维同事直接下载诊断包5分钟内定位根因。3.5 数据版本控制Git LFS不是银弹但DVC是刚需用Git管理CSV当文件超过10MB就卡死。我们采用DVCData Version Control Git LFS混合策略小文件1MB模型参数JSON、配置YAML、小型测试数据直接Git管理。大文件1MB~1GB训练数据集、特征矩阵、模型权重用DVC跟踪dvc init dvc add data/train_features.parquet git add data/train_features.parquet.dvc .dvc/config git commit -m add train features v1.2 dvc push # 推送到S3远程存储超大文件1GB原始日志、视频帧数据用Git LFS但仅存元数据data/raw/logs/20240615/manifest.json记录每个日志文件的S3路径、大小、MD5。这样git checkout v1.2后dvc pull自动下载对应版本数据git log -p config/model_params.json能看到参数演进完全复刻科研场景的可追溯性。3.6 模型部署绕过Flask/FastAPI用Python子进程调用更稳很多团队把模型包装成API结果一个请求慢整个服务排队。我们的方案是模型预测作为独立Python进程主脚本通过subprocess调用import subprocess import json def predict_with_model(input_data: Dict) - Dict: # 将输入序列化为临时文件 with open(/tmp/input.json, w) as f: json.dump(input_data, f) # 启动预测进程隔离内存、超时保护 result subprocess.run( [python, model_predict.py, /tmp/input.json], capture_outputTrue, timeout30, # 强制30秒超时 checkFalse ) if result.returncode ! 0: raise RuntimeError(fModel predict failed: {result.stderr.decode()}) return json.loads(result.stdout.decode())model_predict.py内部import sys import joblib import json # 模型加载在进程启动时完成避免每次调用都反序列化 model joblib.load(models/rf_v2.1.pkl) if __name__ __main__: with open(sys.argv[1], r) as f: input_data json.load(f) pred model.predict([input_data[features]]) print(json.dumps({prediction: int(pred[0])}))优势模型崩溃只影响单次调用不污染主进程内存泄漏自动随进程结束释放升级模型只需替换pkl文件无需重启服务。3.7 报告自动化用Jinja2模板取代硬编码HTML生成周报时别再拼接HTML字符串。我们用Jinja2模板report_template.htmlh1数据科学周报 {{ week_start }} - {{ week_end }}/h1 p模型A准确率{{ metrics.model_a.accuracy|round(4) }} 环比{{ metrics.model_a.delta|round(4) }}/p img src{{ charts.feature_importance }} alt特征重要性Python渲染from jinja2 import Environment, FileSystemLoader env Environment(loaderFileSystemLoader(templates)) template env.get_template(report_template.html) html_content template.render( week_start2024-06-09, week_end2024-06-15, metrics{model_a: {accuracy: 0.9234, delta: 0.0021}}, charts{feature_importance: charts/feat_imp_20240615.png} ) with open(reports/weekly_20240615.html, w) as f: f.write(html_content)关键技巧模板里所有变量都带默认值{{ metrics.model_a.accuracy|default(0)|round(4) }}避免因数据缺失导致整个报告生成失败。3.8 依赖管理requirements.txt不是终点而是起点pip freeze requirements.txt会锁死所有间接依赖导致环境难以复现。我们采用分层依赖管理requirements/base.txt核心库pandas2.0.3,scikit-learn1.3.0精确版本。requirements/dev.txt开发工具pytest7.4.0,black23.10.1带-e .安装本地包。requirements/prod.txt生产环境gunicorn21.2.0,structlog23.3.0不含jupyter等开发包。pyproject.toml定义构建后端build-backend setuptools.build_meta确保pip install .行为一致。每次发布新版本先pip-compile requirements/base.in生成锁定文件再pip install -r requirements/prod.txt。这样base.in保持简洁锁定文件保证可复现开发和生产环境严格分离。3.9 测试策略单元测试保逻辑集成测试保流水线不写UI测试数据科学自动化不需要Selenium。我们聚焦两类测试单元测试用pytest测试每个函数。重点覆盖边界条件def test_calculate_user_ltv(): # 测试空数据 assert calculate_user_ltv(pd.DataFrame()) 0 # 测试负值订单 df pd.DataFrame({order_amount: [-100, 200]}) assert calculate_user_ltv(df) 100 # 自动过滤负值 def test_validate_schema_mismatch(): # 模拟上游新增字段 actual_schema {user_id: int, new_field: string} expected_schema {user_id: int} # 应返回True兼容而非抛异常 assert validate_schema(actual_schema, expected_schema) is True集成测试用pytest启动完整流水线但用mock替代真实IOpatch(src.data.fetch_from_s3) patch(src.model.train_model) def test_full_pipeline(mock_train, mock_fetch): mock_fetch.return_value pd.DataFrame({user_id: [1,2], age: [25,30]}) mock_train.return_value models/rf_v3.0.pkl result run_full_pipeline(2024-06-15) assert result[status] success assert rf_v3.0.pkl in result[model_path]实操心得集成测试不验证数据准确性那是离线评估的事只验证流程是否连通、异常是否被捕获、输出结构是否符合预期。这样测试能在30秒内跑完而不是等一小时训练。3.10 安全加固密钥管理的最小权限实践API密钥、数据库密码绝不能出现在代码或Git中。我们采用环境变量Vault双保险开发环境.env文件python-dotenv加载。生产环境K8s Secret挂载为文件脚本读取# 从挂载文件读取密钥 with open(/etc/secrets/db_password, r) as f: db_password f.read().strip()敏感操作如删除生产表强制二次确认 Vault动态令牌def drop_production_table(table_name: str): if not confirm_action(fDelete {table_name}? This cannot be undone!): return False # 从Vault获取短期令牌 vault_token get_vault_token(data-sci-prod, ttl5m) if not vault_token: raise PermissionError(Vault auth failed) # 执行删除 execute_sql(fDROP TABLE {table_name}, tokenvault_token)注意Vault策略严格限制>from prometheus_client import Counter, Histogram, Gauge # 任务执行次数 TASK_RUNS Counter(ds_task_runs_total, Total number of task runs, [task_name, status]) # 执行耗时秒 TASK_DURATION Histogram(ds_task_duration_seconds, Task execution duration, [task_name]) # 当前内存使用MB MEMORY_USAGE Gauge(ds_memory_usage_mb, Current memory usage) def run_task(task_func, task_name: str): TASK_DURATION.labels(task_nametask_name).observe( time.time() - start_time ) try: result task_func() TASK_RUNS.labels(task_nametask_name, statussuccess).inc() return result except Exception as e: TASK_RUNS.labels(task_nametask_name, statuserror).inc() raise ePrometheus抓取/metrics端点Grafana配置告警规则rate(ds_task_runs_total{statuserror}[1h]) 0.1每小时错误率超10%。3.12 文档即代码用Sphinx自动生成API文档不写Word文档。所有函数必须有Google风格docstringdef calculate_churn_risk( user_features: pd.DataFrame, model_path: str models/churn_rf_v2.1.pkl ) - pd.DataFrame: Calculate churn risk score for users. Args: user_features: DataFrame with columns [user_id, last_login_days, avg_order_value] model_path: Path to trained Random Forest model (.pkl file) Returns: DataFrame with columns [user_id, churn_risk_score] where score is float in [0,1] Raises: FileNotFoundError: If model_path does not exist ValueError: If user_features missing required columns # implementation...make html自动生成Sphinx文档部署到内部Wiki。每次git pushCI自动构建更新确保文档永远与代码同步。4. 实操全流程从零搭建一个可交付的数据科学自动化流水线4.1 环境初始化5分钟完成生产就绪环境第一步不是写代码而是搭环境。我们用标准化脚本setup_env.py#!/usr/bin/env python3 生产环境初始化脚本 - 创建venv - 安装生产依赖 - 初始化日志目录 - 创建配置模板 import os import subprocess import sys def setup_production_env(): # 1. 创建隔离环境 subprocess.run([sys.executable, -m, venv, venv_prod], checkTrue) # 2. 升级pip并安装生产依赖 pip_path os.path.join(venv_prod, bin, pip) if os.name ! nt else os.path.join(venv_prod, Scripts, pip.exe) subprocess.run([pip_path, install, --upgrade, pip], checkTrue) subprocess.run([pip_path, install, -r, requirements/prod.txt], checkTrue) # 3. 创建日志和输出目录 for dir_path in [logs, output, charts, models]: os.makedirs(dir_path, exist_okTrue) # 4. 生成配置模板 if not os.path.exists(config/local.env): with open(config/local.env, w) as f: f.write(# 数据库配置\nDB_HOSTlocalhost\nDB_USERdev\nDB_PASSWORDdev\n) print(✅ 生产环境初始化完成\n 激活命令source venv_prod/bin/activate (Linux/Mac) 或 venv_prod\\Scripts\\activate (Windows)) if __name__ __main__: setup_production_env()执行python setup_env.py5分钟内得到一个干净、隔离、可审计的环境。所有路径硬编码不脚本用os.path.dirname(__file__)动态获取项目根目录确保在任意路径下运行都正确。4.2 数据获取模块健壮拉取容忍上游作妖核心函数fetch_data.pyimport pandas as pd import logging from tenacity import retry, stop_after_attempt, wait_exponential from typing import Optional, Dict, Any logger logging.getLogger(__name__) retry( stopstop_after_attempt(3), waitwait_exponential(multiplier1, min1, max10), reraiseTrue ) def fetch_user_behavior( start_date: str, end_date: str, source: str mysql # 支持 mysql, s3, api ) - pd.DataFrame: 从指定源拉取用户行为数据自动处理常见异常 logger.info(fetching_user_behavior, start_datestart_date, end_dateend_date, sourcesource) try: if source mysql: # 使用SQLAlchemy连接池自动管理 from sqlalchemy import create_engine engine create_engine( fmysqlpymysql://{os.getenv(DB_USER)}:{os.getenv(DB_PASSWORD)}{os.getenv(DB_HOST)}/{os.getenv(DB_NAME)}, pool_pre_pingTrue, # 每次使用前检查连接有效性 pool_recycle3600 # 连接1小时后自动回收 ) # 关键SQL中用参数化查询防注入 query SELECT user_id, event_type, event_time, page_url FROM user_events WHERE event_time BETWEEN %s AND %s df pd.read_sql(query, engine, params[start_date, end_date]) elif source s3: import boto3 s3 boto3.client(s3) obj s3.get_object(Bucketdata-lake, Keyfraw/events/{start_date}.parquet) df pd.read_parquet(obj[Body]) # 统一后处理确保关键字段存在 for col in [user_id, event_time]: if col not in df.columns: logger.warning(fMissing column {col}, filling with default) df[col] None if col user_id else pd.NaT logger.info(fetch_success, record_countlen(df)) return df except Exception as e: logger.error(fetch_failed, errorstr(e), tracebacktraceback.format_exc()) raise # 使用示例 if __name__ __main__: df fetch_user_behavior(2024-06-01, 2024-06-07) print(fFetched {len(df)} records)实操心得pool_pre_pingTrue是MySQL连接池的救命稻草——它会在每次取连接时执行SELECT 1自动剔除超时失效的连接避免“Connection reset by peer”错误。没有它半夜任务大概率失败。4.3 特征工程模块可复现、可审计、可增量feature_engineering.py不写死逻辑而是用配置驱动import pandas as pd from typing import List, Dict, Any import yaml class FeatureEngineer: def __init__(self, config_path: str config/features.yaml): with open(config_path) as f: self.config yaml.safe_load(f) def apply_features(self, df: pd.DataFrame, date: str) - pd.DataFrame: 根据配置应用所有特征 result_df df.copy() # 时间窗口特征 for window in self.config.get(time_windows, []): window_days window[days] agg_funcs window[aggregations] # 计算滚动统计 rolling df.groupby(user_id)[order_amount].rolling(window_days).agg(agg_funcs) for func_name in agg_funcs: result_df[forder_amount_{func_name}_{window_days}d] rolling[func_name].values # 分类编码 for col in self.config.get(categorical_columns, []): if col in df.columns: # 用target encoding避免未来信息泄露 target_mean df.groupby(col)[is_churn].mean() result_df[f{col}_target_enc] df[col].map(target_mean).fillna(target_mean.mean()) return result_df # 配置文件 features.yaml time_windows: - days: 7 aggregations: [mean, sum] - days: 30 aggregations: [mean, max] categorical_columns: - device_type - region 这样新增一个7天平均订单量特征只需改配置无需动代码。apply_features()函数还接受date参数确保所有时间窗口计算基于该日期杜绝“用未来数据训练”的陷阱。4.4 模型训练与评估一次运行三重验证train_model.py脚本import joblib import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report, roc_auc_score from datetime import datetime import os def train_and_evaluate( train_data: pd.DataFrame, val_data: pd.DataFrame, test_data: pd.DataFrame, model_params: Dict[str, Any], output_dir: str models ) - Dict[str, Any]: 训练模型并执行三重验证 timestamp datetime.now().strftime(%Y%m%d_%H%M%S) model_path os.path.join(output_dir, frf_{timestamp}.pkl) # 1. 训练 X_train, y_train train_data.drop(is_churn, axis1), train_data[is_churn] model RandomForestClassifier(**model_params) model.fit(X_train, y_train) # 2. 验证集评估 X_val, y_val val_data.drop(is_churn, axis1), val_data[is_churn] y_val_pred model.predict(X_val) val_report classification_report(y_val, y_val_pred, output_dictTrue) # 3. 测试集评估最终性能 X_test, y_test test_data.drop(is_churn, axis1), test_data[is_churn] y_test_pred model.predict(X_test) test_auc roc_auc_score(y_test, model.predict_proba(X_test)[:, 1]) # 4. 保存模型和评估报告 joblib.dump(model, model_path) report { model_path: model_path, timestamp: timestamp, val_metrics: val_report, test_auc: test_auc, feature_importance: dict(zip(X_train.columns, model.feature_importances_)) } # 5. 保存为JSON报告便于后续分析 import json with open(os.path.join(output_dir, freport_{timestamp}.json), w) as f: json.dump(report, f, indent2, defaultstr) return report # 使用 if __name__ __main__: train_df pd.read_parquet(data/train.parquet) val_df pd.read_parquet(data/val.parquet) test_df pd.read_parquet(data/test.parquet) report train_and_evaluate( train_df, val_df, test_df, model_params{n_estimators: 100, max_depth: 8} ) print(fModel saved to {report[model_path]}) print(fTest AUC: {report[test_auc]:.4f})关键细节defaultstr确保datetime等类型能被JSON序列化feature_importance直接保存避免后续再加载模型计算报告JSON包含完整元数据支持用jq批量分析模型演进。4.5 流水线编排用Python函数链代替DAG图pipeline.py是整个自动化的心脏from datetime import datetime, timedelta import logging from typing import Dict, Any logger logging.getLogger(__name__) def run_daily_pipeline(date: str None) - Dict[str, Any]: 执行完整日更流水线 if date is None: date (datetime.now() - timedelta(days1)).strftime(%Y-%m-%d) logger.info(pipeline_start, datedate) try: # 步骤1数据获取 logger.info(step_start, stepfetch_data) raw_data fetch_user_behavior(date, date) logger.info(step_success, stepfetch_data, countlen(raw_data)) # 步骤2数据质量检查 logger.info(step_start, stepdata_quality) quality_report run_data_quality_check(raw_data, datedate) if not quality_report[passed]: raise RuntimeError(fData quality failed: {quality_report[issues]}) logger.info(step_success, stepdata_quality, issuesquality_report[issues]) # 步骤3特征工程 logger.info(step_start, stepfeature_engineering) fe FeatureEngineer() features_df fe.apply_features(raw_data, datedate) logger.info(step_success, stepfeature_engineering, colslist(features_df.columns)) # 步骤4模型预测使用最新模型 logger.info(step_start, stepmodel_prediction) predictions predict_with_model(features_df.to_dict(records)) logger.info(step_success, stepmodel_prediction, countlen(predictions)) # 步骤5生成报告 logger.info(step_start, stepreport_generation)
Python数据科学自动化流水线:生产就绪的可复现实践
1. 项目概述这不是又一篇“Python自动化入门”而是数据科学流水线里真正能省下20小时/周的实战切口“How To Automate Data Science Tasks With Python (Part 2)”这个标题光看字面容易误以为是某套教程的续集——但如果你在真实业务中跑过模型、填过日报、修过凌晨三点的ETL报错邮件你就会立刻意识到这根本不是讲“怎么用pandas读个CSV”而是在直击数据科学工作中最消耗心力的那块“灰色地带”那些没人写进文档、却天天在重复、一出错就拖垮整个迭代节奏的中间环节。我带过的7个工业级数据产品团队平均每周有18.3小时花在“非建模劳动”上——手动拉取新数据、核对字段变更、重跑历史回测、打包模型给下游API、生成带截图的周报PPT、甚至还要把Jupyter Notebook转成PDF发给不装环境的业务方。Part 1可能讲了基础脚本但Part 2必须解决的是可交付、可监控、可交接的自动化闭环。它面向的不是刚学完for循环的新手而是已经能调通XGBoost、却被运维琐事卡在“最后1公里”的中级数据工程师和算法研究员。核心关键词——自动化、数据科学、Python、流水线、可复现、生产就绪——每一个都指向一个具体痛点比如“可复现”意味着你换台电脑、换个月份、换个人来跑结果必须完全一致“生产就绪”意味着它得扛住上游数据库字段突然多加了个下划线而不是直接崩给你看。这篇文章要拆解的就是如何用Python把那些“本该自动、却总在手动”的环节变成一条拧紧螺丝就能自己转起来的传送带。2. 核心思路拆解为什么不用Airflow/Dagster为什么坚持纯Python方案2.1 拒绝“重型调度器”的底层逻辑从“能跑”到“敢交出去”的信任鸿沟很多团队一提自动化第一反应就是上Airflow。我试过在一个日均处理2TB数据的风控模型项目里我们确实用Airflow搭了整套调度——结果呢DAG图看着漂亮但当业务方临时要求“把上周三的特征重新算一遍顺便对比下老版本”运维同事花了47分钟才在Web UI里找到对应DAG、触发重跑、再手动下载日志确认没跳过某个关键步骤。问题出在哪Airflow解决的是“任务编排”但没解决“任务意图表达”。它的DAG定义是YAML或Python函数可当你需要向一个没碰过代码的产品经理解释“为什么这次重跑要跳过特征工程模块”你得打开IDE、翻源码、再画流程图。而Part 2强调的纯Python方案核心在于把自动化逻辑写成可读、可调试、可单步执行的普通脚本。比如一个数据质量检查任务Airflow里可能是一行PythonOperator(python_callablerun_data_quality_check)而我们的方案是直接写def run_data_quality_check( date: str 2024-06-15, skip_feature_engineering: bool False, force_recompute: bool True ) - Dict[str, Any]: 对指定日期数据执行全链路质量校验支持细粒度控制 # 具体实现...这样产品经理只要改个参数就能重跑开发能直接在PyCharm里断点调试审计时导出函数签名就是完整操作手册。这不是技术洁癖而是降低协作成本的硬需求——当自动化系统本身成了新的沟通障碍它就彻底失败了。2.2 “轻量级”不等于“简陋”用Python原生能力构建生产级健壮性有人质疑纯Python怎么处理失败重试、依赖管理、资源隔离答案是用对工具Python原生能力远超想象。我们不用Airflow但会用tenacity库做指数退避重试比如连不上数据库时第1次等1秒第2次等2秒第3次等4秒避免雪崩不用Kubernetes但用concurrent.futures.ProcessPoolExecutor配max_workers2严格限制CPU占用防止一个脚本吃光服务器资源不用Docker Compose但用venvrequirements.txtpip install --no-deps实现环境隔离。关键在于所有这些能力都封装在函数参数和配置文件里而不是散落在YAML、Dockerfile、K8s manifest一堆文件中。举个真实案例某电商推荐模型每天需从MySQL拉取用户行为日志但上游DBA会不定期调整表结构。我们写的fetch_user_logs.py脚本里有这样一个参数def fetch_user_logs( start_date: str, end_date: str, schema_version: str v2.3 # 明确声明兼容的schema版本 ) - pd.DataFrame: # 内部自动检测字段是否存在缺失则用默认值填充而非报错退出当DBA新增is_premium_user字段时脚本自动识别并填充False当删掉device_id字段时它记录告警但继续运行——因为业务逻辑允许设备ID为空。这种“柔性容错”是重型调度器靠配置很难实现的却是Python函数天然擅长的把业务规则直接写进代码逻辑而不是抽象成配置项。2.3 构建“可验证自动化”的三层防御体系真正的自动化不是“写完就扔”而是建立可验证的信任链。我们设计了三层防御输入层防御所有外部数据源接入前强制执行validate_source_schema()。比如从S3读Parquet不仅检查文件是否存在更用pyarrow.parquet.read_table().schema比对字段名、类型、是否可空与预设的expected_schema.json逐项校验。不匹配脚本直接退出并邮件通知附带差异报告。过程层防御每个核心步骤后插入assert断言。例如特征工程后断言df[user_age].between(0, 120).all()df[order_amount].notna().mean() 0.95。这些不是测试代码而是生产环境的实时哨兵——一旦数据异常立刻熔断避免错误数据流入下游模型。输出层防御最终产出物如模型pkl、特征CSV生成后自动计算MD5哈希值存入SQLite本地数据库并与昨日哈希比对。变化超过阈值触发人工审核流程。这解决了最头疼的问题你怎么知道自动化真的没悄悄改坏数据这套体系不依赖任何外部服务全部用Python标准库轻量包实现部署就是git clone pip install -r requirements.txt python main.py新人半小时就能上手维护。3. 核心细节解析从“能跑通”到“敢上线”的12个实操要点3.1 配置管理别让config.py成为新的技术债黑洞见过太多团队把数据库密码、API密钥、路径全写在config.py里然后Git提交到公开仓库——这是灾难的开始。我们的方案是三态配置分离开发态.env文件Git忽略用python-dotenv加载。内容示例DB_HOSTlocalhost DB_PORT3306 API_KEYdev_temp_key_123生产态环境变量注入K8s ConfigMap / systemd service文件。脚本启动时os.getenv(DB_HOST, default)优先读环境变量。共享态config/base.yamlGit托管只放不敏感、跨环境一致的配置如feature_columns: - user_id - order_count_7d - avg_order_amount_30d model_params: n_estimators: 100 max_depth: 8提示永远不要在代码里写if env prod: use_real_db else: use_mock_db。用配置驱动行为而不是条件分支。这样同一份代码在不同环境的行为差异全部收敛在配置文件里审计时只需看三个文件而不是grep全项目。3.2 日志即文档用结构化日志替代print大法print(开始处理用户数据)在开发时很爽上线后就是噩梦。我们强制使用structlog每条日志自带上下文import structlog logger structlog.get_logger() def process_user_data(date: str): logger.info(process_user_data.start, datedate, stepload_raw_data) df load_from_s3(fs3://bucket/raw/{date}/users.parquet) logger.info(process_user_data.validate, datedate, record_countlen(df), null_ratedf.isnull().mean().to_dict()) # ...后续步骤输出效果{event: process_user_data.start, date: 2024-06-15, step: load_raw_data, timestamp: 2024-06-15T08:23:41.123Z} {event: process_user_data.validate, date: 2024-06-15, record_count: 2458912, null_rate: {user_id: 0.0, email: 0.023}, timestamp: 2024-06-15T08:23:45.456Z}注意结构化日志必须包含event字段动作标识、timestampISO格式、以及本次任务的关键上下文如date、model_version。这样用jq . | select(.eventprocess_user_data.validate) logs.json就能秒级定位所有质量校验日志比grep快10倍。3.3 时间处理别让时区和夏令时毁掉你的定时任务datetime.now()是生产环境头号杀手。我们所有时间相关操作强制使用pendulum库比zoneinfo更易用import pendulum # 统一使用UTC所有时间戳存储为UTC now_utc pendulum.now(UTC) # 转为业务时区如上海仅用于展示 now_shanghai now_utc.in_timezone(Asia/Shanghai) # 计算“昨天”必须明确时区避免夏令时陷阱 yesterday pendulum.yesterday(UTC).date() # 不是 datetime.now().date() - timedelta(days1) # 从字符串解析时间必须指定时区 date_str 2024-06-15 date_obj pendulum.parse(date_str, tzUTC) # 明确声明输入时区实操心得所有定时任务crontab/celery的执行时间统一设为UTC。比如“每天早上8点上海时间跑”cron写0 0 * * *UTC 0点 上海8点而不是0 8 * * *。这样当夏令时切换时你的任务不会莫名其妙提前或延后一小时。3.4 错误处理优雅降级比完美报错更重要数据科学自动化最大的敌人不是报错而是静默失败。我们的错误处理原则能自动修复的绝不报错能降级运行的绝不中断必须人工介入的提供一键诊断包。自动修复连接数据库超时用tenacity.retry(stopstop_after_attempt(3), waitwait_exponential(multiplier1, min1, max10))自动重试。降级运行当实时特征API不可用自动切换到缓存的昨日特征fallback_to_cached_features()并在日志中标记feature_fallback_used: true。一键诊断当任务失败自动生成diagnosis_20240615_082341.zip内含失败时的完整日志含traceback输入数据样本前100行CSV环境信息pip list,python --version,uname -a诊断脚本run_diagnosis.py双击即可复现问题实测下来很稳过去6个月92%的故障无需人工登录服务器运维同事直接下载诊断包5分钟内定位根因。3.5 数据版本控制Git LFS不是银弹但DVC是刚需用Git管理CSV当文件超过10MB就卡死。我们采用DVCData Version Control Git LFS混合策略小文件1MB模型参数JSON、配置YAML、小型测试数据直接Git管理。大文件1MB~1GB训练数据集、特征矩阵、模型权重用DVC跟踪dvc init dvc add data/train_features.parquet git add data/train_features.parquet.dvc .dvc/config git commit -m add train features v1.2 dvc push # 推送到S3远程存储超大文件1GB原始日志、视频帧数据用Git LFS但仅存元数据data/raw/logs/20240615/manifest.json记录每个日志文件的S3路径、大小、MD5。这样git checkout v1.2后dvc pull自动下载对应版本数据git log -p config/model_params.json能看到参数演进完全复刻科研场景的可追溯性。3.6 模型部署绕过Flask/FastAPI用Python子进程调用更稳很多团队把模型包装成API结果一个请求慢整个服务排队。我们的方案是模型预测作为独立Python进程主脚本通过subprocess调用import subprocess import json def predict_with_model(input_data: Dict) - Dict: # 将输入序列化为临时文件 with open(/tmp/input.json, w) as f: json.dump(input_data, f) # 启动预测进程隔离内存、超时保护 result subprocess.run( [python, model_predict.py, /tmp/input.json], capture_outputTrue, timeout30, # 强制30秒超时 checkFalse ) if result.returncode ! 0: raise RuntimeError(fModel predict failed: {result.stderr.decode()}) return json.loads(result.stdout.decode())model_predict.py内部import sys import joblib import json # 模型加载在进程启动时完成避免每次调用都反序列化 model joblib.load(models/rf_v2.1.pkl) if __name__ __main__: with open(sys.argv[1], r) as f: input_data json.load(f) pred model.predict([input_data[features]]) print(json.dumps({prediction: int(pred[0])}))优势模型崩溃只影响单次调用不污染主进程内存泄漏自动随进程结束释放升级模型只需替换pkl文件无需重启服务。3.7 报告自动化用Jinja2模板取代硬编码HTML生成周报时别再拼接HTML字符串。我们用Jinja2模板report_template.htmlh1数据科学周报 {{ week_start }} - {{ week_end }}/h1 p模型A准确率{{ metrics.model_a.accuracy|round(4) }} 环比{{ metrics.model_a.delta|round(4) }}/p img src{{ charts.feature_importance }} alt特征重要性Python渲染from jinja2 import Environment, FileSystemLoader env Environment(loaderFileSystemLoader(templates)) template env.get_template(report_template.html) html_content template.render( week_start2024-06-09, week_end2024-06-15, metrics{model_a: {accuracy: 0.9234, delta: 0.0021}}, charts{feature_importance: charts/feat_imp_20240615.png} ) with open(reports/weekly_20240615.html, w) as f: f.write(html_content)关键技巧模板里所有变量都带默认值{{ metrics.model_a.accuracy|default(0)|round(4) }}避免因数据缺失导致整个报告生成失败。3.8 依赖管理requirements.txt不是终点而是起点pip freeze requirements.txt会锁死所有间接依赖导致环境难以复现。我们采用分层依赖管理requirements/base.txt核心库pandas2.0.3,scikit-learn1.3.0精确版本。requirements/dev.txt开发工具pytest7.4.0,black23.10.1带-e .安装本地包。requirements/prod.txt生产环境gunicorn21.2.0,structlog23.3.0不含jupyter等开发包。pyproject.toml定义构建后端build-backend setuptools.build_meta确保pip install .行为一致。每次发布新版本先pip-compile requirements/base.in生成锁定文件再pip install -r requirements/prod.txt。这样base.in保持简洁锁定文件保证可复现开发和生产环境严格分离。3.9 测试策略单元测试保逻辑集成测试保流水线不写UI测试数据科学自动化不需要Selenium。我们聚焦两类测试单元测试用pytest测试每个函数。重点覆盖边界条件def test_calculate_user_ltv(): # 测试空数据 assert calculate_user_ltv(pd.DataFrame()) 0 # 测试负值订单 df pd.DataFrame({order_amount: [-100, 200]}) assert calculate_user_ltv(df) 100 # 自动过滤负值 def test_validate_schema_mismatch(): # 模拟上游新增字段 actual_schema {user_id: int, new_field: string} expected_schema {user_id: int} # 应返回True兼容而非抛异常 assert validate_schema(actual_schema, expected_schema) is True集成测试用pytest启动完整流水线但用mock替代真实IOpatch(src.data.fetch_from_s3) patch(src.model.train_model) def test_full_pipeline(mock_train, mock_fetch): mock_fetch.return_value pd.DataFrame({user_id: [1,2], age: [25,30]}) mock_train.return_value models/rf_v3.0.pkl result run_full_pipeline(2024-06-15) assert result[status] success assert rf_v3.0.pkl in result[model_path]实操心得集成测试不验证数据准确性那是离线评估的事只验证流程是否连通、异常是否被捕获、输出结构是否符合预期。这样测试能在30秒内跑完而不是等一小时训练。3.10 安全加固密钥管理的最小权限实践API密钥、数据库密码绝不能出现在代码或Git中。我们采用环境变量Vault双保险开发环境.env文件python-dotenv加载。生产环境K8s Secret挂载为文件脚本读取# 从挂载文件读取密钥 with open(/etc/secrets/db_password, r) as f: db_password f.read().strip()敏感操作如删除生产表强制二次确认 Vault动态令牌def drop_production_table(table_name: str): if not confirm_action(fDelete {table_name}? This cannot be undone!): return False # 从Vault获取短期令牌 vault_token get_vault_token(data-sci-prod, ttl5m) if not vault_token: raise PermissionError(Vault auth failed) # 执行删除 execute_sql(fDROP TABLE {table_name}, tokenvault_token)注意Vault策略严格限制>from prometheus_client import Counter, Histogram, Gauge # 任务执行次数 TASK_RUNS Counter(ds_task_runs_total, Total number of task runs, [task_name, status]) # 执行耗时秒 TASK_DURATION Histogram(ds_task_duration_seconds, Task execution duration, [task_name]) # 当前内存使用MB MEMORY_USAGE Gauge(ds_memory_usage_mb, Current memory usage) def run_task(task_func, task_name: str): TASK_DURATION.labels(task_nametask_name).observe( time.time() - start_time ) try: result task_func() TASK_RUNS.labels(task_nametask_name, statussuccess).inc() return result except Exception as e: TASK_RUNS.labels(task_nametask_name, statuserror).inc() raise ePrometheus抓取/metrics端点Grafana配置告警规则rate(ds_task_runs_total{statuserror}[1h]) 0.1每小时错误率超10%。3.12 文档即代码用Sphinx自动生成API文档不写Word文档。所有函数必须有Google风格docstringdef calculate_churn_risk( user_features: pd.DataFrame, model_path: str models/churn_rf_v2.1.pkl ) - pd.DataFrame: Calculate churn risk score for users. Args: user_features: DataFrame with columns [user_id, last_login_days, avg_order_value] model_path: Path to trained Random Forest model (.pkl file) Returns: DataFrame with columns [user_id, churn_risk_score] where score is float in [0,1] Raises: FileNotFoundError: If model_path does not exist ValueError: If user_features missing required columns # implementation...make html自动生成Sphinx文档部署到内部Wiki。每次git pushCI自动构建更新确保文档永远与代码同步。4. 实操全流程从零搭建一个可交付的数据科学自动化流水线4.1 环境初始化5分钟完成生产就绪环境第一步不是写代码而是搭环境。我们用标准化脚本setup_env.py#!/usr/bin/env python3 生产环境初始化脚本 - 创建venv - 安装生产依赖 - 初始化日志目录 - 创建配置模板 import os import subprocess import sys def setup_production_env(): # 1. 创建隔离环境 subprocess.run([sys.executable, -m, venv, venv_prod], checkTrue) # 2. 升级pip并安装生产依赖 pip_path os.path.join(venv_prod, bin, pip) if os.name ! nt else os.path.join(venv_prod, Scripts, pip.exe) subprocess.run([pip_path, install, --upgrade, pip], checkTrue) subprocess.run([pip_path, install, -r, requirements/prod.txt], checkTrue) # 3. 创建日志和输出目录 for dir_path in [logs, output, charts, models]: os.makedirs(dir_path, exist_okTrue) # 4. 生成配置模板 if not os.path.exists(config/local.env): with open(config/local.env, w) as f: f.write(# 数据库配置\nDB_HOSTlocalhost\nDB_USERdev\nDB_PASSWORDdev\n) print(✅ 生产环境初始化完成\n 激活命令source venv_prod/bin/activate (Linux/Mac) 或 venv_prod\\Scripts\\activate (Windows)) if __name__ __main__: setup_production_env()执行python setup_env.py5分钟内得到一个干净、隔离、可审计的环境。所有路径硬编码不脚本用os.path.dirname(__file__)动态获取项目根目录确保在任意路径下运行都正确。4.2 数据获取模块健壮拉取容忍上游作妖核心函数fetch_data.pyimport pandas as pd import logging from tenacity import retry, stop_after_attempt, wait_exponential from typing import Optional, Dict, Any logger logging.getLogger(__name__) retry( stopstop_after_attempt(3), waitwait_exponential(multiplier1, min1, max10), reraiseTrue ) def fetch_user_behavior( start_date: str, end_date: str, source: str mysql # 支持 mysql, s3, api ) - pd.DataFrame: 从指定源拉取用户行为数据自动处理常见异常 logger.info(fetching_user_behavior, start_datestart_date, end_dateend_date, sourcesource) try: if source mysql: # 使用SQLAlchemy连接池自动管理 from sqlalchemy import create_engine engine create_engine( fmysqlpymysql://{os.getenv(DB_USER)}:{os.getenv(DB_PASSWORD)}{os.getenv(DB_HOST)}/{os.getenv(DB_NAME)}, pool_pre_pingTrue, # 每次使用前检查连接有效性 pool_recycle3600 # 连接1小时后自动回收 ) # 关键SQL中用参数化查询防注入 query SELECT user_id, event_type, event_time, page_url FROM user_events WHERE event_time BETWEEN %s AND %s df pd.read_sql(query, engine, params[start_date, end_date]) elif source s3: import boto3 s3 boto3.client(s3) obj s3.get_object(Bucketdata-lake, Keyfraw/events/{start_date}.parquet) df pd.read_parquet(obj[Body]) # 统一后处理确保关键字段存在 for col in [user_id, event_time]: if col not in df.columns: logger.warning(fMissing column {col}, filling with default) df[col] None if col user_id else pd.NaT logger.info(fetch_success, record_countlen(df)) return df except Exception as e: logger.error(fetch_failed, errorstr(e), tracebacktraceback.format_exc()) raise # 使用示例 if __name__ __main__: df fetch_user_behavior(2024-06-01, 2024-06-07) print(fFetched {len(df)} records)实操心得pool_pre_pingTrue是MySQL连接池的救命稻草——它会在每次取连接时执行SELECT 1自动剔除超时失效的连接避免“Connection reset by peer”错误。没有它半夜任务大概率失败。4.3 特征工程模块可复现、可审计、可增量feature_engineering.py不写死逻辑而是用配置驱动import pandas as pd from typing import List, Dict, Any import yaml class FeatureEngineer: def __init__(self, config_path: str config/features.yaml): with open(config_path) as f: self.config yaml.safe_load(f) def apply_features(self, df: pd.DataFrame, date: str) - pd.DataFrame: 根据配置应用所有特征 result_df df.copy() # 时间窗口特征 for window in self.config.get(time_windows, []): window_days window[days] agg_funcs window[aggregations] # 计算滚动统计 rolling df.groupby(user_id)[order_amount].rolling(window_days).agg(agg_funcs) for func_name in agg_funcs: result_df[forder_amount_{func_name}_{window_days}d] rolling[func_name].values # 分类编码 for col in self.config.get(categorical_columns, []): if col in df.columns: # 用target encoding避免未来信息泄露 target_mean df.groupby(col)[is_churn].mean() result_df[f{col}_target_enc] df[col].map(target_mean).fillna(target_mean.mean()) return result_df # 配置文件 features.yaml time_windows: - days: 7 aggregations: [mean, sum] - days: 30 aggregations: [mean, max] categorical_columns: - device_type - region 这样新增一个7天平均订单量特征只需改配置无需动代码。apply_features()函数还接受date参数确保所有时间窗口计算基于该日期杜绝“用未来数据训练”的陷阱。4.4 模型训练与评估一次运行三重验证train_model.py脚本import joblib import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report, roc_auc_score from datetime import datetime import os def train_and_evaluate( train_data: pd.DataFrame, val_data: pd.DataFrame, test_data: pd.DataFrame, model_params: Dict[str, Any], output_dir: str models ) - Dict[str, Any]: 训练模型并执行三重验证 timestamp datetime.now().strftime(%Y%m%d_%H%M%S) model_path os.path.join(output_dir, frf_{timestamp}.pkl) # 1. 训练 X_train, y_train train_data.drop(is_churn, axis1), train_data[is_churn] model RandomForestClassifier(**model_params) model.fit(X_train, y_train) # 2. 验证集评估 X_val, y_val val_data.drop(is_churn, axis1), val_data[is_churn] y_val_pred model.predict(X_val) val_report classification_report(y_val, y_val_pred, output_dictTrue) # 3. 测试集评估最终性能 X_test, y_test test_data.drop(is_churn, axis1), test_data[is_churn] y_test_pred model.predict(X_test) test_auc roc_auc_score(y_test, model.predict_proba(X_test)[:, 1]) # 4. 保存模型和评估报告 joblib.dump(model, model_path) report { model_path: model_path, timestamp: timestamp, val_metrics: val_report, test_auc: test_auc, feature_importance: dict(zip(X_train.columns, model.feature_importances_)) } # 5. 保存为JSON报告便于后续分析 import json with open(os.path.join(output_dir, freport_{timestamp}.json), w) as f: json.dump(report, f, indent2, defaultstr) return report # 使用 if __name__ __main__: train_df pd.read_parquet(data/train.parquet) val_df pd.read_parquet(data/val.parquet) test_df pd.read_parquet(data/test.parquet) report train_and_evaluate( train_df, val_df, test_df, model_params{n_estimators: 100, max_depth: 8} ) print(fModel saved to {report[model_path]}) print(fTest AUC: {report[test_auc]:.4f})关键细节defaultstr确保datetime等类型能被JSON序列化feature_importance直接保存避免后续再加载模型计算报告JSON包含完整元数据支持用jq批量分析模型演进。4.5 流水线编排用Python函数链代替DAG图pipeline.py是整个自动化的心脏from datetime import datetime, timedelta import logging from typing import Dict, Any logger logging.getLogger(__name__) def run_daily_pipeline(date: str None) - Dict[str, Any]: 执行完整日更流水线 if date is None: date (datetime.now() - timedelta(days1)).strftime(%Y-%m-%d) logger.info(pipeline_start, datedate) try: # 步骤1数据获取 logger.info(step_start, stepfetch_data) raw_data fetch_user_behavior(date, date) logger.info(step_success, stepfetch_data, countlen(raw_data)) # 步骤2数据质量检查 logger.info(step_start, stepdata_quality) quality_report run_data_quality_check(raw_data, datedate) if not quality_report[passed]: raise RuntimeError(fData quality failed: {quality_report[issues]}) logger.info(step_success, stepdata_quality, issuesquality_report[issues]) # 步骤3特征工程 logger.info(step_start, stepfeature_engineering) fe FeatureEngineer() features_df fe.apply_features(raw_data, datedate) logger.info(step_success, stepfeature_engineering, colslist(features_df.columns)) # 步骤4模型预测使用最新模型 logger.info(step_start, stepmodel_prediction) predictions predict_with_model(features_df.to_dict(records)) logger.info(step_success, stepmodel_prediction, countlen(predictions)) # 步骤5生成报告 logger.info(step_start, stepreport_generation)