目录
进程间通信(IPC)
含义
目的
分类
匿名管道
原理
创建过程
特性
四大情况
close问题
代码练习
简单通信
进程池
小知识
进程间通信(IPC)
含义
就是让不同的进程能看到同一份资源,实现数据交流。
目的
1.数据传输:一个进程需要将它的数据发送给另一个进程
2.资源共享:多个进程之间共享同样的资源
3.通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)
4.进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并且能够及时知道它的状态改变
分类
主要分三大类
1.管道:基于文件的通信方法,一般为单向通信的信道。
- 匿名管道
- 命名管道
2.System V IPC :操作系统中单独设计的通信模块(只能进行本地通信)
- System V 消息队列
- System V 共享内存
- System V 信号量
3.POSIX IPC:网络间进程通信
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
匿名管道
原理
fork子进程时,PCB要拷贝,关联的文件的struct file结构体也会拷贝,父子进程的struct file通过自己的文件读写位置区访问文件属性和内容;为了保证父子读写不会冲突(同一个文件读写位置只有一个),所以要么父进程读,子进程写,要么子进程读,父进程写,这里父子进程就可以看到同一份资源了,这就是管道的原理。
创建过程
父进程以读写两种方式打开同一个管道文件,然后父进程创建子进程,子进程继承父进程,最后父进程和子进程关闭自己不需要的读或写方式。
1.父进程为什么以读写方式打开:为了让子进程也继承读写。
2.为什么关闭读写端:管道是单向通信的。
3.为什么单向通信:就是这样设计的。
pipe :创建匿名管道的系统调用
1.pipifd是输出型参数,会将创建的读写方式对应的文件描述符放到这个数组里,[0]为读,[1]为写。
2.成功返回0,失败返回-1,同时
errno
变量会被设置为相应的错误码,以指示具体的错误原因。
创建之后我们可以得知,匿名管道不存在文件名和路径,就不存在向磁盘中刷新数据,是由操作系统进行开辟和管理的内核中的一块缓冲区,是一个内存级的基于文件系统的文件。
特性
1.用于有血缘关系这样的进程进行进程通信(IPC)
2.单向通信
3.管道的生命周期随进程
4.面向字节流
5.管道自带同步机制
四大情况
1.写端不关,写端不写 管道里没有数据,读端会被阻塞(造就同步机制)
2.读端不关,读端不读 写满了(64kb,65536字节)就不会再写了
3.读端不关,写端先关 读端返回值会为0,表示读到文件结尾
4.写端不关,读端先关 OS会自动杀掉写进程,通过发送13号信号
close问题
如果多个子进程继承同一父进程时,在第一个之后的子进程会将父进程与之前的子进程建立的写端(读端)的fd也继承,让其引用计数++,这样如果最后按顺序进行close(fd)+wait 的话,是会出问题的。
例:父进程写,子进程读,创建第一个子进程时,父进程使用文件描述符表中3/4作为读端和写端,第一个子进程继承到3/4,然后父进程关闭3,子进程关闭4;接下来创建第二个子进程,父进程pipe会以3/5作为读端/写端,这时子进程继承,4也会被继承,这样这个子进程也就即指向了之前的管道,又指向它的管道,创建之后的子进程同理。
这样越先创建的管道,指向它的就越多,引用计数就越大,当父进程退出不写后,指向管道的写端的引用计数不为0,子进程认为仍有写端可以进行写,不会关闭。
解决方法:1.倒着关闭
2.在子进程创建时关闭之前子进程的继承来的fd。
代码练习
简单通信
main.cpp
#include "together.hpp"
int main()
{
int pipefd[2] = {0};
int piperet = pipe(pipefd);
if(piperet<0)
{
ERR_EXIT("pipe");
}
pid_t pid = fork();
if(pid<0)
{
ERR_EXIT("fork");
}
else if(pid==0)
{
close(pipefd[1]);
char buff[1024] = {'\0'};
while(1)
{
ssize_t readret = read(pipefd[0],buff,sizeof(buff)-1);
if(readret<0)
{
ERR_EXIT("read");
}
else if(readret==0)
{
cout<<"写端下班,我也下班喽"<<endl;
break;
}
else
{
buff[readret] = '\0';
cout<<"父进程说:"<<buff<<endl;
}
}
exit(0);
}
close(pipefd[0]);
ssize_t writeret = write(pipefd[1],"i am father",11);
sleep(2);
return 0;
}
together.hpp
#pragma once
#include <iostream>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
using namespace std;
#define ERR_EXIT(s)\
do{\
perror(s);\
exit(EXIT_FAILURE);\
}while(0)
Makefile
ano_pipe:main.cpp
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f ano_pipe
进程池
main.cpp
#include "pro_pool.hpp"
#define NUM 3
int main()
{
srand(time(nullptr));
//创建进程池
ProPool pool;
//初始化进程池
pool.InitProPool(NUM,[](int fd){
while(true)
{
int code = 0;
ssize_t n = read(fd,&code,sizeof(code));
if(n==sizeof(code))
{
if(code>=0&&code<NUM)
{
_task._t[code]();
}
else
{
std::cout<<"任务码错误 "<<code<<std::endl;
}
}
else if(n==0)
{
std::cout<<"子进程"<<getpid()<<"退出"<<std::endl;
break;
}
else
{
std::cout<<"read error"<<std::endl;
break;
}
}
});
//唤醒进程池
pool.Awake_Pro();
//结束进程池
pool.WaitPro();
return 0;
}
Makefile
pro_pool:main.cpp
g++ -o $@ $^ -std=c++11
.PHOMY:
clean:
rm -rf pro_pool
pro_pool.hpp
#ifndef PRO_POOL_HPP
#define PRO_POOL_HPP
#include <iostream>
#include <string>
#include <vector>
#include <functional>
#include <cstdio>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <time.h>
#include "task.hpp"
using namespace std;
using func_t = std::function<void(int fd)>;
Task_ _task;
class Channel
{
int _wfd;
std::string _name;
pid_t _pid;
public:
Channel() {}
Channel(int wfd, const std::string &name, pid_t pid) : _wfd(wfd), _name(name), _pid(pid) {}
~Channel() {}
int FD() { return _wfd; }
std::string Name() { return _name; }
pid_t Pid() { return _pid; }
void Close() { close(_wfd); }
void Wait()
{
pid_t ret = waitpid(_pid, nullptr, 0);
(void)ret;
}
};
class ProPool
{
std::vector<Channel> _channels;
int _progressnum = 0;
public:
ProPool() {}
~ProPool() {}
void InitProPool(int num, func_t cb)
{
_progressnum = num;
for (int i = 0; i < num; ++i)
{
int fdnum[2];
int ret = pipe(fdnum);
if (ret < 0)
exit(2);
pid_t pid = fork();
if (pid < 0)
exit(1);
else if (pid == 0)
{
close(fdnum[1]);
// 子进程执行任务
cb(fdnum[0]);
exit(0);
}
close(fdnum[0]);
std::string task_name = "channel " + to_string(i);
_channels.emplace_back(fdnum[1], task_name, pid);
}
}
void Awake_Pro()
{
for (int index = 0;; index++)
{
index %= _channels.size();
int code = rand() % _task.size();
std::cout << "唤醒" << _channels[index].Name() << std::endl;
ssize_t ret = write(_channels[index].FD(), &code, sizeof(code));
sleep(1);
}
}
void WaitPro()
{
while (_progressnum--)
{
_channels[_progressnum].Close();
_channels[_progressnum].Wait();
}
}
};
#endif
task.hpp
#include <iostream>
#include <vector>
#include <functional>
using namespace std;
using task_f = std::function<void ()>;
void download()
{
std::cout<<"download ing..."<<std::endl;
}
void unload()
{
std::cout<<"unload ing..."<<std::endl;
}
void run_childtask()
{
std::cout<<"run_childtask ing..."<<std::endl;
}
class Task_
{
public:
std::vector<task_f> _t;
Task_()
{
_t.push_back(download);
_t.push_back(unload);
_t.push_back(run_childtask);
}
size_t size(){return _t.size();}
};
小知识
1.C++中, .cpp .cxx .cc 都可以作为源文件的后缀 .hpp是头源结合,不过在编译时会被编译器当做头文件处理,所以单独的.hpp文件是不能形成正确的.exe的。
2.头文件的本质是函数和变量的声明的文本集合