前言单机同步爬虫受限于串行请求逻辑面对海量目标 URL 采集场景时请求排队阻塞、CPU 与网络资源利用率低下单进程单日爬取量级难以突破万级多进程改造又存在任务重复分配、任务状态无记录、节点宕机丢失未完成任务等缺陷。依托 aiohttp 协程高并发特性搭配 Redis 消息队列构建分布式爬虫调度架构能够实现任务统一集中下发、多爬虫节点并行消费、失败任务回滚重入队列从架构层面拆分调度中心与采集工作节点突破单机性能上限。本文围绕 Redis 任务队列数据结构选型、生产者任务入库逻辑、aiohttp 异步消费者协程池管控、异常任务回写重试、多节点分布式防重复抓取等核心模块展开落地区分普通待爬队列、失败死信队列、去重缓存集合三类 Redis 存储单元附带完整可运行工程代码、架构参数选型对照表完成从任务生产、分发、消费、异常回收全链路工程化落地。本文所需依赖官方文档链接aiohttp异步 HTTP 请求核心库实现高并发页面采集redis-pyRedis 客户端实现任务队列读写与去重管控asyncioPython 内置协程调度管理异步任务池ujson高性能序列化库任务信息快速序列化存储一、分布式异步爬虫架构分层与存储设计1.1 三层架构分工明细整套架构划分为任务生产者层、Redis 中间存储层、异步消费者层三层解耦可独立部署扩容表格架构分层核心职责部署特性生产者层批量生成待爬 URL、组装任务附加参数、任务入队、初始化去重集合独立部署调度服务可定时从文件 / 数据库批量生成任务Redis 中间层存储待爬任务队列、失败死信任务、已爬取 URL 去重集合、爬虫运行配置参数独立 Redis 服务支持多节点爬虫共享数据源消费者层aiohttp 协程批量拉取任务、页面数据采集、数据入库、异常任务回写死信队列多台服务器横向新增节点即可提升整体爬虫吞吐1.2 Redis 多数据结构任务存储规划为实现任务生命周期全管控拆分三类存储容器规避任务重复入队、重复爬取List 结构spider:task:wait待爬任务主队列左侧入队、右侧阻塞弹出天然 FIFO 先进先出List 结构spider:task:dead死信失败队列超过重试阈值的异常任务存入等待人工二次下发Set 结构spider:url:done已完成 URL 去重集合存入采集成功的 URL入队前先校验集合杜绝重复任务生成。1.3 核心配置常量预定义python运行# Redis连接配置 REDIS_HOST 127.0.0.1 REDIS_PORT 6379 REDIS_DB 0 # 队列键名 QUEUE_WAIT spider:task:wait QUEUE_DEAD spider:task:dead SET_DONE spider:url:done # 爬虫配置 MAX_CONCURRENT 20 # 单实例最大协程并发数 MAX_RETRY 3 # 单任务最大重试次数二、环境依赖安装与基础客户端初始化bash运行pip install aiohttp redis ujson2.1 同步 Redis 生产者客户端、异步 Redis 消费者客户端区分生产者批量写入任务使用同步 redis 客户端消费者异步环境采用 aioredis 实现非阻塞队列读取避免同步 Redis 阻塞协程运行python运行import redis import ujson # 同步生产者Redis sync_redis redis.Redis(hostREDIS_HOST,portREDIS_PORT,dbREDIS_DB,decode_responsesTrue)三、任务生产者模块批量生成任务与防重复入队实现3.1 生产者底层原理生产者读取目标 URL 清单每条任务封装字典结构url、retry_count、extra_params入队前校验 URL 是否存在于已爬集合不存在则序列化后 LPUSH 推入待爬队列实现源头去重避免无效任务占用爬虫资源。3.2 生产者完整代码实现python运行class TaskProducer: staticmethod def generate_task(url_list:list,extra:dictNone): 批量生成任务并入队 extra_info extra if extra else {} insert_count 0 for url in url_list: # 去重校验已爬URL直接跳过 if sync_redis.sismember(SET_DONE,url): continue # 组装任务体初始化重试次数0 task_body { url:url, retry_count:0, extra:extra_info } # ujson序列化 task_str ujson.dumps(task_body) sync_redis.rpush(QUEUE_WAIT,task_str) insert_count 1 print(f成功入库任务数{insert_count}) # 生产者测试调用 if __name__ __main__: test_urls [ https://httpbin.org/get?page1, https://httpbin.org/get?page2, https://httpbin.org/get?page3 ] TaskProducer.generate_task(test_urls,extra{source:test_task,cate:demo})3.3 生产者优化细节选用 ujson 替换原生 json 序列化大批量任务生成时序列化速度提升 40% 以上使用 Set 天然去重特性相比数据库查询去重耗时大幅缩减。四、aiohttp 异步消费者协程池实现4.1 消费者核心运行逻辑单个协程从 Redis 阻塞式 RPOP 拉取任务无任务时阻塞等待减少无效轮询aiohttp 发起异步请求页面解析提取目标数据请求成功URL 写入已完成 Set 集合数据落地存储请求失败重试计数 1未达上限重新推入待爬队列超出上限存入死信队列。4.2 异步爬虫消费者完整代码python运行import asyncio import aiohttp import ujson import aioredis class AsyncTaskConsumer: def __init__(self): self.session None self.redis_conn None self.headers { User-Agent:Mozilla/5.0 Windows Chrome/120.0.0.0 Safari/537.36 } async def init_resource(self): 初始化异步会话与异步Redis连接 self.session aiohttp.ClientSession(connectoraiohttp.TCPConnector(limitMAX_CONCURRENT)) self.redis_conn await aioredis.from_url(fredis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB},decode_responsesTrue) async def close_resource(self): 关闭连接释放资源 await self.session.close() await self.redis_conn.close() async def crawl_single_task(self,task_info:dict): 单任务采集逻辑 url task_info[url] try: async with self.session.get(url,headersself.headers,timeoutaiohttp.ClientTimeout(total8)) as resp: content await resp.text() # 模拟数据入库生产环境替换mysql/mongo写入逻辑 print(f采集成功:{url},页面长度:{len(content)}) # 成功标记URL为已完成 await self.redis_conn.sadd(SET_DONE,url) return True except Exception as e: print(f采集失败{url},异常:{str(e)}) task_info[retry_count] 1 # 未超限重入待爬队列否则死信 if task_info[retry_count] MAX_RETRY: await self.redis_conn.rpush(QUEUE_WAIT,ujson.dumps(task_info)) else: await self.redis_conn.rpush(QUEUE_DEAD,ujson.dumps(task_info)) return False async def consumer_worker(self): 单个消费协程循环拉取任务 while True: # blpop阻塞弹出等待超时3秒 task_raw await self.redis_conn.blpop(QUEUE_WAIT,timeout3) if not task_raw: continue task_str task_raw[1] task_dict ujson.loads(task_str) await self.crawl_single_task(task_dict) async def run_consumer_pool(self,worker_numMAX_CONCURRENT): 启动协程消费池 await self.init_resource() task_list [asyncio.create_task(self.consumer_worker()) for _ in range(worker_num)] await asyncio.gather(*task_list) # 消费者启动入口 async def main(): consumer AsyncTaskConsumer() await consumer.run_consumer_pool() if __name__ __main__: asyncio.run(main())4.3 协程消费关键原理拆解blpop阻塞队列队列无任务时协程休眠不循环空跑占用 CPU是消息队列经典消费方案TCPConnector (limit) 管控并发限制单实例最大同时 TCP 连接避免瞬间海量请求触发目标站点风控失败分级处理动态变更任务重试字段实现阶梯式重试杜绝无效死循环重试。五、多节点分布式拓展与防重复消费方案5.1 多节点横向扩容规则新服务器仅需部署同一份消费者代码配置指向同一个远程 Redis 地址即可接入集群生产者与所有消费者共用一套任务队列新增节点自动分摊剩余任务无需修改生产者代码。5.2 分布式重复消费规避方案表格方案实现逻辑适用场景URL 前置去重生产者入队前 Set 校验 URL常规资讯、商品列表爬虫任务消费临时锁Redis Set 标记正在消费任务完成后删除风控严苛、耗时较长的详情页爬虫临时锁补充代码片段python运行# 消费开始上锁 lock_key flock:{task_dict[url]} if await self.redis_conn.sismember(spider:lock:doing,lock_key): continue await self.redis_conn.sadd(spider:lock:doing,lock_key) # 采集完成解锁 await self.redis_conn.srem(spider:lock:doing,lock_key)六、死信队列运维与任务二次补发工具6.1 死信任务补发函数定期人工核查死信队列异常 URL修复反爬、接口故障问题后批量重新下发至待爬队列python运行def resend_dead_task(): while True: task sync_redis.lpop(QUEUE_DEAD) if not task: break sync_redis.rpush(QUEUE_WAIT,task) print(死信任务重新下发完成)6.2 死信分类统计通过 redis 批量取出死信任务解析异常来源针对性优化 UA 池、代理配置从源头减少失败任务生成。七、架构性能调优参数对照表表格调优项参数配置建议优化效果单实例协程数15~30依据目标站点反爬强度调整反爬宽松站点上调风控站点下调并发请求超时时间5~10 秒网络差的代理爬虫放宽至 12 秒Redis 持久化RDBAOF 双持久开启服务器宕机任务不丢失任务批量生成单次入队 500~2000 条减少 Redis 频繁 IO 损耗八、常见架构故障与解决方案表格异常现象故障诱因优化方案多节点同一 URL 重复爬取无消费临时锁任务弹出后节点宕机任务丢失新增消费锁定机制任务完成再移除锁定Redis 队列数据暴涨消费者消费速度低于生产者生产速度新增多台消费节点、提升单实例并发数大量任务进入死信队列IP 封禁、UA 失效接入前文异地多区代理池动态切换代理发起请求九、总结aiohttpRedis 队列分布式异步爬虫架构依托生产消费分离设计从根源解决单机爬虫并发瓶颈依靠 Redis 实现任务统一调度、去重管控、失败任务回收三大核心能力。生产者专注任务生成、源头去重消费者依托协程实现高并发采集多服务器横向部署即可线性提升采集吞吐量。在实际项目落地中可搭配代理池、异常重试自愈机制、结构化数据入库模块形成一套可 7×24 小时稳定运行的工业化分布式爬虫体系适配海量站点全量数据批量采集场景。
Python 爬虫异步架构实战:基于 aiohttp+Redis 队列实现分布式任务分发与消费
前言单机同步爬虫受限于串行请求逻辑面对海量目标 URL 采集场景时请求排队阻塞、CPU 与网络资源利用率低下单进程单日爬取量级难以突破万级多进程改造又存在任务重复分配、任务状态无记录、节点宕机丢失未完成任务等缺陷。依托 aiohttp 协程高并发特性搭配 Redis 消息队列构建分布式爬虫调度架构能够实现任务统一集中下发、多爬虫节点并行消费、失败任务回滚重入队列从架构层面拆分调度中心与采集工作节点突破单机性能上限。本文围绕 Redis 任务队列数据结构选型、生产者任务入库逻辑、aiohttp 异步消费者协程池管控、异常任务回写重试、多节点分布式防重复抓取等核心模块展开落地区分普通待爬队列、失败死信队列、去重缓存集合三类 Redis 存储单元附带完整可运行工程代码、架构参数选型对照表完成从任务生产、分发、消费、异常回收全链路工程化落地。本文所需依赖官方文档链接aiohttp异步 HTTP 请求核心库实现高并发页面采集redis-pyRedis 客户端实现任务队列读写与去重管控asyncioPython 内置协程调度管理异步任务池ujson高性能序列化库任务信息快速序列化存储一、分布式异步爬虫架构分层与存储设计1.1 三层架构分工明细整套架构划分为任务生产者层、Redis 中间存储层、异步消费者层三层解耦可独立部署扩容表格架构分层核心职责部署特性生产者层批量生成待爬 URL、组装任务附加参数、任务入队、初始化去重集合独立部署调度服务可定时从文件 / 数据库批量生成任务Redis 中间层存储待爬任务队列、失败死信任务、已爬取 URL 去重集合、爬虫运行配置参数独立 Redis 服务支持多节点爬虫共享数据源消费者层aiohttp 协程批量拉取任务、页面数据采集、数据入库、异常任务回写死信队列多台服务器横向新增节点即可提升整体爬虫吞吐1.2 Redis 多数据结构任务存储规划为实现任务生命周期全管控拆分三类存储容器规避任务重复入队、重复爬取List 结构spider:task:wait待爬任务主队列左侧入队、右侧阻塞弹出天然 FIFO 先进先出List 结构spider:task:dead死信失败队列超过重试阈值的异常任务存入等待人工二次下发Set 结构spider:url:done已完成 URL 去重集合存入采集成功的 URL入队前先校验集合杜绝重复任务生成。1.3 核心配置常量预定义python运行# Redis连接配置 REDIS_HOST 127.0.0.1 REDIS_PORT 6379 REDIS_DB 0 # 队列键名 QUEUE_WAIT spider:task:wait QUEUE_DEAD spider:task:dead SET_DONE spider:url:done # 爬虫配置 MAX_CONCURRENT 20 # 单实例最大协程并发数 MAX_RETRY 3 # 单任务最大重试次数二、环境依赖安装与基础客户端初始化bash运行pip install aiohttp redis ujson2.1 同步 Redis 生产者客户端、异步 Redis 消费者客户端区分生产者批量写入任务使用同步 redis 客户端消费者异步环境采用 aioredis 实现非阻塞队列读取避免同步 Redis 阻塞协程运行python运行import redis import ujson # 同步生产者Redis sync_redis redis.Redis(hostREDIS_HOST,portREDIS_PORT,dbREDIS_DB,decode_responsesTrue)三、任务生产者模块批量生成任务与防重复入队实现3.1 生产者底层原理生产者读取目标 URL 清单每条任务封装字典结构url、retry_count、extra_params入队前校验 URL 是否存在于已爬集合不存在则序列化后 LPUSH 推入待爬队列实现源头去重避免无效任务占用爬虫资源。3.2 生产者完整代码实现python运行class TaskProducer: staticmethod def generate_task(url_list:list,extra:dictNone): 批量生成任务并入队 extra_info extra if extra else {} insert_count 0 for url in url_list: # 去重校验已爬URL直接跳过 if sync_redis.sismember(SET_DONE,url): continue # 组装任务体初始化重试次数0 task_body { url:url, retry_count:0, extra:extra_info } # ujson序列化 task_str ujson.dumps(task_body) sync_redis.rpush(QUEUE_WAIT,task_str) insert_count 1 print(f成功入库任务数{insert_count}) # 生产者测试调用 if __name__ __main__: test_urls [ https://httpbin.org/get?page1, https://httpbin.org/get?page2, https://httpbin.org/get?page3 ] TaskProducer.generate_task(test_urls,extra{source:test_task,cate:demo})3.3 生产者优化细节选用 ujson 替换原生 json 序列化大批量任务生成时序列化速度提升 40% 以上使用 Set 天然去重特性相比数据库查询去重耗时大幅缩减。四、aiohttp 异步消费者协程池实现4.1 消费者核心运行逻辑单个协程从 Redis 阻塞式 RPOP 拉取任务无任务时阻塞等待减少无效轮询aiohttp 发起异步请求页面解析提取目标数据请求成功URL 写入已完成 Set 集合数据落地存储请求失败重试计数 1未达上限重新推入待爬队列超出上限存入死信队列。4.2 异步爬虫消费者完整代码python运行import asyncio import aiohttp import ujson import aioredis class AsyncTaskConsumer: def __init__(self): self.session None self.redis_conn None self.headers { User-Agent:Mozilla/5.0 Windows Chrome/120.0.0.0 Safari/537.36 } async def init_resource(self): 初始化异步会话与异步Redis连接 self.session aiohttp.ClientSession(connectoraiohttp.TCPConnector(limitMAX_CONCURRENT)) self.redis_conn await aioredis.from_url(fredis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB},decode_responsesTrue) async def close_resource(self): 关闭连接释放资源 await self.session.close() await self.redis_conn.close() async def crawl_single_task(self,task_info:dict): 单任务采集逻辑 url task_info[url] try: async with self.session.get(url,headersself.headers,timeoutaiohttp.ClientTimeout(total8)) as resp: content await resp.text() # 模拟数据入库生产环境替换mysql/mongo写入逻辑 print(f采集成功:{url},页面长度:{len(content)}) # 成功标记URL为已完成 await self.redis_conn.sadd(SET_DONE,url) return True except Exception as e: print(f采集失败{url},异常:{str(e)}) task_info[retry_count] 1 # 未超限重入待爬队列否则死信 if task_info[retry_count] MAX_RETRY: await self.redis_conn.rpush(QUEUE_WAIT,ujson.dumps(task_info)) else: await self.redis_conn.rpush(QUEUE_DEAD,ujson.dumps(task_info)) return False async def consumer_worker(self): 单个消费协程循环拉取任务 while True: # blpop阻塞弹出等待超时3秒 task_raw await self.redis_conn.blpop(QUEUE_WAIT,timeout3) if not task_raw: continue task_str task_raw[1] task_dict ujson.loads(task_str) await self.crawl_single_task(task_dict) async def run_consumer_pool(self,worker_numMAX_CONCURRENT): 启动协程消费池 await self.init_resource() task_list [asyncio.create_task(self.consumer_worker()) for _ in range(worker_num)] await asyncio.gather(*task_list) # 消费者启动入口 async def main(): consumer AsyncTaskConsumer() await consumer.run_consumer_pool() if __name__ __main__: asyncio.run(main())4.3 协程消费关键原理拆解blpop阻塞队列队列无任务时协程休眠不循环空跑占用 CPU是消息队列经典消费方案TCPConnector (limit) 管控并发限制单实例最大同时 TCP 连接避免瞬间海量请求触发目标站点风控失败分级处理动态变更任务重试字段实现阶梯式重试杜绝无效死循环重试。五、多节点分布式拓展与防重复消费方案5.1 多节点横向扩容规则新服务器仅需部署同一份消费者代码配置指向同一个远程 Redis 地址即可接入集群生产者与所有消费者共用一套任务队列新增节点自动分摊剩余任务无需修改生产者代码。5.2 分布式重复消费规避方案表格方案实现逻辑适用场景URL 前置去重生产者入队前 Set 校验 URL常规资讯、商品列表爬虫任务消费临时锁Redis Set 标记正在消费任务完成后删除风控严苛、耗时较长的详情页爬虫临时锁补充代码片段python运行# 消费开始上锁 lock_key flock:{task_dict[url]} if await self.redis_conn.sismember(spider:lock:doing,lock_key): continue await self.redis_conn.sadd(spider:lock:doing,lock_key) # 采集完成解锁 await self.redis_conn.srem(spider:lock:doing,lock_key)六、死信队列运维与任务二次补发工具6.1 死信任务补发函数定期人工核查死信队列异常 URL修复反爬、接口故障问题后批量重新下发至待爬队列python运行def resend_dead_task(): while True: task sync_redis.lpop(QUEUE_DEAD) if not task: break sync_redis.rpush(QUEUE_WAIT,task) print(死信任务重新下发完成)6.2 死信分类统计通过 redis 批量取出死信任务解析异常来源针对性优化 UA 池、代理配置从源头减少失败任务生成。七、架构性能调优参数对照表表格调优项参数配置建议优化效果单实例协程数15~30依据目标站点反爬强度调整反爬宽松站点上调风控站点下调并发请求超时时间5~10 秒网络差的代理爬虫放宽至 12 秒Redis 持久化RDBAOF 双持久开启服务器宕机任务不丢失任务批量生成单次入队 500~2000 条减少 Redis 频繁 IO 损耗八、常见架构故障与解决方案表格异常现象故障诱因优化方案多节点同一 URL 重复爬取无消费临时锁任务弹出后节点宕机任务丢失新增消费锁定机制任务完成再移除锁定Redis 队列数据暴涨消费者消费速度低于生产者生产速度新增多台消费节点、提升单实例并发数大量任务进入死信队列IP 封禁、UA 失效接入前文异地多区代理池动态切换代理发起请求九、总结aiohttpRedis 队列分布式异步爬虫架构依托生产消费分离设计从根源解决单机爬虫并发瓶颈依靠 Redis 实现任务统一调度、去重管控、失败任务回收三大核心能力。生产者专注任务生成、源头去重消费者依托协程实现高并发采集多服务器横向部署即可线性提升采集吞吐量。在实际项目落地中可搭配代理池、异常重试自愈机制、结构化数据入库模块形成一套可 7×24 小时稳定运行的工业化分布式爬虫体系适配海量站点全量数据批量采集场景。