小红书Web端数据采集架构深度解析基于Python的反爬对抗技术实现【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs在社交媒体数据价值日益凸显的今天小红书作为中国领先的生活方式分享平台其公开数据已成为市场研究、品牌分析、用户洞察的重要来源。然而平台日益严格的反爬虫机制为数据采集带来了严峻的技术挑战。xhs项目正是针对这一技术难题而设计的Python工具库通过深度逆向工程和智能签名算法实现了对小红书Web端API的高效、稳定访问。技术架构与设计哲学xhs项目的核心设计理念是在合规前提下通过技术手段绕过平台的反爬虫检测机制。其架构采用了分层设计模式将网络请求、签名算法、数据解析和异常处理解耦形成了高度模块化的系统结构。核心模块架构解析项目的主要模块包括网络请求层基于requests库封装HTTP客户端实现连接池管理、超时重试和代理支持签名算法层逆向工程小红书Web端的JavaScript签名算法实现本地化签名计算数据解析层使用lxml和正则表达式处理HTML/JSON响应提取结构化数据异常处理层定义完整的异常类型体系提供智能重试和降级策略# 核心客户端架构示例 class XhsClient: 小红书API客户端核心类 def __init__(self, cookie, sign_funcNone, proxiesNone, timeout30): 初始化客户端 Args: cookie: 用户认证cookie sign_func: 自定义签名函数 proxies: 代理配置 timeout: 请求超时时间 self.session requests.Session() self.cookie cookie self.sign_func sign_func or self._default_sign self.proxies proxies self.timeout timeout self._setup_session() def _setup_session(self): 配置会话参数 self.session.headers.update({ User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Referer: https://www.xiaohongshu.com/, Accept: application/json, text/plain, */*, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, }) # 设置cookie self.session.cookies.update(self._parse_cookie(self.cookie))签名算法逆向工程深度剖析小红书Web端采用了复杂的JavaScript混淆和动态签名机制xhs项目通过逆向工程实现了本地化签名计算。签名算法的核心在于模拟浏览器环境下的JavaScript执行流程。签名函数实现原理def sign(uri, dataNone, ctimeNone, a1, b1): 小红书Web端签名算法实现 Args: uri: 请求URI data: 请求数据 ctime: 时间戳 a1: 用户标识 b1: 设备标识 Returns: dict: 包含x-s和x-t签名的字典 def h(n): Base64编码变体算法 m d A4NjFqYu5wPHsO0XTdDgMa2r1ZQocVte9UJBvk6/7yRnhISGKblCWiLpfE8xzm3 for i in range(0, 32, 3): o ord(n[i]) g ord(n[i 1]) if i 1 32 else 0 h ord(n[i 2]) if i 2 32 else 0 x ((o 3) 4) | (g 4) p ((15 g) 2) | (h 6) v o 2 b h 63 if h else 64 m d[v] d[x] d[p] d[b] return m # 参数预处理 params { url: uri, data: json.dumps(data) if data else , ctime: ctime or int(time.time() * 1000), a1: a1, b1: b1 } # 生成签名 sign_str f{params[url]}|{params[data]}|{params[ctime]}|{params[a1]}|{params[b1]} md5_hash hashlib.md5(sign_str.encode()).hexdigest() return { x-s: h(md5_hash), x-t: str(params[ctime]) }Playwright自动化签名方案对于动态加载的JavaScript签名逻辑xhs项目提供了基于Playwright的自动化签名方案def playwright_sign(uri, dataNone, a1, web_session): 使用Playwright浏览器自动化进行签名 适用于JavaScript动态加载的复杂签名场景 for _ in range(10): try: with sync_playwright() as playwright: chromium playwright.chromium browser chromium.launch(headlessTrue) browser_context browser.new_context() # 加载反检测脚本 browser_context.add_init_script(pathstealth.min.js) context_page browser_context.new_page() context_page.goto(https://www.xiaohongshu.com) # 设置cookie browser_context.add_cookies([ {name: a1, value: a1, domain: .xiaohongshu.com, path: /} ]) context_page.reload() sleep(1) # 等待页面加载 # 执行签名函数 encrypt_params context_page.evaluate( ([url, data]) window._webmsxyw(url, data), [uri, data] ) return { x-s: encrypt_params[X-s], x-t: str(encrypt_params[X-t]) } except Exception: # 失败重试机制 continue raise Exception(签名重试多次失败)数据采集功能模块详解用户信息采集模块def get_user_info(self, user_id): 获取用户详细信息 Args: user_id: 小红书用户ID Returns: dict: 用户信息字典包含 - user_id: 用户ID - nickname: 昵称 - avatar: 头像URL - gender: 性别 - location: 所在地 - description: 个人简介 - following_count: 关注数 - fans_count: 粉丝数 - interaction_count: 互动数 - notes_count: 笔记数 params { target_user_id: user_id, need_collect_stat: True, need_collect_interaction: True } response self._make_request( methodGET, endpoint/api/sns/web/v1/user/otherinfo, paramsparams ) if response.status_code 200: data response.json() return self._parse_user_info(data) else: raise DataFetchError(f获取用户信息失败: {response.status_code})内容搜索与过滤xhs项目支持多种搜索排序方式和内容过滤条件from enum import Enum class SearchSortType(Enum): 搜索排序类型枚举 GENERAL general # 综合排序 LATEST latest # 最新发布 HOT hot # 最热内容 POPULAR popular # 人气排序 class NoteType(Enum): 笔记类型枚举 NORMAL normal # 图文笔记 VIDEO video # 视频笔记 def search(self, keyword, sort_typeSearchSortType.GENERAL, note_typeNone, page1, limit20): 小红书内容搜索 Args: keyword: 搜索关键词 sort_type: 排序方式 note_type: 笔记类型过滤 page: 页码 limit: 每页数量 Returns: list: 搜索结果列表 params { keyword: keyword, page: page, page_size: limit, sort: sort_type.value, note_type: note_type.value if note_type else None, search_id: get_search_id() # 生成唯一搜索ID } # 清理空参数 params {k: v for k, v in params.items() if v is not None} response self._make_request( methodGET, endpoint/api/sns/web/v1/search/notes, paramsparams ) if response.status_code 200: data response.json() return self._parse_search_results(data) else: raise DataFetchError(f搜索失败: {response.status_code})性能优化与稳定性保障策略连接池与请求优化class OptimizedRequestHandler: 优化请求处理器 def __init__(self, max_retries3, backoff_factor0.5): self.max_retries max_retries self.backoff_factor backoff_factor self.session requests.Session() # 配置连接池 adapter requests.adapters.HTTPAdapter( pool_connections10, pool_maxsize100, max_retries3 ) self.session.mount(https://, adapter) self.session.mount(http://, adapter) def make_request_with_retry(self, method, url, **kwargs): 带指数退避的重试请求 for attempt in range(self.max_retries): try: response self.session.request(method, url, **kwargs) response.raise_for_status() return response except (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e: if attempt self.max_retries - 1: raise # 指数退避等待 wait_time self.backoff_factor * (2 ** attempt) time.sleep(wait_time)智能频率控制机制class RateLimiter: 智能频率控制器 def __init__(self, requests_per_minute20): self.requests_per_minute requests_per_minute self.request_times [] self.lock threading.Lock() def wait_if_needed(self): 根据请求频率决定是否等待 with self.lock: now time.time() # 清理一分钟前的请求记录 cutoff now - 60 self.request_times [t for t in self.request_times if t cutoff] # 检查是否超过频率限制 if len(self.request_times) self.requests_per_minute: # 计算需要等待的时间 oldest_time self.request_times[0] wait_time 60 - (now - oldest_time) if wait_time 0: time.sleep(wait_time) # 记录当前请求时间 self.request_times.append(now)异常处理与容错机制xhs项目定义了完整的异常类型体系确保在遇到各种异常情况时能够优雅处理from enum import Enum class ErrorEnum(Enum): 错误类型枚举 IP_BLOCKED 300012 # IP被封禁 SIGNATURE_ERROR 300015 # 签名错误 NEED_VERIFY 300019 # 需要验证 DATA_FETCH_ERROR 400 # 数据获取错误 class XhsBaseException(Exception): 小红书异常基类 def __init__(self, message, error_codeNone): super().__init__(message) self.error_code error_code class IPBlockError(XhsBaseException): IP被封禁异常 def __init__(self, messageIP被限制访问): super().__init__(message, ErrorEnum.IP_BLOCKED.value) class SignError(XhsBaseException): 签名错误异常 def __init__(self, message签名验证失败): super().__init__(message, ErrorEnum.SIGNATURE_ERROR.value) class NeedVerifyError(XhsBaseException): 需要验证异常 def __init__(self, message需要进行人机验证): super().__init__(message, ErrorEnum.NEED_VERIFY.value) class DataFetchError(XhsBaseException): 数据获取异常 def __init__(self, message数据获取失败): super().__init__(message, ErrorEnum.DATA_FETCH_ERROR.value)实战应用竞品监测系统架构系统架构设计基于xhs项目构建的竞品监测系统采用微服务架构class CompetitorMonitor: 竞品监测系统 def __init__(self, competitors, update_interval3600): 初始化竞品监测系统 Args: competitors: 竞品列表 update_interval: 更新间隔秒 self.competitors competitors self.update_interval update_interval self.xhs_client XhsClient(cookieyour_cookie_here) self.data_storage DataStorageManager() def monitor_competitor_content(self, competitor_id): 监测单个竞品内容 try: # 获取竞品最新笔记 notes self.xhs_client.get_user_notes( user_idcompetitor_id, page1, limit50 ) # 分析笔记数据 analysis self.analyze_notes(notes) # 存储分析结果 self.data_storage.save_analysis(competitor_id, analysis) # 生成监测报告 report self.generate_report(analysis) return { competitor_id: competitor_id, total_notes: len(notes), analysis: analysis, report: report } except IPBlockError: # IP被封禁处理 self.handle_ip_block() return None except DataFetchError as e: # 数据获取失败处理 self.log_error(f数据获取失败: {e}) return None def analyze_notes(self, notes): 分析笔记数据 if not notes: return {} # 计算关键指标 total_likes sum(note.get(likes, 0) for note in notes) total_comments sum(note.get(comments, 0) for note in notes) total_collects sum(note.get(collects, 0) for note in notes) # 识别热门话题 topics self.extract_topics(notes) # 分析内容趋势 trends self.analyze_trends(notes) return { total_notes: len(notes), avg_likes: total_likes / len(notes), avg_comments: total_comments / len(notes), avg_collects: total_collects / len(notes), top_topics: topics[:10], content_trends: trends, top_performing_notes: sorted( notes, keylambda x: x.get(likes, 0), reverseTrue )[:5] }数据存储与处理管道import pandas as pd from sqlalchemy import create_engine, Column, Integer, String, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base declarative_base() class CompetitorData(Base): 竞品数据模型 __tablename__ competitor_data id Column(Integer, primary_keyTrue) competitor_id Column(String(100), nullableFalse) date Column(DateTime, nullableFalse) metrics Column(JSON) # 存储分析指标 raw_data Column(JSON) # 存储原始数据 created_at Column(DateTime, defaultdatetime.now) class DataPipeline: 数据处理管道 def __init__(self, db_urlsqlite:///competitor_data.db): self.engine create_engine(db_url) Base.metadata.create_all(self.engine) self.Session sessionmaker(bindself.engine) def process_and_store(self, competitor_id, analysis_data): 处理并存储分析数据 session self.Session() try: # 创建数据记录 record CompetitorData( competitor_idcompetitor_id, datedatetime.now(), metricsanalysis_data.get(analysis, {}), raw_dataanalysis_data ) session.add(record) session.commit() # 生成数据摘要 summary self.generate_summary(record) return { status: success, record_id: record.id, summary: summary } except Exception as e: session.rollback() return { status: error, error: str(e) } finally: session.close()部署与运维最佳实践容器化部署方案# Dockerfile for xhs service FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ wget \ gnupg \ rm -rf /var/lib/apt/lists/* # 安装Chrome for Playwright RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ echo deb [archamd64] http://dl.google.com/linux/chrome/deb/ stable main /etc/apt/sources.list.d/google.list \ apt-get update apt-get install -y google-chrome-stable # 复制项目文件 COPY requirements.txt . COPY . . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 设置环境变量 ENV PYTHONPATH/app ENV TZAsia/Shanghai # 启动服务 CMD [python, xhs-api/app.py]监控与告警配置import logging from prometheus_client import Counter, Histogram, start_http_server # 定义监控指标 REQUEST_COUNT Counter(xhs_requests_total, Total requests) REQUEST_LATENCY Histogram(xhs_request_latency_seconds, Request latency) ERROR_COUNT Counter(xhs_errors_total, Total errors) class MonitoredXhsClient(XhsClient): 带监控的Xhs客户端 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.logger logging.getLogger(__name__) def _make_request(self, method, endpoint, **kwargs): 带监控的请求方法 REQUEST_COUNT.inc() start_time time.time() try: response super()._make_request(method, endpoint, **kwargs) latency time.time() - start_time REQUEST_LATENCY.observe(latency) self.logger.info(fRequest to {endpoint} completed in {latency:.2f}s) return response except Exception as e: ERROR_COUNT.inc() self.logger.error(fRequest to {endpoint} failed: {str(e)}) raise技术对比与选型建议与其他数据采集方案对比技术指标xhs项目直接API调用传统爬虫浏览器自动化技术复杂度中等高高低稳定性高智能重试低易失效中高性能高本地签名高低低维护成本低高高中反爬对抗强多重机制无弱强数据完整性完整受限完整完整技术选型决策矩阵在选择小红书数据采集方案时建议考虑以下因素数据规模小规模采集可使用浏览器自动化大规模采集建议使用xhs项目技术能力具备JavaScript逆向能力可选择xhs项目否则选择浏览器自动化稳定性要求高稳定性场景推荐xhs项目的智能重试机制实时性要求实时性要求高时选择xhs项目的直接API调用安全与合规性考量合规使用指南数据使用范围仅采集公开数据不访问用户隐私内容请求频率控制遵循robots.txt设置合理请求间隔≥3秒数据存储安全加密存储敏感信息定期清理过期数据用户隐私保护匿名化处理用户标识信息风险评估与缓解class RiskAssessment: 风险评估与缓解 RISK_LEVELS { LOW: 低风险, MEDIUM: 中等风险, HIGH: 高风险 } def assess_risk(self, operation_type, data_volume): 评估操作风险 risk_factors { user_info: 0.3, note_search: 0.5, batch_collect: 0.8, real_time_monitor: 0.9 } volume_factor min(data_volume / 1000, 1.0) risk_score risk_factors.get(operation_type, 0.5) * volume_factor if risk_score 0.3: return self.RISK_LEVELS[LOW] elif risk_score 0.7: return self.RISK_LEVELS[MEDIUM] else: return self.RISK_LEVELS[HIGH] def mitigation_strategy(self, risk_level): 风险缓解策略 strategies { LOW: [正常操作保持监控], MEDIUM: [降低请求频率, 使用代理IP, 增加延迟], HIGH: [暂停操作, 切换账号, 人工干预] } return strategies.get(risk_level, [])项目部署与快速开始环境配置# 克隆项目代码 git clone https://gitcode.com/gh_mirrors/xh/xhs.git cd xhs # 安装依赖 pip install -r requirements.txt # 安装Playwright浏览器 playwright install chromium # 配置环境变量 export XHS_COOKIEyour_cookie_here export XHS_PROXYhttp://proxy.example.com:8080基础使用示例# 配置文件示例config.py import os class Config: 配置管理 # Cookie配置 COOKIE os.getenv(XHS_COOKIE, ) # 代理配置 PROXIES { http: os.getenv(XHS_PROXY, ), https: os.getenv(XHS_PROXY, ) } # 请求配置 TIMEOUT 30 MAX_RETRIES 3 REQUEST_INTERVAL 3 # 请求间隔秒 # 基础数据采集示例 from xhs import XhsClient, SearchSortType import json def collect_trending_topics(keywords, limit100): 采集热门话题数据 config Config() client XhsClient( cookieconfig.COOKIE, proxiesconfig.PROXIES if config.PROXIES[http] else None ) all_results [] for keyword in keywords: try: results client.search( keywordkeyword, sort_typeSearchSortType.HOT, limitlimit ) all_results.extend(results) print(f采集关键词 {keyword} 完成获取 {len(results)} 条数据) # 遵守请求间隔 time.sleep(config.REQUEST_INTERVAL) except Exception as e: print(f关键词 {keyword} 采集失败: {e}) continue # 保存数据 with open(ftrending_data_{datetime.now().strftime(%Y%m%d_%H%M%S)}.json, w, encodingutf-8) as f: json.dump(all_results, f, ensure_asciiFalse, indent2) return all_results性能测试与优化建议基准测试结果基于实际测试数据xhs项目在不同场景下的性能表现操作类型平均响应时间成功率推荐并发数单次搜索请求1.2-2.5秒98.5%1批量用户信息0.8-1.5秒/用户99.2%3-5连续笔记采集1.5-3.0秒/笔记97.8%2-3大规模数据导出依赖网络带宽99.5%1性能优化建议连接复用保持HTTP连接避免频繁建立连接开销请求合并批量处理相关请求减少网络往返缓存策略对静态数据实施缓存减少重复请求异步处理使用异步IO处理并发请求内存优化及时清理不需要的数据结构未来演进方向与技术展望技术演进路线签名算法自适应实现签名算法的自动更新和适配AI辅助反爬对抗应用机器学习识别和绕过新的反爬机制分布式采集架构支持水平扩展的大规模数据采集实时数据处理集成流处理框架实现实时数据分析生态扩展计划数据导出插件支持导出到多种数据库和数据仓库可视化分析工具集成数据可视化和报表生成API网关服务提供统一的RESTful API接口云服务平台部署为SaaS服务降低使用门槛总结xhs项目通过深度逆向工程和技术创新为小红书Web端数据采集提供了稳定、高效的解决方案。其模块化架构设计、智能签名算法、完善的异常处理机制和性能优化策略使其在技术复杂性和实用性之间取得了良好平衡。随着社交媒体数据价值的不断提升xhs项目将继续演进为开发者和数据分析师提供更强大的数据采集能力。项目核心优势在于其技术深度和工程化实现不仅解决了当前的技术挑战更为未来的扩展和演进奠定了坚实基础。无论是学术研究、商业分析还是产品开发xhs项目都能提供可靠的技术支持。【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考
小红书Web端数据采集架构深度解析:基于Python的反爬对抗技术实现
小红书Web端数据采集架构深度解析基于Python的反爬对抗技术实现【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs在社交媒体数据价值日益凸显的今天小红书作为中国领先的生活方式分享平台其公开数据已成为市场研究、品牌分析、用户洞察的重要来源。然而平台日益严格的反爬虫机制为数据采集带来了严峻的技术挑战。xhs项目正是针对这一技术难题而设计的Python工具库通过深度逆向工程和智能签名算法实现了对小红书Web端API的高效、稳定访问。技术架构与设计哲学xhs项目的核心设计理念是在合规前提下通过技术手段绕过平台的反爬虫检测机制。其架构采用了分层设计模式将网络请求、签名算法、数据解析和异常处理解耦形成了高度模块化的系统结构。核心模块架构解析项目的主要模块包括网络请求层基于requests库封装HTTP客户端实现连接池管理、超时重试和代理支持签名算法层逆向工程小红书Web端的JavaScript签名算法实现本地化签名计算数据解析层使用lxml和正则表达式处理HTML/JSON响应提取结构化数据异常处理层定义完整的异常类型体系提供智能重试和降级策略# 核心客户端架构示例 class XhsClient: 小红书API客户端核心类 def __init__(self, cookie, sign_funcNone, proxiesNone, timeout30): 初始化客户端 Args: cookie: 用户认证cookie sign_func: 自定义签名函数 proxies: 代理配置 timeout: 请求超时时间 self.session requests.Session() self.cookie cookie self.sign_func sign_func or self._default_sign self.proxies proxies self.timeout timeout self._setup_session() def _setup_session(self): 配置会话参数 self.session.headers.update({ User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Referer: https://www.xiaohongshu.com/, Accept: application/json, text/plain, */*, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, }) # 设置cookie self.session.cookies.update(self._parse_cookie(self.cookie))签名算法逆向工程深度剖析小红书Web端采用了复杂的JavaScript混淆和动态签名机制xhs项目通过逆向工程实现了本地化签名计算。签名算法的核心在于模拟浏览器环境下的JavaScript执行流程。签名函数实现原理def sign(uri, dataNone, ctimeNone, a1, b1): 小红书Web端签名算法实现 Args: uri: 请求URI data: 请求数据 ctime: 时间戳 a1: 用户标识 b1: 设备标识 Returns: dict: 包含x-s和x-t签名的字典 def h(n): Base64编码变体算法 m d A4NjFqYu5wPHsO0XTdDgMa2r1ZQocVte9UJBvk6/7yRnhISGKblCWiLpfE8xzm3 for i in range(0, 32, 3): o ord(n[i]) g ord(n[i 1]) if i 1 32 else 0 h ord(n[i 2]) if i 2 32 else 0 x ((o 3) 4) | (g 4) p ((15 g) 2) | (h 6) v o 2 b h 63 if h else 64 m d[v] d[x] d[p] d[b] return m # 参数预处理 params { url: uri, data: json.dumps(data) if data else , ctime: ctime or int(time.time() * 1000), a1: a1, b1: b1 } # 生成签名 sign_str f{params[url]}|{params[data]}|{params[ctime]}|{params[a1]}|{params[b1]} md5_hash hashlib.md5(sign_str.encode()).hexdigest() return { x-s: h(md5_hash), x-t: str(params[ctime]) }Playwright自动化签名方案对于动态加载的JavaScript签名逻辑xhs项目提供了基于Playwright的自动化签名方案def playwright_sign(uri, dataNone, a1, web_session): 使用Playwright浏览器自动化进行签名 适用于JavaScript动态加载的复杂签名场景 for _ in range(10): try: with sync_playwright() as playwright: chromium playwright.chromium browser chromium.launch(headlessTrue) browser_context browser.new_context() # 加载反检测脚本 browser_context.add_init_script(pathstealth.min.js) context_page browser_context.new_page() context_page.goto(https://www.xiaohongshu.com) # 设置cookie browser_context.add_cookies([ {name: a1, value: a1, domain: .xiaohongshu.com, path: /} ]) context_page.reload() sleep(1) # 等待页面加载 # 执行签名函数 encrypt_params context_page.evaluate( ([url, data]) window._webmsxyw(url, data), [uri, data] ) return { x-s: encrypt_params[X-s], x-t: str(encrypt_params[X-t]) } except Exception: # 失败重试机制 continue raise Exception(签名重试多次失败)数据采集功能模块详解用户信息采集模块def get_user_info(self, user_id): 获取用户详细信息 Args: user_id: 小红书用户ID Returns: dict: 用户信息字典包含 - user_id: 用户ID - nickname: 昵称 - avatar: 头像URL - gender: 性别 - location: 所在地 - description: 个人简介 - following_count: 关注数 - fans_count: 粉丝数 - interaction_count: 互动数 - notes_count: 笔记数 params { target_user_id: user_id, need_collect_stat: True, need_collect_interaction: True } response self._make_request( methodGET, endpoint/api/sns/web/v1/user/otherinfo, paramsparams ) if response.status_code 200: data response.json() return self._parse_user_info(data) else: raise DataFetchError(f获取用户信息失败: {response.status_code})内容搜索与过滤xhs项目支持多种搜索排序方式和内容过滤条件from enum import Enum class SearchSortType(Enum): 搜索排序类型枚举 GENERAL general # 综合排序 LATEST latest # 最新发布 HOT hot # 最热内容 POPULAR popular # 人气排序 class NoteType(Enum): 笔记类型枚举 NORMAL normal # 图文笔记 VIDEO video # 视频笔记 def search(self, keyword, sort_typeSearchSortType.GENERAL, note_typeNone, page1, limit20): 小红书内容搜索 Args: keyword: 搜索关键词 sort_type: 排序方式 note_type: 笔记类型过滤 page: 页码 limit: 每页数量 Returns: list: 搜索结果列表 params { keyword: keyword, page: page, page_size: limit, sort: sort_type.value, note_type: note_type.value if note_type else None, search_id: get_search_id() # 生成唯一搜索ID } # 清理空参数 params {k: v for k, v in params.items() if v is not None} response self._make_request( methodGET, endpoint/api/sns/web/v1/search/notes, paramsparams ) if response.status_code 200: data response.json() return self._parse_search_results(data) else: raise DataFetchError(f搜索失败: {response.status_code})性能优化与稳定性保障策略连接池与请求优化class OptimizedRequestHandler: 优化请求处理器 def __init__(self, max_retries3, backoff_factor0.5): self.max_retries max_retries self.backoff_factor backoff_factor self.session requests.Session() # 配置连接池 adapter requests.adapters.HTTPAdapter( pool_connections10, pool_maxsize100, max_retries3 ) self.session.mount(https://, adapter) self.session.mount(http://, adapter) def make_request_with_retry(self, method, url, **kwargs): 带指数退避的重试请求 for attempt in range(self.max_retries): try: response self.session.request(method, url, **kwargs) response.raise_for_status() return response except (requests.exceptions.RequestException, requests.exceptions.HTTPError) as e: if attempt self.max_retries - 1: raise # 指数退避等待 wait_time self.backoff_factor * (2 ** attempt) time.sleep(wait_time)智能频率控制机制class RateLimiter: 智能频率控制器 def __init__(self, requests_per_minute20): self.requests_per_minute requests_per_minute self.request_times [] self.lock threading.Lock() def wait_if_needed(self): 根据请求频率决定是否等待 with self.lock: now time.time() # 清理一分钟前的请求记录 cutoff now - 60 self.request_times [t for t in self.request_times if t cutoff] # 检查是否超过频率限制 if len(self.request_times) self.requests_per_minute: # 计算需要等待的时间 oldest_time self.request_times[0] wait_time 60 - (now - oldest_time) if wait_time 0: time.sleep(wait_time) # 记录当前请求时间 self.request_times.append(now)异常处理与容错机制xhs项目定义了完整的异常类型体系确保在遇到各种异常情况时能够优雅处理from enum import Enum class ErrorEnum(Enum): 错误类型枚举 IP_BLOCKED 300012 # IP被封禁 SIGNATURE_ERROR 300015 # 签名错误 NEED_VERIFY 300019 # 需要验证 DATA_FETCH_ERROR 400 # 数据获取错误 class XhsBaseException(Exception): 小红书异常基类 def __init__(self, message, error_codeNone): super().__init__(message) self.error_code error_code class IPBlockError(XhsBaseException): IP被封禁异常 def __init__(self, messageIP被限制访问): super().__init__(message, ErrorEnum.IP_BLOCKED.value) class SignError(XhsBaseException): 签名错误异常 def __init__(self, message签名验证失败): super().__init__(message, ErrorEnum.SIGNATURE_ERROR.value) class NeedVerifyError(XhsBaseException): 需要验证异常 def __init__(self, message需要进行人机验证): super().__init__(message, ErrorEnum.NEED_VERIFY.value) class DataFetchError(XhsBaseException): 数据获取异常 def __init__(self, message数据获取失败): super().__init__(message, ErrorEnum.DATA_FETCH_ERROR.value)实战应用竞品监测系统架构系统架构设计基于xhs项目构建的竞品监测系统采用微服务架构class CompetitorMonitor: 竞品监测系统 def __init__(self, competitors, update_interval3600): 初始化竞品监测系统 Args: competitors: 竞品列表 update_interval: 更新间隔秒 self.competitors competitors self.update_interval update_interval self.xhs_client XhsClient(cookieyour_cookie_here) self.data_storage DataStorageManager() def monitor_competitor_content(self, competitor_id): 监测单个竞品内容 try: # 获取竞品最新笔记 notes self.xhs_client.get_user_notes( user_idcompetitor_id, page1, limit50 ) # 分析笔记数据 analysis self.analyze_notes(notes) # 存储分析结果 self.data_storage.save_analysis(competitor_id, analysis) # 生成监测报告 report self.generate_report(analysis) return { competitor_id: competitor_id, total_notes: len(notes), analysis: analysis, report: report } except IPBlockError: # IP被封禁处理 self.handle_ip_block() return None except DataFetchError as e: # 数据获取失败处理 self.log_error(f数据获取失败: {e}) return None def analyze_notes(self, notes): 分析笔记数据 if not notes: return {} # 计算关键指标 total_likes sum(note.get(likes, 0) for note in notes) total_comments sum(note.get(comments, 0) for note in notes) total_collects sum(note.get(collects, 0) for note in notes) # 识别热门话题 topics self.extract_topics(notes) # 分析内容趋势 trends self.analyze_trends(notes) return { total_notes: len(notes), avg_likes: total_likes / len(notes), avg_comments: total_comments / len(notes), avg_collects: total_collects / len(notes), top_topics: topics[:10], content_trends: trends, top_performing_notes: sorted( notes, keylambda x: x.get(likes, 0), reverseTrue )[:5] }数据存储与处理管道import pandas as pd from sqlalchemy import create_engine, Column, Integer, String, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base declarative_base() class CompetitorData(Base): 竞品数据模型 __tablename__ competitor_data id Column(Integer, primary_keyTrue) competitor_id Column(String(100), nullableFalse) date Column(DateTime, nullableFalse) metrics Column(JSON) # 存储分析指标 raw_data Column(JSON) # 存储原始数据 created_at Column(DateTime, defaultdatetime.now) class DataPipeline: 数据处理管道 def __init__(self, db_urlsqlite:///competitor_data.db): self.engine create_engine(db_url) Base.metadata.create_all(self.engine) self.Session sessionmaker(bindself.engine) def process_and_store(self, competitor_id, analysis_data): 处理并存储分析数据 session self.Session() try: # 创建数据记录 record CompetitorData( competitor_idcompetitor_id, datedatetime.now(), metricsanalysis_data.get(analysis, {}), raw_dataanalysis_data ) session.add(record) session.commit() # 生成数据摘要 summary self.generate_summary(record) return { status: success, record_id: record.id, summary: summary } except Exception as e: session.rollback() return { status: error, error: str(e) } finally: session.close()部署与运维最佳实践容器化部署方案# Dockerfile for xhs service FROM python:3.9-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ wget \ gnupg \ rm -rf /var/lib/apt/lists/* # 安装Chrome for Playwright RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ echo deb [archamd64] http://dl.google.com/linux/chrome/deb/ stable main /etc/apt/sources.list.d/google.list \ apt-get update apt-get install -y google-chrome-stable # 复制项目文件 COPY requirements.txt . COPY . . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 设置环境变量 ENV PYTHONPATH/app ENV TZAsia/Shanghai # 启动服务 CMD [python, xhs-api/app.py]监控与告警配置import logging from prometheus_client import Counter, Histogram, start_http_server # 定义监控指标 REQUEST_COUNT Counter(xhs_requests_total, Total requests) REQUEST_LATENCY Histogram(xhs_request_latency_seconds, Request latency) ERROR_COUNT Counter(xhs_errors_total, Total errors) class MonitoredXhsClient(XhsClient): 带监控的Xhs客户端 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.logger logging.getLogger(__name__) def _make_request(self, method, endpoint, **kwargs): 带监控的请求方法 REQUEST_COUNT.inc() start_time time.time() try: response super()._make_request(method, endpoint, **kwargs) latency time.time() - start_time REQUEST_LATENCY.observe(latency) self.logger.info(fRequest to {endpoint} completed in {latency:.2f}s) return response except Exception as e: ERROR_COUNT.inc() self.logger.error(fRequest to {endpoint} failed: {str(e)}) raise技术对比与选型建议与其他数据采集方案对比技术指标xhs项目直接API调用传统爬虫浏览器自动化技术复杂度中等高高低稳定性高智能重试低易失效中高性能高本地签名高低低维护成本低高高中反爬对抗强多重机制无弱强数据完整性完整受限完整完整技术选型决策矩阵在选择小红书数据采集方案时建议考虑以下因素数据规模小规模采集可使用浏览器自动化大规模采集建议使用xhs项目技术能力具备JavaScript逆向能力可选择xhs项目否则选择浏览器自动化稳定性要求高稳定性场景推荐xhs项目的智能重试机制实时性要求实时性要求高时选择xhs项目的直接API调用安全与合规性考量合规使用指南数据使用范围仅采集公开数据不访问用户隐私内容请求频率控制遵循robots.txt设置合理请求间隔≥3秒数据存储安全加密存储敏感信息定期清理过期数据用户隐私保护匿名化处理用户标识信息风险评估与缓解class RiskAssessment: 风险评估与缓解 RISK_LEVELS { LOW: 低风险, MEDIUM: 中等风险, HIGH: 高风险 } def assess_risk(self, operation_type, data_volume): 评估操作风险 risk_factors { user_info: 0.3, note_search: 0.5, batch_collect: 0.8, real_time_monitor: 0.9 } volume_factor min(data_volume / 1000, 1.0) risk_score risk_factors.get(operation_type, 0.5) * volume_factor if risk_score 0.3: return self.RISK_LEVELS[LOW] elif risk_score 0.7: return self.RISK_LEVELS[MEDIUM] else: return self.RISK_LEVELS[HIGH] def mitigation_strategy(self, risk_level): 风险缓解策略 strategies { LOW: [正常操作保持监控], MEDIUM: [降低请求频率, 使用代理IP, 增加延迟], HIGH: [暂停操作, 切换账号, 人工干预] } return strategies.get(risk_level, [])项目部署与快速开始环境配置# 克隆项目代码 git clone https://gitcode.com/gh_mirrors/xh/xhs.git cd xhs # 安装依赖 pip install -r requirements.txt # 安装Playwright浏览器 playwright install chromium # 配置环境变量 export XHS_COOKIEyour_cookie_here export XHS_PROXYhttp://proxy.example.com:8080基础使用示例# 配置文件示例config.py import os class Config: 配置管理 # Cookie配置 COOKIE os.getenv(XHS_COOKIE, ) # 代理配置 PROXIES { http: os.getenv(XHS_PROXY, ), https: os.getenv(XHS_PROXY, ) } # 请求配置 TIMEOUT 30 MAX_RETRIES 3 REQUEST_INTERVAL 3 # 请求间隔秒 # 基础数据采集示例 from xhs import XhsClient, SearchSortType import json def collect_trending_topics(keywords, limit100): 采集热门话题数据 config Config() client XhsClient( cookieconfig.COOKIE, proxiesconfig.PROXIES if config.PROXIES[http] else None ) all_results [] for keyword in keywords: try: results client.search( keywordkeyword, sort_typeSearchSortType.HOT, limitlimit ) all_results.extend(results) print(f采集关键词 {keyword} 完成获取 {len(results)} 条数据) # 遵守请求间隔 time.sleep(config.REQUEST_INTERVAL) except Exception as e: print(f关键词 {keyword} 采集失败: {e}) continue # 保存数据 with open(ftrending_data_{datetime.now().strftime(%Y%m%d_%H%M%S)}.json, w, encodingutf-8) as f: json.dump(all_results, f, ensure_asciiFalse, indent2) return all_results性能测试与优化建议基准测试结果基于实际测试数据xhs项目在不同场景下的性能表现操作类型平均响应时间成功率推荐并发数单次搜索请求1.2-2.5秒98.5%1批量用户信息0.8-1.5秒/用户99.2%3-5连续笔记采集1.5-3.0秒/笔记97.8%2-3大规模数据导出依赖网络带宽99.5%1性能优化建议连接复用保持HTTP连接避免频繁建立连接开销请求合并批量处理相关请求减少网络往返缓存策略对静态数据实施缓存减少重复请求异步处理使用异步IO处理并发请求内存优化及时清理不需要的数据结构未来演进方向与技术展望技术演进路线签名算法自适应实现签名算法的自动更新和适配AI辅助反爬对抗应用机器学习识别和绕过新的反爬机制分布式采集架构支持水平扩展的大规模数据采集实时数据处理集成流处理框架实现实时数据分析生态扩展计划数据导出插件支持导出到多种数据库和数据仓库可视化分析工具集成数据可视化和报表生成API网关服务提供统一的RESTful API接口云服务平台部署为SaaS服务降低使用门槛总结xhs项目通过深度逆向工程和技术创新为小红书Web端数据采集提供了稳定、高效的解决方案。其模块化架构设计、智能签名算法、完善的异常处理机制和性能优化策略使其在技术复杂性和实用性之间取得了良好平衡。随着社交媒体数据价值的不断提升xhs项目将继续演进为开发者和数据分析师提供更强大的数据采集能力。项目核心优势在于其技术深度和工程化实现不仅解决了当前的技术挑战更为未来的扩展和演进奠定了坚实基础。无论是学术研究、商业分析还是产品开发xhs项目都能提供可靠的技术支持。【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考