用qstock构建量化投研系统:从实时行情到策略回测的完整解决方案

用qstock构建量化投研系统:从实时行情到策略回测的完整解决方案 用qstock构建量化投研系统从实时行情到策略回测的完整解决方案【免费下载链接】qstockqstock由“Python金融量化”公众号开发试图打造成个人量化投研分析包目前包括数据获取data、可视化(plot)、选股(stock)和量化回测策略backtest模块。 qstock将为用户提供简洁的数据接口和规整化后的金融市场数据。可视化模块为用户提供基于web的交互图形的简单接口 选股模块提供了同花顺的选股数据和自定义选股包括RPS、MM趋势、财务指标、资金流模型等 回测模块为大家提供向量化基于pandas和基于事件驱动的基本框架和模型。 关注“Python金融量化“微信公众号获取更多应用信息。项目地址: https://gitcode.com/gh_mirrors/qs/qstock在金融量化分析领域数据获取往往是开发者面临的首要难题。传统的数据获取方式要么需要付费订阅商业数据源要么需要自行爬取和维护复杂的API接口这对于个人开发者和小型团队来说成本高昂且技术门槛不低。qstock作为一个开源的Python金融量化库正是为了解决这一痛点而生它为个人量化投研提供了一套完整的数据获取、可视化、选股和回测解决方案。实时行情数据的多维度应用场景对于量化交易者来说实时行情数据是策略执行的基础。qstock通过简洁的API设计让开发者能够轻松获取各类市场的实时数据。场景一监控市场整体态势在进行资产配置决策时我们需要了解不同市场的整体表现。qstock的realtime_data函数支持多达20多种市场类型的实时数据获取从沪深A股到可转债、期货、美股、港股等覆盖了主要的金融交易市场。import qstock as qs # 获取多市场实时行情对比分析 market_types [沪深A, 可转债, 创业板, 行业板块, 概念板块] market_data {} for market in market_types: try: df qs.realtime_data(marketmarket) # 计算市场整体涨跌情况 avg_change df[涨幅].mean() market_data[market] { total_stocks: len(df), avg_change: avg_change, rising_ratio: (df[涨幅] 0).sum() / len(df) } print(f{market}: 平均涨幅{avg_change:.2%}, 上涨比例{market_data[market][rising_ratio]:.2%}) except Exception as e: print(f获取{market}数据失败: {e}) # 找出当日表现最佳的市场 best_market max(market_data.items(), keylambda x: x[1][avg_change]) print(f\n今日表现最佳市场: {best_market[0]}, 平均涨幅: {best_market[1][avg_change]:.2%})这种多市场监控能力对于跨市场套利策略和资产轮动策略至关重要。通过实时跟踪不同市场的表现交易者可以及时调整仓位配置。场景二构建自定义股票池监控系统在量化交易中我们通常需要监控一组特定的股票。qstock允许我们灵活地创建和管理自定义股票池。import qstock as qs import pandas as pd from datetime import datetime class StockMonitor: def __init__(self, watchlist): 初始化股票监控器 watchlist: 监控股票列表可以是代码或名称 self.watchlist watchlist self.history_data {} def get_realtime_status(self): 获取实时状态 try: df qs.realtime_data(codeself.watchlist) return df except Exception as e: print(f获取实时数据失败: {e}) return None def check_price_alert(self, threshold_pct5): 检查价格预警 df self.get_realtime_status() if df is not None: alerts [] for _, row in df.iterrows(): change_pct row[涨幅] if abs(change_pct) threshold_pct: alerts.append({ 股票: row[名称], 代码: row[代码], 当前价: row[最新价], 涨跌幅: f{change_pct:.2%}, 时间: datetime.now().strftime(%H:%M:%S) }) return alerts return [] def calculate_technical_indicators(self, period60d): 计算技术指标 indicators {} for stock in self.watchlist: try: # 获取历史数据 hist_data qs.get_data(stock, startperiod) if not hist_data.empty: # 计算移动平均线 ma5 hist_data[close].rolling(5).mean().iloc[-1] ma20 hist_data[close].rolling(20).mean().iloc[-1] # 计算RSI delta hist_data[close].diff() gain (delta.where(delta 0, 0)).rolling(14).mean() loss (-delta.where(delta 0, 0)).rolling(14).mean() rs gain / loss rsi 100 - (100 / (1 rs)).iloc[-1] indicators[stock] { MA5: ma5, MA20: ma20, RSI: rsi, 趋势: 上涨 if ma5 ma20 else 下跌 } except Exception as e: print(f计算{stock}技术指标失败: {e}) return indicators # 使用示例 monitor StockMonitor([中国平安, 贵州茅台, 宁德时代, 东方财富]) alerts monitor.check_price_alert(threshold_pct3) if alerts: print(发现价格预警:) for alert in alerts: print(f{alert[股票]}({alert[代码]}): {alert[涨跌幅]}) indicators monitor.calculate_technical_indicators() print(\n技术指标分析:) for stock, ind in indicators.items(): print(f{stock}: MA5{ind[MA5]:.2f}, MA20{ind[MA20]:.2f}, RSI{ind[RSI]:.1f}, 趋势{ind[趋势]})高级数据集成基本面与技术面结合分析qstock的强大之处在于它不仅提供行情数据还集成了基本面数据、资金流向、财务指标等多维度信息为量化分析提供了全面的数据支持。基本面数据深度挖掘基本面分析是价值投资的核心qstock提供了丰富的财务数据接口import qstock as qs import pandas as pd def analyze_fundamentals(stock_list): 综合分析股票基本面 results [] for stock in stock_list: try: # 获取基本财务指标 basics qs.stock_basics(stock) if not basics.empty: # 获取详细财务指标 indicators qs.stock_indicator(stock) # 获取股东信息 holders qs.stock_holder_top10(stock, n1) # 获取主营业务构成 business qs.main_business(stock) stock_data { 股票: stock, 市盈率: basics.iloc[0][市盈率] if 市盈率 in basics.columns else N/A, 市净率: basics.iloc[0][市净率] if 市净率 in basics.columns else N/A, ROE: basics.iloc[0][ROE] if ROE in basics.columns else N/A, 毛利率: basics.iloc[0][毛利率] if 毛利率 in basics.columns else N/A, 股东数: len(holders) if holders is not None else 0, 主营业务: business[主营业务收入_主营构成][0] if not business.empty else N/A } results.append(stock_data) except Exception as e: print(f分析{stock}基本面失败: {e}) return pd.DataFrame(results) # 分析一组股票的基本面 stocks [中国平安, 贵州茅台, 招商银行, 宁德时代] fundamental_df analyze_fundamentals(stocks) print(基本面分析结果:) print(fundamental_df) # 筛选优质股票 def screen_quality_stocks(df, pe_threshold30, pb_threshold5, roe_threshold15): 筛选优质股票 quality_stocks df[ (df[市盈率] pe_threshold) (df[市净率] pb_threshold) (df[ROE] roe_threshold) ] return quality_stocks quality_stocks screen_quality_stocks(fundamental_df) print(\n优质股票筛选结果:) print(quality_stocks)资金流向分析与市场情绪判断资金流向是判断市场情绪的重要指标qstock提供了多层次资金流数据def analyze_money_flow(stock, days20): 分析股票资金流向 try: # 获取历史资金流向 hist_money qs.hist_money(stock) # 获取多周期资金流 multi_period qs.stock_money(stock, ndays[3, 5, 10, 20]) # 获取同花顺资金流数据 ths_money qs.ths_money(个股, ndays) analysis { 股票: stock, 近期净流入: hist_money[净流入].iloc[-5:].sum() if not hist_money.empty else 0, 主力净流入: multi_period[主力净流入_20日].iloc[-1] if not multi_period.empty else 0, 散户净流入: multi_period[散户净流入_20日].iloc[-1] if not multi_period.empty else 0, 资金强度: ths_money[ths_money[代码] stock][净额].iloc[0] if not ths_money.empty else 0 } # 判断资金流向趋势 if analysis[近期净流入] 0 and analysis[主力净流入] 0: analysis[趋势判断] 资金持续流入 elif analysis[近期净流入] 0 and analysis[主力净流入] 0: analysis[趋势判断] 资金持续流出 else: analysis[趋势判断] 资金分歧 return analysis except Exception as e: print(f分析{stock}资金流向失败: {e}) return None # 分析多只股票的资金流向 stocks_to_analyze [中国平安, 贵州茅台, 东方财富] for stock in stocks_to_analyze: result analyze_money_flow(stock) if result: print(f\n{stock}资金流向分析:) for key, value in result.items(): if key ! 股票: print(f {key}: {value})可视化分析从数据到洞察qstock内置的可视化模块基于pyecharts提供了丰富的图表类型帮助用户直观理解市场数据。K线图与趋势分析from qstock import plot import qstock as qs def analyze_stock_trend(stock_code, period90d): 分析股票趋势并可视化 # 获取历史数据 df qs.get_data(stock_code, startperiod) if df.empty: print(获取数据失败) return # 计算技术指标 df[MA5] df[close].rolling(5).mean() df[MA20] df[close].rolling(20).mean() df[MA60] df[close].rolling(60).mean() # 创建K线图 kline_chart plot.kline( df, mas5, # 短期均线 mal20, # 长期均线 titlef{stock_code} K线图与均线系统, notebookFalse # 保存为HTML文件 ) # 保存图表 kline_chart.render(f{stock_code}_kline.html) # 分析趋势信号 latest_close df[close].iloc[-1] ma5 df[MA5].iloc[-1] ma20 df[MA20].iloc[-1] ma60 df[MA60].iloc[-1] signals [] if latest_close ma5 ma20 ma60: signals.append(多头排列 - 强势上涨趋势) elif latest_close ma5 ma20 ma60: signals.append(空头排列 - 下跌趋势) elif ma5 ma20 and latest_close ma5: signals.append(短期看涨) elif ma5 ma20 and latest_close ma5: signals.append(短期看跌) return { 当前价格: latest_close, MA5: ma5, MA20: ma20, MA60: ma60, 技术信号: signals, 图表文件: f{stock_code}_kline.html } # 分析股票趋势 analysis analyze_stock_trend(000001) print(趋势分析结果:) for key, value in analysis.items(): print(f{key}: {value})行业板块热力图分析def analyze_industry_heatmap(): 分析行业板块热度 # 获取行业板块数据 industry_data qs.realtime_data(market行业板块) if industry_data.empty: print(获取行业数据失败) return # 筛选涨幅前20的行业 top_industries industry_data.nlargest(20, 涨幅)[[名称, 涨幅, 涨跌额]] # 创建热力图数据 heat_data [] for _, row in top_industries.iterrows(): # 根据涨幅大小确定颜色强度 if row[涨幅] 5: color_intensity 5 elif row[涨幅] 3: color_intensity 4 elif row[涨幅] 1: color_intensity 3 elif row[涨幅] 0: color_intensity 2 else: color_intensity 1 heat_data.append([row[名称], row[涨幅], color_intensity]) # 创建DataFrame heat_df pd.DataFrame(heat_data, columns[行业, 涨幅, 热度]) # 分析热点行业特征 hot_industries heat_df[heat_df[热度] 4][行业].tolist() return { 热点行业: hot_industries, 行业数据: heat_df, 平均涨幅: heat_df[涨幅].mean(), 上涨行业数: (heat_df[涨幅] 0).sum(), 下跌行业数: (heat_df[涨幅] 0).sum() } # 执行行业分析 industry_analysis analyze_industry_heatmap() print(行业热度分析:) print(f热点行业: {industry_analysis[热点行业]}) print(f平均涨幅: {industry_analysis[平均涨幅]:.2%}) print(f上涨行业数: {industry_analysis[上涨行业数]}) print(f下跌行业数: {industry_analysis[下跌行业数]})量化选股策略实现qstock的选股模块提供了多种选股模型结合技术指标和基本面数据可以构建复杂的选股策略。RPS相对强度选股def rps_screening(market沪深A, top_n50, period120d): RPS相对强度选股 # 获取市场所有股票 all_stocks qs.get_code(market) rps_scores {} for stock in tqdm(all_stocks[:100], desc计算RPS): # 限制数量用于演示 try: # 获取历史数据 df qs.get_data(stock, startperiod) if df.empty or len(df) 60: continue # 计算收益率 returns df[close].pct_change() # 计算20日、60日、120日收益率 ret_20d (df[close].iloc[-1] / df[close].iloc[-20] - 1) * 100 ret_60d (df[close].iloc[-1] / df[close].iloc[-60] - 1) * 100 ret_120d (df[close].iloc[-1] / df[close].iloc[-120] - 1) * 100 # 计算RPS分数加权平均 rps_score ret_20d * 0.5 ret_60d * 0.3 ret_120d * 0.2 # 获取股票名称 name qs.realtime_data(codestock)[名称].iloc[0] if not qs.realtime_data(codestock).empty else stock rps_scores[name] { 代码: stock, RPS: rps_score, 20日涨幅: ret_20d, 60日涨幅: ret_60d, 120日涨幅: ret_120d } except Exception as e: continue # 按RPS排序 sorted_stocks sorted(rps_scores.items(), keylambda x: x[1][RPS], reverseTrue) return sorted_stocks[:top_n] # 执行RPS选股 top_rps_stocks rps_screening(top_n20) print(RPS选股结果:) for i, (name, data) in enumerate(top_rps_stocks, 1): print(f{i:2d}. {name}({data[代码]}): RPS{data[RPS]:.1f}, 20日{data[20日涨幅]:.1f}%)多因子综合选股策略def multi_factor_screening(market沪深A, min_pe0, max_pe50, min_roe10, min_growth15): 多因子综合选股 qualified_stocks [] # 获取市场股票 market_stocks qs.get_code(market)[:200] # 限制数量 for stock in tqdm(market_stocks, desc多因子筛选): try: # 获取基本面数据 basics qs.stock_basics(stock) if basics.empty: continue # 获取实时数据 realtime qs.realtime_data(codestock) if realtime.empty: continue # 因子条件判断 pe basics.iloc[0][市盈率] if 市盈率 in basics.columns else float(inf) roe basics.iloc[0][ROE] if ROE in basics.columns else 0 pb basics.iloc[0][市净率] if 市净率 in basics.columns else float(inf) # 技术面因子 hist_data qs.get_data(stock, start60d) if hist_data.empty or len(hist_data) 20: continue ma20 hist_data[close].rolling(20).mean().iloc[-1] current_price realtime[最新价].iloc[0] price_vs_ma (current_price - ma20) / ma20 * 100 # 筛选条件 if (min_pe pe max_pe and roe min_roe and pb 5 and # 市净率小于5 price_vs_ma -10): # 价格在20日均线附近 stock_info { 名称: realtime[名称].iloc[0], 代码: stock, 市盈率: pe, ROE: roe, 市净率: pb, 价格相对MA20: f{price_vs_ma:.1f}%, 当前价格: current_price, 综合得分: (20/pe if pe 0 else 0) roe/10 (100 - pb*10) } qualified_stocks.append(stock_info) except Exception as e: continue # 按综合得分排序 qualified_stocks.sort(keylambda x: x[综合得分], reverseTrue) return qualified_stocks[:50] # 执行多因子选股 selected_stocks multi_factor_screening() print(f筛选出{len(selected_stocks)}只符合条件的股票:) for i, stock in enumerate(selected_stocks[:10], 1): print(f{i:2d}. {stock[名称]}({stock[代码]}): fPE{stock[市盈率]:.1f}, ROE{stock[ROE]:.1f}%, fPB{stock[市净率]:.1f}, 得分{stock[综合得分]:.1f})回测框架应用策略验证与优化qstock的回测模块提供了向量化和事件驱动两种回测框架帮助用户验证交易策略的有效性。基于海龟交易法则的回测from qstock.backtest import turtle def backtest_turtle_strategy(stock_code, start_date2023-01-01, end_date2023-12-31, initial_capital100000, position_ratio0.1): 海龟交易策略回测 # 获取历史数据 df qs.get_data(stock_code, startstart_date, endend_date) if df.empty: print(获取数据失败) return None # 准备数据格式 data df[[open, high, low, close, volume]].copy() data.columns [open, high, low, close, volume] # 创建海龟交易策略实例 strategy turtle.TurtleStrategy( datadata, initial_capitalinitial_capital, position_ratioposition_ratio, atr_period20, # ATR周期 entry_period20, # 入场信号周期 exit_period10 # 出场信号周期 ) # 运行回测 results strategy.run_backtest() # 分析回测结果 performance { 最终净值: results[total_value].iloc[-1], 总收益率: (results[total_value].iloc[-1] / initial_capital - 1) * 100, 年化收益率: strategy.annual_return * 100, 最大回撤: strategy.max_drawdown * 100, 夏普比率: strategy.sharpe_ratio, 胜率: strategy.win_rate * 100, 交易次数: len(results[results[position] ! 0]), 平均持仓周期: results[hold_days].mean() if hold_days in results.columns else 0 } return results, performance # 执行回测 results, performance backtest_turtle_strategy(000001) if results is not None: print(海龟策略回测结果:) for key, value in performance.items(): print(f{key}: {value:.2f} if isinstance(value, float) else f{key}: {value}) # 可视化回测结果 import matplotlib.pyplot as plt fig, axes plt.subplots(2, 1, figsize(12, 10)) # 价格和交易信号 axes[0].plot(results.index, results[close], label收盘价, linewidth1) buy_signals results[results[signal] 1] sell_signals results[results[signal] -1] axes[0].scatter(buy_signals.index, buy_signals[close], colorred, marker^, s100, label买入信号) axes[0].scatter(sell_signals.index, sell_signals[close], colorgreen, markerv, s100, label卖出信号) axes[0].set_title(价格与交易信号) axes[0].legend() axes[0].grid(True) # 净值曲线 axes[1].plot(results.index, results[total_value], label投资组合净值, linewidth2) axes[1].axhline(y100000, colorgray, linestyle--, label初始资金) axes[1].set_title(净值曲线) axes[1].legend() axes[1].grid(True) plt.tight_layout() plt.savefig(turtle_strategy_backtest.png, dpi300, bbox_inchestight) plt.show()性能优化与最佳实践1. 数据缓存策略频繁调用API会降低程序性能合理的缓存策略可以显著提升效率import hashlib import pickle import os from datetime import datetime, timedelta class QStockCache: qstock数据缓存管理器 def __init__(self, cache_dir./qstock_cache, ttl_minutes30): self.cache_dir cache_dir self.ttl timedelta(minutesttl_minutes) os.makedirs(cache_dir, exist_okTrue) def _get_cache_key(self, func_name, *args, **kwargs): 生成缓存键 key_str f{func_name}_{args}_{kwargs} return hashlib.md5(key_str.encode()).hexdigest() def _get_cache_path(self, key): 获取缓存文件路径 return os.path.join(self.cache_dir, f{key}.pkl) def get_cached_data(self, func_name, *args, **kwargs): 获取缓存数据 cache_key self._get_cache_key(func_name, *args, **kwargs) cache_path self._get_cache_path(cache_key) if os.path.exists(cache_path): # 检查缓存是否过期 mtime datetime.fromtimestamp(os.path.getmtime(cache_path)) if datetime.now() - mtime self.ttl: with open(cache_path, rb) as f: return pickle.load(f) return None def set_cache_data(self, data, func_name, *args, **kwargs): 设置缓存数据 cache_key self._get_cache_key(func_name, *args, **kwargs) cache_path self._get_cache_path(cache_key) with open(cache_path, wb) as f: pickle.dump(data, f) def cached_call(self, func, *args, **kwargs): 带缓存的函数调用 func_name func.__name__ cached_data self.get_cached_data(func_name, *args, **kwargs) if cached_data is not None: return cached_data # 调用原函数 result func(*args, **kwargs) # 缓存结果 self.set_cache_data(result, func_name, *args, **kwargs) return result # 使用示例 cache_manager QStockCache() # 带缓存的函数调用 def get_cached_realtime_data(market沪深A, codeNone): return cache_manager.cached_call(qs.realtime_data, marketmarket, codecode) # 第一次调用会从API获取并缓存 data1 get_cached_realtime_data(market沪深A) print(f第一次获取数据数据量: {len(data1)}) # 第二次调用会使用缓存30分钟内 data2 get_cached_realtime_data(market沪深A) print(f第二次获取数据使用缓存数据量: {len(data2)})2. 批量数据处理优化import concurrent.futures from functools import partial def batch_get_stock_data(stock_list, func, max_workers10): 批量获取股票数据多线程优化 results {} # 创建线程池 with concurrent.futures.ThreadPoolExecutor(max_workersmax_workers) as executor: # 提交任务 future_to_stock { executor.submit(func, stock): stock for stock in stock_list } # 收集结果 for future in concurrent.futures.as_completed(future_to_stock): stock future_to_stock[future] try: result future.result(timeout10) results[stock] result except Exception as e: print(f获取{stock}数据失败: {e}) results[stock] None return results # 使用示例 stock_list [000001, 000002, 000858, 600519, 300750] # 批量获取实时数据 realtime_results batch_get_stock_data( stock_list, partial(qs.realtime_data, codeNone), # 注意这里需要调整参数 max_workers5 ) # 批量获取基本面数据 basic_results batch_get_stock_data( stock_list, qs.stock_basics, max_workers5 ) print(f成功获取{len([v for v in realtime_results.values() if v is not None])}只股票的实时数据) print(f成功获取{len([v for v in basic_results.values() if v is not None])}只股票的基本面数据)3. 错误处理与重试机制from retry import retry import time import logging # 配置日志 logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) class RobustQStock: 增强版的qstock客户端包含错误处理和重试机制 def __init__(self, max_retries3, retry_delay1): self.max_retries max_retries self.retry_delay retry_delay retry(tries3, delay1, backoff2) def safe_realtime_data(self, market沪深A, codeNone): 安全的实时数据获取 try: return qs.realtime_data(marketmarket, codecode) except Exception as e: logger.error(f获取实时数据失败: {e}) if code: logger.error(f股票代码: {code}) raise def get_data_with_fallback(self, stock_code, start_date, end_dateNone): 带降级策略的数据获取 strategies [ self._try_get_data_direct, # 直接获取 self._try_get_data_chunk, # 分块获取 self._try_get_data_alternative # 替代方案 ] for i, strategy in enumerate(strategies): try: logger.info(f尝试策略 {i1}: {strategy.__name__}) result strategy(stock_code, start_date, end_date) if result is not None and not result.empty: logger.info(f策略 {i1} 成功) return result except Exception as e: logger.warning(f策略 {i1} 失败: {e}) time.sleep(self.retry_delay) logger.error(所有策略都失败了) return None def _try_get_data_direct(self, stock_code, start_date, end_date): 直接获取数据 return qs.get_data(stock_code, startstart_date, endend_date) def _try_get_data_chunk(self, stock_code, start_date, end_date): 分块获取数据应对大数据量 # 如果数据量太大可以分时间段获取 if end_date: # 计算时间差 from datetime import datetime start_dt datetime.strptime(start_date, %Y%m%d) end_dt datetime.strptime(end_date, %Y%m%d) days_diff (end_dt - start_dt).days if days_diff 365: # 如果超过1年分块获取 chunks [] current_start start_dt while current_start end_dt: current_end min( current_start timedelta(days180), end_dt ) chunk qs.get_data( stock_code, startcurrent_start.strftime(%Y%m%d), endcurrent_end.strftime(%Y%m%d) ) chunks.append(chunk) current_start current_end timedelta(days1) if chunks: return pd.concat(chunks) return None def _try_get_data_alternative(self, stock_code, start_date, end_date): 替代方案使用其他数据源或简化版本 # 这里可以实现降级逻辑比如使用缓存数据或简化查询 logger.warning(f使用降级方案获取 {stock_code} 数据) # 返回简化版本或None return None # 使用示例 robust_client RobustQStock() try: # 安全获取数据 data robust_client.safe_realtime_data(market沪深A) print(f成功获取{len(data)}条数据) # 带降级策略的数据获取 hist_data robust_client.get_data_with_fallback(000001, 20230101, 20231231) if hist_data is not None: print(f获取到{len(hist_data)}条历史数据) except Exception as e: logger.error(f最终失败: {e})实战案例构建智能监控系统结合qstock的各项功能我们可以构建一个完整的智能监控系统import schedule import time from datetime import datetime import pandas as pd class IntelligentMonitor: 智能监控系统 def __init__(self, config): self.config config self.alert_history [] self.qs_cache QStockCache() self.robust_client RobustQStock() def monitor_market_overview(self): 监控市场概况 try: # 获取主要市场数据 markets [沪深A, 创业板, 科创板, 北A] overview {} for market in markets: data self.robust_client.safe_realtime_data(marketmarket) if not data.empty: overview[market] { total: len(data), up: (data[涨幅] 0).sum(), down: (data[涨幅] 0).sum(), avg_change: data[涨幅].mean(), turnover: data[换手率].mean() } # 分析市场情绪 total_up sum([v[up] for v in overview.values()]) total_down sum([v[down] for v in overview.values()]) market_sentiment 乐观 if total_up total_down * 1.5 else 谨慎 if total_up total_down else 悲观 return { timestamp: datetime.now(), overview: overview, sentiment: market_sentiment, total_up: total_up, total_down: total_down } except Exception as e: logger.error(f监控市场概况失败: {e}) return None def monitor_watchlist(self, watchlist): 监控自选股 alerts [] for stock in watchlist: try: # 获取实时数据 realtime self.robust_client.safe_realtime_data(codestock) if realtime.empty: continue current_price realtime[最新价].iloc[0] change_pct realtime[涨幅].iloc[0] turnover realtime[换手率].iloc[0] # 检查预警条件 if abs(change_pct) self.config[price_alert_threshold]: alerts.append({ type: 价格预警, stock: stock, price: current_price, change: change_pct, time: datetime.now() }) if turnover self.config[turnover_alert_threshold]: alerts.append({ type: 换手率预警, stock: stock, turnover: turnover, time: datetime.now() }) except Exception as e: logger.error(f监控股票{stock}失败: {e}) return alerts def analyze_sector_rotation(self): 分析板块轮动 try: # 获取行业板块数据 industry_data self.robust_client.safe_realtime_data(market行业板块) concept_data self.robust_client.safe_realtime_data(market概念板块) if industry_data.empty or concept_data.empty: return None # 找出强势板块 strong_industries industry_data.nlargest(5, 涨幅) strong_concepts concept_data.nlargest(5, 涨幅) # 分析资金流向 industry_money qs.ths_money(行业, n5) concept_money qs.ths_money(概念, n5) return { strong_industries: strong_industries[[名称, 涨幅]].to_dict(records), strong_concepts: strong_concepts[[名称, 涨幅]].to_dict(records), industry_money_flow: industry_money.head(10).to_dict(records) if not industry_money.empty else [], concept_money_flow: concept_money.head(10).to_dict(records) if not concept_money.empty else [] } except Exception as e: logger.error(f分析板块轮动失败: {e}) return None def generate_daily_report(self): 生成每日报告 report { date: datetime.now().strftime(%Y-%m-%d), market_overview: self.monitor_market_overview(), sector_analysis: self.analyze_sector_rotation(), alerts: [] } # 保存报告 report_file fdaily_report_{datetime.now().strftime(%Y%m%d)}.json with open(report_file, w, encodingutf-8) as f: import json json.dump(report, f, ensure_asciiFalse, indent2, defaultstr) return report_file # 配置监控系统 config { price_alert_threshold: 5, # 价格波动超过5%预警 turnover_alert_threshold: 10, # 换手率超过10%预警 watchlist: [中国平安, 贵州茅台, 宁德时代, 招商银行, 东方财富], monitor_interval_minutes: 5 } # 创建监控实例 monitor IntelligentMonitor(config) # 定时任务 def monitoring_job(): print(f\n[{datetime.now().strftime(%H:%M:%S)}] 执行监控任务) # 监控市场概况 market_status monitor.monitor_market_overview() if market_status: print(f市场情绪: {market_status[sentiment]}) print(f上涨股票: {market_status[total_up]}, 下跌股票: {market_status[total_down]}) # 监控自选股 alerts monitor.monitor_watchlist(config[watchlist]) if alerts: print(f发现{alerts}个预警) for alert in alerts: print(f预警: {alert[type]} - {alert[stock]}) # 分析板块轮动 sector_info monitor.analyze_sector_rotation() if sector_info: print(强势行业:, [ind[名称] for ind in sector_info[strong_industries]]) print(强势概念:, [concept[名称] for concept in sector_info[strong_concepts]]) # 设置定时任务示例 schedule.every(5).minutes.do(monitoring_job) print(智能监控系统已启动按CtrlC停止) try: while True: schedule.run_pending() time.sleep(1) except KeyboardInterrupt: print(\n监控系统已停止)总结与展望qstock作为一个开源的量化投研工具包其价值不仅在于提供了丰富的金融数据接口更在于它构建了一个完整的量化分析工作流。从数据获取、处理分析到可视化展示和策略回测qstock为个人开发者和量化研究员提供了一个高效、易用的解决方案。qstock的核心优势数据全面性覆盖A股、港股、美股、期货、债券、基金等多个市场提供行情、基本面、资金流、财务数据等多维度信息。接口简洁性统一的API设计几行代码即可完成复杂的数据获取任务大幅降低开发门槛。生态完整性集成了数据获取、可视化、选股、回测等多个模块形成完整的量化分析闭环。开源可扩展基于Python生态可以轻松与其他数据分析库如pandas、numpy、scikit-learn等集成构建更复杂的分析模型。最佳实践建议合理使用缓存对于实时性要求不高的数据使用缓存机制减少API调用频率。错误处理机制金融数据获取具有不稳定性完善的错误处理和重试机制是生产环境必备。数据质量验证对获取的数据进行基本的质量检查如空值处理、异常值检测等。性能优化对于批量数据处理使用多线程或异步IO提升效率。合规性考虑注意数据使用的合规性特别是商业用途时需遵守相关数据使用协议。随着金融科技的发展个人量化投研的需求日益增长。qstock这样的开源工具降低了量化分析的门槛让更多开发者能够参与到金融数据分析中来。无论是学术研究、投资决策还是策略开发qstock都提供了一个强大而灵活的基础平台。通过本文介绍的实战应用我们可以看到qstock不仅是一个数据获取工具更是一个完整的量化分析框架。合理利用其提供的各种功能结合自己的投资理念和策略逻辑可以构建出符合个人需求的智能投研系统。【免费下载链接】qstockqstock由“Python金融量化”公众号开发试图打造成个人量化投研分析包目前包括数据获取data、可视化(plot)、选股(stock)和量化回测策略backtest模块。 qstock将为用户提供简洁的数据接口和规整化后的金融市场数据。可视化模块为用户提供基于web的交互图形的简单接口 选股模块提供了同花顺的选股数据和自定义选股包括RPS、MM趋势、财务指标、资金流模型等 回测模块为大家提供向量化基于pandas和基于事件驱动的基本框架和模型。 关注“Python金融量化“微信公众号获取更多应用信息。项目地址: https://gitcode.com/gh_mirrors/qs/qstock创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考