Linux进程间通信----简易进程池实现

news2025/6/6 15:13:26

进程池的模拟实现

1.进程池的原理:

是什么

进程池是一种多进程编程模式,核心思想是先创建好一定数量的子进程用作当作资源,这些进程可以帮助完成任务并且重复利用,避免频繁的进程的创建和销毁的开销。

下面我们举例子来帮助理解:

完成任务需要工作人员,在主任务执行的途中源源不断的有新的支线任务来临。比如校园社团展览大会的场地布置是主任务,但是不同的场地要有不同的布置任务布置结构,在整理真个场地的途中你需要帮手去帮你解决小任务,社团A来发任务,你现场去找一个人让他去做A任务,社团B来发放任务,你又得去找一个人去完成B任务,效率极低。这时候你提前找好了10个帮手,让他们待命,来了任务直接将帮手派发出去,帮手完成任务后回到等待任务队列,整体效率就提高了很多并且免去了频繁找人的过程。

上面的故事里,帮手就是组成进程池的子进程,你作为父进程需要组织并管理这些子进程,让他们去帮你完成任务。

为什么

为什么要有进程池?操作系统在创建进程时要给进程分配内存等资源,在任务短频率高的情况下消耗极高。同时子进程可以反复利用,可以将这些子进程的创建成本分摊到多次任务中。

怎么做

首先创建进程池就需要预设好一定数量的进程,第一步就是创建一定数量的子进程

第二步,为了能让子进程帮我们完成任务,我们需要和子进程通信,利用上次讲到的管道

第三步,任务的发布和负载均衡,发布任务的时候尽量让每个进程都被平均的使用到,发挥进程池的优势。

实现类似下图的结构:

代码实现:

第一步,完成任务一:所需数量的创建管道和子进程

#include <iostream>
#include <vector>
#include <unistd.h>

using namespace std;
#define PIPEERRO 1

int num = 5; // 全局变量,表示进程池内的进程数量

int main()
{
    for (int i = 0; i < num; i++)
    {
        // 1.创建管道和子进程
        int pipefd[2];
        int n = pipe(pipefd);
        if (n < 0)
        {
            return PIPEERRO;
        }

        //2.创建子进程
        pid_t id=fork();
        if(id==0)
        {
            //子进程代码
            //子进程负责读取任务 ,关闭写端
            close(pipefd[1]);

            //读取任务

            //执行任务

            //退出
            exit(0);
        }
        //父进程代码
        //父进程负责写任务,关闭读端
        close(pipefd[0]);

    }

    return 0;
}

 框架搭建完毕后,我们需要对创建出来的管道和子进程进行管理,管理的6字真言:先描述再组织

为了管理好我们创建的这么多管道和子进程我们创建一个管道类Channel去描述管道和子进程的关系,然后用vector去组织他们。

创建的所有管道都被放入channels vector数组里管理起来,以后我们想对单个子进程发送任务就只需要发给对应的数组成员就行了,操作十分方便。

现在我们完成了第一步,写一个假的work函数看看能不能工作吧:

void work()
{
    cout<<"I am child process: "<<getpid()<<endl;
} 

int main()
{
    vector<Channel> channels;
    for (int i = 0; i < num; i++)
    {
        // 1.创建管道和子进程
        int pipefd[2];
        int n = pipe(pipefd);
        if (n < 0)
        {
            return PIPEERRO;
        }

        //2.创建子进程
        pid_t id=fork();
        if(id==0)
        {
            //子进程代码
            //子进程负责读取任务 ,关闭写端
            close(pipefd[1]);

            //读取任务

            //执行任务
            work();
            //退出
            exit(0);
        }
        //父进程代码
        //父进程负责写任务,关闭读端
        close(pipefd[0]);
        channels.emplace_back(id,pipefd[1]);
    }

    return 0;
}

执行结果:

十分成功,接下来我们去实现第二步:任务的发放

我们首先实现几种不同的任务函数,利用回调函数的方式让我们的子进程随机执行任务,为了完成这一步我们利用数组将task(任务)函数用数组管理起来,形成任务清单,任务的发放方式也改为向管道内发送task函数数组的下标,即发放task清单序号,子进程根据从管道内收到的序号去执行清单上对应task。

为了统一接口,我们定义一个进程池类统一接口去管理操作函数

class ProcessPool
{
public:
    ProcessPool(int num) : sub_proc_num(num)
    {
    }
    int CreateProcessPool()
    {
        //vector<Channel> channels;  类成员变量无需再次定义
        for (int i = 0; i < num; i++)
        {
            // 1.创建管道和子进程
            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0)
            {
                return PIPEERRO;
            }

            // 2.创建子进程
            pid_t id = fork();
            if (id == 0)
            {
                // 子进程代码
                // 子进程负责读取任务 ,关闭写端
                close(pipefd[1]);

                // 读取任务

                // 执行任务

                // 退出
                exit(0);
            }
            // 父进程代码
            // 父进程负责写任务,关闭读端
            close(pipefd[0]);
            channels.emplace_back(id, pipefd[1]);
        }
    }

    ~ProcessPool()
    {
    }

private:
    vector<Channel> channels;
    int sub_proc_num;
};

int main()
{
    ProcessPool * pp=new ProcessPool(num);//num 个进程的进程池
    pp->CreateProcessPool();

    return 0;
}

接下来:创建一个头文件专门放置任务函数

#pragma once

#include<iostream>
#include <unistd.h>
using namespace std;
typedef void(*work_t)();//执行任务总接口函数指针
typedef void(*task_t)();//任务函数指针,用于管理任务函数

//task函数实现
void Singing()
{
    cout<<"I am singing.... lalala~"<<endl;
}

void Dancing()
{
    cout<<"I am dancing.... siusiusiu~"<<endl;
}

void Playing()
{
    cout<<"I am playing piano.... DoReMi~"<<endl;
}

uint32_t NextTask()
{
    return rand()%3;//task数组下标
}

task_t task[3]{Singing,Dancing,Playing};//数组管理task函数

void worker()//总接口函数
{
    while(true)
    {
        uint32_t command_code;//任务码
        ssize_t byte_read = read(0,&command_code,sizeof(command_code));//读取管道内容(任务码)
        if(byte_read == sizeof(command_code))
        {
            if(command_code>3)continue;
            task[command_code]();//任务码用作下标
        }
        
    }
}

 实现任务清单,同时给出work总接口,兼具读取任务码和执行动作。

说明:

为什么死循环?死循环的目的是让子进程反复接受任务反复执行任务,这样能最大化利用进程池特性。

为什么command_code>3就continue?为了让子进程收到非法任务码时也能继续执行不影响后续任务执行。

主程序代码只需修改几个地方:

pp->CreateProcessPool(worker);//传总接口函数进去
int CreateProcessPool(work_t worker)
    {
        // vector<Channel> channels;  类成员变量无需再次定义
        for (int i = 0; i < num; i++)
        {
            // 1.创建管道和子进程
            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0)
            {
                return PIPEERRO;
            }

            // 2.创建子进程
            pid_t id = fork();
            if (id == 0)
            {
                // 子进程代码
                // 子进程负责读取任务 ,关闭写端
                close(pipefd[1]);

                // 读取任务
                dup2(pipefd[0], 0); // 重定向,将从stdin读取重定向到从管道读端读取
                // 执行任务
                worker(); // 总接口内有真正的读取管道操作
                // 退出
                exit(0);
            }
            // 父进程代码
            // 父进程负责写任务,关闭读端
            close(pipefd[0]);
            channels.emplace_back(id, pipefd[1]);
        }
        return 0;
    }

 说明:

因为在create函数里面进行了重定向,所有worker接口里的read是从0也就是stdin里读取

准备好了任务码的读取,接下来我们来完成任务码的发放:

注意两点:

        1.任务码随机发方

        2.子进程的轮询调度(为了让所有进程分摊任务)

// 任务码发放
    while (true)
    {
        // 选择任务码
        uint32_t code = NextTask();

        // 选择管道
        int index = pp->Select_Channel();
        // 发送任务码
        pp->SendCode(index,code);
        sleep(1);
    }
    int Select_Channel() // 轮询选择管道
    {
        static int next = 0; // static变量只定义一次
        int c = next;
        next++;
        next %= channels.size(); // next只能在channels的下标中循环
        return c;
    }
    void SendCode(int index, int code) // 向管道写入任务码
    {
        cout << "sending code : " << code << " to " << channels[index].getid()
             << " " << channels[index].getname() << endl;
        write(channels[index].getfd(), &code, sizeof(code)); // 向管道写入任务码
    }
uint32_t NextTask()
{
    return rand()%3;//task数组下标
}

 接下来看执行结果:

非常完美,任务自动不停的发送给每个管道内,由不同的子进程执行。

完整代码:

processpool.cc:

#pragma once

#include<iostream>
#include <unistd.h>
using namespace std;
typedef void(*work_t)();//执行任务总接口函数指针
typedef void(*task_t)();//任务函数指针,用于管理任务函数

//task函数实现
void Singing()
{
    cout<<"I am singing.... lalala~"<<endl;
}

void Dancing()
{
    cout<<"I am dancing.... siusiusiu~"<<endl;
}

void Playing()
{
    cout<<"I am playing piano.... DoReMi~"<<endl;
}

uint32_t NextTask()
{
    return rand()%3;//task数组下标
}

task_t task[3]{Singing,Dancing,Playing};//数组管理task函数

void worker()//总接口函数
{
    while(true)
    {
        uint32_t command_code;//任务码
        ssize_t byte_read = read(0,&command_code,sizeof(command_code));//读取管道内容(任务码)
        if(byte_read == sizeof(command_code))
        {
            if(command_code>3)continue;
            task[command_code]();//任务码用作下标
        }
        
    }
}

Task.hpp:

#pragma once

#include <iostream>
#include <unistd.h>

using namespace std;

typedef void (*work_t)(int);
typedef void (*task_t)();

void Singing()
{
    cout << "I am singing.... lalala" << endl;
}

void Dancing()
{
    cout << "I am dancing.... hahaha" << endl;
}

void Playing()
{
    cout << "I am playing piano.... doremi~" << endl;
}

task_t task[3] = {Singing, Dancing, Playing};

uint32_t Select_Task()
{
    return rand()%3;
}

void worker(int task_num)
{
    
    while (task_num)
    {
        uint32_t commandcode;
        ssize_t byte_read = read(0, &commandcode, sizeof(commandcode));
        if (byte_read == sizeof(commandcode))
        {
            if (commandcode > 3)
                continue;
            task[commandcode]();
            task_num--;
        }
        
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2398941.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

解锁Java多级缓存:性能飞升的秘密武器

一、引言 文末有彩蛋 在当今高并发、低延迟的应用场景中&#xff0c;传统的单级缓存策略往往难以满足性能需求。随着系统规模扩大&#xff0c;数据访问的瓶颈逐渐显现&#xff0c;如何高效管理缓存成为开发者面临的重大挑战。多级缓存架构应运而生&#xff0c;通过分层缓存设…

(纳芯微)NCA9548- DTSXR 具有复位功能的八通道 I²C 开关、所有I/O端子均可承受5.5V输入电压

深圳市润泽芯电子有限公司 推荐NOVOSENSE(纳芯微)品牌 NCA9548- DTSXR TSSOP-24封装 NCA9548- DTSXR 具有复位功能的八通道 IC 开关、所有I/O端子均可承受5.5V输入电压 产品描述 NCA9548是通过I2C总线控制的八路双向转换开关。 SCL / SDA上行数据分散到八对下行数据或通道。…

013旅游网站设计技术详解:打造一站式旅游服务平台

旅游网站设计技术详解&#xff1a;打造一站式旅游服务平台 在互联网与旅游业深度融合的时代&#xff0c;旅游网站成为人们规划行程、预订服务的重要工具。一个功能完备的旅游网站&#xff0c;通过用户管理、订单管理等核心模块&#xff0c;实现用户与管理员的高效交互。本文将…

2024 CKA模拟系统制作 | Step-By-Step | 12、题目搭建-创建多容器Pod

目录 免费获取题库配套 CKA_v1.31_模拟系统 一、题目 二、考点分析 1. 多容器 Pod 的理解 2. YAML 配置规范 3. 镜像版本控制 三、考点详细讲解 1. 多容器 Pod 的工作原理 2. 容器端口冲突处理 3. 资源隔离机制 四、实验环境搭建步骤 总结 免费获取题库配套 CKA_v…

优化 Spring Boot API 性能:利用 GZIP 压缩处理大型有效载荷

引言 在构建需要处理和传输大量数据的API服务时&#xff0c;响应时间是一个关键的性能指标。一个常见的场景是&#xff0c;即使后端逻辑和数据库查询已得到充分优化&#xff0c;当API端点返回大型数据集&#xff08;例如&#xff0c;数千条记录的列表&#xff09;时&#xff0…

【C盘瘦身】给DevEco Studio中HarmonyOSEmulator(鸿蒙模拟器)换个地方,一键移动给C盘瘦身

文章目录 一、HarmonyOSEmulator的安装路径二、修改路径 一、HarmonyOSEmulator的安装路径 之前安装了华为的DevEco Studio&#xff0c;当时没注意&#xff0c;后来C盘告急&#xff0c;想着估计是鸿蒙的模拟器占用空间比较大&#xff0c;一检查还真是躺在C盘。路径如下&#x…

ORACLE 缺失 OracleDBConsoleorcl服务导致https://xxx:port/em 不能访问

这个原因是&#xff0c;操作过一下 ORCL的服务配置变更导致的。 再PATH中添加个环境变量&#xff0c;路径如下 管理员权限运行cmd 等待创建完成 大概3分钟 查看服务 点击第一个访问&#xff0c;下图登录后的截图

VScode自动添加指定内容

在 VS Code 中&#xff0c;可以通过配置 用户代码片段&#xff08;User Snippets&#xff09; 或使用 文件模板扩展 来实现新建指定文件类型时自动添加指定内容。以下是具体方法&#xff1a; 方法 1&#xff1a;使用 VS Code 内置的「用户代码片段」 适用场景&#xff1a;适用…

Ubuntu 22.04 安装 Nacos 记录

Ubuntu 22.04 安装 Nacos 记录 本文记录了在 Ubuntu 22.04 系统上安装 Nacos 的完整过程&#xff0c;适用于本地测试或生产部署的基础搭建。 一、官方资源 官网下载地址&#xff1a;https://nacos.io/download/nacos-server/官网文档&#xff1a;https://nacos.io/docs/lates…

相机--RGBD相机

教程 分类原理和标定 原理 视频总结 双目相机和RGBD相机原理 作用 RGBD相机RGB相机深度&#xff1b; RGB-D相机同时获取两种核心数据&#xff1a;RGB彩色图像和深度图像&#xff08;Depth Image&#xff09;。 1. RGB彩色图像 数据格式&#xff1a; 标准三通道矩阵&#…

记一次idea中lombok无法使用的解决方案

在注解处理器下&#xff0c;一般 Default 为“启用注解处理”和“从项目类路径获取处理器”&#xff0c;但是我的项目中的为选择“处理器路径”&#xff0c;导致了无法识别lombok&#xff0c;因此&#xff0c;需要改为使用“从项目类路径获取处理器”这个选项。如下图所示&…

贪心算法应用:硬币找零问题详解

贪心算法与硬币找零问题详解 贪心算法&#xff08;Greedy Algorithm&#xff09;在解决优化问题时表现出简洁高效的特点&#xff0c;尤其适用于特定结构的组合优化问题。本文将用2万字篇幅&#xff0c;深入探讨贪心算法在硬币找零问题中的应用&#xff0c;覆盖算法原理、正确性…

深入理解 x86 汇编中的重复前缀:REP、REPZ/REPE、REPNZ/REPNE(进阶详解版)

一、重复前缀&#xff1a;串操作的 “循环加速器” 如果你写过汇编代码&#xff0c;一定遇到过需要重复处理大量数据的场景&#xff1a; 复制 1000 字节的内存块比较两个长达 200 字符的字符串在缓冲区中搜索特定的特征值 手动用loop指令编写循环&#xff1f;代码冗长不说&a…

Docker 在 AI 开发中的实践:GPU 支持与深度学习环境的容器化

人工智能(AI)和机器学习(ML),特别是深度学习,正以前所未有的速度发展。然而,AI 模型的开发和部署并非易事。开发者常常面临复杂的依赖管理(如 Python 版本、TensorFlow/PyTorch 版本、CUDA、cuDNN)、异构硬件(CPU 和 GPU)支持以及环境复现困难等痛点。这些挑战严重阻…

学习NuxtLink标签

我第一次接触这个标签&#xff0c;我都不知道是干嘛的&#xff0c;哈哈哈哈&#xff0c;就是他长得有点像routerLink&#xff0c;所以我就去查了一下&#xff01;哎&#xff01;&#xff01;&#xff01;真是一样的&#xff0c;哈哈哈哈&#xff0c;至少做的事情是一样的&#…

基于PostGIS的GeoTools执行原生SQL查询制图实践-以贵州省行政区划及地级市驻地为例

目录 前言 一、空间相关表简介 1、地市行政区划表 2、地市驻地信息表 3、空间查询检索 二、GeoTools制图实现 1、数据类型绑定 2、WKT转Geometry 3、原生SQL转SimpleFeatureCollection 4、集成调用 5、成果预览 三、总结 前言 在当今这个信息爆炸的时代&#xff0c…

NLP实战(5):基于LSTM的电影评论情感分析模型研究

目录 摘要 1. 引言 2. 相关工作 3. 方法 3.1 数据预处理 3.2 模型架构 3.3 训练策略 3.4 交叉验证 4. 实验与结果 4.1 数据集 4.2 实验结果 4.3训练日志 4.4 示例预测 5. 讨论 6. 结论 附录代码 展示和免费下载 摘要 本文提出了一种基于双向LSTM的深度学习模…

c++面向对象第4天---拷贝构造函数与深复制

含有对象成员的构造函数深复制与浅复制拷贝&#xff08;复制&#xff09;构造函数 第一部分&#xff1a;含有对象成员的构造函数 以下是一个学生 类包含日期成员出生日期的代码 #include<iostream> using namespace std; class Date { public:Date(int year,int month…

Windows版PostgreSQL 安装 vector 扩展

问题 spring-ai在集成PGVector向量存储的时候会报错如下&#xff0c;那么就需要安装pgsql的vector扩展。 SQL [CREATE EXTENSION IF NOT EXISTS vector]; 错误: 无法打开扩展控制文件 "C:/Program Files/PostgreSQL/9.6/share/extension/vector.control": No such …

KINGCMS被入侵

现象会强制跳转到 一个异常网站,请掉截图代码. 代码中包含经过混淆处理的JavaScript&#xff0c;它使用了一种技术来隐藏其真实功能。代码中使用了eval函数来执行动态生成的代码&#xff0c;这是一种常见的技术&#xff0c;恶意脚本经常使用它来隐藏其真实目的。 这段脚本会检…