进程池流程
- 父进程流程:
- 父进程创建N个子进程,让每个子进程挂起,等待进行文件传输
- 父进程创建监听套接字,等待客户端新连接到来
- 父进程创建epoll实例,监听响应的文件描述符
- 父进程接收客户端的连接请求,accept之后得到peerfd,之后传递给一个空闲子进程(进程间传递文件描述符)
- 如果管道可读,表示子进程已执行完任务,将子进程标记为空闲状态
- 子进程流程:
- 子进程recvFd一直阻塞在管道上,如果管道中有数据到来,子进程从recvmsg中返回,可以给客户端传输文件
- 传输文件结束后,关闭连接,通知父进程
- 等待下一次任务
搭建进程池框架
//子进程信息
typedef struct{
pid_t pid;
int fd;
short busy;
}process_date
父进程需要管理每一个子进程,根据父子进程各自流程,搭建进程池框架
父进程流程
#include"processpool.h"
#include<func.h>
int main(int argc, char argv[]){
//IP,port,pnum
ARGC_CHEAK(argc, 4);
int pnum = atoi(argv[3]);
//0.构建子进程的结构体信息
pProcess_data_t pProcessData = calloc(pnum, sizeof(Process_data));
//1.父进程创建N个子进程
makeChild(pProcessData, pnum);
//2.创建监听套接字,开启服务器监听操作
//创建TCP监听套接字
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
printf("listenfd : %d",listenfd);
int on;
int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
if (ret < 0)
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
//绑定网络地址
struct sockaddr_in saddr;
//初始化
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(atoi(argv[2]));
//指定IP地址
saddr.sin_addr.s_addr = inet_addr(argv[1]);
ret = bind(listenfd, (const struct sockaddr*)&saddr, sizeof(saddr));
if (ret < 0)
{
perror("bind");
close(listenfd);
exit(EXIT_FAILURE);
}
//进行监听
ret = listen(listenfd, pnum);
if (ret < 0)
{
perror("listen");
close(listenfd);
exit(EXIT_FAILURE);
}
printf("server start listening");
//3.创建epoll实例
int epfd = epoll_create1(0);
if (epfd < 0)
{
perror("epoll creat");
exit(EXIT_FAILURE);
}
//4.对文件描述符进行监听
//4.1对listenfd进行监听
struct epoll_event evt;
memset(&evt, 0, sizeof(evt));
evt.data.fd = listenfd;
evt.events = EPOLLIN;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &evt);
if (ret < 0)
{
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
for (int i = 0; i < pnum; i++)//对管道进行监听
{
struct epoll_event evt;
memset(&evt, 0, sizeof(evt));
evt.data.fd = pProcessData[i].fd;
evt.events = EPOLLIN;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pProcessData[i].fd, &evt);
if (ret < 0)
{
perror("epoll_ctl : listen process");
exit(EXIT_FAILURE);
}
}
struct epoll_event* pEvtArr = (struct epoll_event*)calloc(pnum + 1, sizeof(struct epoll_event));
while (1)
{
int nready = epoll_wait(epfd, &evt, pnum + 1, -1);
if (nready < 0)
{
perror("epollwait");
exit(EXIT_FAILURE);
}
for (size_t i = 0; i < nready; i++)
{
int fd = pEvtArr[i].data.fd;
if (fd == listenfd)//新连接交给子进程处理
{
int peerfd = accept(listenfd, NULL, NULL);
if (peerfd < 0)
{
perror("accept");
exit(EXIT_FAILURE);
}
printf("parent peerfd: %d\n");
//查找子进程
for (size_t j = 0; j < pnum; j++)
{
if (pProcessData[j].busy == 0)
{
sendFd(pProcessData[i].fd, peerfd);
pProcessData[j].busy = 1;
break;
}
}
//父进程必须关闭套接字,否则无法正常关闭peerfd
close(peerfd);
}
else{
//子进程完成任务,通过管道通知父进程
int howmany = 0;
int ret = read(fd, &howmany, sizeof(int));
if(ret < 0){
perror("read");
}
pProcessData[i].busy = 0;
printf("child %d is not busy.\n", pProcessData[i].pid );
}
}
}
return;
}
子进程流程
int childHandleTask(int pipefd){
printf("child %d\n", getpid());
while (1)
{
//子进程一直等待任务到来
//当没有任务到的时候,就阻塞在recvFd上
int childFd = -1;
printf("child recvFd ...\n");
recvFd(pipefd, &childFd);
printf("child %d recv %d.\n", getpid(), childFd);
//进行文件的发送
char buf[64] = "hello,client";
send(childFd, buf, strlen(buf), 0);
//文件发送完成关闭连接
close(childFd);
//写管道通知父进程
int one = 1;
write(pipefd, &one, sizeof(int));
}
}
int makeChild(pProcess_data_t pdata, int pnum){
for (size_t i = 0; i < pnum; i++)
{
int fds[2];
socketpair(AF_LOCAL, SOCK_STREAM, 0, fds);
pid_t pid = fork();
if (pid == 0)
{
//子进程
close(fds[0]);
//执行子进程任务
childHandleTask(fds[1]);
exit(EXIT_SUCCESS);
}
//父进程
close(fds[1]);
pdata[i].pid = pid;
pdata[i].fd = fds[0];
pdata[i].busy = 0;
}
return 0;
}