Python concurrent.futures:并发编程实战指南

Python concurrent.futures:并发编程实战指南 Python concurrent.futures并发编程实战指南引言在后端开发中并发编程是提高系统性能的关键技术。Python的concurrent.futures模块提供了简洁的高级API让开发者能够轻松实现多线程和多进程并发。作为一名从Python转向Rust的后端开发者我在实践中总结了concurrent.futures的最佳实践。本文将深入探讨这个强大的并发工具帮助你编写高效的并发代码。一、concurrent.futures基础1.1 模块概述concurrent.futures模块提供了两个主要的执行器ThreadPoolExecutor线程池执行器ProcessPoolExecutor进程池执行器1.2 基本使用模式from concurrent.futures import ThreadPoolExecutor def task(n): return n * n with ThreadPoolExecutor(max_workers4) as executor: future executor.submit(task, 5) result future.result() print(result)1.3 核心组件组件说明Executor抽象执行器基类ThreadPoolExecutor线程池执行器ProcessPoolExecutor进程池执行器Future表示异步计算的结果二、ThreadPoolExecutor详解2.1 创建线程池from concurrent.futures import ThreadPoolExecutor # 创建线程池 executor ThreadPoolExecutor( max_workers4, thread_name_prefixworker- ) # 使用上下文管理器 with ThreadPoolExecutor(max_workers4) as executor: # 提交任务 pass2.2 提交任务from concurrent.futures import ThreadPoolExecutor def download_file(url): # 模拟下载 import time time.sleep(1) return fDownloaded: {url} with ThreadPoolExecutor(max_workers3) as executor: # 提交单个任务 future executor.submit(download_file, http://example.com/file1.txt) # 获取结果阻塞 result future.result(timeout5) print(result)2.3 批量提交任务urls [ http://example.com/file1.txt, http://example.com/file2.txt, http://example.com/file3.txt, ] with ThreadPoolExecutor(max_workers3) as executor: # 批量提交 futures [executor.submit(download_file, url) for url in urls] # 获取所有结果 for future in futures: print(future.result())三、ProcessPoolExecutor详解3.1 创建进程池from concurrent.futures import ProcessPoolExecutor def compute_heavy(n): # CPU密集型任务 return sum(i * i for i in range(n)) with ProcessPoolExecutor(max_workers4) as executor: future executor.submit(compute_heavy, 1_000_000) print(future.result())3.2 线程池 vs 进程池特性ThreadPoolExecutorProcessPoolExecutorGIL限制受GIL限制不受GIL限制适用场景IO密集型CPU密集型启动开销低高内存开销低高数据共享容易困难3.3 选择建议# IO密集型任务 → ThreadPoolExecutor # CPU密集型任务 → ProcessPoolExecutor # 混合任务 → 结合使用 def process_task(data): # IO操作 raw_data fetch_from_api(data) # CPU操作 result compute(raw_data) return result四、Future对象详解4.1 Future状态from concurrent.futures import Future future Future() # 检查状态 print(future.done()) # False print(future.running()) # False print(future.cancelled()) # False # 设置结果 future.set_result(42) print(future.done()) # True print(future.result()) # 424.2 添加回调def callback(future): print(fTask completed: {future.result()}) with ThreadPoolExecutor() as executor: future executor.submit(task, 5) future.add_done_callback(callback)4.3 超时处理try: result future.result(timeout2) except concurrent.futures.TimeoutError: print(Task timed out)五、高级用法5.1 as_completedfrom concurrent.futures import ThreadPoolExecutor, as_completed def task(id): import time time.sleep(id) return fTask {id} completed with ThreadPoolExecutor(max_workers3) as executor: futures [executor.submit(task, i) for i in range(1, 4)] # 按完成顺序获取结果 for future in as_completed(futures): print(future.result())5.2 map函数with ThreadPoolExecutor(max_workers3) as executor: results executor.map(task, [1, 2, 3, 4, 5]) # 按输入顺序返回结果 for result in results: print(result)5.3 wait函数from concurrent.futures import wait, FIRST_COMPLETED with ThreadPoolExecutor(max_workers3) as executor: futures [executor.submit(task, i) for i in range(1, 4)] # 等待第一个完成 done, not_done wait(futures, return_whenFIRST_COMPLETED) print(fCompleted: {len(done)}) print(fNot completed: {len(not_done)})六、实战案例6.1 并行下载文件import requests from concurrent.futures import ThreadPoolExecutor def download_file(url, save_path): response requests.get(url) with open(save_path, wb) as f: f.write(response.content) return save_path urls [ (https://example.com/image1.jpg, images/image1.jpg), (https://example.com/image2.jpg, images/image2.jpg), (https://example.com/image3.jpg, images/image3.jpg), ] with ThreadPoolExecutor(max_workers5) as executor: futures [executor.submit(download_file, url, path) for url, path in urls] for future in as_completed(futures): print(fDownloaded: {future.result()})6.2 并行数据库查询import psycopg2 from concurrent.futures import ThreadPoolExecutor def query_user(user_id): conn psycopg2.connect(dbnameexample userpostgres) cursor conn.cursor() cursor.execute(SELECT * FROM users WHERE id %s, (user_id,)) result cursor.fetchone() conn.close() return result user_ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with ThreadPoolExecutor(max_workers4) as executor: results executor.map(query_user, user_ids) for user_id, user in zip(user_ids, results): print(fUser {user_id}: {user})6.3 混合IO和CPU任务from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def fetch_data(url): import requests return requests.get(url).json() def process_data(data): # CPU密集型处理 return sum(item[value] for item in data) def pipeline(url): data fetch_data(url) return process_data(data) urls [https://api.example.com/data1, https://api.example.com/data2] # IO阶段使用线程池 with ThreadPoolExecutor(max_workers4) as io_executor: futures [io_executor.submit(fetch_data, url) for url in urls] raw_data [f.result() for f in futures] # CPU阶段使用进程池 with ProcessPoolExecutor(max_workers4) as cpu_executor: results list(cpu_executor.map(process_data, raw_data)) print(results)七、最佳实践7.1 合理设置worker数量import os # CPU密集型任务 cpu_workers os.cpu_count() or 4 # IO密集型任务 io_workers min(32, (os.cpu_count() or 4) * 5)7.2 避免共享状态# 不好的做法共享可变状态 counter 0 def increment(): global counter counter 1 # 好的做法使用线程安全的数据结构或锁 from threading import Lock class ThreadSafeCounter: def __init__(self): self._count 0 self._lock Lock() def increment(self): with self._lock: self._count 17.3 优雅关闭executor ThreadPoolExecutor(max_workers4) try: # 提交任务 futures [executor.submit(task, i) for i in range(10)] # 获取结果 for future in futures: print(future.result()) finally: # 关闭执行器 executor.shutdown(waitTrue)八、性能对比8.1 同步vs异步import time def sync_download(urls): for url in urls: download_file(url) def async_download(urls): with ThreadPoolExecutor(max_workers5) as executor: executor.map(download_file, urls) # 性能对比 urls [https://example.com/file{}.txt.format(i) for i in range(10)] start time.time() sync_download(urls) print(fSync time: {time.time() - start:.2f}s) start time.time() async_download(urls) print(fAsync time: {time.time() - start:.2f}s)8.2 线程池vs进程池def cpu_intensive(n): return sum(i * i for i in range(n)) # 线程池受GIL限制 with ThreadPoolExecutor(max_workers4) as executor: start time.time() executor.map(cpu_intensive, [10_000_000] * 4) print(fThreadPool time: {time.time() - start:.2f}s) # 进程池不受GIL限制 with ProcessPoolExecutor(max_workers4) as executor: start time.time() executor.map(cpu_intensive, [10_000_000] * 4) print(fProcessPool time: {time.time() - start:.2f}s)总结concurrent.futures是Python并发编程的利器。通过本文的学习你应该掌握了以下核心要点ThreadPoolExecutor适用于IO密集型任务ProcessPoolExecutor适用于CPU密集型任务Future对象管理异步计算结果高级APIas_completed、map、wait实战案例并行下载、数据库查询、混合任务最佳实践合理设置worker数量、避免共享状态性能对比同步vs异步、线程池vs进程池作为从Python转向Rust的后端开发者理解并发编程模式对于构建高性能系统至关重要。虽然Rust的并发模型更加安全但Python的concurrent.futures提供了快速实现并发的便捷方式。