Python 中的并发 —— 进程间通信

Python 中的并发 —— 进程间通信 摘要本文介绍了Python中实现进程间通信(IPC)的多种机制1. 队列(Queue) - 提供线程和进程安全的FIFO通信方式2. 管道(Pipe) - 通过双向连接对象实现进程间数据传输3. 管理器(Manager) - 控制共享对象服务进程支持命名空间(Namespace)实现多属性共享4. Ctypes - 使用共享内存中的Array和Value对象5. 通信顺序进程(CSP) - 通过PyCSP库实现基于消息传递的并发模型。每种机制都配有简明示例代码展示其基本使用方法。这些IPC技术为开发并行应用程序提供了灵活的数据交换方案。目录Python 中的并发 —— 进程间通信通信方式各类通信机制队列队列使用示例输出结果管道管道使用示例输出结果管理器管理器使用示例输出结果管理器中的命名空间概念命名空间使用示例输出结果Ctypes 数组与值Ctypes 数组使用示例输出结果通信顺序进程CSPPython 库PyCSP安装 PyCSP并行运行进程示例输出结果Python 中的并发 —— 进程间通信进程间通信指的是进程之间的数据交换在开发并行应用程序时进程间的数据交换是必不可少的。下图展示了多个子进程间实现同步的各类通信机制通信方式队列Queues管道Pipes管理器ManagerC 类型Ctypes各类通信机制本节将介绍多种进程间通信机制具体说明如下队列队列可用于多进程程序中multiprocessing模块的Queue类与Queue.Queue类功能相似因此可使用相同的应用程序编程接口API。multiprocessing.Queue为进程间通信提供了一种线程和进程安全的先进先出FIFO机制。队列使用示例以下是取自 Python 多进程官方文档的简单示例用于理解multiprocessing模块中Queue类的使用原理main.pyfrom multiprocessing import Process, Queue import random def f(q): q.put([42, None, hello]) def main(): q Queue() p Process(targetf, args(q,)) p.start() print(q.get()) if __name__ __main__: main()输出结果运行代码并验证输出plaintext[42, None, hello]管道管道是一种数据结构用于多进程程序中的进程间通信。Pipe()函数会返回一对由管道连接的连接对象该管道默认是双向的。其工作机制如下返回一对连接对象分别代表管道的两端每个对象都拥有send()发送和recv()接收两个方法用于实现进程间的通信。管道使用示例以下是取自 Python 多进程官方文档的简单示例用于理解multiprocessing模块中Pipe()函数的使用原理main.pyfrom multiprocessing import Process, Pipe def f(conn): conn.send([42, None, hello]) conn.close() if __name__ __main__: parent_conn, child_conn Pipe() p Process(targetf, args(child_conn,)) p.start() print(parent_conn.recv()) p.join()输出结果运行代码并验证输出plaintext[42, None, hello]管理器Manager是multiprocessing模块中的一个类为所有使用者之间协调共享信息提供了一种方式。管理器对象会控制一个服务进程该服务进程负责管理共享对象并允许其他进程对这些对象进行操作。换句话说管理器提供了一种创建可在不同进程间共享数据的方式。管理器对象具有以下不同特性核心特性是控制管理共享对象的服务进程另一重要特性是当任意进程修改共享对象时会对所有共享对象进行同步更新。管理器使用示例以下示例通过管理器对象在服务进程中创建一个列表记录然后向该列表中添加一条新记录import multiprocessing def print_records(records): for record in records: print(名称{0}\n分数{1}\n.format(record[0], record[1])) def insert_record(record, records): records.append(record) print(新增一条记录\n) if __name__ __main__: with multiprocessing.Manager() as manager: records manager.list([(计算机, 1), (历史, 5), (语文, 9)]) new_record (英语, 3) p1 multiprocessing.Process(targetinsert_record, args(new_record, records)) p2 multiprocessing.Process(targetprint_records, args(records,)) p1.start() p2.start() p1.join() p2.join()输出结果运行代码并验证输出plaintext新增一条记录 名称计算机 分数1 名称历史 分数5 名称语文 分数9 名称英语 分数3管理器中的命名空间概念Manager类内置了命名空间Namespaces的概念这是一种能在多个进程间共享多个属性的便捷方法。命名空间没有可调用的公共方法但拥有可写的属性。命名空间使用示例以下 Python 脚本示例可帮助我们利用命名空间在主进程和子进程间共享数据main.pyimport multiprocessing def Mng_NaSp(using_ns): using_ns.x 5 using_ns.y * 10 if __name__ __main__: manager multiprocessing.Manager() using_ns manager.Namespace() using_ns.x 1 using_ns.y 1 print(修改前, using_ns) p multiprocessing.Process(targetMng_NaSp, args(using_ns,)) p.start() p.join() print(修改后, using_ns)输出结果运行代码并验证输出plaintext修改前 Namespace(x1, y1) 修改后 Namespace(x6, y10)Ctypes 数组与值multiprocessing模块提供了Array数组和Value值对象用于在共享内存映射中存储数据。Array是从共享内存中分配的 C 类型数组Value是从共享内存中分配的 C 类型对象。使用前需先从multiprocessing中导入Process、Value和Array。Ctypes 数组使用示例以下 Python 脚本是取自 Python 官方文档的示例用于演示如何利用 Ctypes 数组和值实现进程间的数据共享main.pyimport array from multiprocessing import Process def f(n, a): n 3.1415927 for i in range(len(a)): a[i] -a[i] if __name__ __main__: num 0.0 arr array.array(i, range(10)) p Process(targetf, args(num, arr)) p.start() p.join() print(num) print(arr[:])输出结果运行代码并验证输出plaintext3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]通信顺序进程CSP通信顺序进程可用于描述带有并发模型的系统与其他系统之间的交互它是一种通过消息传递编写并发程序的框架因此能有效描述并发行为。Python 库PyCSP为实现通信顺序进程中的核心原语Python 提供了 PyCSP 库。该库的实现代码简洁、可读性强易于理解。以下是 PyCSP 的基本进程网络结构通道 2进程 1 进程 2通道 1在上述 PyCSP 进程网络中包含进程 1 和进程 2 两个进程这两个进程通过通道 1 和通道 2 两条通道传递消息实现通信。安装 PyCSP可通过以下命令安装 Python 的 PyCSP 库plaintextpip install PyCSP并行运行进程示例以下 Python 脚本是一个简单的示例通过 PyCSP 库实现两个进程的并行运行main.pyfrom pycsp.parallel import * import time process def P1(): time.sleep(1) print(P1 进程退出) process def P2(): time.sleep(1) print(P2 进程退出) def main(): Parallel(P1(), P2()) print(程序终止) if __name__ __main__: main()在上述脚本中定义了 P1 和 P2 两个函数并通过process装饰器将其转换为进程。输出结果运行代码并验证输出plaintextP2 进程退出 P1 进程退出 程序终止