开源巴西金融市场数据引擎:从数据管道到REST API的量化投资实践

开源巴西金融市场数据引擎:从数据管道到REST API的量化投资实践 1. 项目概述从零构建一个巴西金融市场数据引擎如果你也研究过巴西的股票和房地产投资信托基金你肯定和我一样被获取结构化数据这件事折磨过。市面上要么是付费API价格不菲要么是零散的CSV文件数据清洗和整合的工作量巨大。这个项目b3-data-fetcher就是在这种“受够了”的情绪下诞生的。它的核心目标很简单打造一个完全开源、可以本地部署的数据管道和API服务让你能自由、透明地分析和筛选巴西B3交易所的资产。简单来说它干了两件事第一作为数据管道它自动抓取并处理来自巴西证券交易委员会和雅虎财经的官方数据第二作为一个计算引擎它基于一套自定义的投资启发式算法对资产进行评分和排序。最终它会通过一个REST API把这些处理好的、富含洞见的数据提供给你。整个过程不依赖任何外部付费服务数据源完全公开透明你甚至可以自己验证每一个计算步骤。这不仅仅是另一个数据抓取脚本而是一个旨在成为个人量化投资分析基石的“数据引擎”。2. 核心架构与设计思路拆解2.1 为什么选择“数据管道 API”的模式在项目初期我评估过几种方案。最简单的是写一个一次性脚本跑完输出CSV。但这样每次分析都要重新运行无法实时交互。另一种是直接做一个带界面的桌面应用但这限制了自动化集成和二次开发的可能性。最终选择“数据管道 REST API”的架构主要基于以下几点考量解耦与复用性数据获取、清洗、计算逻辑管道部分与数据呈现、服务接口API部分分离。这意味着你可以只使用管道部分将处理好的数据导入你自己的数据库或分析工具也可以直接调用API快速构建前端仪表盘或移动应用。自动化与可扩展性管道可以配置为定时任务例如每天收盘后自动更新确保数据新鲜度。API层则可以方便地添加缓存、认证、限流等企业级功能。生态友好REST API是当前服务间通信的事实标准几乎任何编程语言或工具都能轻松调用极大降低了使用门槛。这个架构的核心思想是将脏活、累活数据清洗、映射、计算在后台一次性做好并通过一个干净、稳定的接口提供价值。2.2 技术栈选型背后的逻辑项目当前的技术栈是PythonPoetryFastAPI并规划了PostgreSQL和Redis。每一个选择都有其具体原因Python在金融数据分析和原型快速开发领域Python拥有无与伦比的生态优势。pandas用于数据清洗和计算yfinance用于获取股价requests处理网络请求这些库成熟且高效。Poetry相比传统的requirements.txt和virtualenvPoetry提供了更强大的依赖管理和打包能力。它能精确锁定依赖版本确保项目在任何机器上都能以完全一致的环境运行这对于数据项目的可复现性至关重要。FastAPI作为API框架FastAPI的性能与NodeJS和Go的框架相当且自动生成交互式API文档OpenAPI的特性对于项目的可用性和社区推广有巨大帮助。其基于Python类型提示的声明式接口编写方式也让代码更清晰、错误更少。SQLite → PostgreSQL开发初期使用SQLite因为它无需额外服务开箱即用适合原型验证。但SQLite在高并发写入和复杂连接查询上存在瓶颈。因此路线图中规划迁移到PostgreSQL这是一个功能强大、稳定可靠的开源关系数据库完全能满足生产环境的需求。Redis金融数据虽然变化但在分钟甚至小时级别内是相对静态的。将处理后的资产列表、指标计算结果缓存到Redis中可以极大减轻数据库压力和计算开销将API响应时间从秒级降低到毫秒级。这个技术栈组合在开发效率、运行性能、可维护性和社区支持上取得了很好的平衡。3. 数据管道核心细节与实现解析数据管道是整个项目的心脏它负责从原始、杂乱的CSV文件到干净、可分析的结构化数据的全过程。这个过程远比一个简单的pd.read_csv复杂。3.1 多源数据整合的挑战与解决方案项目需要处理两个主要数据源CVM巴西证券交易委员会的官方基金报表和雅虎财经的实时股价。最大的挑战在于数据关联CVM数据通过CNPJ巴西的公司税号标识基金而雅虎财经使用股票代码。两者之间没有直接的、稳定的公共映射表。我的解决方案是引入一个自定义的映射文件(ativos.txt)。这个文件的格式很简单PVBI11, 12.345.678/0001-90 HGLG11, 98.765.432/0001-21 ...第一列是代码第二列是CNPJ。这个文件需要手动维护和更新虽然听起来有点“糙”但在当前阶段是最可靠、最可控的方式。未来可以考虑从B3官网或其他可靠来源定期爬取这个映射关系来实现自动化。实操心得在构建数据管道时不要追求一步到位的全自动化。优先保证核心流程数据获取、清洗、计算的畅通对于像“代码-CNPJ映射”这类边界但关键的问题可以采用“半自动人工校验”的过渡方案。一个可工作的、部分手动的系统远比一个设计完美但无法运行的“全自动”系统有价值。3.2 核心指标的计算逻辑与财务意义管道不只是合并数据更重要的是根据投资逻辑计算衍生指标。当前实现的启发式算法围绕几个核心指标展开每股净资产值直接从CVM的fii_complemento等CSV中提取并计算。它代表了每份基金份额对应的底层资产净值是评估基金价值的基础。市净率计算公式为当前股价 / 每股净资产值。这是判断资产是否“便宜”的关键指标。P/VP 1 通常意味着股价低于其净资产可能被低估反之则可能被高估。股息率这里计算的是基于净资产值的月度股息率。首先从CVM数据中获取月度分红金额然后计算月度分红 / 每股净资产值。这个指标反映了基金以其净资产为基础产生现金流的能力。理论月度股息计算公式为每股净资产值 × 月度股息率。这个值可以理解为在当前净资产水平下基金“应该”能分出的红利金额是一个理论参考值。机会分数这是整个启发式算法的核心。一个简单的版本可以是(股息率 * 权重A) - (P/VP * 权重B)。通过调整权重你可以表达自己的投资偏好是更看重高股息还是更看重低估值这个分数用于对所有资产进行排序分数越高代表综合投资机会越好。# 简化的计算示例 (非完整代码) import pandas as pd def calculate_metrics(df_cvm, df_yahoo, mapping_dict): df_cvm: 包含CNPJ, vp_cota (每股净资产), dy_mes (月度股息率) 的DataFrame df_yahoo: 包含ticker, price (当前价) 的DataFrame mapping_dict: {ticker: cnpj} 的映射字典 # 1. 通过映射字典合并数据 df_yahoo[cnpj] df_yahoo[ticker].map(mapping_dict) df_merged pd.merge(df_cvm, df_yahoo, oncnpj, howinner) # 2. 计算核心指标 df_merged[p_vp] df_merged[price] / df_merged[vp_cota] df_merged[dividend_r] df_merged[vp_cota] * df_merged[dy_mes] # 3. 计算简单的机会分数 (示例更看重低P/VP和高DY) # 对P/VP进行归一化值越小越好对DY进行归一化值越大越好 df_merged[p_vp_norm] (df_merged[p_vp].max() - df_merged[p_vp]) / (df_merged[p_vp].max() - df_merged[p_vp].min()) df_merged[dy_norm] (df_merged[dy_mes] - df_merged[dy_mes].min()) / (df_merged[dy_mes].max() - df_merged[dy_mes].min()) df_merged[opportunity_score] 0.6 * df_merged[dy_norm] 0.4 * df_merged[p_vp_norm] # 4. 排序 df_merged df_merged.sort_values(byopportunity_score, ascendingFalse) return df_merged[[ticker, vp_cota, dy_mes, price, p_vp, dividend_r, opportunity_score]]3.3 缓存策略平衡数据新鲜度与性能金融数据并非每秒都在剧烈变化。频繁地从雅虎财经拉取数据可能触发限流而重复计算所有指标也是一种浪费。因此我实现了一个简单的文件缓存系统 (cache.mouras)。缓存逻辑如下检查缓存程序运行时首先检查缓存文件是否存在以及其创建时间。判断时效通过环境变量CACHE_TTL_MINUTES例如设为30设置缓存有效时间。如果缓存文件存在且未过期则直接加载缓存数据。更新缓存如果缓存不存在或已过期则执行完整的数据获取、处理和计算流程并将结果序列化如用JSON或Pickle格式保存到缓存文件中同时更新文件时间戳。这个策略确保了在缓存有效期内API的响应是瞬时的而一旦数据“变旧”系统会自动更新。未来迁移到Redis后这个机制会更高效并可以支持更复杂的缓存失效策略。4. 从脚本到服务FastAPI 的实现与演进当前项目是一个命令行脚本输出结果到终端。下一步的核心演进就是将其封装成HTTP API服务让数据能够被远程、结构化地访问。4.1 构建最小可行API使用FastAPI构建一个基础端点非常简单。首先我们需要将数据处理逻辑模块化。假设我们有一个data_processor.py模块里面有一个get_processed_assets()函数它返回我们之前计算好的DataFrame或字典列表。# main.py (演进为FastAPI应用入口) from fastapi import FastAPI, Query from typing import Optional, List import data_processor app FastAPI(titleB3 Data Fetcher API, description开源巴西金融市场数据API) app.get(/assets) async def get_assets( asset_type: Optional[str] Query(None, description过滤资产类型如 FII), max_p_vp: Optional[float] Query(None, descriptionP/VP最大值例如 1.0), min_dy: Optional[float] Query(None, description股息率最小值例如 0.005 (0.5%)) ): 获取处理后的资产列表支持基础过滤。 # 调用数据处理模块 assets data_processor.get_processed_assets() # 应用过滤器 filtered_assets assets if asset_type: # 假设数据中有 type 字段 filtered_assets [a for a in filtered_assets if a.get(type) asset_type] if max_p_vp is not None: filtered_assets [a for a in filtered_assets if a.get(p_vp, float(inf)) max_p_vp] if min_dy is not None: filtered_assets [a for a in filtered_assets if a.get(dy_mes, 0) min_dy] return {count: len(filtered_assets), assets: filtered_assets}这个/assets端点已经具备了实用价值。通过查询参数前端可以轻松实现“只显示P/VP小于1的FII”这样的筛选功能。FastAPI会自动为这个端点生成交互式API文档通常位于/docs极大方便了API的测试和使用。4.2 数据结构化与数据库集成目前数据可能存在于内存或临时文件中。为了持久化、历史查询和更高效的数据操作引入数据库是必然的。路线图规划了从SQLite到PostgreSQL的迁移。数据库模型设计思路资产表存储资产基本信息如代码、CNPJ、名称、类型。这是维度表。数据快照表这是核心事实表。每天或每次数据更新为每个资产插入一条记录包含当时的时间戳、股价、净资产值、股息率、计算出的P/VP、机会分数等所有指标。映射表独立存储代码-CNPJ映射关系方便维护更新。这样的设计允许你查询任意资产在某个时间点的所有数据。计算指标的历史趋势例如P/VP在过去一年的变化。高效地进行复杂的跨资产、跨时间段的筛选和聚合查询。当API收到请求时它将不再直接调用data_processor进行实时计算而是转换为对数据库的查询。数据处理管道则作为一个独立的后台进程或定时任务定期从源头获取新数据计算后写入数据库。4.3 性能优化引入缓存层即使有了数据库一些高频、静态的查询比如“当前机会分数最高的10个FII”仍然值得缓存。这就是Redis的用武之地。集成Redis后的API流程会变成这样收到请求如GET /assets?typeFIIlimit10sort-opportunity_score。根据请求参数生成一个唯一的缓存键例如assets:FII:top10:score。首先检查Redis中是否存在这个键。如果存在且未过期直接返回缓存的值。如果Redis中不存在则查询数据库获取结果。将查询结果序列化后存入Redis并设置一个合理的过期时间例如60秒。返回结果。这个简单的“旁路缓存”策略对于读多写少的金融数据查询场景能带来数量级的性能提升。FastAPI社区有成熟的库如aioredis可以方便地集成这一功能。5. 部署、运维与未来扩展5.1 本地开发与生产部署项目使用Poetry管理依赖这使得环境一致性得到保障。对于生产部署一个常见的做法是使用Docker容器化。一个简单的Dockerfile可能如下所示FROM python:3.11-slim WORKDIR /app # 复制依赖定义文件 COPY pyproject.toml poetry.lock ./ # 安装Poetry和项目依赖 RUN pip install poetry poetry config virtualenvs.create false poetry install --no-dev --no-interaction --no-ansi # 复制应用代码 COPY . . # 运行FastAPI应用 CMD [uvicorn, main:app, --host, 0.0.0.0, --port, 8000]结合docker-compose.yml可以轻松定义并启动包含API服务、PostgreSQL数据库和Redis缓存在内的完整应用栈。云部署平台如Render、Railway或Google Cloud Run都原生支持Docker部署过程会非常顺畅。5.2 监控、日志与错误处理一个健壮的服务离不开可观测性。在FastAPI中可以集成像structlog这样的结构化日志库将不同级别的日志INFO、WARNING、ERROR输出到控制台或日志文件并包含请求ID、用户标识等上下文信息便于问题追踪。对于错误处理FastAPI有强大的异常处理机制。你需要为可能出现的错误定义清晰的HTTP状态码和错误信息例如422 Unprocessable Entity请求参数验证失败。503 Service Unavailable上游数据源如雅虎财经暂时不可用或数据处理管道出错。404 Not Found请求的特定资产不存在。全局的异常处理器可以确保任何未捕获的异常都不会直接暴露给用户而是返回一个通用的500 Internal Server Error和相关的请求ID方便后台排查。5.3 项目的未来可能性这个项目的边界远不止于当前的功能。它的开源和模块化设计为社区贡献和个性化扩展留下了巨大空间更多数据源除了CVM和雅虎财经可以集成B3的官方数据接口、宏观经济指标、甚至社交媒体情绪数据。更复杂的策略当前是单一的启发式算法。未来可以设计成“策略插件”模式允许用户选择或上传自己的评分算法例如加入动量因子、质量因子等。实时通知结合消息队列如Celery RabbitMQ当某个资产的指标达到用户预设的阈值时如P/VP跌破0.8自动发送邮件或App推送。前端仪表盘基于React或Vue.js构建一个轻量级前端可视化展示资产排名、历史趋势对比、投资组合模拟等。社区化数据维护建立一个机制让社区用户共同维护和校验“代码-CNPJ”映射文件甚至贡献数据清洗规则。这个项目的终极愿景是成为一个由社区驱动、透明、可信赖的巴西金融市场数据基础设施。它始于一个简单的个人需求但通过清晰的架构和开源协作完全有潜力成长为一个对更多投资者、开发者和研究者都有价值的工具。