Epoll - Reactor 设计模式
以餐厅大点餐为例

Reactor优点

Epoll - IO多路复用
1.创建EPOLL 句柄
相关函数
epoll_create
#include <sys/epoll.h>
int epoll_create(int size); 
作用:
创建一个 epoll 实例
参数:
size 参数用于指定 epoll 实例中管理的文件描述符数量,不过该参数在现代 Linux 系统中已经被忽略,可以设置为任意值(除了 0)。
返回值:
如果创建成功,该文件描述符将是一个非负整数(用于后续的epoll操作);如果创建失败,该函数将返回 -1,并设置全局变量 errno 以指示错误原因。
2.向EPOLL对象中添加、修改或者删除感兴趣的事件
相关函数
epoll_ctl
#inclue<sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);    
参数
epfd是epoll_create产生的epoll句柄(epoll_create的返回值)
fd表示操作的文件描述符
op取值:EPOLL_CTL_ADD 添加新的事件到epoll中
EPOLL_CTL_MOD 修改EPOLL中的事件
EPOLL_CTL_DEL 删除epoll中的事件
epoll_event结构体定义如下:
struct epoll_event{
	__uint32_t  events;
	epoll_data_t data;
}
typedef union epoll_data{//表示与事件相关的信息
	void *ptr;
	int fd;
	uint32_t u32;
	uint64_t u64;
}epoll_data_t
 
events取值:
EPOLLIN 表示有数据可以读出(接受连接、关闭连接)
EPOLLOUT 表示连接可以写入数据发送(向服务器发起连接,连接成功事件) EPOLLERR 表示对应的连接发生错误
EPOLLHUP 表示对应的连接被挂起
3.收集在epoll监控的事件中已经发生的事件
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);   
epfd: epoll的描述符。
events:则是分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)。
maxevents: 本次可以返回的最大事件数目,通常 maxevents参数与预分配的events数组的大小是相等的。
timeout: 表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,立刻返回,不会等待。-1表示无限期阻塞
返回值
返回0表示监听超时
返回-1表示出错
大于0表示返回了需要处理的事件数
代码示例
用epoll实现了一个粗糙的http服务器
epoll_server.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>
//    int fd;
typedef struct _ConnectStat  ConnectStat;
typedef void(*response_handler) (ConnectStat * stat);
struct _ConnectStat {
	int fd;
	char name[64];
	char  age[64];
	struct epoll_event _ev;
	int  status;//0 -未登录   1 - 已登陆
	response_handler handler;//不同页面的处理函数
};
//http协议相关代码
ConnectStat * stat_init(int fd);
void connect_handle(int new_fd);
void do_http_respone(ConnectStat * stat);
void do_http_request(ConnectStat * stat);
void welcome_response_handler(ConnectStat * stat);
void commit_respone_handler(ConnectStat * stat);
const char *main_header = "HTTP/1.0 200 OK\r\nServer: Martin Server\r\nContent-Type: text/html\r\nConnection: Close\r\n";
static int epfd = 0;
void usage(const char* argv)
{
	printf("%s:[ip][port]\n", argv);
}
void set_nonblock(int fd)
{
	int fl = fcntl(fd, F_GETFL);
	fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
int startup(char* _ip, int _port)  //创建一个套接字,绑定,检测服务器
{
	//sock
	//1.创建套接字
	int sock = socket(AF_INET, SOCK_STREAM, 0);
	if (sock < 0)
	{
		perror("sock");
		exit(2);
	}
	int opt = 1;
	setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
	//2.填充本地 sockaddr_in 结构体(设置本地的IP地址和端口)
	struct sockaddr_in local;
	local.sin_port = htons(_port);
	local.sin_family = AF_INET;
	local.sin_addr.s_addr = inet_addr(_ip);
	//3.bind()绑定
	if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0)
	{
		perror("bind");
		exit(3);
	}
	//4.listen()监听 检测服务器
	if (listen(sock, 5) < 0)
	{
		perror("listen");
		exit(4);
	}
	//sleep(1000);
	return sock;    //这样的套接字返回
}
int main(int argc, char *argv[])
{
	if (argc != 3)     //检测参数个数是否正确
	{
		usage(argv[0]);
		exit(1);
	}
	int listen_sock = startup(argv[1], atoi(argv[2]));      //创建一个绑定了本地 ip 和端口号的套接字描述符
	//1.创建epoll    
	epfd = epoll_create(256);    //可处理的最大句柄数256个
	if (epfd < 0)
	{
		perror("epoll_create");
		exit(5);
	}
	struct epoll_event _ev;       //epoll结构填充 
	ConnectStat * stat = stat_init(listen_sock);
	_ev.events = EPOLLIN;         //初始关心事件为读
	_ev.data.ptr = stat;
	//_ev.data.fd = listen_sock;    //  
	//2.托管
	epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &_ev);  //将listen sock添加到epfd中,关心读事件
	struct epoll_event revs[64];
	int timeout = -1;
	int num = 0;
	int done = 0;
	while (!done)
	{
		//epoll_wait()相当于在检测事件
		switch ((num = epoll_wait(epfd, revs, 64, timeout)))  //返回需要处理的事件数目  64表示 事件有多大
		{
		case 0:                  //返回0 ,表示监听超时
			printf("timeout\n");
			break;
		case -1:                 //出错
			perror("epoll_wait");
			break;
		default:                 //大于零 即就是返回了需要处理事件的数目
		{
			struct sockaddr_in peer;
			socklen_t len = sizeof(peer);
			int i;
			for (i = 0; i < num; i++)
			{
				ConnectStat * stat = (ConnectStat *)revs[i].data.ptr;
				int rsock = stat->fd; //准确获取哪个事件的描述符
				if (rsock == listen_sock && (revs[i].events) && EPOLLIN) //如果是初始的 就接受,建立链接
				{
					int new_fd = accept(listen_sock, (struct sockaddr*)&peer, &len);
					if (new_fd > 0)
					{
						printf("get a new client:%s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));
						//sleep(1000);
						connect_handle(new_fd);
					}
				}
				else // 接下来对num - 1 个事件处理
				{
					if (revs[i].events & EPOLLIN)
					{
						do_http_request((ConnectStat *)revs[i].data.ptr);
					}
					else if (revs[i].events & EPOLLOUT)
					{
						do_http_respone((ConnectStat *)revs[i].data.ptr);
					}
					else
					{
					}
				}
			}
		}
		break;
		}//end switch
	}//end while
	return 0;
}
ConnectStat * stat_init(int fd) {
	ConnectStat * temp = NULL;
	temp = (ConnectStat *)malloc(sizeof(ConnectStat));
	if (!temp) {
		fprintf(stderr, "malloc failed. reason: %m\n");
		return NULL;
	}
	memset(temp, '\0', sizeof(ConnectStat));
	temp->fd = fd;
	temp->status = 0;
	//temp->handler = welcome_response_handler;
}
//初始化连接,然后等待浏览器发送请求
void connect_handle(int new_fd) {
	ConnectStat *stat = stat_init(new_fd);
	set_nonblock(new_fd);
	stat->_ev.events = EPOLLIN;
	stat->_ev.data.ptr = stat;
	epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &stat->_ev);    //二次托管
}
void do_http_respone(ConnectStat * stat) {
	stat->handler(stat);
}
void do_http_request(ConnectStat * stat) {
	//读取和解析http 请求
	char buf[4096];
	char * pos = NULL;
	//while  header \r\n\r\ndata
	ssize_t _s = read(stat->fd, buf, sizeof(buf) - 1);
	if (_s > 0)
	{
		buf[_s] = '\0';
		printf("receive from client:%s\n", buf);
		pos = buf;
		//Demo 仅仅演示效果,不做详细的协议解析
		if (!strncasecmp(pos, "GET", 3)) {
			stat->handler = welcome_response_handler;
		}
		else if (!strncasecmp(pos, "Post", 4)) {
			//获取 uri
			printf("---Post----\n");
			pos += strlen("Post");
			while (*pos == ' ' || *pos == '/') ++pos;
			if (!strncasecmp(pos, "commit", 6)) {//获取名字和年龄
				int len = 0;
				printf("post commit --------\n");
				pos = strstr(buf, "\r\n\r\n");
				char *end = NULL;
				if (end = strstr(pos, "name=")) {
					pos = end + strlen("name=");
					end = pos;
					while (('a' <= *end && *end <= 'z') || ('A' <= *end && *end <= 'Z') || ('0' <= *end && *end <= '9'))	end++;
					len = end - pos;
					if (len > 0) {
						memcpy(stat->name, pos, end - pos);
						stat->name[len] = '\0';
					}
				}
				if (end = strstr(pos, "age=")) {
					pos = end + strlen("age=");
					end = pos;
					while ('0' <= *end && *end <= '9')	end++;
					len = end - pos;
					if (len > 0) {
						memcpy(stat->age, pos, end - pos);
						stat->age[len] = '\0';
					}
				}
				stat->handler = commit_respone_handler;
			}
			else {
				stat->handler = welcome_response_handler;
			}
		}
		else {
			stat->handler = welcome_response_handler;
		}
		//生成处理结果 html ,write
		stat->_ev.events = EPOLLOUT;
		//stat->_ev.data.ptr = stat;
		epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);    //二次托管
	}
	else if (_s == 0)  //client:close
	{
		printf("client: %d close\n", stat->fd);
		epoll_ctl(epfd, EPOLL_CTL_DEL, stat->fd, NULL);
		close(stat->fd);
		free(stat);
	}
	else
	{
		perror("read");
	}
}
void welcome_response_handler(ConnectStat * stat) {
	const char * welcome_content = "\
<html lang=\"zh-CN\">\n\
<head>\n\
<meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
<title>This is a test</title>\n\
</head>\n\
<body>\n\
<div align=center height=\"500px\" >\n\
<br/><br/><br/>\n\
<h2>大家好,欢迎来到奇牛学院VIP 课!</h2><br/><br/>\n\
<form action=\"commit\" method=\"post\">\n\
尊姓大名: <input type=\"text\" name=\"name\" />\n\
<br/>芳龄几何: <input type=\"password\" name=\"age\" />\n\
<br/><br/><br/><input type=\"submit\" value=\"提交\" />\n\
<input type=\"reset\" value=\"重置\" />\n\
</form>\n\
</div>\n\
</body>\n\
</html>";
	char sendbuffer[4096];
	char content_len[64];
	strcpy(sendbuffer, main_header);
	snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", (int)strlen(welcome_content));
	strcat(sendbuffer, content_len);
	strcat(sendbuffer, welcome_content);
	printf("send reply to client \n%s", sendbuffer);
	write(stat->fd, sendbuffer, strlen(sendbuffer));
	stat->_ev.events = EPOLLIN;
	//stat->_ev.data.ptr = stat;
	epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
}
void commit_respone_handler(ConnectStat * stat) {
	const char * commit_content = "\
<html lang=\"zh-CN\">\n\
<head>\n\
<meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
<title>This is a test</title>\n\
</head>\n\
<body>\n\
<div align=center height=\"500px\" >\n\
<br/><br/><br/>\n\
<h2>欢迎学霸同学 %s  ,你的芳龄是 %s!</h2><br/><br/>\n\
</div>\n\
</body>\n\
</html>\n";
	char sendbuffer[4096];
	char content[4096];
	char content_len[64];
	int len = 0;
	len = snprintf(content, 4096, commit_content, stat->name, stat->age);
	strcpy(sendbuffer, main_header);
	snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", len);
	strcat(sendbuffer, content_len);
	strcat(sendbuffer, content);
	printf("send reply to client \n%s", sendbuffer);
	write(stat->fd, sendbuffer, strlen(sendbuffer));
	stat->_ev.events = EPOLLIN;
	//stat->_ev.data.ptr = stat;
	epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
} 
注意:
1.由accept函数产生的listen_sock只有一个,可以把它看作是一个信箱;epoll_wait函数监听的文件描述符只有两种可能:
a.监听客户端连接发起的listen_sock(唯一)
b.与客户端建立连接的文件描述符(每个客户端独对应一个)
for (i = 0; i < num; i++)
{
	ConnectStat* stat = (ConnectStat*)revs[i].data.ptr;//获取函数参数
	int rsock = stat->fd; //准确获取哪个事件的描述符
	//listen_sock只能有一个(代表信箱)
	if (rsock == listen_sock && (revs[i].events) && EPOLLIN) //如果是初始的 就接受,建立链接
	{
		int new_fd = accept(listen_sock, (struct sockaddr*)&peer, &len);
		if (new_fd > 0)
		{
			printf("get a new client:%s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));
			//sleep(1000);
			connect_handle(new_fd);
		}
	}
	else // 接下来对num - 1 个事件处理
	{
		if (revs[i].events & EPOLLIN)
		{
			do_http_request((ConnectStat*)revs[i].data.ptr);
		}
		else if (revs[i].events & EPOLLOUT)
		{
			do_http_respone((ConnectStat*)revs[i].data.ptr);
		}
		else
		{
		}
	}
} 
上面这段代码遍历就绪事件的数组revs[],判断事件对应的文件描述符是否是listen_sock:
a.若是listen_sock,则表示有客户端要建立连接,则调用accept函数接收连接,并调用connect_handle函数添加新的事件。
void connect_handle(int new_fd) {
	ConnectStat* stat = stat_init(new_fd);
	set_nonblock(new_fd);
	stat->_ev.events = EPOLLIN;
	stat->_ev.data.ptr = stat;
	epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &stat->_ev);    //二次托管
} 
b.若不是listen_sock,则是已经建立连接的事件,则调用request和response函数,接收客户端传来的数据或者对客户端进行回应
if (revs[i].events & EPOLLIN)
{
				do_http_request((ConnectStat*)revs[i].data.ptr);
}
else if (revs[i].events & EPOLLOUT)
{
				do_http_respone((ConnectStat*)revs[i].data.ptr);
}
else
{
} 
request表示从客户端读取数据并处理,response表示对客户端进行回应。
do_http_request函数如下:
void do_http_request(ConnectStat* stat) {
	//读取和解析http 请求
	char buf[4096];
	char* pos = NULL;
	//while  header \r\n\r\ndata
	ssize_t _s = read(stat->fd, buf, sizeof(buf) - 1);
	if (_s > 0)
	{
		buf[_s] = '\0';
		printf("receive from client:%s\n", buf);
		pos = buf;
		//Demo 仅仅演示效果,不做详细的协议解析
		if (!strncasecmp(pos, "GET", 3)) {
			stat->handler = welcome_response_handler;
		}
		else if (!strncasecmp(pos, "Post", 4)) {
			//获取 uri
			printf("---Post----\n");
			pos += strlen("Post");
			while (*pos == ' ' || *pos == '/') ++pos;
			if (!strncasecmp(pos, "commit", 6)) {//获取名字和年龄
				int len = 0;
				printf("post commit --------\n");
				pos = strstr(buf, "\r\n\r\n");
				char* end = NULL;
				if (end = strstr(pos, "name=")) {
					pos = end + strlen("name=");
					end = pos;
					while (('a' <= *end && *end <= 'z') || ('A' <= *end && *end <= 'Z') || ('0' <= *end && *end <= '9'))	end++;
					len = end - pos;
					if (len > 0) {
						memcpy(stat->name, pos, end - pos);
						stat->name[len] = '\0';
					}
				}
				if (end = strstr(pos, "age=")) {
					pos = end + strlen("age=");
					end = pos;
					while ('0' <= *end && *end <= '9')	end++;
					len = end - pos;
					if (len > 0) {
						memcpy(stat->age, pos, end - pos);
						stat->age[len] = '\0';
					}
				}
				stat->handler = commit_respone_handler;
			}
			else {
				stat->handler = welcome_response_handler;
			}
		}
		else {
			stat->handler = welcome_response_handler;
		}
		//生成处理结果 html ,write
		stat->_ev.events = EPOLLOUT;
		//stat->_ev.data.ptr = stat;
		epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);    //二次托管
	}
	else if (_s == 0)  //client:close
	{
		printf("client: %d close\n", stat->fd);
		epoll_ctl(epfd, EPOLL_CTL_DEL, stat->fd, NULL);
		close(stat->fd);
		free(stat);
	}
	else
	{
		perror("read");
	}
}
 
调用read从客户端读取数据并分析:
1.读取长度若为0,表示客户端已经关闭,删除对应的事件并关闭描述符
2.读取长度不为0,根据客户端发送的不同请求(GET/POST)设置事件对应的执行函数,并将事件改成EPOLLOUT表示向客户端输出数据。
do_http_response函数代码如下:
void do_http_respone(ConnectStat* stat) {
	stat->handler(stat);
} 
很简单,执行事件数据函数(该函数由do_http_request在分析客户端发来的请求时设置)。
水平触发和边缘触发
Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!!
设置方式: 默认即水平触发
Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!! 设置方式: stat->_ev.events = EPOLLIN | EPOLLET
关键问题
如何解决事件与 连接socket句柄挂钩,快速完成检索?
如何突破 系统默认状态最多允许 1024 个连接限制?
cmd输入
ulimit -a 
差距查看open files

表示进程可打开的文件句柄数最大值
使用
ulimit -n 100000 
进行修改
epoll 监听的事件没有超时处理机制,如何处理?
参考epoll框架
高并发epoll的封装
源代码在Github仓库中:

本来想传的,弄了半天一直传不上给我整笑了😓
代码剖析
global.h
struct _fde {
    unsigned int type;//类型
    u_short local_port;//本地端口
    u_short remote_port;//远程端口
    struct in_addr local_addr;//本地地址
 
    char ipaddr[16];		/* dotted decimal address of peer */
   
   
    PF *read_handler;//读处理函数指针
    void *read_data;//读的数据
    PF *write_handler;//写处理的函数指针
    void *write_data;//写的数据
    PF *timeout_handler;//超时处理的...
    time_t timeout;//超时阈值
    void *timeout_data;
}; 
 
定义了和文件描述符相关的信息
extern fde *fd_table; 
fd_table数组用来保存每一个文件句柄的信息.
eg:fd_table[1] 表示fd=1对应的文件句柄的信息
/*系统时间相关,设置成全局变量,供所有模块使用*/
extern struct timeval current_time;
extern double current_dtime;
extern time_t sys_curtime; 
定义了一些时间变量,用于超时处理.
/* epoll 相关接口实现 */
extern void do_epoll_init(int max_fd);
extern void do_epoll_shutdown();
extern void epollSetEvents(int fd, int need_read, int need_write);
extern int do_epoll_select(int msec); 
定义了epoll相关的一些接口.
/*框架外围接口*/
void comm_init(int max_fd);
extern int comm_select(int msec);
extern inline void comm_call_handlers(int fd, int read_event, int write_event);
void  commUpdateReadHandler(int fd, PF * handler, void *data);
void commUpdateWriteHandler(int fd, PF * handler, void *data); 
定义了框架的外围接口
com_epoll.c
定义了一些全局变量:
/* epoll structs */
static int kdpfd;
static struct epoll_event events[MAX_EVENTS];//传入epoll_wait做参数
static int epoll_fds = 0;//目前在监听的文件句柄总数
static unsigned *epoll_state;	/* 保存每个epoll 的事件状态 */ 
为什么这里要设置epoll_state数组?

我们可以调用epoll_ctl函数来添加、修改、删除事件,但是对于具体的事件监听状态是难以获知的。
我们需要设置一个数组来获取每一个文件句柄对应的事件状态,以便进行修改(setEpollEvnet函数)
static const char *
epolltype_atoi(int x)//把epolltpye类型转为字符串类型
{
    switch (x) {
    case EPOLL_CTL_ADD:
	return "EPOLL_CTL_ADD";
    case EPOLL_CTL_DEL:
	return "EPOLL_CTL_DEL";
    case EPOLL_CTL_MOD:
	return "EPOLL_CTL_MOD";
    default:
	return "UNKNOWN_EPOLLCTL_OP";
    }
} 
将epoll_wait的相关命令转变为对应的字符形式
void do_epoll_init(int max_fd)
{
    
    kdpfd = epoll_create(max_fd);
    if (kdpfd < 0)
	  fprintf(stderr,"do_epoll_init: epoll_create(): %s\n", xstrerror());
    //fd_open(kdpfd, FD_UNKNOWN, "epoll ctl");
    //commSetCloseOnExec(kdpfd);
    epoll_state = calloc(max_fd, sizeof(*epoll_state));//状态数组,保存每一个event的状态
    //epoll_state[fd] 访问fd对应事件的状态
} 
对于epoll_create进行分装,传入最大文件描述符 ,
并初始化数组epoll_state用来存放每一个事件的状态:

void do_epoll_shutdown()
{
    
    close(kdpfd);
    kdpfd = -1;
    safe_free(epoll_state);
} 
关闭epoll句柄,并释放事件状态数组所占内存。
void epollSetEvents(int fd, int need_read, int need_write)
{
    int epoll_ctl_type = 0;
    struct epoll_event ev;
    assert(fd >= 0);
    debug(5, 8) ("commSetEvents(fd=%d)\n", fd);
	memset(&ev, 0, sizeof(ev));
    
    ev.events = 0;
    ev.data.fd = fd;
    if (need_read)
	ev.events |= EPOLLIN;
    if (need_write)
	ev.events |= EPOLLOUT;
    if (ev.events)//EPOLLHUP、EPOLLERR为必设状态
	ev.events |= EPOLLHUP | EPOLLERR;
    //自动判断epoll_ctl的op类型
    if (ev.events != epoll_state[fd]) {
	/* If the struct is already in epoll MOD or DEL, else ADD */
	if (!ev.events) {
	    epoll_ctl_type = EPOLL_CTL_DEL;
	} else if (epoll_state[fd]) {
	    epoll_ctl_type = EPOLL_CTL_MOD;
	} else {
	    epoll_ctl_type = EPOLL_CTL_ADD;
	}
	//更新数组
	epoll_state[fd] = ev.events;
	if (epoll_ctl(kdpfd, epoll_ctl_type, fd, &ev) < 0) {
	    debug(5, 1) ("commSetEvents: epoll_ctl(%s): failed on fd=%d: %s\n",
		epolltype_atoi(epoll_ctl_type), fd, xstrerror());
	}
	switch (epoll_ctl_type) {
	case EPOLL_CTL_ADD:
	    epoll_fds++;
	    break;
	case EPOLL_CTL_DEL:
	    epoll_fds--;
	    break;
	default:
	    break;
	}
    }
} 
实现对于epoll_ctl函数的封装
由传入的need_read、need_write参数决定事件是要读还是写,并且无论是读还是写,
无论是读还是写都设置EPOLLHUP和EPOLLERR
- EPOLLHUP:表示该文件描述符的连接被挂起,通常是指连接断开或者对方关闭连接。
 - EPOLLERR:表示该文件描述符发生错误,例如连接出现错误、连接被重置等。
 
数组epoll_state中存储了在调用epollSetEvents之前,fd对应的事件状态,这里通过比较事件状态的新值(存储在新创建的ev中)和旧值(存储在event_state数组中)来决定是新增、修改或删除事件状态:
 //自动判断epoll_ctl的op类型
 if (ev.events != epoll_state[fd]) {
	/* If the struct is already in epoll MOD or DEL, else ADD */
	if (!ev.events) {//新事件状态为0,则要进行删除
  epoll_ctl_type = EPOLL_CTL_DEL;
	} else if (epoll_state[fd]) {//新、旧事件状态不为0,则要进行修改
  epoll_ctl_type = EPOLL_CTL_MOD;
	} else {//旧事件状态为0,则进行添加
  epoll_ctl_type = EPOLL_CTL_ADD;
	} 
并且要更新epoll_state数组实现同步:
epoll_state[fd] = ev.events; 
最后调用epoll_ctl函数,并更新一些全局变量:
if (epoll_ctl(kdpfd, epoll_ctl_type, fd, &ev) < 0) {
    debug(5, 1) ("commSetEvents: epoll_ctl(%s): failed on fd=%d: %s\n",
	epolltype_atoi(epoll_ctl_type), fd, xstrerror());
}
switch (epoll_ctl_type) {
case EPOLL_CTL_ADD:
    epoll_fds++;
    break;
case EPOLL_CTL_DEL:
    epoll_fds--;
    break;
default:
    break;
} 
 
int do_epoll_select(int msec)
{
    int i;
    int num;
    int fd;
    struct epoll_event *cevents;
    /*if (epoll_fds == 0) {
	assert(shutting_down);
	return COMM_SHUTDOWN;
    }
    statCounter.syscalls.polls++;
    */
    num = epoll_wait(kdpfd, events, MAX_EVENTS, msec);
    if (num < 0) {
	getCurrentTime();
	if (ignoreErrno(errno))//可以忽略的错误
	    return COMM_OK;
	debug(5, 1) ("comm_select: epoll failure: %s\n", xstrerror());
	return COMM_ERROR;
    }
    //statHistCount(&statCounter.select_fds_hist, num);
    if (num == 0)
	return COMM_TIMEOUT;
    //num表示事件就绪的句柄数目
    for (i = 0, cevents = events; i < num; i++, cevents++) {
		fd = cevents->data.fd;
		comm_call_handlers(fd, cevents->events & ~EPOLLOUT, cevents->events & ~EPOLLIN);//是否有读事件?是否有写事件?
    }
    return COMM_OK;
}
 
对epoll_wait函数进行封装:
a.epoll_wait返回值<0表示出错,判断是否是可忽略的错误,若是可忽略的错误则返回COMM_OK,否则返回COMM_ERROR
b.返回值=0表示超时,返回COMM_TIMEOUT
c.返回值num>0表示有事件可以处理,可处理的事件会放在events数组中0~num-1的位置,遍历数组,执行相应的事件处理函数
comm_call_handlers函数如下:
inline void
comm_call_handlers(int fd, int read_event, int write_event)
{
    fde *F = &fd_table[fd];
    
    debug(5, 8) ("comm_call_handlers(): got fd=%d read_event=%x write_event=%x F->read_handler=%p F->write_handler=%p\n"
	,fd, read_event, write_event, F->read_handler, F->write_handler);
    if (F->read_handler && read_event) {
	    PF *hdl = F->read_handler;
	    void *hdl_data = F->read_data;
	    /* If the descriptor is meant to be deferred, don't handle */
		debug(5, 8) ("comm_call_handlers(): Calling read handler on fd=%d\n", fd);
		//commUpdateReadHandler(fd, NULL, NULL);
		hdl(fd, hdl_data);
    }
	
    if (F->write_handler && write_event) {
	
	    PF *hdl = F->write_handler;
	    void *hdl_data = F->write_data;
	
	    //commUpdateWriteHandler(fd, NULL, NULL);
	    hdl(fd, hdl_data);
    }
}
 
为fd对应的事件执行相应的读处理/写处理函数
common.c
time_t getCurrentTime(void)//获取时间戳,用来做超时处理
{
    gettimeofday(¤t_time, NULL);
    current_dtime = (double) current_time.tv_sec +
	(double) current_time.tv_usec / 1000000.0;
    return sys_curtime = current_time.tv_sec;
}
 
获取当前时间戳,并以秒为单位返回(用来做超时处理);同时还将时间戳以双精度浮点数的形式存储在current'_dtime中
current_time.tv_usec/1000000.0表示将微秒转换为秒
int
commSetTimeout(int fd, int timeout, PF * handler, void *data)//设置超时处理函数
{
    fde *F;
    debug(5, 3) ("commSetTimeout: FD %d timeout %d\n", fd, timeout);
    assert(fd >= 0);
    assert(fd < Biggest_FD);
    F = &fd_table[fd];
	
    if (timeout < 0) {//表示不执行超时处理
	F->timeout_handler = NULL;
	F->timeout_data = NULL;
	return F->timeout = 0;
    }
    assert(handler || F->timeout_handler);
    if (handler || data) {
	F->timeout_handler = handler;
	F->timeout_data = data;
    }
    return F->timeout = sys_curtime + (time_t) timeout;
} 
设置超时处理函数:
timeout的单位是秒,timeout<0表示不进行超时处理
超时的时间设置为当前的时间+timeout(当时间达到了F->timeout就执行超时处理函数)
int
comm_select(int msec)
{
    static double last_timeout = 0.0;
    int rc;
    double start = current_dtime;
    debug(5, 3) ("comm_select: timeout %d\n", msec);
    if (msec > MAX_POLL_TIME)
	msec = MAX_POLL_TIME;
    //statCounter.select_loops++;
    /* Check timeouts once per second */
    if (last_timeout + 0.999 < current_dtime) {
	last_timeout = current_dtime;
	checkTimeouts();//checkTimeouts一秒钟调用一次
    } else {
	int max_timeout = (last_timeout + 1.0 - current_dtime) * 1000;
	if (max_timeout < msec)
	    msec = max_timeout;
    }
    //comm_select_handled = 0;
    rc = do_epoll_select(msec);
    getCurrentTime();
    //statCounter.select_time += (current_dtime - start);
    if (rc == COMM_TIMEOUT)
	debug(5, 8) ("comm_select: time out\n");
    return rc;
}
 
执行一个事件选择操作,控制超时时间,实时更新时间戳,并执行相应的超时检查和处理。
重点是下面这一部分:
/* Check timeouts once per second */
if (last_timeout + 0.999 < current_dtime) {
last_timeout = current_dtime;
checkTimeouts();//checkTimeouts一秒钟调用一次
} else {
int max_timeout = (last_timeout + 1.0 - current_dtime) * 1000;
if (max_timeout < msec)
    msec = max_timeout;
}
//comm_select_handled = 0;
rc = do_epoll_select(msec);
getCurrentTime(); 
这一部分保证了checkTimeouts函数(处理超时事件)每秒执行一次
static void
checkTimeouts(void)//处理超时事件
{
    int fd;
    fde *F = NULL;
    PF *callback;
    for (fd = 0; fd <= Biggest_FD; fd++) {
	F = &fd_table[fd];
	/*if (!F->flags.open)
	    continue;
	*/
	
	if (F->timeout == 0)
	    continue;
	if (F->timeout > sys_curtime)
	    continue;
	debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd);
	
	if (F->timeout_handler) {
	    debug(5, 5) ("checkTimeouts: FD %d: Call timeout handler\n", fd);
	    callback = F->timeout_handler;
	    F->timeout_handler = NULL;
	    callback(fd, F->timeout_data);
	} else {
	    debug(5, 5) ("checkTimeouts: FD %d: Forcing comm_close()\n", fd);
	    comm_close(fd);
	}
    }
}
 
如果有事件超时,则执行处理函数
void
commUpdateReadHandler(int fd, PF * handler, void *data)
{
    fd_table[fd].read_handler = handler;
    fd_table[fd].read_data = data;
    
    epollSetEvents(fd,1,0); //设置读事件
}
void
commUpdateWriteHandler(int fd, PF * handler, void *data)
{
    fd_table[fd].write_handler = handler;
    fd_table[fd].write_data = data;
	
    epollSetEvents(fd,0,1); 
} 
主要是对事件的处理函数进行注册;
fd_table[fd].read_handler = handler;//指定事件对应的读处理函数
 fd_table[fd].read_data = data;//指定事件对应的读处理函数的参数
epollSetEvents(fd,1,0); //设置读事件
LIBEVENT框架——解决了C10K问题
C10K 问题:并发能力突破不了1万连接
libevent是一个轻量级的开源的高性能的事件触发的网络库,适用于windows、linux、bsd等多种平台,内部使用select、epoll、kqueue等系统调用管理事件机制。
它被众多的开源项目使用,例如大名鼎鼎的memcached等。
特点:
事件驱动,高性能;
轻量级,专注于网络(相对于ACE);
开放源码,代码相当精炼、易读;
跨平台,支持Windows、Linux、BSD和Mac OS;
支持多种I/O多路复用技术(epoll、poll、dev/poll、select和kqueue等),在不同的操作系统下,做了多路复用模型的抽象,可以选择使用不同的模型,通过事件函数提供服务;
支持I/O,定时器和信号等事件;
采用Reactor模式
libevent是一个典型的reactor模式的实现。
普通的函数调用机制:程序调用某个函数,函数执行,程序等待,函数将结果返回给调用程序(如果含有函数返回值的话),也就是顺序执行的。
Reactor模式的基本流程:应用程序需要提供相应的接口并且注册到reactor反应器上,如果相应的事件发生的话,那么reactor将自动调用相应的注册的接口函数(类似于回调函数)通知你,所以libevent是事件触发的网络库。
libevent的功能
Libevent提供了事件通知,io缓存事件,定时器,超时,异步解析dns,事件驱动的http server以及一个rpc框架。
事件通知:当文件描述符可读可写时将执行回调函数。
IO缓存:缓存事件提供了输入输出缓存,能自动的读入和写入,用户不必直接操作io。
定时器:libevent提供了定时器的机制,能够在一定的时间间隔之后调用回调函数。
信号:触发信号,执行回调。
异步的dns解析:libevent提供了异步解析dns服务器的dns解析函数集。
事件驱动的http服务器:libevent提供了一个简单的,可集成到应用程序中的HTTP服务器。
RPC客户端服务器框架:libevent为创建RPC服务器和客户端创建了一个RPC框架,能自动的封装和解封数据结构。















![[PM]产品运营](https://img-blog.csdnimg.cn/img_convert/26a271d8cd6be477e599b603fac09760.png)



