Linux进程间通信(管道)

news2025/7/20 21:33:10

进程间通信的目的

数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

进程间通信的必要性本质以及技术背景

进程间通信的必要性,单进程的,那么也就无法使用并发能力,更加无法实现多进程协同。传输数据,同步执行流,消息通知等,所以进程通信不是目的,而是手段。

进程间通信的技术背景

1.进程是具有独立性的。虚拟地址空间+页表保证进程运行的独立性(进程内核数据结构+进程的代码和数据)。

2.通信成本会比较高。

进程间通信的本质理解

1.进程间通信的前提,首先需要让不同的进程看到同一块"内存"(特定的结构组织的)

2.所以你所谓的进程看到同一块"内存",属于哪一个进程呢?不能隶属于任何一个进程,而应该更强调共享。 

管道 

管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
管道又分为两种,匿名管道和命名管道。

管道的原理:

这个就叫做管道,分别以读写的方式打开一个文件,fork()创建子进程,双方关闭自己不需要的文件描述符,管道就是文件,两个进程通过文件的方式进行通信。

匿名管道 

#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码


其实从上面这张都就可以看出管道是怎们样的,接下来就用代码来实现一下。 

#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

using namespace std;

// 为什么不定义全局buffer来进行通信呢?? 因为有写时拷贝的存在,无法更改通信!

int main()
{
    // 1. 创建管道
    int pipefd[2] = {0};
    int n = pipe(pipefd);
    assert(n == 0);
    (void)n;
#ifdef DEBUG
    cout << "pipefd[0]: " << pipefd[0] << endl; // 3
    cout << "pipefd[1]: " << pipefd[1] << endl; // 4
#endif
    // 2. 创建子进程
    pid_t id = fork();
    assert(id != -1);
    if (id == 0)
    {
        // 子进程 - 读
        // 3. 构建单向通信的信道,父进程写入,子进程读取
        // 3.1 关闭子进程不需要的fd
        close(pipefd[1]);
        char buffer[1024 * 8];
        while (true)
        {
            // sleep(20);
            // 写入的一方,fd没有关闭,如果有数据,就读,没有数据就等
            // 写入的一方,fd关闭, 读取的一方,read会返回0,表示读到了文件的结尾!
            ssize_t s = read(pipefd[0], buffer, sizeof(buffer) - 1);
            if (s > 0)
            {
                buffer[s] = '\0';
                cout << "child get a message[" << getpid() << "] Father# " << buffer << endl;
            }
            else if (s == 0)
            {
                cout << "writer quit(father), me quit!!!" << endl;
                break;
            }
        }
        exit(1);
    }
    // 父进程 - 写
    //  3. 构建单向通信的信道
    //  3.1 关闭父进程不需要的fd
    close(pipefd[0]);
    string message = "我是父进程,我正在给你发消息";
    char buffer[1024 * 8];
    int count = 0;
    char send_buffer[1024 * 8];
    while (true)
    {
        // 3.2 构建一个变化的字符串
        snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d",
                 message.c_str(), getpid(), count++);
        // 3.3 写入
        write(pipefd[1], send_buffer, strlen(send_buffer));
        // 3.4 故意sleep
        sleep(1);
        cout << count << endl;
        if (count == 5)
        {
            cout << "writer quit(father)" << endl;
            break;
        }
    }
    close(pipefd[1]);
    pid_t ret = waitpid(id, nullptr, 0);
    cout << "id : " << id << " ret: " << ret << endl;
    assert(ret > 0);
    (void)ret;

    return 0;
}

来讲一下这个代码的大致方向,第一部肯定是创建管道,然后就是创建子进程,这里我们是让父进程写子进程读,所以关闭父子进程不需要的fd,写入的一方,fd没有关闭,如果有数据,就读,没有数据就等,写入的一方,fd关闭, 读取的一方,read会返回0,表示读到了文件的结尾,最后关闭父子进程fd,回收子进程。

其从上面的代码我们就可以总结一下管道的特点。总结管道的特点,理解以前的管道 | ,管道是一个文件–读取–具有访问控制,显示器也是一个文件,父子同时往显示器写入的时候,有没有说一个会等另一个的情况呢,缺乏访问控制。

1.管道是用来进行具有血缘关系的进程进性进程间通信-- 常用于父子通信 

2.管道具有通过让进程间协同,提供了访问控制

3.管道提供的是面向流式的通信服务--面向字节流--_协议 

4.管道是基于文件的,文件的生命周期是随进程的,管道的生命周期是随进程的

5.管道是单向通信的,就是半双工通信的一种特殊情况 

6.写快,读慢,写满不能在写了

7.写慢,读快,管道没有数据的时候,读必须等待

8.写关,读0,标识读到了文件结尾

9.读关,写继续写,oS终止写进程

扩展 

分发多个任务的管理器

 代码:

//.hpp
//#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>
#include <functional>

typedef std::function<void()> func;

std::vector<func> callbacks;
std::unordered_map<int, std::string> desc;

void readMySQL()
{
    std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}

void execuleUrl()
{
    std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}

void cal()
{
    std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}

void save()
{
    std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}

void load()
{
    desc.insert({callbacks.size(), "readMySQL: 读取数据库"});
    callbacks.push_back(readMySQL);

    desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});
    callbacks.push_back(execuleUrl);

    desc.insert({callbacks.size(), "cal: 进行加密计算"});
    callbacks.push_back(cal);

    desc.insert({callbacks.size(), "save: 进行数据的文件保存"});
    callbacks.push_back(save);
}

void showHandler()
{
    for(const auto &iter : desc )
    {
        std::cout << iter.first << "\t" << iter.second << std::endl;
    }
}

int handlerSize()
{
    return callbacks.size();
}
//.cc
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"

#define PROCESS_NUM 5

using namespace std;

int waitCommand(int waitFd, bool &quit) // 如果对方不发,我们就阻塞
{
    uint32_t command = 0;
    ssize_t s = read(waitFd, &command, sizeof(command));
    if (s == 0)
    {
        quit = true;
        return -1;
    }
    assert(s == sizeof(uint32_t));
    return command;
}

void sendAndWakeup(pid_t who, int fd, uint32_t command)
{
    write(fd, &command, sizeof(command));
    cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}

int main()
{
    // 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗??
    load();
    // pid: pipefd
    vector<pair<pid_t, int>> slots;
    // 先创建多个进程
    for (int i = 0; i < PROCESS_NUM; i++)
    {
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        assert(n == 0);
        (void)n;
        pid_t id = fork();
        assert(id != -1);
        if (id == 0)
        {
            close(pipefd[1]);
            while (true)
            {
                // pipefd[0]
                // 等命令
                bool quit = false;
                int command = waitCommand(pipefd[0], quit); // 如果对方不发,我们就阻塞
                if (quit)
                    break;
                // 执行对应的命令
                if (command >= 0 && command < handlerSize())
                {
                    callbacks[command]();
                }
                else
                {
                    cout << "非法command: " << command << endl;
                }
            }
            exit(0);
        }
        // father,进行写入,关闭读端
        close(pipefd[0]); // pipefd[1]
        slots.push_back(pair<pid_t, int>(id, pipefd[1]));
    }
    // 父进程派发任务
    srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机
    while (true)
    {
        // 选择一个任务, 如果任务是从网络里面来的?
        int command = rand() % handlerSize();
        // 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡
        int choice = rand() % slots.size();
        // 把任务给指定的进程
        sendAndWakeup(slots[choice].first, slots[choice].second, command);
        sleep(1);
        // int select;
        // int command;
        // cout << "############################################" << endl;
        // cout << "#   1. show funcitons      2.send command  #" << endl;
        // cout << "############################################" << endl;
        // cout << "Please Select> ";
        // cin >> select;
        // if (select == 1)
        //     showHandler();
        // else if (select == 2)
        // {
        //     cout << "Enter Your Command> ";
        //     // 选择任务
        //     cin >> command;
        //     // 选择进程
        //     int choice = rand() % slots.size();
        //     // 把任务给指定的进程
        //     sendAndWakeup(slots[choice].first, slots[choice].second, command);
        // }
        // else
        // {
        // }
    }
    // 关闭fd, 所有的子进程都会退出
    for (const auto &slot : slots)
    {
        close(slot.second);
    }

    // 回收所有的子进程信息
    for (const auto &slot : slots)
    {
        waitpid(slot.first, nullptr, 0);
    }
}

命名管道

 命名管道可以从命令行上创建,命令行方法是使用下面这个命令 mkfifo filename                         删除管道文件:unlink filename
命名管道也可以从程序里创建,相关函数是:

匿名管道与命名管道的区别
匿名管道由pipe函数创建并打开。
命名管道由mkfifo函数创建,打开用open
FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义

接下来同样用一段代码来演示一下:

//Makefile
.PHONY:all
all:client mutiServer

client:client.cxx
	g++ -o $@ $^ -std=c++11
mutiServer:server.cxx
	g++ -o $@ $^ -std=c++11

.PHONY:clean
clean:
	rm -f client mutiServer
//client.cxx
#include "comm.hpp"

int main()
{
    // 1. 获取管道文件
    int fd = open(ipcPath.c_str(), O_WRONLY);
    if(fd < 0)
    {
        perror("open");
        exit(1);
    }

    // 2. ipc过程
    string buffer;
    while(true)
    {
        cout << "Please Enter Message Line :> ";
        std::getline(std::cin, buffer);
        write(fd, buffer.c_str(), buffer.size());
    }

    // 3. 关闭
    close(fd);
    return 0;
}
//log.hpp
#ifndef _LOG_H_
#define _LOG_H_

#include <iostream>
#include <ctime>

#define Debug   0
#define Notice  1
#define Warning 2
#define Error   3


const std::string msg[] = {
    "Debug",
    "Notice",
    "Warning",
    "Error"
};

std::ostream &Log(std::string message, int level)
{
    std::cout << " | " << (unsigned)time(nullptr) << " | " << msg[level] << " | " << message;
    return std::cout;
}


#endif
//comm.hpp
#ifndef _COMM_H_
#define _COMM_H_

#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "Log.hpp"

using namespace std;

#define MODE 0666
#define SIZE 128

string ipcPath = "./fifo.ipc";


#endif
//server.cxx
#include "comm.hpp"
#include <sys/wait.h>

static void getMessage(int fd)
{
    char buffer[SIZE];
    while (true)
    {
        memset(buffer, '\0', sizeof(buffer));
        ssize_t s = read(fd, buffer, sizeof(buffer) - 1);
        if (s > 0)
        {
            cout <<"["  << getpid() << "] "<< "client say> " << buffer << endl;
        }
        else if (s == 0)
        {
            // end of file
            cerr <<"["  << getpid() << "] " << "read end of file, clien quit, server quit too!" << endl;
            break;
        }
        else
        {
            // read error
            perror("read");
            break;
        }
    }
}

int main()
{
    // 1. 创建管道文件
    if (mkfifo(ipcPath.c_str(), MODE) < 0)
    {
        perror("mkfifo");
        exit(1);
    }

    Log("创建管道文件成功", Debug) << " step 1" << endl;

    // 2. 正常的文件操作
    int fd = open(ipcPath.c_str(), O_RDONLY);
    if (fd < 0)
    {
        perror("open");
        exit(2);
    }
    Log("打开管道文件成功", Debug) << " step 2" << endl;

    int nums = 3;
    for (int i = 0; i < nums; i++)
    {
        pid_t id = fork();
        if (id == 0)
        {
            // 3. 编写正常的通信代码了
            getMessage(fd);
            exit(1);
        }
    }
    for(int i = 0; i < nums; i++)
    {
        waitpid(-1, nullptr, 0);
    }
    // 4. 关闭文件
    close(fd);
    Log("关闭管道文件成功", Debug) << " step 3" << endl;

    unlink(ipcPath.c_str()); // 通信完毕,就删除文件
    Log("删除管道文件成功", Debug) << " step 4" << endl;

    return 0;
}

上面的代码其实就是实现server进程和client进程间的通信,其实从这些代码也可以看出来,匿名管道适用于父子进程间的通信,命名管道用于两个毫不相关的进程间通信。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/347045.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu20.04开启VNC远程服务配置教程

对于使用过PVE的大佬来说&#xff0c;在自己电脑安装虚拟机打开的画面惨不忍睹&#xff0c;其实它只是用错了地方。 今天给大家介绍一款控制工具&#xff0c;它叫VNC&#xff0c;是用来进行远程连接的非常好用的工具可以很完美的适配自己电脑的屏幕。显示效果如下&#xff1a;…

【Spark分布式内存计算框架——Spark SQL】3. SparkSQL 概述(下)、DataFrame

2.2 官方定义 SparkSQL模块官方定义&#xff1a;针对结构化数据处理Spark Module模块。 主要包含三层含义&#xff1a; 第一、针对结构化数据处理&#xff0c;属于Spark框架一个部分 结构化数据&#xff1a;一般指数据有固定的 Schema(约束)&#xff0c;例如在用户表中&…

MapReduce paper(2004)-阅读笔记

文章目录前言摘要&#xff08;Abstract)一、引言( Introduction)二、编程模型&#xff08;Programming Model&#xff09;三、实现&#xff08;Implementation&#xff09;3.1、执行概述&#xff08;Execution Overview&#xff09;3.2、主节点数据结构&#xff08;Master Data…

im 应用多点登录和消息漫游架构

本文主要节选和总结自沈剑大佬的 微信多点登录与QQ消息漫游架构随想和文章的评论&#xff0c;略有删改 1、多点登录和消息漫游架构 多点登录指的是同个账号可以在多类终端(安卓、pc)同时登录&#xff0c;但是同类终端只能在唯一的一台设备上登录。 消息漫游指的是服务端保存聊天…

超详细讲解线性表和顺序表!!

超详细讲解线性表和顺序表&#xff01;&#xff01;线性表顺序表顺序表的概念及结构静态顺序表动态顺序表顺序表接口实现1、创建2、初始化3、扩容4、尾插5、打印6、销毁7、尾删8、头插9、头删10、插入任意位置11、删除任意位置12、查找13、修改线性表 线性表&#xff08;linea…

分析C语言位域的访问开销

背景 C语言的位域用于描述结构体的指定字段占多少bit&#xff0c;使得多个字段可以存到一个字节里&#xff0c;也可以让一个字段占多个字节。它能减小结构体的内存占用&#xff0c;同时还能精确限定结构体字段的取值范围。 问题的提出 项目上有个硬件模块&#xff0c;它输入…

Linux系统下基于Docker安装Yapi,并且迁移Yapi数据

本文主要讲四个部分&#xff1a; 1.什么是Yapi 2.Centos7 下基于docker安装Yapi 3.Yapi数据迁移 4.利用Nginx反向代理 什么是Yapi YApi 是高效、易用、功能强大的 api 管理平台&#xff0c;旨在为开发、产品、测试人员提供更优雅的接口管理服务。可以帮助开发者轻松创建、…

网络安全-内网DNS劫持-ettercap

网络安全-内网DNS劫持-ettercap 前言 一&#xff0c;我也是初学者记录的笔记 二&#xff0c;可能有错误的地方&#xff0c;请谨慎 三&#xff0c;欢迎各路大神指教 四&#xff0c;任何文章仅作为学习使用 五&#xff0c;学习网络安全知识请勿适用于违法行为 学习网络安全知识请…

【C++、数据结构】哈希 — 闭散列与哈希桶的模拟实现

文章目录&#x1f4d6; 前言1. STL中哈希表的两个应用⚡1.1 &#x1f31f;unordered_set1.2 &#x1f31f;unordered_map2. 常见查找的性能对比&#x1f4a5;3. 哈希表模拟实现&#x1f3c1;3.1 哈希的概念&#xff1a;3.2 哈希函数&#xff1a;3.3 哈希冲突&#xff1a;3.4 闭…

RPC概念与理解(一)

目录 1.写在前面 2. 电商系统的演变 2.1 单一应用框架 2.2 垂直应用框架 2.3 分布式应用架构(RPC) 2.4 流动计算架构(SOA) 2.5 架构演变详解 3.远程调用方式 RPC Http 总结&#xff1a;对比RPC和http的区别 4.什么是PRC 5.RPC是什么 6.RPC剖析 6.1.RPC的工作原理…

Java并发编程解析之基于JDK源码解析Java领域中ReentrantLock锁的设计思想与实现原理

一、开头 在并发编程领域&#xff0c;有两大核心问题&#xff1a;一个是互斥&#xff0c;即同一时刻只允许一个线程访问共享资源&#xff1b;另一个是同步&#xff0c;即线程之间如何通信、协作。 主要原因是&#xff0c;对于多线程实现实现并发&#xff0c;一直以来&#xff…

【蓝桥杯集训3】二分专题(3 / 5)

目录 二分模板 1460. 我在哪&#xff1f; - 二分答案 哈希表 1221. 四平方和 - 哈希表 / 二分 1、哈希表 2、二分 自定义排序 1227. 分巧克力 - 113. 特殊排序 - 二分模板 l r >> 1 —— 先 r mid 后 l mid1 —— 寻找左边界 —— 找大于某个数的最小值lr…

SRV6跨域优势

背景 运营商网络作为一张覆盖全国的网络&#xff0c;其主体分为骨干&#xff0c;省干&#xff0c;城域网层级&#xff0c;主流的管理模式为分层级管理。随着运营商网络服务的终端规模不断增长&#xff0c;不同地理位置之间网络连接的需求变得非常的普遍&#xff0c;但不同网络…

SpringMVC使用JSTL

简介 JSTL 标签是一个开放源代码的 JSP 标签库&#xff0c;是由 apache 的 jakarta 小组来维护的&#xff1b;JSTL 只能运行在支持 JSP1.2 和Servlet2.3 规范的容器上&#xff0c;在 jsp 页面中经常用到&#xff0c;能帮助我们实现一些特殊的功能&#xff0c;例如&#xff1a;…

windows-Mysql的主从数据库同步设置

复制原有的mysql修改my.ini配置文件 修改端口号修改从数据的地址和从数据库的数据存放地址安装从数据库进入从数据库的bin目录&#xff0c;打开命令窗口输入命令&#xff1a;mysqld.exe install mysql-back --defaults-file "C:\ProgramData\MySQL\MySQL Server 5.7-back\…

1、创建第一个Android项目

1.1、创建Android工程项目&#xff1a;双击打开Android Studio。在菜单栏File中new-->new project3、在界面中选择Empty Activity&#xff0c;然后选择next4、在下面界面中修改工程名称&#xff0c;工程保存路径选择java语言&#xff0c;然后点击finishAndroid studio自动为…

实现了统一消息中心的微服务基础框架 JVS,快点赞收藏

一、开源项目简介基于JVS&#xff08;基于spring cloud封装&#xff09;的基础开源框架&#xff0c;实现了基于多对多租户能力的管理系统。二、基础框架实现功能支持数据管理支持分布式定时任务支持分布式日志采集支持系统监控支持动态配置中心支持模板消息支持链路跟踪支持邮件…

jenkins部署指定任意版本和配置详细教程 jenkins 2.361版本示例

Jenkins构建CI/CD什么是CI/CD&#xff1a;持续集成/持续发布---开发(git) -->git主库-->jenkins(gitjdktomcatmaven打包测试&#xff09;-->发布到tomcat服务器。持续集成(Continuous Integration, CI): 代码合并&#xff0c;构建&#xff0c;部署&#xff0c;测试都在…

【Airplay_BCT】Bonjour 和本地链接、域名和 DNS

Bonjour 零配置网络架构支持在局域网或广域网上发布和发现基于 TCP/IP 的服务。本文档概括介绍了 Bonjour 架构&#xff0c;并简要介绍了可用的 Bonjour API。 Bonjour 是 Apple 对一套零配置网络协议的实现。 Bonjour 旨在让用户更轻松地进行网络配置。 例如&#xff0c;Bon…

ChatGPT一路狂飙,NVMe SSD能否应对性能挑战?

近日&#xff0c;ChatGPT持续火爆&#xff0c;用户在短短两个月内迅速破亿&#xff0c;大量用户涌入导致ChatGPT访问和数据规模指数级增加。与数月前发布的版本相比&#xff0c;新的ChatGPT“智能”了许多&#xff0c;不仅可以像人类一样聊天交流&#xff0c;甚至能够完成一定程…