C++ 实现进程池:主从架构、管道通信与任务调度

C++ 实现进程池:主从架构、管道通信与任务调度 主/从进程池架构一、原理1 个主进程Master 预先创建 N 个子进程Worker主进程只管分配任务子进程只管处理任务通过管道通信实现高并发、稳定、不重复创建销毁进程。二、三大核心角色1. Master主进程唯一不处理业务只做 4 件事创建进程池fork 一堆子进程接收任务分发任务发给空闲子进程管理子进程监控、重启、回收2. Worker子进程 → 进程池预先创建好 N 个一直活着不退出阻塞等待任务收到任务 → 处理 → 继续等待3. 管道通信通道每个子进程一条管道Master 写 → 发任务Worker 读 → 收任务阻塞等待天然实现同步控制三、代码#includecstdlib#includectime#includeiostream#includestring#includeunistd.h#includevector#includefunctional#includesys/wait.h#includesys/types.h#definePOLL_SIZE4// ***********************************任务列表*********************************************voidSyncDisk(){std::cout同步磁盘...std::endl;}voidDownloadFile(){std::cout下载文件...std::endl;}voidPrintMessage(){std::cout打印消息...std::endl;}voidUpdateDatabase(){std::cout更新数据库...std::endl;}typedefvoid(*task_t)();// 定义一个函数指针类型指向任务函数task_t tasks[]{SyncDisk,DownloadFile,PrintMessage,UpdateDatabase};// 任务列表// ***********************************进程池实现*******************************************enum{OK0,PIPE_ERROR,FORK_ERROR};voidDotask(intfd){// 任务的入口while(1){inttask_code0;ssize_t nread(fd,task_code,sizeof(task_code));// 约定好读取4字节的任务码if(n-1){std::cerrread errorstd::endl;break;}elseif(n0){std::cout没有任务了...std::endl;break;// 管道被关闭退出循环}else{if(task_code0||task_codesizeof(tasks)/sizeof(task_t)){std::cerrInvalid task code: task_codestd::endl;continue;// 跳过无效的任务码}// 根据任务码执行对应的任务函数tasks[task_code]();}}}// typedef std::functionvoid (int) cb_t;usingcb_tstd::functionvoid(int);classProcessPool{private:// 定义一个内部Channel类保存管道写端和子进程pidclassChannel{public:Channel(intwfd,pid_t sub_pid):_wfd(wfd),_sub_pid(sub_pid){_sub_namesub_channel_std::to_string(sub_pid);}~Channel(){}voidWrite(inttask_index){ssize_t nwrite(_wfd,task_index,sizeof(task_index));(void)n;// 忽略写入结果}voidPrintInfo(){printf(Channel Info - Sub PID: %d, Sub Name: %s\n,_sub_pid,_sub_name.c_str());}std::stringGetSubName()const{return_sub_name;}voidClosePipe(){std::cout关闭管道wfd: _wfdstd::endl;close(_wfd);}voidWait(){waitpid(_sub_pid,nullptr,0);std::cout回收子进程: PID: _sub_pidstd::endl;}private:int_wfd;// 写管道文件描述符pid_t _sub_pid;// 子进程pidstd::string _sub_name;// 子进程名字};public:ProcessPool(){srand((unsignedint)time(nullptr)^(unsignedint)getpid());// 设置随机数种子}~ProcessPool(){}voidInit(cb_t cb){CreatProcessChannel(cb);}voidDebug(){for(autoch:channels){ch.PrintInfo();}}voidRun(){intcnt6;while(cnt--){std::cout---------------------------------------------std::endl;// 1. 选择一个任务intitaskSelectTask();std::cout选择任务 index: itaskstd::endl;// 2. 选择一个channel(管道子进程)本质是选择一个下标intindexSelectChannel();std::cout选择管道 index: indexstd::endl;// 3. 发送任务给指定的channel(管道子进程)SendTaskToChannel(itask,index);std::cout发送任务 index itask 到管道 index indexstd::endl;}}voidQuit(){// 想要1 : 1地回收就要改变CreateProcessChannel的实现// 就要在子进程中关闭历史遗留的管道写端// 从而实现1 : 1地回收资源。for(autoch:channels){ch.ClosePipe();// 关闭写端ch.Wait();}// 逆向回收资源// int end channels.size() - 1;// for (int i end; i 0; --i) {// channels[i].ClosePipe(); // 关闭写端// channels[i].Wait(); // 等待子进程退出// }// 1:1回收演示// for (auto ch : channels) {// ch.ClosePipe(); // 关闭写端// ch.Wait();// }// // 1. 关闭所有管道写端通知子进程退出// for (auto ch : channels) {// ch.ClosePipe(); // 关闭写端// }// // 2. 等待所有子进程退出// for (auto ch : channels) {// ch.Wait();// }}private:intSelectChannel(){// 这里简单的轮询选择一个channelstaticintindex0;intselectedindex%channels.size();index;returnselected;}intSelectTask(){// 轮询选择一个任务intitaskrand()%(sizeof(tasks)/sizeof(task_t));returnitask;}voidSendTaskToChannel(intitask,intindex){if(itask0||itasksizeof(tasks)/sizeof(task_t)){std::cerrInvalid task index: itaskstd::endl;return;}if(index0||indexchannels.size()){std::cerrInvalid channel index: indexstd::endl;return;}// 将任务索引写入管道通知子进程执行对应的任务channels[index].Write(itask);}voidCreatProcessChannel(cb_t cb){for(inti0;iPOLL_SIZE;i){intpipefd[2]{0};intretpipe(pipefd);if(ret-1){std::cerrpipe errorstd::endl;exit(PIPE_ERROR);}pid_t pidfork();if(pid-1){std::cerrfork errorstd::endl;exit(FORK_ERROR);}elseif(pid0){// 子进程if(!channels.empty()){for(autoch:channels){ch.ClosePipe();// 关闭历史遗留的管道写端}}close(pipefd[1]);// 关闭写端cb(pipefd[0]);exit(OK);}else{// 父进程close(pipefd[0]);// 关闭读端// 创建一个channel对象保存管道写端和子进程pidchannels.emplace_back(pipefd[1],pid);// Channel ch(pipefd[1], pid);// channels.emplace_back(ch); // 将channel对象添加到容器中std::cout创建了一个管道PID: pid到管道容器中了std::endl;sleep(1);}}}private:std::vectorChannelchannels;// 要有未来组织所有channel的容器};intmain(){// 1. 初始化一个进程池对象ProcessPool pool;pool.Init(Dotask);// 2. 运行进程池pool.Run();// 3. 结束进程池释放资源pool.Quit();return0;}Linux 进程池ProcessPool源码解析一、项目目标实现一个简单的进程池父进程负责调度任务子进程负责执行任务父子进程通过匿名管道通信父进程发送任务码子进程根据任务码执行对应任务二、整体架构Parent(ProcessPool) │ ┌────────────────────┼────────────────────┐ │ │ │ │ │ │ pipe0 pipe1 pipe2 │ │ │ ▼ ▼ ▼ Child0 Child1 Child2 │ │ │ Dotask() Dotask() Dotask() │ │ │ └────────────执行任务────────────┘父进程ProcessPool负责创建子进程创建管道选择任务分发任务回收子进程三、任务系统任务定义void SyncDisk(); void DownloadFile(); void PrintMessage(); void UpdateDatabase();任务表typedef void (*task_t)(); task_t tasks[] { SyncDisk, DownloadFile, PrintMessage, UpdateDatabase };本质任务码 → 函数地址对应关系任务码任务0SyncDisk1DownloadFile2PrintMessage3UpdateDatabase四、ProcessPool类设计class ProcessPool { private: vectorChannel channels; };保存所有管道 子进程五、Channel设计Channel是什么一个Channel对应一个管道 一个子进程类结构class Channel { private: int _wfd; pid_t _sub_pid; string _sub_name; };成员说明_wfdint _wfd;父进程保存管道写端用于发送任务。_sub_pidpid_t _sub_pid;保存子进程PID用于waitpid()回收资源。_sub_namesub_channel_xxx用于调试。六、进程创建流程CreateProcessChannel()核心代码pipe(pipefd); fork();创建过程第一次循环Parent │ └── Child0生成pipe0第二次循环Parent ├── Child0 └── Child1生成pipe1第三次循环Parent ├── Child0 ├── Child1 └── Child2生成pipe2最终Parent │ ├── Child0 ├── Child1 ├── Child2 └── Child3七、为什么关闭历史遗留写端问题fork会继承所有打开文件描述符。例如pipe0创建Child0后Child0 拥有 pipe0[0] pipe0[1]关闭close(pipe0[1]);剩pipe0[0]继续fork Child1Child1会继承pipe0[1] pipe1[1]此时Child1 持有 pipe0 写端如果父进程关闭pipe0 写端Child0仍然读不到EOF。因为还有进程持有写端解决方案for(auto ch : channels) { ch.ClosePipe(); }关闭历史遗留写端。最终关系pipe0 → Child0 pipe1 → Child1 pipe2 → Child2 pipe3 → Child3形成1 Pipe ↓ 1 Child八、任务执行流程Dotask()void Dotask(int fd) { while(true) { read(fd,task_code,sizeof(task_code)); } }收到任务码例如2表示PrintMessage();执行tasks[2]();执行流程父进程 │ write(2) │ ▼ pipe │ ▼ 子进程 │ read() │ ▼ tasks[2]() │ ▼ PrintMessage()九、任务调度策略SelectTask()rand() % 4随机任务0~3SelectChannel()static int index 0; selected index % channels.size();执行顺序0 1 2 3 0 1 2 3 ...这种策略叫Round Robin 轮询调度十、运行流程图┌───────────┐ │ Run() │ └─────┬─────┘ │ ▼ 选择任务 │ ▼ 选择Channel │ ▼ Write(任务码) │ ▼ 管道传输 │ ▼ 子进程read() │ ▼ 执行任务 │ ▼ 继续等待任务十一、退出流程父进程ClosePipe();关闭所有写端。子进程read(...)返回0表示EOF执行break;退出循环。父进程回收waitpid(pid,nullptr,0);流程Parent │ close(wfd) │ ▼ Child read()0 │ ▼ 退出 │ ▼ waitpid() │ ▼ 回收完成十二、源码中的亮点回调思想using cb_t std::functionvoid(int);初始化pool.Init(Dotask);创建子进程后cb(pipefd[0]);调用Dotask(pipefd[0]);实现进程池框架 业务逻辑解耦。