关于【进程池阻塞 + 子进程未回收问题】
续接上文进程间通信二实现一个高可用的进程池-CSDN博客目录一、先看现象两个核心问题二、核心原因文件描述符泄漏管道读端没关干净1. 管道的核心规则回顾2. 后果管道写端永远关不完三、原理图解为什么子进程会继承父进程的文件描述符四、为什么子进程没被回收五、修复方案5.1 倒着关5.2 在子进程中关闭所有继承的管道写端这个关于进程池的demo 有一个致命的问题典型的「文件描述符泄漏导致管道读端未关闭 → 子进程阻塞 → 僵尸进程 / 无法回收」问题一、先看现象两个核心问题进程阻塞子进程卡在read上主进程也卡在waitpid上程序无法正常结束子进程未回收ps ajx | grep ProcessPool看到 6 个进程1 个父进程 5 个子进程说明子进程退出后变成了僵尸进程父进程没回收成功二、核心原因文件描述符泄漏管道读端没关干净1. 管道的核心规则回顾管道的read行为由所有写端是否关闭决定只要还有任意一个写端文件描述符没被关闭read就会一直阻塞不会返回 0EOF只有当所有写端都关闭时read才会返回 0子进程才会退出循环在ProcessPool::Start()中创建子进程和管道for (int i 0; i _process_num; i) { int pipefd[2] {0}; pipe(pipefd); pid_t subid fork(); if (subid 0) { close(pipefd[1]); // 子进程关写端 Work(pipefd[0]); // 子进程阻塞在 read(pipefd[0]) close(pipefd[0]); exit(0); } else { close(pipefd[0]); // 父进程关读端 _cm.Insert(pipefd[1], subid); // 父进程持有写端 } }问题出在子进程会继承父进程之前创建的所有管道文件描述符第 1 次循环父进程创建管道 1 → fork 子进程 1 → 子进程 1 继承管道 1 的读 / 写端第 2 次循环父进程创建管道 2 → fork 子进程 2 → 子进程 2继承管道 1 的写端 管道 2 的读 / 写端第 3 次循环父进程创建管道 3 → fork 子进程 3 → 子进程 3继承管道 1 的写端 管道 2 的写端 管道 3 的读 / 写端... 以此类推第 5 个子进程会继承前 4 个管道的所有写端2. 后果管道写端永远关不完当你调用Stop()关闭父进程持有的所有写端时父进程的写端都关了但子进程手里还握着之前管道的写端对管道 1 来说子进程 2~5 都持有它的写端 → 管道 1 的写端总数 0子进程 1 调用read(pipefd[0])时发现还有写端没关 →一直阻塞不会退出父进程调用waitpid时子进程还活着 →主进程也阻塞最终所有子进程都卡在read父进程卡在waitpid程序僵死三、原理图解为什么子进程会继承父进程的文件描述符四、为什么子进程没被回收因为子进程根本没退出子进程阻塞在read没有执行exit(0)父进程调用waitpid时子进程还在运行 →waitpid会一直阻塞只有当子进程真正退出后父进程才能回收它否则就会一直等下去如果子进程意外退出父进程没回收才会变成僵尸进程Z 状态这里是子进程活着父进程阻塞本质是死锁。五、修复方案5.1 倒着关5.2 在子进程中关闭所有继承的管道写端#ifndef __PROCESS_POOL_HPP__ #define __PROCESS_POOL_HPP__ #include iostream #include vector #include unistd.h #include cstdlib #include sys/wait.h #include Task.hpp // 先描述 class Channel { public: Channel(int fd, pid_t id) : _wfd(fd), _subid(id) { _name channel- std::to_string(_wfd) - std::to_string(_subid); } ~Channel() { } void Send(int code) { int n write(_wfd, code, sizeof(code)); (void)n; // n被定义如果未使用的时候会出现告警 } void Close() { close(_wfd); } void Wait() { pid_t rid waitpid(_subid, nullptr, 0); (void)rid; } int Fd() { return _wfd; } pid_t SubId() { return _subid; } std::string Name() { return _name; } private: int _wfd; pid_t _subid; // 想知道这个信道是给哪一个子进程的 std::string _name; }; // 再组织 class ChannelManager { public: ChannelManager() : _next(0) {} void Insert(int wfd, pid_t subid) { _channels.emplace_back(wfd, subid); // 不用再构建临时变量 // Channel c(wfd,subid); // _channels.push_back(std::move(c)); } Channel Select() { auto c _channels[_next]; _next; // 下一次访问的时候就去访问下一个了 _next % _channels.size(); return c; } void PrintfChannel() { for (auto channel : _channels) { std::cout channel.Name() std::endl; } } void CloseAll() { for(auto channel : _channels) { channel.Close(); } } void StopSubProcess() { for (auto channel : _channels) { channel.Close(); std::cout 关闭: channel.Name() std::endl; } } void WaitSubProcess() { for (auto channel : _channels) { channel.Wait(); std::cout 回收: channel.Name() std::endl; } } void CloseAndWait() { //解决方案2 for(auto channel : _channels) { channel.Close(); std::cout 关闭: channel.Name() std::endl; channel.Wait(); std::cout 回收: channel.Name() std::endl; } // 解决方案1倒着关 // for (int i _channels.size() - 1; i 0; i--) // { // _channels[i].Close(); // std::cout 关闭: _channels[i].Name() std::endl; // _channels[i].Wait(); // std::cout 回收: _channels[i].Name() std::endl; // } } ~ChannelManager() {} private: std::vectorChannel _channels; int _next; }; const int gdefaultnum 5; class ProcessPool { public: ProcessPool(int num) : _process_num(num) { _tm.Register(PrintLog); _tm.Register(Downlode); _tm.Register(Uplode); } void Work(int rfd) { while (true) { int code 0; ssize_t n read(rfd, code, sizeof(code)); if (n 0) { if (n ! sizeof(code)) { continue; } std::cout 子进程[ getpid() ]收到了一个任务码: code std::endl; _tm.Execute(code); } else if (n 0) { std::cout 子进程退出 std::endl; break; } else { std::cout 读取错误 std::endl; break; } // std::cout 我是子进程,我的rfd是: rfd std::endl; // sleep(5); } } bool Start() { for (int i 0; i _process_num; i) { // 1.创建管道 int pipefd[2] {0}; int n pipe(pipefd); if (n 0) return false; // 2.创建子进程 pid_t subid fork(); if (subid 0) return false; else if (subid 0) { // 子进程 //让子进程关闭自己继承下来的 他的哥哥进程的w端关闭就行了 //for(std::vectorChannel _channels) _channels.close(); _cm.CloseAll(); // 3.关闭不需要的文件描述符 close(pipefd[1]); Work(pipefd[0]); close(pipefd[0]); exit(0); } else { // 父进程 // 3.关闭不需要的文件描述符 close(pipefd[0]); // 写端: pipefd[1]; _cm.Insert(pipefd[1], subid); // wfd,subid, } } return true; } void Debug() { _cm.PrintfChannel(); } void Run() { // 1.选择了一个任务 int taskcode _tm.Code(); // 2.选择一个信道【子进程】负载均衡的选择一个子进程完成任务 auto c _cm.Select(); std::cout 选择了一个子进程 c.Name() std::endl; // 3.发送任务 c.Send(taskcode); std::cout 发送了一个任务码 c.Name() std::endl; } void Stop() { _cm.CloseAndWait(); // // 关闭父进程所有的wfd即可 // _cm.StopSubProcess(); // // 回收所有子进程 // _cm.WaitSubProcess(); } ~ProcessPool() {} private: ChannelManager _cm; int _process_num; // 创建对应的进程个数 TaskManager _tm; // }; #endif
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2460933.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!