Python asyncio事件循环的底层实现asyncio的事件循环通过多个系统调用来实现IO多路复用。在Linux上使用epoll在macOS上使用kqueue在Windows上使用IOCP。这些系统调用由selector模块封装。SelectorSelector的核心接口import selectorssel selectors.DefaultSelector()# 在Linux上等价于 sel selectors.EpollSelector()注册文件描述符的事件监听import socketserver socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server.bind((localhost, 8888))server.listen(100)server.setblocking(False)sel.register(server, selectors.EVENT_READ)def accept_connection(sock):conn, addr sock.accept()conn.setblocking(False)sel.register(conn, selectors.EVENT_READ, read_handler)while True:events sel.select(timeout1) # 阻塞直到有事件发生或超时for key, mask in events:if key.fileobj is server:accept_connection(server)else:handler key.datahandler(key.fileobj)sel.select返回就绪事件列表。在epoll实现中调用epoll_wait系统调用挂起当前线程直到至少一个事件就绪或超时。epoll的事件循环伪代码import osimport arrayEPOLL_CTL_ADD 1EPOLLIN 0x001epoll_fd os.epoll_create()# 注册事件os.epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server.fileno(), EPOLLIN)MAX_EVENTS 64while True:events os.epoll_wait(epoll_fd, MAX_EVENTS, -1)for fd, event_mask in events:if fd server.fileno():conn, addr server.accept()os.epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn.fileno(), EPOLLIN)conn.setblocking(False)else:data os.read(fd, 4096)if data:os.write(fd, data)else:os.epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, 0)os.close(fd)os.epoll_wait的超时参数-1表示无限阻塞直到有事件0表示立即返回非阻塞正数表示最多等待多少毫秒。asyncio内部将上述的select/poll/epoll/kqueue封装为事件循环。核心数据结构是三个队列import heapqimport timefrom collections import dequeclass MockEventLoop:def __init__(self):self._ready deque() # 就绪队列self._scheduled [] # 定时器堆self._stopping Falseself._debug Falsedef call_soon(self, callback, *args, contextNone):h Handle(callback, args, context or self._get_context())self._ready.append(h)return hdef call_later(self, delay, callback, *args):when time.monotonic() delaytimer TimerHandle(when, callback, args, self._get_context())heapq.heappush(self._scheduled, timer)return timerdef _run_once(self):now time.monotonic()# 将到期的定时器移动到就绪队列while self._scheduled:timer self._scheduled[0]if timer.when now:breakheapq.heappop(self._scheduled)self._ready.append(timer)# 如果没有就绪事件等待IOif not self._ready:timeout 0if self._scheduled:timeout max(0, self._scheduled[0].when - time.monotonic())events self._selector.select(timeout)for key, mask in events:callback key.dataself._ready.append(callback)# 执行所有就绪回调ntodo len(self._ready)for _ in range(ntodo):handle self._ready.popleft()handle._run()def run_forever(self):while not self._stopping:self._run_once()每一步的执行顺序是处理定时器、等待IO事件、执行就绪回调。这个_ready队列是一个deque所以回调是先进先出的。asyncio的事件循环在多线程中的行为import asyncioimport threadingloop asyncio.new_event_loop()def run_loop():asyncio.set_event_loop(loop)loop.run_forever()t threading.Thread(targetrun_loop, daemonTrue)t.start()# 从其他线程提交任务async def async_task():return 42future asyncio.run_coroutine_threadsafe(async_task(), loop)result future.result(timeout5)print(result)run_coroutine_threadsafe通过线程安全的队列将任务提交到事件循环线程def run_coroutine_threadsafe(coro, loop):future loop.create_future()def callback():task asyncio.ensure_future(coro, looploop)task.add_done_callback(lambda t: _set_result(future, t))loop.call_soon_threadsafe(callback)return futuredef call_soon_threadsafe(self, callback, *args):handle self.call_soon(callback, *args)self._write_to_self(handle)return handlecall_soon_threadsafe通过self管道self-pipe trick唤醒事件循环。往管道中写入一个字节事件循环的select在I/O事件就绪时被唤醒。asyncio事件的内部管道def _write_to_self(self):try:os.write(self._ssock.fileno(), b\x00)except OSError:passdef _read_from_self(self):try:os.read(self._csock.fileno(), 4096)except OSError:pass关闭事件循环的机制loop.stop() # 设置_stopping标志当前run_once后退出loop.close() # 更彻底地关闭# 1. 取消所有待处理的任务# 2. 关闭异步生成器# 3. 关闭executor的线程池# 4. 关闭selectorloop.close的调用顺序def close(self):try:self._executor.shutdown(waitTrue)self._selector.close()self._make_self_pipe() # 再次创建self-pipe为下次使用准备# 但实际上close之后loop不能再使用finally:self._closed True已经close的事件循环再调用run_forever会抛出RuntimeError(Event loop is closed)。asyncio.all_tasks列出所有未完成的任务async def slow_task(name, delay):await asyncio.sleep(delay)return f{name} doneasync def main():task1 asyncio.create_task(slow_task(A, 5))task2 asyncio.create_task(slow_task(B, 3))pending asyncio.all_tasks()print(f待处理的任务: {len(pending)})for t in pending:print(f - {t.get_name()}: {t._coro})result await asyncio.gather(task1, task2, return_exceptionsTrue)print(result)asyncio.run(main())Task的__step方法在每次协程yield后被调用。事件循环通过重复调用__step来推进协程的执行class Task(Future):def __step(self, excNone):coro self._corotry:if exc is None:result coro.send(None)else:result coro.throw(exc)except StopIteration as exc:self.set_result(exc.value)except CancelledError as exc:super().cancel(msgexc.args[0])except Exception as exc:self.set_exception(exc)else:if result is self:# 协程返回自身调度器重新调度self._loop.call_soon(self.__step)elif isinstance(result, Future):result.add_done_callback(self.__wakeup)else:self._loop.call_soon(self.__step)关键逻辑如果result是Future注册回调等待Future完成如果result是其他值用call_soon立即重新调度。Python 3.12移除了一些已废弃的事件循环实现try:loop asyncio.get_event_loop() # Python 3.12中仍有但不推荐except DeprecationWarning:loop asyncio.new_event_loop() # 推荐方式asyncio.new_event_loop创建新的SelectorEventLoop实例Unix或ProactorEventLoop实例Windows。asyncio.get_running_loop获取当前正在运行的事件循环是正确的方式。Windows的ProactorEventLoop使用IOCP而不是select。IOCP是Windows上最高效的IO事件通知机制import asyncioimport sysif sys.platform win32:loop asyncio.ProactorEventLoop()else:loop asyncio.SelectorEventLoop()asyncio.set_event_loop(loop)asyncio.run()在Python 3.10中自动选择正确的事件循环实现。不需要手动判断平台。uvloop是libuv封装的事件循环比asyncio默认的事件循环快约2倍import uvloopimport asyncioasync def main():await asyncio.sleep(1)return done# uvloop替代默认事件循环asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())result asyncio.run(main())uvloop用Cython实现将libuv的C回调映射为Python的协程。libuv是Node.js使用的事件循环库。
Python asyncio事件循环的底层实现
Python asyncio事件循环的底层实现asyncio的事件循环通过多个系统调用来实现IO多路复用。在Linux上使用epoll在macOS上使用kqueue在Windows上使用IOCP。这些系统调用由selector模块封装。SelectorSelector的核心接口import selectorssel selectors.DefaultSelector()# 在Linux上等价于 sel selectors.EpollSelector()注册文件描述符的事件监听import socketserver socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)server.bind((localhost, 8888))server.listen(100)server.setblocking(False)sel.register(server, selectors.EVENT_READ)def accept_connection(sock):conn, addr sock.accept()conn.setblocking(False)sel.register(conn, selectors.EVENT_READ, read_handler)while True:events sel.select(timeout1) # 阻塞直到有事件发生或超时for key, mask in events:if key.fileobj is server:accept_connection(server)else:handler key.datahandler(key.fileobj)sel.select返回就绪事件列表。在epoll实现中调用epoll_wait系统调用挂起当前线程直到至少一个事件就绪或超时。epoll的事件循环伪代码import osimport arrayEPOLL_CTL_ADD 1EPOLLIN 0x001epoll_fd os.epoll_create()# 注册事件os.epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server.fileno(), EPOLLIN)MAX_EVENTS 64while True:events os.epoll_wait(epoll_fd, MAX_EVENTS, -1)for fd, event_mask in events:if fd server.fileno():conn, addr server.accept()os.epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn.fileno(), EPOLLIN)conn.setblocking(False)else:data os.read(fd, 4096)if data:os.write(fd, data)else:os.epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, 0)os.close(fd)os.epoll_wait的超时参数-1表示无限阻塞直到有事件0表示立即返回非阻塞正数表示最多等待多少毫秒。asyncio内部将上述的select/poll/epoll/kqueue封装为事件循环。核心数据结构是三个队列import heapqimport timefrom collections import dequeclass MockEventLoop:def __init__(self):self._ready deque() # 就绪队列self._scheduled [] # 定时器堆self._stopping Falseself._debug Falsedef call_soon(self, callback, *args, contextNone):h Handle(callback, args, context or self._get_context())self._ready.append(h)return hdef call_later(self, delay, callback, *args):when time.monotonic() delaytimer TimerHandle(when, callback, args, self._get_context())heapq.heappush(self._scheduled, timer)return timerdef _run_once(self):now time.monotonic()# 将到期的定时器移动到就绪队列while self._scheduled:timer self._scheduled[0]if timer.when now:breakheapq.heappop(self._scheduled)self._ready.append(timer)# 如果没有就绪事件等待IOif not self._ready:timeout 0if self._scheduled:timeout max(0, self._scheduled[0].when - time.monotonic())events self._selector.select(timeout)for key, mask in events:callback key.dataself._ready.append(callback)# 执行所有就绪回调ntodo len(self._ready)for _ in range(ntodo):handle self._ready.popleft()handle._run()def run_forever(self):while not self._stopping:self._run_once()每一步的执行顺序是处理定时器、等待IO事件、执行就绪回调。这个_ready队列是一个deque所以回调是先进先出的。asyncio的事件循环在多线程中的行为import asyncioimport threadingloop asyncio.new_event_loop()def run_loop():asyncio.set_event_loop(loop)loop.run_forever()t threading.Thread(targetrun_loop, daemonTrue)t.start()# 从其他线程提交任务async def async_task():return 42future asyncio.run_coroutine_threadsafe(async_task(), loop)result future.result(timeout5)print(result)run_coroutine_threadsafe通过线程安全的队列将任务提交到事件循环线程def run_coroutine_threadsafe(coro, loop):future loop.create_future()def callback():task asyncio.ensure_future(coro, looploop)task.add_done_callback(lambda t: _set_result(future, t))loop.call_soon_threadsafe(callback)return futuredef call_soon_threadsafe(self, callback, *args):handle self.call_soon(callback, *args)self._write_to_self(handle)return handlecall_soon_threadsafe通过self管道self-pipe trick唤醒事件循环。往管道中写入一个字节事件循环的select在I/O事件就绪时被唤醒。asyncio事件的内部管道def _write_to_self(self):try:os.write(self._ssock.fileno(), b\x00)except OSError:passdef _read_from_self(self):try:os.read(self._csock.fileno(), 4096)except OSError:pass关闭事件循环的机制loop.stop() # 设置_stopping标志当前run_once后退出loop.close() # 更彻底地关闭# 1. 取消所有待处理的任务# 2. 关闭异步生成器# 3. 关闭executor的线程池# 4. 关闭selectorloop.close的调用顺序def close(self):try:self._executor.shutdown(waitTrue)self._selector.close()self._make_self_pipe() # 再次创建self-pipe为下次使用准备# 但实际上close之后loop不能再使用finally:self._closed True已经close的事件循环再调用run_forever会抛出RuntimeError(Event loop is closed)。asyncio.all_tasks列出所有未完成的任务async def slow_task(name, delay):await asyncio.sleep(delay)return f{name} doneasync def main():task1 asyncio.create_task(slow_task(A, 5))task2 asyncio.create_task(slow_task(B, 3))pending asyncio.all_tasks()print(f待处理的任务: {len(pending)})for t in pending:print(f - {t.get_name()}: {t._coro})result await asyncio.gather(task1, task2, return_exceptionsTrue)print(result)asyncio.run(main())Task的__step方法在每次协程yield后被调用。事件循环通过重复调用__step来推进协程的执行class Task(Future):def __step(self, excNone):coro self._corotry:if exc is None:result coro.send(None)else:result coro.throw(exc)except StopIteration as exc:self.set_result(exc.value)except CancelledError as exc:super().cancel(msgexc.args[0])except Exception as exc:self.set_exception(exc)else:if result is self:# 协程返回自身调度器重新调度self._loop.call_soon(self.__step)elif isinstance(result, Future):result.add_done_callback(self.__wakeup)else:self._loop.call_soon(self.__step)关键逻辑如果result是Future注册回调等待Future完成如果result是其他值用call_soon立即重新调度。Python 3.12移除了一些已废弃的事件循环实现try:loop asyncio.get_event_loop() # Python 3.12中仍有但不推荐except DeprecationWarning:loop asyncio.new_event_loop() # 推荐方式asyncio.new_event_loop创建新的SelectorEventLoop实例Unix或ProactorEventLoop实例Windows。asyncio.get_running_loop获取当前正在运行的事件循环是正确的方式。Windows的ProactorEventLoop使用IOCP而不是select。IOCP是Windows上最高效的IO事件通知机制import asyncioimport sysif sys.platform win32:loop asyncio.ProactorEventLoop()else:loop asyncio.SelectorEventLoop()asyncio.set_event_loop(loop)asyncio.run()在Python 3.10中自动选择正确的事件循环实现。不需要手动判断平台。uvloop是libuv封装的事件循环比asyncio默认的事件循环快约2倍import uvloopimport asyncioasync def main():await asyncio.sleep(1)return done# uvloop替代默认事件循环asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())result asyncio.run(main())uvloop用Cython实现将libuv的C回调映射为Python的协程。libuv是Node.js使用的事件循环库。