一、Motivation
通常,操作系统会为每个进程划分一个时间片的,在这个时间片内进程可以合法占有 cpu 进行一些计算任务。并当时间片结束后自动退回至就绪状态待命,等待下一次的调度
但是,有一种情况会使进程提前(时间片还未用完)进入等待状态,即是进程发生了阻塞(多半是因为 I/O 请求)。进程一旦发生了阻塞,它就要让出 cpu 给其他进程,这个让位的动作就是进程之间切换的操作,这种操作非常蠢(在开发者眼里是无用功),也很耗时。可以说是时间和 cpu 资源没用在正儿八经的计算任务上
select 和 epoll 的提出就是来解决这个愚蠢的问题,有一种设想:在分配给该进程时间片还未结束之前,如果进程的某个 socket 连接发生阻塞,先不急着逼该进程退位,而是通过某种手段去查询一下进程的其他 socket 连接是否有已就绪的。如果其他 socket 连接有活动可以处理,不如充分利用 cpu 先进行计算,在处理完成 OR 时间片到期后再让位也不迟。这样不就可以提高计算机资源的利用率了嘛
但是,在 Linux 老的版本中,有关事件触发的问题,一直是采用 select 轮询手段来解决的,所谓的轮询就是 cpu 不停地去查询任务队列是否有已经就绪的任务。这种方法在任务较少的情况下还能勉强应付,当任务数量增加至千级数量级之后,效率就会出现断崖式地降低。因为每次需要轮询上千个任务,自然非常耗时
为此,Linux 提出了新的解决方法 epoll,不再采用轮询的方法来感知新事件的发生,而是通过 epoll 结构体内部的红黑树来自动将等待的任务和就绪的任务分开,从而使 kernel 能够快速感知新事件的发生
再说直白一点,只要活儿足够多,epoll_wait 根本就不会让用户进程阻塞,用户进程会一直干活,直到属于该进程的时间片结束。这样就大大减少了进程切换次数,提高了效率
二、Solutions
S1 - epoll_create
创建一个 epoll 句柄,size 用来告诉 kernel 共能监听多少个事件,
int epoll_create(int size)
这个参数在现在的版本中没有意义,kernel 会根据实际情况自行决定的,意思就是说这个 size 只是我们规定的事件的大致数量,而不是能够处理的最大事件数
epoll 结构体中定义的等待队列 wq 存放阻塞在 epoll 对象上的用户进程,当软中断数据就绪时会前来寻找进程;epoll 对象用红黑树 rbr 来管理用户进程 accept 添加进来的所有 socket 连接,选用红黑树的原因是因为红黑树能够更好地支持海量连接的查找、插入和删除;就绪链表 rdllist 存放着一些已就绪的任务,这样一来,应用进程只需要查询 rdllist 就能判断是否有就绪任务可供处理,而不必去遍历整棵红黑树
S2 - epoll_ctl
该方法向 epoll 对象中添加、修改和删除特定的事件,返回 0 表示成功,-1 表示失败,
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
添加意味着对这件事感兴趣,应用进程想收来处理;删除则表示对这件事没了兴趣。其中,epfd 是 epoll 对象的 id,epoll_create()
的返回值;op 有三种操作类型,EPOLL_CTL_ADD、EPOLL_CTL_MOD 和 EPOLL_CTL_DEL;fd 是需要监听的文件描述符,通常是连接至服务端的 socket;最后一个参数 event 可以是以下几种宏的集合,
- EPOLLIN:文件描述符可读
- EPOLLOUT:文件描述符可写
- EPOLLPRI:文件描述符有紧急数据可读
- EPOLLERR:文件描述符发生错误
- EPOLLHUP:文件描述符被挂断
- EPOLLET:边缘触发(后面会讲到)
- EPOLLONESHOT:只监听一次,意味着触发来事件之后就被踢出 epoll 对象中了
它是一个传入的指针,这就要求我们需要在进入函数之前分配好空间并初始化,以便 epoll_create()
可以在方法内获取内容,但 epoll_create()
并不会替我们释放 events 空间
再进一步解释,当有新的 socket 连接加入 epoll 对象时,epoll 对象会创建一个 epitem 用来关联该 socket 连接,然后将 epitem 挂到红黑树 rbr 中。之后,会设置该 epitem 的回调函数(如果该连接有数据写入,请将其存入 epoll 对象的就绪链表 rdllist 中),以及其他的回调函数
在这我只列举了 “增” 的一个例子,其他关于 “删” 和 “改” 的操作,它们的本质是一样的,都是 socket 连接有什么动作就会去调用对应的回调函数。关于能够快速实现 “增删改查” 最主要的原因是因为选用了红黑树
S3 - epoll_wait
等待处于监听范围的事件发生,
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout)
epoll 对象会将已经发生的事件复制到数组 events 中,maxevents 是数组的长度;timeout 如果为 0,则意味着就绪链表 rdllist 若为空则立刻返回,不会等待;-1 表示阻塞,会一直陷入 epoll_wait 状态中
关于 ET 和 LT 模式,我想用简短的语言去描述,不要深究细节。ET(边缘触发)模式仅当状态发生变化时才会感知事件的发生,即使这个事件对应的缓冲区内还有未读取的数据;而 LT(水平触发)模式是只要有数据没处理就会一直通知下去
三、Result
我想透过一个简单的 demo 来介绍 epoll 的经典用法。说到用法,最常用的就是连接 socket,监听 socket 的动静并读/写数据进行处理,之后返回给 client 结果。我写了一个小写转大写的程式来说明 epoll 的用法,请看代码,
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <ctype.h>
#include <unistd.h>
#define EPOLL_MAXSIZE 16
#define SRV_PORT_ID 1980 /* 端口号 */
#define SOCKET_QUEUE_LEN 20
#define BUFSIZE 256
struct myepoll_data {
int fd;
char data[BUFSIZE];
};
int main()
{
int i,j;
int epfd, sockfd, nfds, clntfd;
struct sockaddr_in srvaddr, clntaddr;
struct epoll_event ev, evs[EPOLL_MAXSIZE];
socklen_t clntlen = sizeof(clntaddr);
char buf[BUFSIZE];
/* 创建epoll结构体(就绪链表、等待队列和红黑树) */
epfd = epoll_create(EPOLL_MAXSIZE);
if(epfd == -1) {
printf("epoll_create err\n");
goto over;
}
printf("epoll_create ok\n");
/* 创建socket结构体 */
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1) {
printf("socket_create err\n");
goto over;
}
printf("socket_create ok\n");
/* 初始化socket绑定监听 */
bzero(&srvaddr, sizeof(srvaddr));
srvaddr.sin_family = AF_INET;
srvaddr.sin_port = htons(SRV_PORT_ID);
srvaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(sockfd, (struct sockaddr*)&srvaddr, sizeof(struct sockaddr)) == -1) {
printf("socket_bind err\n");
goto over;
}
printf("socket_bind ok\n");
if(listen(sockfd, SOCKET_QUEUE_LEN) == -1) {
printf("socket_listen err\n");
goto over;
}
printf("socket_listen ok\n");
/* 向epoll结构体中注册socket,实现监听功能 */
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLET;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
printf("epoll_ctl_add err\n");
goto over;
}
printf("epoll_ctl_add ok\n");
/* 不停地处理外来事件 */
while(1) {
/* 阻塞地等待事件发生,其中0为没有就绪事件就立刻返回,-1为阻塞 */
nfds = epoll_wait(epfd, evs, EPOLL_MAXSIZE, -1);
/* 处理每个收上来的事件 */
for(i=0; i<nfds; i++) {
if(evs[i].data.fd == sockfd) { /* 有人敲sockfd的门了(收到新的连接)*/
clntfd = accept(sockfd, (struct sockaddr*)&clntaddr, &clntlen);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = clntfd;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, clntfd, &ev) == -1)
printf("epoll_ctl_add %d err\n", clntfd);
else
printf("epoll_ctL_add %d clnt ok\n", clntfd);
} else if(evs[i].events & EPOLLIN) { /* 读取数据但先不处理 */
clntfd = evs[i].data.fd;
memset(buf, 0, BUFSIZE);
if(read(clntfd, buf, BUFSIZE) == 0) { /* 客户端关闭连接 */
if(epoll_ctl(epfd, EPOLL_CTL_DEL, clntfd, NULL) == -1) {
printf("epoll_ctl_del %d err\n", clntfd);
} else {
printf("epoll_ctl_del %d ok\n", clntfd);
close(clntfd);
}
continue;
}
/* 先接收client的请求 */
struct myepoll_data fddata;
fddata.fd = clntfd;
strcpy(fddata.data, buf);
ev.data.ptr = &fddata;
memset(buf, 0, BUFSIZE);
strcpy(buf, "i'm keep u's data, deal with it later, please check u can be written...\n");
send(clntfd, buf, strlen(buf), 0);
ev.events = EPOLLOUT | EPOLLET;
/* 下一次epoll时再处理client的请求 */
if(epoll_ctl(epfd, EPOLL_CTL_MOD, clntfd, &ev) == -1)
printf("epoll_ctl_mod clnt %d EPOLLIN -> EPOLLOUT err\n", clntfd);
else
printf("epoll_ctl_mod clnt %d EPOLLIN -> EPOLLOUT ok\n", clntfd);
} else if(evs[i].events & EPOLLOUT) { /* 对之前读取的数据予以处理并将处理结果返回给client */
struct myepoll_data* fddata = (struct myepoll_data*)evs[i].data.ptr;
clntfd = fddata->fd;
char* data = fddata->data;
memset(buf, 0, BUFSIZE);
strcpy(buf, "i'm processing u's data, please waiting...\n");
send(clntfd, buf, strlen(buf), 0);
/* 将小写转为大写的业务逻辑 */
for(j=0; j<strlen(data); j++)
data[j] = toupper(data[j]);
send(clntfd, data, strlen(data), 0);
ev.events = EPOLLIN | EPOLLET;
/* 准备接收client的下一次计算请求 */
if(epoll_ctl(epfd, EPOLL_CTL_MOD, clntfd, &ev) == -1)
printf("epoll_ctl_mod clnt %d EPOLLOUT -> EPOLLIN err\n", clntfd);
else
printf("epoll_ctl_mod clnt %d EPOLLOUT -> EPOLLIN ok\n", clntfd);
} else {
printf("unknown event\n");
}
}
}
over:
return 0;
}
整个流程,我认为较为清晰,首先创建 socket,然后将 socket 添加进 epoll 对象中,这就意味着让 epoll 对象监听 socket 的一举一动。如果有数据写入 socket 中,那么就读出来,等待下一轮再进行处理(为什么下一轮再进行处理?而不是接收了请求就处理,其中的道理我暂时还没有悟透,但有人告诉我,先接收后处理的手法是 epoll 的精髓,说实话我并不认同,因为我不能说服自己要相信这种脱裤子放屁的说法)
按照流程走下去,在下一轮中进行处理(小写转大写),然后将结果返回给 client。这就是 epoll demo。在另一个终端中透过 nc 命令尝试连接 server 进程,
nc 127.0.0.1 1980
作为 client,输入小写的字符串,server 就会返回大写的结果,