一. 核心设计思路本进程池实现的核心逻辑父进程创建指定数量的子进程通过匿名管道与每个子进程建立单向通信父写子读父进程采用轮询策略将任务分发给不同子进程实现简单的负载均衡子进程循环读取管道中的任务码执行对应任务父进程通过关闭管道写端通知子进程退出并回收所有子进程资源。二. 代码模块拆解2.1 任务定义与随机任务生成这部分是测试用的任务层定义了进程池要执行的具体任务以及随机生成任务的工具函数。#include iostream #include string #include vector #include memory #include functional #include ctime #include cstdlib #include unistd.h #include sys/wait.h #define __MAIN__ /////////////////////////////任务测试代码/////////////////////////////////////// // 定义任务类型无参数无返回值的函数对象 using task_t std::functionvoid(); // 具体任务1打印日志带进程ID标识 void PrintLog() { std::cout 我是一个打印日志的任务, pid getpid() std::endl; } // 具体任务2模拟下载 void DownLoad() { std::cout 我是一个下载任务, pid getpid() std::endl; } // 具体任务3模拟访问MySQL void ReadMysql() { std::cout 我是一个访问数据库的任务, pid getpid() std::endl; } // 具体任务4模拟访问Redis void WriteRedies() { std::cout 我是一个访问redies的任务, pid getpid() std::endl; } // 全局任务列表存储所有可执行的任务 std::vectortask_t gtasks; // 加载所有任务到全局列表 void LoadTask() { gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies); } // 随机生成50个任务码输出型参数out存储结果 // 作用模拟业务中随机产生的任务请求 // *: 输出型参数 // const : 输入型参数 // : 输入输出型 void RandomTask(std::vectorint* out) { for(int i 0; i 50; i) { // 随机选择任务0~3 int code rand() % gtasks.size(); usleep(23223); // 模拟任务产生的时间间隔 out-push_back(code); } } // 任务码枚举增强可读性 #define LOG_TASK 0 #define DOWNLOAD_TASK 1 #define MYSQL_TASK 2 #define REDIES_TASK 3 // 任务码转字符串方便日志打印 std::string TaskToString(int code) { switch(code) { case LOG_TASK: return PrintLog; case DOWNLOAD_TASK: return DownLoad; case MYSQL_TASK: return ReadMysql; case REDIES_TASK: return WriteRedies; default: return Unknown; } }2.2 子进程任务处理逻辑Work函数是子进程的核心执行逻辑负责从管道读取任务码并执行对应任务/////////////////////////进程池核心代码//////////////////////// // 子进程工作函数循环读取管道中的任务码并执行 // rfd管道读端文件描述符 void Work(int rfd) { while(true) { int code 0; // 从管道读取任务码阻塞读 ssize_t n read(rfd, code, sizeof(code)); // 读取成功且长度正确执行任务 if(n 0 n sizeof(int)) { if(code 0 code gtasks.size()) { gtasks[code](); // 执行对应任务 } } // 读取到0表示管道写端关闭父进程通知退出 else if(n 0) { break; // 子进程退出循环 } // 读取错误直接退出 else{ break; } } }2.2 子进程任务处理逻辑Work函数是子进程的核心执行逻辑负责从管道读取任务码并执行对应任务/////////////////////////进程池核心代码//////////////////////// // 子进程工作函数循环读取管道中的任务码并执行 // rfd管道读端文件描述符 void Work(int rfd) { while(true) { int code 0; // 从管道读取任务码阻塞读 ssize_t n read(rfd, code, sizeof(code)); // 读取成功且长度正确执行任务 if(n 0 n sizeof(int)) { if(code 0 code gtasks.size()) { gtasks[code](); // 执行对应任务 } } // 读取到0表示管道写端关闭父进程通知退出 else if(n 0) { break; // 子进程退出循环 } // 读取错误直接退出 else{ break; } } }2.3 通道Channel类封装父子进程通信Channel类封装了 “管道写端 子进程 ID” 的关联关系简化父进程对单个子进程的管理发任务、关管道、回收进程// 通道类管理单个子进程的通信管道和进程ID class Channel { public: // 构造函数初始化管道写端、子进程ID生成通道名称 Channel(int wfd, pid_t who): _wfd(wfd), _sub_process_id(who) { _name Channel- std::to_string(_sub_process_id) - std::to_string(_wfd); } int Fd() { return _wfd; } // 获取管道写端 pid_t SubId() { return _sub_process_id; } // 获取子进程ID std::string Name() { return _name; } // 获取通道名称调试用 // 关闭管道写端 void Close() { if(_wfd 0) close(_wfd); } // 等待子进程退出回收资源 void Wait() { pid_t rid waitpid(_sub_process_id, nullptr, 0); (void)rid; // 屏蔽未使用变量警告 } // 向子进程发送任务码写管道 void SendTask(int taskcode) { ssize_t n write(_wfd, taskcode, sizeof(taskcode)); (void)n; // 屏蔽未使用变量警告实际场景应检查写操作是否成功 } ~Channel() {} private: int _wfd; // 管道写端文件描述符 pid_t _sub_process_id; // 对应子进程ID std::string _name; // 通道名称调试用 };2.4 进程池ProcesspPool类核心管理逻辑ProcesspPool类是进程池的核心负责创建子进程、管理通道、分发任务、停止进程池class ProcesspPool { private: // 轮询选择下一个子进程负载均衡策略 int Next() { int choice _next_choice; _next_choice; _next_choice % _channels.size(); // 取模实现循环 return choice; } public: // 构造函数初始化进程池大小、轮询索引 ProcesspPool(int number): _number(number), _next_choice(0) { std::cout number: _number std::endl; } // 启动进程池父进程执行创建指定数量的子进程和管道 void Start() { for(int i 0; i _number; i) { // 1. 创建匿名管道 int pipefd[2]; int n pipe(pipefd); if(n 0) { perror(pipe); exit(2); } // 2. 创建子进程 pid_t id fork(); if(id 0) { perror(fork); exit(3); } else if(id 0) // 子进程逻辑 { // 这里后面还有些变化为了解决下面那个version2 close(pipefd[1]); // 子进程关闭写端只读 Work(pipefd[0]); // 执行工作函数 close(pipefd[0]); // 任务完成后关闭读端 exit(0); // 子进程退出 } else // 父进程逻辑 { close(pipefd[0]); // 父进程关闭读端只写 // 创建通道对象并加入管理列表 _channels.emplace_back(pipefd[1], id); } } } // 推送任务选择子进程并发送任务码 void PushTask(int taskcode) { // 轮询选择一个子进程 int who Next(); _channels[who].SendTask(taskcode); // 打印任务分发日志调试用 std::cout 发送任务: TaskToString(taskcode) [ taskcode ] 给: _channels[who].Name() std::endl; } // 停止进程池关闭所有管道回收子进程 void Stop() { // version1 -- 可以成功 // 1. 批量关闭所有管道写端通知子进程退出 for(auto channel: _channels) { channel.Close(); std::cout channel.Name() close success! std::endl; } sleep(3); // 等待子进程处理完最后任务并退出 // 2. 批量回收子进程资源 for(auto channel: _channels) { channel.Wait(); std::cout channel.Name() wait success! std::endl; } // // version2 -- 不能成功原因关闭管道后立即wait子进程可能还未处理完读操作导致阻塞 // for(auto channel: _channels) // { // channel.Close(); // channel.Wait(); // std::cout channel.Name() close and wait success! std::endl; // } // // version3 -- 可以成功逆序关闭回收避免资源竞争 // int end _channels.size() - 1; // while(end 0) // { // _channels[end].Close(); // _channels[end].Wait(); // std::cout channel.Name() close and wait success! std::endl; // end--; // } } // 调试打印输出所有通道信息 void DebugPrint() { std::cout ------------------------------------ std::endl; for(auto channel : _channels) { std::cout channel.Fd() std::endl; std::cout channel.SubId() std::endl; std::cout channel.Name() std::endl; } std::cout ------------------------------------ std::endl; } ~ProcesspPool() {} private: std::vectorChannel _channels; // 管理所有子进程的通道 int _number; // 进程池大小子进程数量 int _next_choice; // 轮询索引下一个要分发任务的子进程 };2.5 主函数进程池使用示例主函数是进程池的入口负责初始化、启动、分发任务、停止进程池#ifdef __MAIN__ // 用法提示函数 static void Usage(const std::string proc) { std::cout Usage:\n\t proc proceess_number std::endl; } // 程序入口./process_pool 55为子进程数量 int main(int argc, char* argv[]) { // 检查命令行参数 if(argc ! 2) { Usage(argv[0]); exit(1); } int number std::stoi(argv[1]); // 0. 初始化加载任务、随机生成50个任务码 srand(time(nullptr) ^ getpid()); // 设置随机数种子结合时间进程ID LoadTask(); std::vectorint task_codes; RandomTask(task_codes); // 1. 创建进程池对象智能指针自动管理内存 std::unique_ptrProcesspPool pp std::make_uniqueProcesspPool(number); // 2. 启动进程池创建子进程和管道 pp-Start(); sleep(2); // 等待所有子进程初始化完成 // 3. 分发所有随机任务 for(auto task : task_codes) { pp-PushTask(task); usleep(500000); // 模拟任务分发间隔500ms } // // 注释部分交互式输入任务码调试用 // while(true) // { // int code 0; // std::cout Please Enter Your Task# ; // std::cin code; // if(code 0 || code gtasks.size()) // { // std::cout 任务码错误, 请重新输入 std::endl; // continue; // } // pp-PushTask(code); // } // 4. 停止进程池回收资源 pp-Stop(); return 0; } #endif三. 关键知识点解析3.1 管道通信原理匿名管道pipe()创建的文件描述符对pipefd[0]读、pipefd[1]写是单向的父子进程继承管道文件描述符通过关闭不需要的端实现 “父写子读”当写端关闭后读端read()会返回 0子进程通过这个信号判断退出。3.2 轮询负载均衡Next()函数通过递增取模的方式循环选择子进程确保任务均匀分发给所有子进程避免单个子进程过载。3.3 进程回收的坑Stop()函数中 version2 失败的原因父进程关闭管道后立即waitpid()子进程可能还在阻塞读管道此时父进程waitpid()会阻塞而子进程读取到管道关闭后退出但若所有子进程都处于这种状态会导致死锁。version1 先批量关闭所有管道等待 3 秒让子进程全部退出后再回收避免了这个问题。总结版本2失败的根本原因是父进程在等待一个子进程时其他子进程的写端并未关闭(因为都是继承了父进程关闭了一个但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的)导致它们无法退出从而形成串行阻塞。版本1通过先关闭所有写端让子进程并发退出避免了这一风险。因此在实际开发中应当采用版本1或类似策略来确保进程池能够优雅地停止。我们可以怎么样去修改使version2变成可行的方案四. 完整代码展示#include iostream #include string #include vector #include memory #include functional #include ctime #include cstdlib #include unistd.h #include sys/wait.h #define __MAIN__ /////////////////////////////任务测试代码/////////////////////////////////////// using task_t std::functionvoid(); void PrintLog() { std::cout 我是一个打印日志的任务, pid getpid() std::endl; } void DownLoad() { std::cout 我是一个下载任务, pid getpid() std::endl; } void ReadMysql() { std::cout 我是一个访问数据库的任务, pid getpid() std::endl; } void WriteRedies() { std::cout 我是一个访问redies的任务, pid getpid() std::endl; } std::vectortask_t gtasks; void LoadTask() { gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies); } // *: 输出型参数 // const : 输入型参数 // : 输入输出型 void RandomTask(std::vectorint* out) { for(int i 0; i 50; i) { int code rand() % gtasks.size(); usleep(23223); out-push_back(code); } } #define LOG_TASK 0 #define DOWNLOAD_TASK 1 #define MYSQL_TASK 2 #define REDIES_TASK 3 std::string TaskToString(int code) { switch(code) { case LOG_TASK: return PrintLog; case DOWNLOAD_TASK: return DownLoad; case MYSQL_TASK: return ReadMysql; case REDIES_TASK: return WriteRedies; default: return Unknown; } } /////////////////////////进程池代码//////////////////////// void Work(int rfd) { while(true) { int code 0; ssize_t n read(rfd, code, sizeof(code)); if(n 0 n sizeof(int)) { if(code 0 code gtasks.size()) { gtasks[code](); } } else if(n 0) { break; // 子进程只要读到返回值为0, 表明父进程让我退出 } else{ break; } } } class Channel { public: Channel(int wfd, pid_t who): _wfd(wfd), _sub_process_id(who) { _name Channel- std::to_string(_sub_process_id) - std::to_string(_wfd); } int Fd() { return _wfd; } pid_t SubId() { return _sub_process_id; } std::string Name() { return _name; } void Close() { if(_wfd 0) close(_wfd); } void Wait() { pid_t rid waitpid(_sub_process_id, nullptr, 0); (void)rid; } void SendTask(int taskcode) { ssize_t n write(_wfd, taskcode, sizeof(taskcode)); (void)n; } ~Channel() { } private: int _wfd; pid_t _sub_process_id; std::string _name; }; class ProcesspPool { private: int Next() { int choice _next_choice; _next_choice; _next_choice % _channels.size(); return choice; } public: ProcesspPool(int number): _number(number), _next_choice(0) { std::cout number: _number std::endl; } // 父进程 void Start() { for(int i 0; i _number; i) { // 1. 创建管道 int pipefd[2]; int n pipe(pipefd); if(n 0) { perror(pipe); exit(2); } // 2. 创建子进程 pid_t id fork(); if(id 0) { perror(fork); exit(3); } else if(id 0) // 子进程 { // 关闭父进程历史的wfd! for(auto channel : _channels) channel.Close(); close(pipefd[1]); Work(pipefd[0]); close(pipefd[0]); exit(0); } else // 父进程 { close(pipefd[0]); // _channels c(pipefd[1], fd); // _channels.push_back(c); _channels.emplace_back(pipefd[1], id); // 内部会直接构造 } } } // 1. 什么任务? 任务码决定 // 2. 任务给谁? 属于进程池内部操作,负载均衡(我这里是用的轮询的机制) void PushTask(int taskcode) { // 选择一个子进程 int who Next(); _channels[who].SendTask(taskcode); std::cout 发送任务: TaskToString(taskcode) [ taskcode ] 给: _channels[who].Name() std::endl; } // 有版本存在一些问题, 后续会说为什么 void Stop() { // version1 -- 可以成功 // 1. 关闭wfd for(auto channel: _channels) { channel.Close(); std::cout channel.Name() close success! std::endl; } sleep(3); // 2. 回收子进程 for(auto channel: _channels) { channel.Wait(); std::cout channel.Name() wait success! std::endl; } // // version2 -- 不能成功??? // for(auto channel: _channels) // { // channel.Close(); // channel.Wait(); // std::cout channel.Name() close and wait success! std::endl; // } // version3 -- 可以成功 // int end _channels.size() - 1; // while(end 0) // { // _channels[end].Close(); // _channels[end].Wait(); // std::cout channel.Name() close and wait success! std::endl; // end--; // } } void DebugPrint() { std::cout ------------------------------------ std::endl; for(auto channel : _channels) { std::cout channel.Fd() std::endl; std::cout channel.SubId() std::endl; std::cout channel.Name() std::endl; } std::cout ------------------------------------ std::endl; } ~ProcesspPool() {} private: std::vectorChannel _channels; int _number; int _next_choice; }; // 父进程 #ifdef __MAIN__ static void Usage(const std::string proc) { std::cout Usage:\n\t proc proceess_number std::endl; } // ./process_pool 5 int main(int argc, char* argv[]) { if(argc ! 2) { Usage(argv[0]); exit(1); } int number std::stoi(argv[1]); // 0. 加载任务并随机生成任务 srand(time(nullptr) ^ getpid()); LoadTask(); std::vectorint task_codes; RandomTask(task_codes); // 1. 创建进程池对象 std::unique_ptrProcesspPool pp std::make_uniqueProcesspPool(number); // 2. 启动进程池 pp-Start(); sleep(2); for(auto task : task_codes) { pp-PushTask(task); usleep(500000); } // // 自己输入发送任务 // while(true) // { // int code 0; // std::cout Please Enter Your Task# ; // std::cin code; // if(code 0 || code gtasks.size()) // { // std::cout 任务码错误, 请重新输入 std::endl; // continue; // } // pp-PushTask(code); // } pp-Stop(); return 0; } #endif五. 编译与运行附 Makefileprocess_pool:process_pool.cc g -o $ $^ -stdc14 .PHONY:clean clean: rm -f process_pool编译直接 make运行./process_pool 55 为子进程数量可自定义输出可以看到任务被轮询分发给不同子进程每个任务打印对应的进程 ID最后进程池正常停止并回收资源。六. 扩展与优化方向错误处理当前代码未处理write()/read()的错误返回值实际场景应增加重试、日志记录动态扩容支持运行时增加 / 减少子进程数量更优的负载均衡基于子进程当前任务数、CPU 使用率等动态分发任务队列父进程增加任务队列避免任务分发过快导致管道阻塞信号处理增加SIGCHLD信号处理异步回收子进程避免僵尸进程。
手搓简易 Linux 进程池:从 0 到 1 实现基于管道的任务分发系统
一. 核心设计思路本进程池实现的核心逻辑父进程创建指定数量的子进程通过匿名管道与每个子进程建立单向通信父写子读父进程采用轮询策略将任务分发给不同子进程实现简单的负载均衡子进程循环读取管道中的任务码执行对应任务父进程通过关闭管道写端通知子进程退出并回收所有子进程资源。二. 代码模块拆解2.1 任务定义与随机任务生成这部分是测试用的任务层定义了进程池要执行的具体任务以及随机生成任务的工具函数。#include iostream #include string #include vector #include memory #include functional #include ctime #include cstdlib #include unistd.h #include sys/wait.h #define __MAIN__ /////////////////////////////任务测试代码/////////////////////////////////////// // 定义任务类型无参数无返回值的函数对象 using task_t std::functionvoid(); // 具体任务1打印日志带进程ID标识 void PrintLog() { std::cout 我是一个打印日志的任务, pid getpid() std::endl; } // 具体任务2模拟下载 void DownLoad() { std::cout 我是一个下载任务, pid getpid() std::endl; } // 具体任务3模拟访问MySQL void ReadMysql() { std::cout 我是一个访问数据库的任务, pid getpid() std::endl; } // 具体任务4模拟访问Redis void WriteRedies() { std::cout 我是一个访问redies的任务, pid getpid() std::endl; } // 全局任务列表存储所有可执行的任务 std::vectortask_t gtasks; // 加载所有任务到全局列表 void LoadTask() { gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies); } // 随机生成50个任务码输出型参数out存储结果 // 作用模拟业务中随机产生的任务请求 // *: 输出型参数 // const : 输入型参数 // : 输入输出型 void RandomTask(std::vectorint* out) { for(int i 0; i 50; i) { // 随机选择任务0~3 int code rand() % gtasks.size(); usleep(23223); // 模拟任务产生的时间间隔 out-push_back(code); } } // 任务码枚举增强可读性 #define LOG_TASK 0 #define DOWNLOAD_TASK 1 #define MYSQL_TASK 2 #define REDIES_TASK 3 // 任务码转字符串方便日志打印 std::string TaskToString(int code) { switch(code) { case LOG_TASK: return PrintLog; case DOWNLOAD_TASK: return DownLoad; case MYSQL_TASK: return ReadMysql; case REDIES_TASK: return WriteRedies; default: return Unknown; } }2.2 子进程任务处理逻辑Work函数是子进程的核心执行逻辑负责从管道读取任务码并执行对应任务/////////////////////////进程池核心代码//////////////////////// // 子进程工作函数循环读取管道中的任务码并执行 // rfd管道读端文件描述符 void Work(int rfd) { while(true) { int code 0; // 从管道读取任务码阻塞读 ssize_t n read(rfd, code, sizeof(code)); // 读取成功且长度正确执行任务 if(n 0 n sizeof(int)) { if(code 0 code gtasks.size()) { gtasks[code](); // 执行对应任务 } } // 读取到0表示管道写端关闭父进程通知退出 else if(n 0) { break; // 子进程退出循环 } // 读取错误直接退出 else{ break; } } }2.2 子进程任务处理逻辑Work函数是子进程的核心执行逻辑负责从管道读取任务码并执行对应任务/////////////////////////进程池核心代码//////////////////////// // 子进程工作函数循环读取管道中的任务码并执行 // rfd管道读端文件描述符 void Work(int rfd) { while(true) { int code 0; // 从管道读取任务码阻塞读 ssize_t n read(rfd, code, sizeof(code)); // 读取成功且长度正确执行任务 if(n 0 n sizeof(int)) { if(code 0 code gtasks.size()) { gtasks[code](); // 执行对应任务 } } // 读取到0表示管道写端关闭父进程通知退出 else if(n 0) { break; // 子进程退出循环 } // 读取错误直接退出 else{ break; } } }2.3 通道Channel类封装父子进程通信Channel类封装了 “管道写端 子进程 ID” 的关联关系简化父进程对单个子进程的管理发任务、关管道、回收进程// 通道类管理单个子进程的通信管道和进程ID class Channel { public: // 构造函数初始化管道写端、子进程ID生成通道名称 Channel(int wfd, pid_t who): _wfd(wfd), _sub_process_id(who) { _name Channel- std::to_string(_sub_process_id) - std::to_string(_wfd); } int Fd() { return _wfd; } // 获取管道写端 pid_t SubId() { return _sub_process_id; } // 获取子进程ID std::string Name() { return _name; } // 获取通道名称调试用 // 关闭管道写端 void Close() { if(_wfd 0) close(_wfd); } // 等待子进程退出回收资源 void Wait() { pid_t rid waitpid(_sub_process_id, nullptr, 0); (void)rid; // 屏蔽未使用变量警告 } // 向子进程发送任务码写管道 void SendTask(int taskcode) { ssize_t n write(_wfd, taskcode, sizeof(taskcode)); (void)n; // 屏蔽未使用变量警告实际场景应检查写操作是否成功 } ~Channel() {} private: int _wfd; // 管道写端文件描述符 pid_t _sub_process_id; // 对应子进程ID std::string _name; // 通道名称调试用 };2.4 进程池ProcesspPool类核心管理逻辑ProcesspPool类是进程池的核心负责创建子进程、管理通道、分发任务、停止进程池class ProcesspPool { private: // 轮询选择下一个子进程负载均衡策略 int Next() { int choice _next_choice; _next_choice; _next_choice % _channels.size(); // 取模实现循环 return choice; } public: // 构造函数初始化进程池大小、轮询索引 ProcesspPool(int number): _number(number), _next_choice(0) { std::cout number: _number std::endl; } // 启动进程池父进程执行创建指定数量的子进程和管道 void Start() { for(int i 0; i _number; i) { // 1. 创建匿名管道 int pipefd[2]; int n pipe(pipefd); if(n 0) { perror(pipe); exit(2); } // 2. 创建子进程 pid_t id fork(); if(id 0) { perror(fork); exit(3); } else if(id 0) // 子进程逻辑 { // 这里后面还有些变化为了解决下面那个version2 close(pipefd[1]); // 子进程关闭写端只读 Work(pipefd[0]); // 执行工作函数 close(pipefd[0]); // 任务完成后关闭读端 exit(0); // 子进程退出 } else // 父进程逻辑 { close(pipefd[0]); // 父进程关闭读端只写 // 创建通道对象并加入管理列表 _channels.emplace_back(pipefd[1], id); } } } // 推送任务选择子进程并发送任务码 void PushTask(int taskcode) { // 轮询选择一个子进程 int who Next(); _channels[who].SendTask(taskcode); // 打印任务分发日志调试用 std::cout 发送任务: TaskToString(taskcode) [ taskcode ] 给: _channels[who].Name() std::endl; } // 停止进程池关闭所有管道回收子进程 void Stop() { // version1 -- 可以成功 // 1. 批量关闭所有管道写端通知子进程退出 for(auto channel: _channels) { channel.Close(); std::cout channel.Name() close success! std::endl; } sleep(3); // 等待子进程处理完最后任务并退出 // 2. 批量回收子进程资源 for(auto channel: _channels) { channel.Wait(); std::cout channel.Name() wait success! std::endl; } // // version2 -- 不能成功原因关闭管道后立即wait子进程可能还未处理完读操作导致阻塞 // for(auto channel: _channels) // { // channel.Close(); // channel.Wait(); // std::cout channel.Name() close and wait success! std::endl; // } // // version3 -- 可以成功逆序关闭回收避免资源竞争 // int end _channels.size() - 1; // while(end 0) // { // _channels[end].Close(); // _channels[end].Wait(); // std::cout channel.Name() close and wait success! std::endl; // end--; // } } // 调试打印输出所有通道信息 void DebugPrint() { std::cout ------------------------------------ std::endl; for(auto channel : _channels) { std::cout channel.Fd() std::endl; std::cout channel.SubId() std::endl; std::cout channel.Name() std::endl; } std::cout ------------------------------------ std::endl; } ~ProcesspPool() {} private: std::vectorChannel _channels; // 管理所有子进程的通道 int _number; // 进程池大小子进程数量 int _next_choice; // 轮询索引下一个要分发任务的子进程 };2.5 主函数进程池使用示例主函数是进程池的入口负责初始化、启动、分发任务、停止进程池#ifdef __MAIN__ // 用法提示函数 static void Usage(const std::string proc) { std::cout Usage:\n\t proc proceess_number std::endl; } // 程序入口./process_pool 55为子进程数量 int main(int argc, char* argv[]) { // 检查命令行参数 if(argc ! 2) { Usage(argv[0]); exit(1); } int number std::stoi(argv[1]); // 0. 初始化加载任务、随机生成50个任务码 srand(time(nullptr) ^ getpid()); // 设置随机数种子结合时间进程ID LoadTask(); std::vectorint task_codes; RandomTask(task_codes); // 1. 创建进程池对象智能指针自动管理内存 std::unique_ptrProcesspPool pp std::make_uniqueProcesspPool(number); // 2. 启动进程池创建子进程和管道 pp-Start(); sleep(2); // 等待所有子进程初始化完成 // 3. 分发所有随机任务 for(auto task : task_codes) { pp-PushTask(task); usleep(500000); // 模拟任务分发间隔500ms } // // 注释部分交互式输入任务码调试用 // while(true) // { // int code 0; // std::cout Please Enter Your Task# ; // std::cin code; // if(code 0 || code gtasks.size()) // { // std::cout 任务码错误, 请重新输入 std::endl; // continue; // } // pp-PushTask(code); // } // 4. 停止进程池回收资源 pp-Stop(); return 0; } #endif三. 关键知识点解析3.1 管道通信原理匿名管道pipe()创建的文件描述符对pipefd[0]读、pipefd[1]写是单向的父子进程继承管道文件描述符通过关闭不需要的端实现 “父写子读”当写端关闭后读端read()会返回 0子进程通过这个信号判断退出。3.2 轮询负载均衡Next()函数通过递增取模的方式循环选择子进程确保任务均匀分发给所有子进程避免单个子进程过载。3.3 进程回收的坑Stop()函数中 version2 失败的原因父进程关闭管道后立即waitpid()子进程可能还在阻塞读管道此时父进程waitpid()会阻塞而子进程读取到管道关闭后退出但若所有子进程都处于这种状态会导致死锁。version1 先批量关闭所有管道等待 3 秒让子进程全部退出后再回收避免了这个问题。总结版本2失败的根本原因是父进程在等待一个子进程时其他子进程的写端并未关闭(因为都是继承了父进程关闭了一个但是后面的子进程关闭一个进行读还是不可避免的继承了上次之前的)导致它们无法退出从而形成串行阻塞。版本1通过先关闭所有写端让子进程并发退出避免了这一风险。因此在实际开发中应当采用版本1或类似策略来确保进程池能够优雅地停止。我们可以怎么样去修改使version2变成可行的方案四. 完整代码展示#include iostream #include string #include vector #include memory #include functional #include ctime #include cstdlib #include unistd.h #include sys/wait.h #define __MAIN__ /////////////////////////////任务测试代码/////////////////////////////////////// using task_t std::functionvoid(); void PrintLog() { std::cout 我是一个打印日志的任务, pid getpid() std::endl; } void DownLoad() { std::cout 我是一个下载任务, pid getpid() std::endl; } void ReadMysql() { std::cout 我是一个访问数据库的任务, pid getpid() std::endl; } void WriteRedies() { std::cout 我是一个访问redies的任务, pid getpid() std::endl; } std::vectortask_t gtasks; void LoadTask() { gtasks.push_back(PrintLog); gtasks.push_back(DownLoad); gtasks.push_back(ReadMysql); gtasks.push_back(WriteRedies); } // *: 输出型参数 // const : 输入型参数 // : 输入输出型 void RandomTask(std::vectorint* out) { for(int i 0; i 50; i) { int code rand() % gtasks.size(); usleep(23223); out-push_back(code); } } #define LOG_TASK 0 #define DOWNLOAD_TASK 1 #define MYSQL_TASK 2 #define REDIES_TASK 3 std::string TaskToString(int code) { switch(code) { case LOG_TASK: return PrintLog; case DOWNLOAD_TASK: return DownLoad; case MYSQL_TASK: return ReadMysql; case REDIES_TASK: return WriteRedies; default: return Unknown; } } /////////////////////////进程池代码//////////////////////// void Work(int rfd) { while(true) { int code 0; ssize_t n read(rfd, code, sizeof(code)); if(n 0 n sizeof(int)) { if(code 0 code gtasks.size()) { gtasks[code](); } } else if(n 0) { break; // 子进程只要读到返回值为0, 表明父进程让我退出 } else{ break; } } } class Channel { public: Channel(int wfd, pid_t who): _wfd(wfd), _sub_process_id(who) { _name Channel- std::to_string(_sub_process_id) - std::to_string(_wfd); } int Fd() { return _wfd; } pid_t SubId() { return _sub_process_id; } std::string Name() { return _name; } void Close() { if(_wfd 0) close(_wfd); } void Wait() { pid_t rid waitpid(_sub_process_id, nullptr, 0); (void)rid; } void SendTask(int taskcode) { ssize_t n write(_wfd, taskcode, sizeof(taskcode)); (void)n; } ~Channel() { } private: int _wfd; pid_t _sub_process_id; std::string _name; }; class ProcesspPool { private: int Next() { int choice _next_choice; _next_choice; _next_choice % _channels.size(); return choice; } public: ProcesspPool(int number): _number(number), _next_choice(0) { std::cout number: _number std::endl; } // 父进程 void Start() { for(int i 0; i _number; i) { // 1. 创建管道 int pipefd[2]; int n pipe(pipefd); if(n 0) { perror(pipe); exit(2); } // 2. 创建子进程 pid_t id fork(); if(id 0) { perror(fork); exit(3); } else if(id 0) // 子进程 { // 关闭父进程历史的wfd! for(auto channel : _channels) channel.Close(); close(pipefd[1]); Work(pipefd[0]); close(pipefd[0]); exit(0); } else // 父进程 { close(pipefd[0]); // _channels c(pipefd[1], fd); // _channels.push_back(c); _channels.emplace_back(pipefd[1], id); // 内部会直接构造 } } } // 1. 什么任务? 任务码决定 // 2. 任务给谁? 属于进程池内部操作,负载均衡(我这里是用的轮询的机制) void PushTask(int taskcode) { // 选择一个子进程 int who Next(); _channels[who].SendTask(taskcode); std::cout 发送任务: TaskToString(taskcode) [ taskcode ] 给: _channels[who].Name() std::endl; } // 有版本存在一些问题, 后续会说为什么 void Stop() { // version1 -- 可以成功 // 1. 关闭wfd for(auto channel: _channels) { channel.Close(); std::cout channel.Name() close success! std::endl; } sleep(3); // 2. 回收子进程 for(auto channel: _channels) { channel.Wait(); std::cout channel.Name() wait success! std::endl; } // // version2 -- 不能成功??? // for(auto channel: _channels) // { // channel.Close(); // channel.Wait(); // std::cout channel.Name() close and wait success! std::endl; // } // version3 -- 可以成功 // int end _channels.size() - 1; // while(end 0) // { // _channels[end].Close(); // _channels[end].Wait(); // std::cout channel.Name() close and wait success! std::endl; // end--; // } } void DebugPrint() { std::cout ------------------------------------ std::endl; for(auto channel : _channels) { std::cout channel.Fd() std::endl; std::cout channel.SubId() std::endl; std::cout channel.Name() std::endl; } std::cout ------------------------------------ std::endl; } ~ProcesspPool() {} private: std::vectorChannel _channels; int _number; int _next_choice; }; // 父进程 #ifdef __MAIN__ static void Usage(const std::string proc) { std::cout Usage:\n\t proc proceess_number std::endl; } // ./process_pool 5 int main(int argc, char* argv[]) { if(argc ! 2) { Usage(argv[0]); exit(1); } int number std::stoi(argv[1]); // 0. 加载任务并随机生成任务 srand(time(nullptr) ^ getpid()); LoadTask(); std::vectorint task_codes; RandomTask(task_codes); // 1. 创建进程池对象 std::unique_ptrProcesspPool pp std::make_uniqueProcesspPool(number); // 2. 启动进程池 pp-Start(); sleep(2); for(auto task : task_codes) { pp-PushTask(task); usleep(500000); } // // 自己输入发送任务 // while(true) // { // int code 0; // std::cout Please Enter Your Task# ; // std::cin code; // if(code 0 || code gtasks.size()) // { // std::cout 任务码错误, 请重新输入 std::endl; // continue; // } // pp-PushTask(code); // } pp-Stop(); return 0; } #endif五. 编译与运行附 Makefileprocess_pool:process_pool.cc g -o $ $^ -stdc14 .PHONY:clean clean: rm -f process_pool编译直接 make运行./process_pool 55 为子进程数量可自定义输出可以看到任务被轮询分发给不同子进程每个任务打印对应的进程 ID最后进程池正常停止并回收资源。六. 扩展与优化方向错误处理当前代码未处理write()/read()的错误返回值实际场景应增加重试、日志记录动态扩容支持运行时增加 / 减少子进程数量更优的负载均衡基于子进程当前任务数、CPU 使用率等动态分发任务队列父进程增加任务队列避免任务分发过快导致管道阻塞信号处理增加SIGCHLD信号处理异步回收子进程避免僵尸进程。