15.进程间通信(一)

news2025/7/20 7:11:27

一、进程间通信介绍

进程间通信目的:
数据传输:一个进程需要将它的数据发送给另⼀个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
怎么通信?
进程间通信本质:先让不同的进程,看到同一份资源(内存)(然后才有通信的条件)
不能由任何一个进程提供,进程间数据隔离->OS提供系统调用->设计统一的通信接口。
什么是通信?

二、具体通信方式

1)基于文件的,管道通信

2)System V 本机通信

1.背景

基于已有的技术,直接进行通信。

2.原理

单独设计了一个内存级的文件,管道(复用了文件管理的代码)。

独特的系统调用:

/* On all other architectures */

int pipe(int pipefd[2]);  //数组第一个参数是读的fd,第二个参数的写的fd。

返回值:成功返回0,失败返回-1,错误码被设置。

管道:通过创建子进程,子进程拷贝一份和父进程一样的文件描述符表,指向同一个“文件”(管道),关闭相应的读写端,使得单向通信。

3.demo代码,测试接口。

测试代码如下:

#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

void ChildWrite(int wfd)
{
    std::cout << "子进程wfd:" << wfd << std::endl;
    char buff[1024];
    int cnt = 0;
    while (true)
    {
        sleep(1);
        snprintf(buff, sizeof(buff), "第%d次:子进程写入\n", cnt++);
        ssize_t ret = write(wfd, buff, strlen(buff));
        (void)ret;
    }
}

void FatherRead(int rfd)
{
    std::cout << "父进程rfd:" << rfd << std::endl;
    char buff[1024];
    int cnt = 1;
    while (cnt--)
    {
        sleep(5);
        ssize_t ret = read(rfd, buff, sizeof(buff) - 1);
        buff[ret] = 0;
        std::cout << buff << std::endl;
        if (ret == 0)
        {
            std::cout << "写关闭了,读关闭" << std::endl;
            break;
        }
    }
}

int main()
{
    // 1.创建管道
    int pipefd[2] = {0};
    int ret = pipe(pipefd);
    if (ret == -1)
        exit(1);

    // 2.创建子进程,子进程写,父进程读
    pid_t pid = fork();
    if (pid < 0)
        exit(1);
    else if (pid == 0)
    {
        // 子进程
        close(pipefd[0]);
        ChildWrite(pipefd[1]);
        close(pipefd[1]);
        exit(0);
    }
    else
    {
        // 父进程
        close(pipefd[1]);
        FatherRead(pipefd[0]);
        close(pipefd[0]);
        int status;
        waitpid(pid, &status, 0);
        std::cout << "子进程 exit code:" << ((status >> 8) & 0xFF) << " exit signal:" << (status & 0x7F) << std::endl; 
    }

    return 0;
}

5种特性(重点):

1)匿名管道,只能用来进行具有血缘关系的进程进行进程间通信(常用于父子)

2)管道文件,自带同步机制

3)管道是面向字节流的。(怎么读和怎么写没有必然关系)

4)管道是单向通信的。

属于半双工的一种特殊情况。

半双工:任何时刻,一个发,一个收。

全双工:任何时刻,可以同时收发(吵架)。

5)(管道)文件的生命周期是随进程的(引用计数)。

4中通信情况:

1)写慢,读快 ------ 读端就要阻塞(进程)

2)写快,读慢 ------ 满了的时候,写端就要阻塞等待

3)写关,读继续 ------ read读到返回值为0,表示文件结尾。

4)写继续,读关 ------ 写端写入无意义,OS不会做无意义的事情->OS会杀掉写端进程->发送异常信号 13 SIGPIPE 

在小于pipe_buf时,管道的写入被要求是原子性的(一次要写全写完)。

测试管道容量:一次写一个字节,写入测试信息。

4.基于匿名管道 --- 进程池 

原理:

父进程通过向指定的管道写入的方式来向子进程发送对应的任务。

父进程的wfd关闭,子进程会读到0,可以退出。

总体结构:

        要对匿名管道进行管理,先描述在组织,要有对应的类Channel,记录父进程的wfd和子进程的pid,在有对应管理多个类Channel的数据结构vector,对于匿名管道的管理就转换成了对于vector的增删查改。对于任务也做管理,与匿名管道管理类似。

逻辑:

        父进程创建若干个子进程,让子进程阻塞在read中,并死循环执行,子进程一直阻塞,等待父进程发送对应的任务码,根据任务码执行相应的任务。父进程采用轮询的方式,确保能够负载均衡,选择子进程发送随机的任务码,即向对应的匿名管道内写入任务码。

易错点:

回收时需注意,每次创建子进程时,子进程会继承上一次父进程的wfd,导致第1个管道有n个引用,第2个管道有n-1个引用 ... 第n个管道有一个引用。

解决方法1:倒着关闭

解决方法2:随便关闭。创建子进程后,子进程把之前从父进程继承下来的wfd全部关闭。

完整代码如下:

Processpool.hpp

#pragma once

#include <vector>
#include <cstdlib>
#include <cstdio>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"

class Channel
{
public:
    Channel(int wfd, int pid)
        : _wfd(wfd), _pid(pid)
    {
    }
    ~Channel()
    {
    }
    int Wfd() const { return _wfd; }
    int Pid() const { return _pid; }

    void Close() const { close(_wfd); }
    void Wait() const { waitpid(_pid, nullptr, 0); }

private:
    int _wfd; // 控制子进程
    int _pid; // 拿到子进程pid,方便回收
};

class ChannelManager
{
public:
    ChannelManager() : _next(0)
    {
    }

    // 为了保证负载均衡,采用轮询的方式
    const Channel& SelectChannel()
    {
        const Channel& c = _channels[_next];
        // 选出下标
        ++_next;
        _next %= _channels.size();
        return c;
    }

    void InsertChannel(int wfd, int pid)
    {
        _channels.emplace_back(wfd, pid);
    }

    void CloseFd() const
    {
        for(const auto &e : _channels)
        {
            e.Close();
        }
    }

    void CloseChannels() const
    {
        // 2.正着关,创建子进程时就关闭从父进程继承下来的wfd
        for (size_t i = 0; i < _channels.size(); i++)
        {
            // 关闭父进程的写fd,让子进程读到0个字节退出。
            _channels[i].Close();
            _channels[i].Wait();
            printf("等待成功,回收了子进程:%d\n", _channels[i].Pid());
        }

        // // 1.倒着关闭
        // for (int i = _channels.size() - 1; i >= 0; i--)
        // {
        //     // 关闭父进程的写fd,让子进程读到0个字节退出。
        //     _channels[i].Close();
        //     _channels[i].Wait();
        //     printf("等待成功,回收了子进程:%d\n", _channels[i].Pid());
        // }
    }

    ~ChannelManager()
    {
    }

private:
    std::vector<Channel> _channels;
    int _next;
};

class ProcessPool
{
public:
    ProcessPool()
    {
    }
    ~ProcessPool()
    {
    }

    void ChildRead(int rfd) const
    {
        int taskcode = 0;
        while (true)
        {
            ssize_t ret = read(rfd, &taskcode, sizeof(taskcode));
            // 父进程写端关闭了,子进程要结束
            if (ret == 0)
            {
                std::cout << "父进程写端关闭,子进程:" << getpid() << "退出" << std::endl;
                break;
            }
            // 读到的不是4字节,丢弃,重新读
            if (ret != sizeof(taskcode))
            {
                printf("丢弃\n");
                return;
            }
            // 执行相应任务
            printf("进程:%d ExcuteTask开始,ret:%d,taskcode:%d\n", getpid(), (int)ret, taskcode);
            _tm.ExecuteTask(taskcode);
        }
    }

    void Create(int num)
    {
        for (int i = 0; i < num; i++)
        {
            // 1.创建管道
            int pipefd[2] = {0};
            int ret = pipe(pipefd);
            if (ret != 0)
                exit(1);
            // 2.创建子进程
            pid_t pid = fork();
            // 3.关闭父读,子写
            if (pid < 0)
                exit(1);
            else if (pid == 0)
            {
                // 关掉从父进程继承下来的wfd
                _cm.CloseFd();
                // 子进程关闭写
                close(pipefd[1]);
                // 子进程工作
                printf("ChildRead开始,进程为%d\n", getpid());
                ChildRead(pipefd[0]);
                close(pipefd[0]);
                // 子进程完成工作,退出
                exit(0);
            }
            // 父进程关闭读
            close(pipefd[0]);
            // emplace_back直接构造,插入到_channels
            _cm.InsertChannel(pipefd[1], pid);
            // 循环num次
        }
    }

    // 选择一个子进程,随机发送任务
    void Run()
    {
        // 1.选择一个子进程
        const Channel &c = _cm.SelectChannel();
        printf("挑选的子进程为:%d\n", c.Pid());
        // 2.获取任务码
        int taskcode = _tm.TaskCode();
        // 3.发送任务码给子进程,子进程执行(写给子进程)
        printf("父进程:%d 写入taskcode:%d\n", getpid(), taskcode);
        ssize_t ret = write(c.Wfd(), &taskcode, sizeof(taskcode));
    }

    void Close() const
    {
        _cm.CloseChannels();
    }

private:
    TaskManager _tm;
    ChannelManager _cm;
};

task.hpp

#pragma once

#include <iostream>
#include <cstdio>
#include <vector>
#include <cstdlib>
#include <ctime>

void Open()
{
    std::cout << "这是一个打开的任务" << std::endl;
}
void Download()
{
    std::cout << "这是一个下载的任务" << std::endl;
}
void Upload()
{
    std::cout << "这是一个上传的任务" << std::endl;
}

typedef void (*task_t)();
class Task
{
public:
    Task(task_t task) : _task(task)
    {
    }
    ~Task()
    {
    }
    void operator()() const
    {
        _task();
    }
    task_t getTask() const {return _task;}
private:
    task_t _task;
};

class TaskManager
{
public:
    TaskManager()
    {
        // 设置种子数,采用随机任务形式
        srand((unsigned)time(nullptr));
        Register(Open);
        Register(Download);
        Register(Upload);
    }
    // 注册任务
    void Register(task_t t)
    {
        _tasks.emplace_back(t);
    }
    // 返回任务码
    int TaskCode() const
    {
        int ret = rand() % _tasks.size();
        return ret;
    }
    void ExecuteTask(int taskcode) const
    {
        if (taskcode < 0 || taskcode >= _tasks.size())
        {
            std::cout << "读取的任务码无效" << std::endl;
            return;
        }
        printf("进程:%d执行任务,taskcode为:%d\n",getpid(),taskcode);
        _tasks[taskcode]();
        printf("任务:%d 执行完毕\n",taskcode);
    }
    ~TaskManager()
    {
    }

private:
    std::vector<Task> _tasks;
};

main.cc

#include <iostream>
#include <unistd.h>
#include <sys/wait.h>
#include "ProcessPool.hpp"

int main()
{
    pid_t pid = fork();
    if (pid == 0)
    {
        ProcessPool pool;
        pool.Create(5);
        int cnt = 5;
        while (cnt--)
        {
            sleep(1);
            pool.Run();
        }
        sleep(10);
        pool.Close();
        exit(0);
    }
    int status;
    waitpid(pid, &status, 0);
    std::cout << "main exit code:" 
    << ((status >> 8) & 0xFF) << 
    " signal code:" << 
    (status & 0x7F) << std::endl;
    
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2387372.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI 数据采集实战指南:基于 Bright Data 快速获取招标讯息

AI 数据采集实战指南&#xff1a;基于Bright Data快速获取招标讯息 在招标行业中&#xff0c;快速、准确地获取招标公告、项目详情、投标截止日期和其他关键招标信息&#xff0c;是投标企业提高竞标成功率的核心竞争力。然而&#xff0c;招标信息往往分散在不同的平台和网页&a…

cursor使用mcp

问题说明 mcp就相当于给AI安装了工具包&#xff0c;它可以调用获取接口文档&#xff0c;网页&#xff0c;数据库等&#xff0c;基本上所有的mcp都是node程序&#xff0c;少数需要python环境 使用说明 使用mcp-mysql举例&#xff0c;下面是配置json "mysql": {&qu…

小白成长之路-计算机网络(四)

文章目录 前言一、网络连接查看1.netstat2.ss3.bond绑定3.1准备好这三个文件3.2添加bond配置文件3.3关闭网络图形化服务3.4重启 4.Linux下的抓包工具Wireshark 5、web压力测试工具6、路由追踪命令 二、[练习题](https://blog.csdn.net/m0_70730767/article/details/148262716?…

【Agent】MLGym: A New Framework and Benchmark for Advancing AI Research Agents

arxiv: https://arxiv.org/pdf/2502.14499 简介 Meta 推出的 MLGym 框架及配套基准 MLGym-Bench&#xff0c;为评估和开发LLM Agent在 AI 研究任务中的表现提供了全新工具。作为首个基于 Gym 的机器学习任务环境&#xff0c;MLGym 支持强化学习等算法对代理的训练&#xff0c…

Web安全测试-文件上传绕过-DVWA

Web安全测试-文件上传绕过-DVWA 很多网站都有上传资源(图片或者文件)的功能&#xff0c;资源上传后一般会存储在服务器的一个文件夹里面&#xff0c;如果攻击者绕过了上传时候的文件类型验证&#xff0c;传了木马或者其他可执行的代码上去&#xff0c;那服务器就危险了。 我用…

现代 CSS 高阶技巧:实现平滑内凹圆角的工程化实践

通过 数学计算 CSS mask 复合遮罩 实现的真正几何内凹效果&#xff1a; 背景是一张图片&#xff0c;用来证明中间的凹陷是透明的。 完整代码&#xff1a; app.js import FormPage from "./pages/formPage"; import "./App.css"; const App () > {re…

【运维自动化-标准运维】如何实现在不同步骤间传递参数

当流程有多个步骤时&#xff0c;经常需要把前面某个个步骤处理的结果传递给下一个或后面的步骤使用&#xff08;输出作为输入&#xff09;&#xff0c;这就是跨步骤传参的场景&#xff0c;标准运维通过特有的标记符号"<SOPS_VAR>key:value</SOPS_VAR> "来…

[AI]主流大模型、ChatGPTDeepseek、国内免费大模型API服务推荐(支持LangChain.js集成)

主流大模型特色对比表 模型核心优势适用场景局限性DeepSeek- 数学/代码能力卓越&#xff08;GSM8K准确率82.3%&#xff09;1- 开源生态完善&#xff08;支持医疗/金融领域&#xff09;7- 成本极低&#xff08;API价格仅为ChatGPT的2%-3%&#xff09;5科研辅助、代码开发、数据…

手机IP地址更换的影响与操作指南

在移动互联网时代&#xff0c;IP地址如同手机的“网络身份证”&#xff0c;其变更可能对上网体验、隐私安全及服务访问产生连锁反应。无论是为了绕过地域限制、保护隐私&#xff0c;还是解决网络冲突&#xff0c;了解IP更换的影响与正确操作方法都至关重要。本文将系统分析影响…

MongoDB 安全机制详解:全方位保障数据安全

在当今数据驱动的时代&#xff0c;数据库安全至关重要。MongoDB 作为一款流行的 NoSQL 数据库&#xff0c;广泛应用于 Web 应用、大数据分析和物联网等领域。然而&#xff0c;随着 MongoDB 的普及&#xff0c;其安全性也面临诸多挑战&#xff0c;如未授权访问、数据泄露和注入攻…

嵌入式学习之系统编程(六)线程

目录 一、线程 &#xff08;一&#xff09;线程概念 &#xff08;二&#xff09;特征 &#xff08;三&#xff09;优缺点 二、线程与进程的区别&#xff08;面问&#xff09; 三、多线程程序设计步骤 四、线程的创建&#xff08;相关函数&#xff09; 1、pthread_create…

大语言模型 提示词的少样本案例的 演示选择与排序新突破

提示词中 演示示例的选择与排序 这篇论文《Rapid Selection and Ordering of In-Context Demonstrations via Prompt Embedding Clustering》聚焦于提升大语言模型(LLMs)在自适应上下文学习(ICL)场景中演示示例的选择与排序效率 一、论文要解决的问题 在上下文学习(ICL)…

【算法篇】二分查找算法:基础篇

题目链接&#xff1a; 34.在排序数组中查找元素的第一个和最后一个位置 题目描述&#xff1a; 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值 target&#xff0c;返…

重磅发布 | 复旦533页《大规模语言模型:从理论到实践(第2版)》(免费下载)

在人工智能浪潮席卷全球的今天&#xff0c;大语言模型正以前所未有的速度推动着科技进步和产业变革。从 ChatGPT 到各类行业应用&#xff0c;LLM 不仅重塑了人机交互的方式&#xff0c;更成为推动学术研究与产业创新的关键技术。 面对这一飞速演进的技术体系&#xff0c;如何系…

智能体赋能效率,企业知识库沉淀价值:UMI企业智脑的双轮驱动!

智能体企业知识库&#xff1a;UMI企业智脑的核心功能与价值 在人工智能技术飞速发展的今天&#xff0c;企业智能化转型已经成为不可逆转的趋势。作为企业级AI智能体开发平台的佼佼者&#xff0c;优秘智能推出的UMI企业智脑&#xff0c;以其强大的智能体开发能力和全面的企业知…

vue项目 build时@vue-office/docx报错

我在打包vue项目时&#xff0c; 开始用的npm run build和cnpm run build&#xff0c;总是提示 vue-office/docx 错误&#xff0c;尝试过用cnpm重新安装node_modules几次都没用。类似下面的提示一直有。 Error: [commonjs--resolver] Failed to resolve entry for package "…

#RabbitMQ# 消息队列入门

目录 一 MQ技术选型 1 运行rabbitmq 2 基本介绍 3 快速入门 1 交换机负责路由消息给队列 2 数据隔离 二 Java客户端 1 快速入门 2 WorkQueue 3 FanOut交换机 4 Direct交换机 5 Topic交换机 *6 声明队列交换机 1 在配置类当中声明 2 使用注解的方式指定 7 消息转…

EPT(Efficient Prompt Tuning)方法,旨在解决提示调优(Prompt Tuning)中效率与准确性平衡和跨任务一致性的问题

EPT(Efficient Prompt Tuning)方法,旨在解决提示调优(Prompt Tuning)中效率与准确性平衡和跨任务一致性的问题 一、核心原理:分解提示与多空间投影 1. 提示分解:用低秩矩阵压缩长提示 传统问题: 长提示(如100个token)精度高但训练慢,短提示(如20个token)速度快但…

云原生安全核心:云安全责任共担模型(Shared Responsibility Model)详解

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 1. 基础概念 什么是云安全责任共担模型&#xff1f; 云安全责任共担模型&#xff08;Shared Responsibility Model, SRM&#xff09;是云服务提供商&…

go并发与锁之sync.Mutex入门

sync.Mutex 原理&#xff1a;一个共享的变量&#xff0c;哪个线程握到了&#xff0c;哪个线程可以执行代码 功能&#xff1a;一个性能不错的悲观锁&#xff0c;使用方式和Java的ReentrantLock很像&#xff0c;就是手动Lock&#xff0c;手动UnLock。 使用例子&#xff1a; v…