Pythonasync迭代器与生成器

Pythonasync迭代器与生成器 Python 异步迭代器与生成器异步数据流处理异步迭代器和异步生成器是 asyncio 生态中的重要组成部分。它们允许在数据生产或消费过程中进行异步等待非常适合处理流式数据、网络请求等场景。一、异步迭代器__aiter__ 和 __anext__import asyncioimport randomfrom typing import AsyncIteratorclass AsyncCounter:异步计数器迭代器。实现了 __aiter__ 和 __anext__ 协议。def __init__(self, start: int, end: int, delay: float 0.5):self.current startself.end endself.delay delaydef __aiter__(self):返回异步迭代器对象本身print(异步迭代器已创建)return selfasync def __anext__(self):返回下一个值。当没有更多值时抛出 StopAsyncIteration 异常。if self.current self.end:print(迭代结束)raise StopAsyncIteration# 模拟异步操作如网络请求、数据库查询await asyncio.sleep(self.delay)value self.currentself.current 1return valueasync def async_iterator_demo():使用异步迭代器的完整示例print( 异步迭代器演示 )async for number in AsyncCounter(1, 5, delay0.3):print(f当前数字: {number})# async for 等价于# 1. 调用 __aiter__() 获取迭代器# 2. 循环调用 await __anext__() 获取值# 3. 捕获 StopAsyncIteration 时退出二、异步生成器async def yield异步生成器使用 async def 定义内部使用 yield。它在每次 yield 之间可以执行 await 操作。async def async_range(start: int, end: int, delay: float 0.5):异步版本的 range() 生成器。每次迭代之间有延迟模拟异步数据生产。print(f异步生成器开始: [{start}, {end}))for i in range(start, end):# 在 yield 之前可以执行异步操作await asyncio.sleep(delay)print(f生成: {i})yield i # 产出值并暂停执行print(异步生成器结束)async def async_generator_demo():使用异步生成器的示例print( 异步生成器演示 )async for value in async_range(1, 4, delay0.2):print(f消费: {value})# 异步生成器底层原理# 1. 调用 async_range() 返回一个异步生成器对象# 2. async for 每次迭代调用 __anext__()# 3. 生成器执行到 yield 时暂停并返回值# 4. 下次迭代从 yield 处继续执行三、asynccontextmanager异步上下文管理器from contextlib import asynccontextmanagerasynccontextmanagerasync def async_file_simulation(filename: str):模拟异步文件操作的上下文管理器。asynccontextmanager 可以将异步生成器转换为异步上下文管理器支持 async with。print(f打开文件: {filename})# 模拟异步打开操作await asyncio.sleep(0.1)try:# yield 之前是 __aenter__ 部分yield {name: filename, content: 模拟文件内容}# yield 之后是 __aexit__ 部分正常退出时执行print(f正常关闭文件: {filename})except Exception as e:# 异常时的清理逻辑print(f异常关闭文件: {filename}, 错误: {e})raisefinally:# 无论如何都会执行的清理print(f最终清理: {filename})async def async_context_manager_demo():使用异步上下文管理器print( 异步上下文管理器演示 )async with async_file_simulation(data.txt) as f:print(f读取内容: {f[content]})# 离开 async with 块时自动执行清理# 也支持异常处理try:async with async_file_simulation(error.txt) as f:raise ValueError(模拟错误)except ValueError:print(异常已被捕获)四、异步迭代器 vs 异步生成器 异步迭代器 - 需要手动实现 __aiter__ 和 __anext__- 可以维护复杂的状态机- 适合需要自定义迭代逻辑的场景- 可以实现可重用的迭代器类 异步生成器 - 使用 async def yield 自动实现- 代码更简洁- 适合简单的数据流转换- 每次迭代只能使用一次不能重置 选择指南 - 简单的数据流变换: 异步生成器- 复杂的迭代状态管理: 异步迭代器- 需要多次遍历的数据: 异步迭代器支持重新创建- 流式数据转换map/filter异步生成器class AsyncMessageStream:异步迭代器模拟消息流def __init__(self, messages: list[str], delay: float 0.1):self.messages messagesself.delay delayself.index 0def __aiter__(self):self.index 0 # 重置状态支持多次遍历return selfasync def __anext__(self):if self.index len(self.messages):raise StopAsyncIterationawait asyncio.sleep(self.delay)msg self.messages[self.index]self.index 1return f消息: {msg}async def message_gen():异步生成器功能相同但代码更简洁messages [A, B, C]for msg in messages:await asyncio.sleep(0.1)yield f消息: {msg}五、async for 循环与异步列表推导式async def async_comprehension_demo():异步推导式的各种用法print( 异步推导式演示 )# 1. 异步列表推导式squares [x * x async for x in async_range(1, 5, 0.1)]print(f异步列表推导式: {squares})# 2. 带条件的异步推导式evens [x async for x in async_range(1, 6, 0.1) if x % 2 0]print(f偶数筛选: {evens})# 3. 异步集合推导式async_set {x async for x in async_range(1, 5, 0.1)}print(f异步集合推导式: {async_set})# 4. 异步字典推导式async_dict {fkey-{x}: x async for x in async_range(1, 4, 0.1)}print(f异步字典推导式: {async_dict})# 5. 异步生成器表达式async_gen (x * 2 async for x in async_range(1, 4, 0.1))async for value in async_gen:print(f生成器表达式: {value})# 注意Python 支持在列表/集合/字典推导式的最外层# 使用 async for但必须在异步函数内部六、异步 map / filter 模式class AsyncTransform:异步数据流转换工具staticmethodasync def amap(func, async_iter):异步 map对异步迭代器的每个元素应用函数。类似于内置的 map()但支持异步函数。async for item in async_iter:result await func(item)yield resultstaticmethodasync def afilter(predicate, async_iter):异步 filter过滤异步迭代器的元素。类似于内置的 filter()但支持异步谓词。async for item in async_iter:if await predicate(item):yield itemstaticmethodasync def areduce(func, async_iter, initialNone):异步 reduce累积异步迭代器的元素。类似于 functools.reduce()。accum initialasync for item in async_iter:if accum is None:accum itemelse:accum await func(accum, item)return accumasync def async_map_filter_demo():演示异步 map/filter/reduce 模式# 数据源异步生成器async def data_source():for i in range(1, 6):await asyncio.sleep(0.05)yield itransform AsyncTransform()# 异步 map每个元素乘以 2async def double(x):await asyncio.sleep(0.02)return x * 2print(异步 map 结果:)async for val in transform.amap(double, data_source()):print(f {val})# 异步 filter筛选偶数async def is_even(x):await asyncio.sleep(0.02)return x % 2 0print(异步 filter 结果偶数:)async for val in transform.afilter(is_even, data_source()):print(f {val})# 异步 reduce求和async def add(x, y):return x ytotal await transform.areduce(add, data_source(), initial0)print(f异步 reduce 求和: {total})七、实用示例异步流式 HTTP 客户端async def fetch_and_process(url: str, chunk_size: int 256):流式获取 HTTP 响应并逐块处理。使用异步生成器实现流式数据处理。try:reader, writer await asyncio.open_connection(url, 80)request (fGET / HTTP/1.1\r\nfHost: {url}\r\nfConnection: close\r\nf\r\n)writer.write(request.encode())await writer.drain()# 流式读取响应体while True:chunk await reader.read(chunk_size)if not chunk:breakyield chunk # 每次产生一块数据await asyncio.sleep(0) # 让出事件循环writer.close()await writer.wait_closed()except Exception as e:print(f获取 {url} 失败: {e})async def stream_processor():流式处理 HTTP 响应async for chunk in fetch_and_process(example.com):print(f收到数据块: {len(chunk)} 字节)八、总结# 1. __aiter__ / __anext__ 实现异步迭代器协议# 2. async def yield 定义异步生成器# 3. asynccontextmanager 结合生成器实现异步上下文管理# 4. 异步推导式列表/集合/字典简化异步数据转换# 5. 异步 map/filter/reduce 构建数据处理流水线# 6. 异步生成器天然适合流式数据场景# 7. StopAsyncIteration 标记迭代结束# 8. async for 是消费异步可迭代对象的标准方式if __name__ __main__:asyncio.run(async_comprehension_demo())