文章目录
- 使用管道实现一个简易版本的进程池
- 流程图
- 代码
- makefile
- Task.hpp
- ProcessPool.cc
- 程序流程:
使用管道实现一个简易版本的进程池
流程图

代码
makefile
ProcessPool:ProcessPool.cc
g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
rm -f ProcessPool
Task.hpp
#pragma once
#include <iostream>
#include <vector>
typedef void (*task_t)(); //定义了一个函数指针类型task_t,它指向返回类型为void且不接受任何参数的函数。
void task1()
{
std::cout << "lol 刷新日志" << std::endl;
}
void task2()
{
std::cout << "lol 更新野区,刷新出来野怪" << std::endl;
}
void task3()
{
std::cout << "lol 检测软件是否更新,如果需要,就提示用户" << std::endl;
}
void task4()
{
std::cout << "lol 用户释放技能,更新用的血量和蓝量" << std::endl;
}
void LoadTask(std::vector<task_t> *tasks) // 该函数接受一个指向std::vector<task_t>的指针,并将其作为参数
{
tasks->push_back(task1); //将task1函数的地址添加到向量中。
tasks->push_back(task2);
tasks->push_back(task3);
tasks->push_back(task4);
}
ProcessPool.cc
#include "Task.hpp" // 包含任务相关的头文件
#include <string>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/wait.h>
const int processnum = 10; // 设定进程池大小为10
std::vector<task_t> tasks; // 存储任务的向量
// 定义channel类,用于管理进程间通信
class channel
{
public:
channel(int cmdfd, int slaverid, const std::string &processname)
:_cmdfd(cmdfd), _slaverid(slaverid), _processname(processname)
{}
public:
int _cmdfd; // 用于向子进程发送命令的文件描述符
pid_t _slaverid; // 子进程ID
std::string _processname; // 子进程名称,用于日志显示
};
// 子进程执行的函数
void slaver()
{
while(true)
{
int cmdcode = 0;
// 从标准输入(被重定向到管道)读取命令
int n = read(0, &cmdcode, sizeof(int));
if(n == sizeof(int))
{
std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " << cmdcode << std::endl;
// 执行对应的任务
if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
}
if(n == 0) break; // 管道关闭时退出
}
}
// 初始化进程池
void InitProcessPool(std::vector<channel> *channels)
{
std::vector<int> oldfds; // 存储历史文件描述符
for(int i = 0; i < processnum; i++)
{
int pipefd[2];
int n = pipe(pipefd); // 创建管道
assert(!n);
(void)n;
pid_t id = fork(); // 创建子进程
if(id == 0) // 子进程
{
// 关闭历史文件描述符
std::cout << "child: " << getpid() << " close history fd: ";
for(auto fd : oldfds) {
std::cout << fd << " "; // 打印当前文件描述符的值,用于显示子进程正在关闭哪些文件描述符。
close(fd); // 关闭文件描述符
}
std::cout << "\n";
close(pipefd[1]); // 关闭写端
dup2(pipefd[0], 0); // 将管道读端重定向到标准输入
close(pipefd[0]); //关闭读端
slaver(); // 执行子进程任务
std::cout << "process : " << getpid() << " quit" << std::endl;
exit(0);
}
// 父进程
close(pipefd[0]); // 关闭读端
// 创建新的channel并添加到channels中
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfds.push_back(pipefd[1]);
sleep(1);
}
}
// 打印调试信息
void Debug(const std::vector<channel> &channels)
{
for(const auto &c :channels)
{
std::cout << c._cmdfd << " " << c._slaverid << " " << c._processname << std::endl;
}
}
// 显示菜单
void Menu()
{
std::cout << "################################################" << std::endl;
std::cout << "# 1. 刷新日志 2. 刷新出来野怪 #" << std::endl;
std::cout << "# 3. 检测软件是否更新 4. 更新用的血量和蓝量 #" << std::endl;
std::cout << "# 0. 退出 #" << std::endl;
std::cout << "#################################################" << std::endl;
}
// 控制子进程执行任务
void ctrlSlaver(const std::vector<channel> &channels)
{
int which = 0;
while(true)
{
int select = 0;
Menu();
std::cout << "Please Enter@ ";
std::cin >> select;
if(select <= 0 || select >= 5) break;
int cmdcode = select - 1;
// 轮询方式分配任务给子进程
std::cout << "father say: " << " cmdcode: " <<
cmdcode << " already sendto " << channels[which]._slaverid << " process name: "
<< channels[which]._processname << std::endl;
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
which++;
which %= channels.size();
}
}
// 清理进程池
void QuitProcess(const std::vector<channel> &channels)
{
for(const auto &c : channels){
close(c._cmdfd); // 关闭所有管道
waitpid(c._slaverid, nullptr, 0); // 等待所有子进程结束
}
}
int main()
{
LoadTask(&tasks); // 加载任务列表
srand(time(nullptr)^getpid()^1023); // 初始化随机数种子
std::vector<channel> channels; //
InitProcessPool(&channels); // 初始化进程池
ctrlSlaver(channels); // 控制子进程执行任务
QuitProcess(channels); // 清理进程池
return 0;
}
程序流程:
1.main函数首先调用LoadTask(&tasks),将task1到task4四个任务的函数地址存入全局tasks向量。
2.srand(time(nullptr)^getpid()^1023); 初始化随机数种子
3.std::vector<channel> channels;,这行代码的作用是定义一个名为 channels 的向量(std::vector),用于存储 channel 类型的对象。它的主要作用是管理多个 channel 对象,每个 channel 对象代表一个子进程的通信通道。
-
每个
channel对象包含以下信息:-
_cmdfd:用于向子进程发送命令的文件描述符(管道写端)。 -
_slaverid:子进程的进程ID(PID)。 -
_processname:子进程的名称,用于日志和调试。
-
-
channels向量存储了所有子进程的通信信息,父进程可以通过它管理所有子进程。
4.InitProcessPool(&channels); ,初始化进程池
// 初始化进程池
void InitProcessPool(std::vector<channel> *channels)
{
std::vector<int> oldfds; // 存储历史文件描述符
for(int i = 0; i < processnum; i++)
{
int pipefd[2];
int n = pipe(pipefd); // 创建管道
assert(!n);
(void)n;
pid_t id = fork(); // 创建子进程
if(id == 0) // 子进程
{
// 关闭历史文件描述符
std::cout << "child: " << getpid() << " close history fd: ";
for(auto fd : oldfds) {
std::cout << fd << " ";
close(fd);
}
std::cout << "\n";
close(pipefd[1]); // 关闭写端
dup2(pipefd[0], 0); // 将管道读端重定向到标准输入
close(pipefd[0]); //关闭读端
slaver(); // 执行子进程任务
std::cout << "process : " << getpid() << " quit" << std::endl;
exit(0);
}
// 父进程
close(pipefd[0]); // 关闭读端
// 创建新的channel并添加到channels中
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfds.push_back(pipefd[1]);
sleep(1);
}
}
5.std::vector<int> oldfds; 的作用是存储父进程中已经创建的管道的写端文件描述符(pipefd[1])。它的主要目的是在创建新的子进程时,确保子进程能够关闭不需要的文件描述符,避免资源泄露和潜在的问题。
为什么需要
oldfds?
文件描述符的继承:
当父进程通过
fork()创建子进程时,子进程会继承父进程的所有打开的文件描述符。如果父进程创建了多个管道(每个子进程对应一个管道),那么每个子进程都会继承所有管道的文件描述符,即使这些管道是用于其他子进程的。
资源泄露问题:
如果子进程不关闭不需要的文件描述符,这些文件描述符会一直保持打开状态,导致资源泄露。
例如,假设父进程创建了 10 个子进程,每个子进程都会继承 10 个管道的文件描述符,但实际上每个子进程只需要一个管道的读端文件描述符。
避免干扰:
- 如果子进程不关闭不需要的文件描述符,可能会导致意外的行为。例如,某个子进程可能会错误地读取其他子进程的管道数据。
6.for(int i = 0; i < processnum; i++),循环 processnum=10 次,每次创建一个子进程和一个管道。
7.int pipefd[2];
pipefd 是一个长度为 2 的整型数组,用于存储管道的两个文件描述符:
pipefd[0]:管道的 读端文件描述符,用于从管道中读取数据。pipefd[1]:管道的 写端文件描述符,用于向管道中写入数据。
8.int n = pipe(pipefd);
调用 pipe 系统函数来创建一个管道,并将结果存储在变量 n 中。
1.
pipe系统函数的作用
pipe是一个系统调用,用于创建一个管道。管道的本质是一个内核缓冲区,用于在两个进程之间传递数据。管道有两个端点:
- 读端:用于从管道中读取数据。
- 写端:用于向管道中写入数据。
pipe函数的原型如下:int pipe(int pipefd[2]);
2. 参数
pipefd[2]
pipefd是一个长度为 2 的整型数组,用于存储管道的两个文件描述符:
pipefd[0]:管道的 读端文件描述符,用于从管道中读取数据。pipefd[1]:管道的 写端文件描述符,用于向管道中写入数据。
3. 返回值
n
- 如果
pipe调用成功,返回0。- 如果
pipe调用失败,返回-1,并设置errno表示错误原因。
4. 代码解析
int n = pipe(pipefd);
pipe(pipefd):调用pipe函数创建管道。n:存储pipe函数的返回值,用于检查管道是否创建成功。
9.assert(!n);,(void)n;
assert(!n):确保管道创建成功。如果pipe调用失败,程序会终止。(void)n:忽略未使用的变量警告。
10.pid_t id = fork(); ,创建子进程
if(id == 0) // 子进程
{
// 关闭历史文件描述符
std::cout << "child: " << getpid() << " close history fd: ";
for(auto fd : oldfds) {
std::cout << fd << " ";
close(fd);
}
std::cout << "\n";
close(pipefd[1]); // 关闭写端
dup2(pipefd[0], 0); // 将管道读端重定向到标准输入
close(pipefd[0]); //关闭读端
slaver(); // 执行子进程任务
std::cout << "process : " << getpid() << " quit" << std::endl;
exit(0);
}
11.在子进程中,id 为 0。
12.std::cout << "child: " << getpid() << " close history fd: ";
打印当前子进程的PID,用于区分不同子进程
" close history fd: ",说明接下来要关闭的文件描述符
for(auto fd : oldfds) {
std::cout << fd << " ";// 打印当前文件描述符的值,用于显示子进程正在关闭哪些文件描述符。
close(fd);// 关闭文件描述符
}
在子进程中遍历 oldfds 向量,关闭所有不需要的文件描述符。
具体来说,它的目的是确保子进程只保留与自己相关的文件描述符,关闭其他无关的文件描述符,从而避免资源泄露和潜在的问题。
close(pipefd[1]); // 子进程关闭写端,因为子进程只需要读取命令
dup2(pipefd[0], 0); // 将父进程管道读端重定向到标准输入
close(pipefd[0]); //关闭父进程读端
slaver(); // 执行子进程任务
dup2函数将管道的读端(pipefd[0])复制到标准输入(0)
这意味着之后从标准输入读取的数据实际上是从管道读取的
后续代码中可以直接使用read(0,…)来读取父进程发送的数据
数据流向:
父进程 ---> 写端(pipefd[1]) ---> 管道 ---> 读端(重定向到标准输入) ---> 子进程
子进程:
- 关闭写端(pipefd[1])
- 将读端重定向到标准输入
- 关闭原读端(因为已重定向)
15.进入子进程函数
// 子进程执行的函数
void slaver()
{
while(true)
{
int cmdcode = 0;
// 从标准输入(被重定向到管道)读取命令
int n = read(0, &cmdcode, sizeof(int));
if(n == sizeof(int))
{
std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " << cmdcode << std::endl;
// 执行对应的任务
if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
}
if(n == 0) break; // 管道关闭时退出
}
}
while(true),无限循环,持续监听命令
int cmdcode = 0;
int n = read(0, &cmdcode, sizeof(int));
read(0, …):从标准输入读取数据,因为前面做了重定向,实际是从管道读取
&cmdcode:存储读取数据的地址
sizeof(int):读取int大小的数据
n:返回实际读取的字节数
if(n == sizeof(int)) { // 成功读取到完整的命令
// 打印调试信息
std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " << cmdcode << std::endl;
// 执行对应任务
if(cmdcode >= 0 && cmdcode < tasks.size())
tasks[cmdcode](); // 调用任务函数
}
if(cmdcode >= 0 && cmdcode < tasks.size()),确保cmdcode非负,确保cmdcode小于任务数组大小,防止数组越界访问
tasks[cmdcode]();,tasks[cmdcode]获取对应的函数指针,()操作符调用该函数。
// 假设cmdcode = 0
tasks[0](); // 调用task1(),输出"lol 刷新日志"
// 假设cmdcode = 1
tasks[1](); // 调用task2(),输出"lol 更新野区,刷新出来野怪"
// 假设cmdcode = 2
tasks[2](); // 调用task3(),输出"lol 检测软件是否更新"
// 假设cmdcode = 3
tasks[3](); // 调用task4(),输出"lol 更新用户血量和蓝量"
if(n == 0) break; ,管道关闭时退出
16.slaver()结束,返回刚刚的
std::cout << "process : " << getpid() << " quit" << std::endl; //打印退出信息,getpid帮助我们确认哪个进程正在退出
exit(0); // 立即终止当前进程
17.然后执行InitProcessPool()函数的剩下来部分
// 父进程
close(pipefd[0]); // 关闭读端
// 创建新的channel并添加到channels中
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfds.push_back(pipefd[1]);
sleep(1);
close(pipefd[0]);,父进程只需写入命令,不需要读。及时关闭不需要的文件描述符
std::string name = "process-" + std::to_string(i);,为每个子进程创建唯一名称。
std::to_string(i) : 将数字i转为字符串,“+” : 字符串拼接运算符。
效果如:process-0, process-1, process-2…
channels->push_back(channel(pipefd[1], id, name));,push_back在容器末尾添加新元素。创建临时 channel 对象并添加到 vector
channel是一个结构体,存储子进程信息:
void InitProcessPool(std::vector<channel> *channels)
struct channel {
int fd; // 管道写端
pid_t pid; // 子进程ID
std::string name; // 进程名称
channel(int _fd, pid_t _pid, const std::string& _name)
: fd(_fd), pid(_pid), name(_name)
{}
};
oldfds.push_back(pipefd[1]);,添加管道写端的文件描述符。
保存文件描述符的用途:
- 用于后续关闭文件描述符
- 防止文件描述符泄漏
- 进程间通信的管理
- 资源清理
sleep(1);,休眠1s。
18.进入main函数,执行ctrlSlaver(channels);
// 控制子进程执行任务
void ctrlSlaver(const std::vector<channel> &channels)
{
int which = 0;
while(true)
{
int select = 0;
Menu();
std::cout << "Please Enter@ ";
std::cin >> select;
if(select <= 0 || select >= 5) break;
int cmdcode = select - 1;
// 轮询方式分配任务给子进程
std::cout << "father say: " << " cmdcode: " <<
cmdcode << " already sendto " << channels[which]._slaverid << " process name: "
<< channels[which]._processname << std::endl;
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
which++;
which %= channels.size();
}
}
轮询机制
int which = 0; // 轮询索引
which++;
which %= channels.size(); // 循环轮询
实现了循环分配任务给不同子进程
如果有3个进程,which的值会是 0,1,2,0,1,2…
任务选择
while(true) {
int select = 0;
Menu(); // 显示菜单
std::cout << "Please Enter@ ";
std::cin >> select; // 获取用户输入
if(select <= 0 || select >= 5) break; // 退出条件
int cmdcode = select - 1; // 将用户输入的选项编号转换为程序内部使用的命令代码。
}
发送任务示例
// 显示任务分配信息
std::cout << "father say: " << " cmdcode: " << cmdcode
<< " already sendto " << channels[which]._slaverid
<< " process name: " << channels[which]._processname << std::endl;
// 向子进程发送命令
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
cmdcode要执行的命令编号(0代表hello,1代表calc等)
_slaverid: 子进程的PID(进程ID)
_processname: 子进程的名称
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
channels[which]._cmdfd:管道的写端文件描述符
&cmdcode:命令代码的地址
sizeof(cmdcode):发送的字节数(int类型通常是4字节)
19.返回主函数,执行QuitProcess(channels);,清理进程池。
void QuitProcess(const std::vector<channel> &channels)
{
// 遍历所有channel对象
for(const auto &c : channels){
// 1. 关闭管道
close(c._cmdfd); // 关闭管道写端
// 2. 等待子进程结束
waitpid(c._slaverid, nullptr, 0); // 阻塞等待直到子进程结束
}
}
20.return 0;














![[STM32 标准库]EXTI应用场景 功能框图 寄存器](https://i-blog.csdnimg.cn/direct/2c653cd57c974ef5830ed5bcba0ef243.png#pic_center)



