三、epoll系统调用
epoll
是Linux特有的I/O复用函数。它在实现和使用上与select
、poll
有很大差异。
- epoll使用一组函数来完成任务,而不是单个函数
- epoll把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集
- epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表
3.1、内核事件表
3.1.1、创建内核事件表
#include <sys/epoll.h>
int epoll_create(int size)
size
参数现在并不起作用,只是给内核一个提示,告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll系统调用的第一个参数,以指定要访问的内核事件表。
3.1.2、操作内核事件表
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
-
fd
参数是要操作的文件描述符; -
op
参数则指定操作类型;- EPOLL_CTL_ADD,往事件表中注册fd上的事件
- EPOLL_CTL_MOD,修改fd上的注册事件
- EPOLL_CTL_DEL,删除fd上的注册事件
-
event
参数指定事件,结构如下:-
struct epoll_event { __uint32_t events; /*epoll事件*/ epoll_data_t data; /*用户数据*/ };
-
events
描述事件类型,epoll支持的事件类型和poll基本相同。表示epoll事件类型的宏是在poll对应的宏前加上“E”。epoll有两个额外的事件类型——EPOLLET
和EPOLLONESHOT
。 -
data
成员用于存储用户数据,结构如下: -
typedef union epoll_data_t { void* ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t;
-
epoll_data_t
是一个联合体,其4个成员中使用最多的是fd
。 -
fd
指定事件所从属的目标文件描述符 -
ptr
可用来指定与fd相关的用户数据 -
但由于epoll_data_t是一个联合体,我们不能同时使用其ptr成员和fd成员,因此,如果要将文件描述符和用户数据关联起来,只能使用其他手段,比如放弃使用fd,而在ptr指向的用户数据中包含fd。
-
-
-
epoll_ctl
成功时返回0,失败则返回-1并设置errno。
3.2、epoll_wait函数
epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件:
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
-
maxevents
指定最大监听事件个数 -
timeout
指定超时时间 -
成功时返回就绪的文件描述符的个数,失败时返回-1并设置errno。
epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的第二个参数events指向的数组中。这个数组只用于输出epoll_wait检测到的就绪事件,而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件描述符的效率。
poll和epoll的区别
/*如何索引poll返回的就绪文件描述符*/
int ret = poll(fds, MAX_EVENT_NUMBER, -1);
/*必须遍历所有已注册文件描述符并找到其中的就绪者(当然,可以利用ret来稍做优化)*/
for(int i = 0; i < MAX_EVENT_NUMBER; ++i) {
if(fds[i].revents&POLLIN) {/*判断第i个文件描述符是否就绪*/
int sockfd=fds[i].fd;
/*处理sockfd*/
}
}
/*如何索引epoll返回的就绪文件描述符*/
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER,-1);
/*仅遍历就绪的ret个文件描述符*/
for(int i=0; i < ret; i++) {
int sockfd = events[i].data.fd;
/*sockfd肯定就绪,直接处理*/
}
3.3、LT和ET模式
epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。ET模式是epoll的高效工作模式。
采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。
采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。ET模式降低了同一个epoll事件被重复触发的次数,因此效率变高。
下面的代码解释了LT模式和ET模式的区别
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <stdbool.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
/*将文件描述符设置成非阻塞的*/
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
/*将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中,参数enable_et指定是否对fd启用ET模式*/
void addfd(int epollfd, int fd, bool enable_et) {
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if (enable_et) {
event.events |= EPOLLET;
}
// 向事件表中注册事件
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
// 将文件描述符fd设置为非阻塞
setnonblocking(fd);
}
/*LT模式的工作流程*/
void lt(struct epoll_event *events, int number, int epollfd, int listenfd) {
// 缓存
char buf[BUFFER_SIZE];
// 遍历事件发生的描述符
for (int i = 0; i<number; i++) {
// 获取事件发生的文件描述符
int sockfd = events[i].data.fd;
// 如果是监听socket发生了事件,那么连接这个tcp,并将其加入到事件表中
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address,&client_addrlength);
// 使用LT模式
addfd(epollfd, connfd, false);
}
// 如果是有数据输入,即socket可读
else if (events[i].events & EPOLLIN) {
printf("event trigger once\n");
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret <= 0) {
close(sockfd);
continue;
}
printf("get%d bytes of content:%s\n", ret, buf);
}
// 如果是其他情况
else {
printf("something else happened\n");
}
}
}
/*ET模式的工作流程*/
void et(struct epoll_event *events, int number, int epollfd, int listenfd) {
// 准备缓存
char buf[BUFFER_SIZE];
// 遍历发生事件的文件描述符
for (int i = 0; i<number; i++){
int sockfd = events[i].data.fd;
// 如果是监听socket发生事件,那么连接这个tcp
if (sockfd == listenfd){
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*) &client_address, &client_addrlength );
// 使用ET模式
addfd(epollfd, connfd, true);
}
// 如果是有数据输入,即socket可读。请注意,这段代码不会被重复触发,所以必须一次性读取出所有的数据
else if (events[i].events & EPOLLIN){
printf("event trigger once\n");
while (1){
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
if (ret<0){
// 对于非阻塞IO,下面的条件成立表示数据已经全部读取完毕。此后,epoll就能再次触发sockfd上的EPOLLIN事件,以驱动下一次读操作
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)){
printf("read later\n");
break;
}
close(sockfd);
break;
}
else if (ret == 0){
close(sockfd);
}
else {
printf("get%d bytes of content:%s\n", ret, buf);
}
}
}
else {
printf("something else happened\n");
}
}
}
int main(int argc, char *argv[]) {
if (argc <= 2) {
printf("usage:%s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip,&address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
struct epoll_event events[MAX_EVENT_NUMBER];
// 内核事件表的大小为5
int epollfd = epoll_create(5);
assert(epollfd != -1);
// listenfd 也加入内核事件表的监听中
addfd(epollfd, listenfd, true);
while (1) {
// 函数成功时返回就绪文件描述符的个数,events数组中存储了就绪的事件,timeout指定为-1,永远阻塞在这里等待事件发生
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
// epoll失败。
if (ret<0) {
printf("epoll failure\n");
break;
}
lt(events, ret, epollfd, listenfd); /*使用LT模式*/
// et(events,ret,epollfd,listenfd); /*使用ET模式*/
}
close(listenfd);
return 0;
}
3.4、EPOLLONESHOT事件
即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程,下同)在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理。这一点可以使用epoll的EPOLLONESHOT事件实现。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。
下面的代码使用了EPOLLONESHOT事件
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <stdbool.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENT_NUMBER 1024
#define MAX_CONNECTIONS 10
struct fds {
int epollfd;
int sockfd;
};
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
/* 将fd上的EPOLLIN和EPOLLET事件注册到epollfd指示的epoll内核事件表中,参数oneshot指定是否注册fd上的EPOLLONESHOT事件*/
void addfd(int epollfd, int fd, bool oneshot) {
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
if (oneshot) {
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
/* 重置fd上的事件。这样操作之后,尽管fd上的EPOLLONESHOT事件被注册,但是操作系统仍然会触发fd上的EPOLLIN事件,且只触发一次*/
void reset_oneshot(int epollfd, int fd)
{
struct epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}
/*工作线程*/
void *worker(void *arg) {
int sockfd = ((struct fds *)arg)->sockfd;
int epollfd = ((struct fds *)arg)->epollfd;
printf("start new thread to receive data on fd: %d\n", sockfd);
char buf[BUFFER_SIZE];
memset(buf, 0, BUFFER_SIZE);
// 循环读取sockfd上的数据,直到遇到EAGAIN错误
while (1) {
int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
// 对方关闭了tcp连接,那么我们也关闭连接。
if (ret == 0) {
close(sockfd);
printf("foreiner closed the connection\n");
break;
}
// 读取数据失败,可能是资源暂不可用,信号中断,缓冲地址无效等情况
else if (ret < 0) {
// 如果资源暂时不可用,也可以说无数据可读,那么重置 socket 上的事件
// 让其他线程也有机会处理这个socket上的数据
if (errno == EAGAIN) {
reset_oneshot(epollfd, sockfd);
printf("read later\n");
break;
}
}
else {
printf("get content:%s\n", buf);
/****************************
* 处理数据
****************************/
sleep(1);
}
}
printf("end thread receiving data on fd:%d\n", sockfd);
}
int main(int argc, char *argv[]) {
int listenfd;
// 创建 socket 文件描述符
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
// 绑定到指定端口
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(PORT);
address.sin_addr.s_addr = inet_addr("192.168.189.129");
int addrlen = sizeof(address);
if (bind(listenfd, (struct sockaddr *) &address, addrlen) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 开始监听
if (listen(listenfd, MAX_CONNECTIONS) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}
// 创建内核事件表
struct epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
// 注意,监听socket listenfd上是不能注册EPOLLONESHOT事件的,否则应用程序只能处理一个客户连接!
// 因为后续的客户连接请求将不再触发listenfd上的EPOLLIN事件
addfd(epollfd, listenfd, false);
// 等待事件发生
while (1) {
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
// epoll失败
if (ret < 0) {
printf("epoll failure\n");
break;
}
// 处理发生的事件
for (int i = 0; i < ret; i++) {
// 获得发生事件的文件描述符
int sockfd = events[i].data.fd;
// 如果是监听描述符发生了事件,即有新的tcp请求,那么建立这个TCP连接,并注册为ET、EPOLLONESHOT事件
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
addfd(epollfd, connfd, true);
}
// 如果是一个TCP连接有数据输入,那么新建立一个线程,处理这个TCP连接
else if (events[i].events & EPOLLIN) {
pthread_t thread;
struct fds fds_for_new_worker;
fds_for_new_worker.epollfd = epollfd;
fds_for_new_worker.sockfd = sockfd;
pthread_create(&thread, NULL, worker, (void *)&fds_for_new_worker);
}
// 如果是其他事件
else {
printf("something else happened\n");
}
}
}
close(listenfd);
return 0;
}
直接看worker工作线程,如果工作线程处理完某个socket上的一次请求之后,又接收到该socket上新的客户请求,则该线程将继续为这个socket服务。因为这个socket注册了EPOLLONESHOT事件,其他线程无法接触到这个socket。如果当前工作线程处理完后,这个socket并没有关闭,也没有新的数据,那么当前线程放弃这个socket,等待新的事件将他触发。
四、三个IO复用函数的区别
select
,poll
,epoll
都可以同时监听多个文件描述符。它们将等待由timeout参数指定的超时时间,直到一个或者多个文件描述符上有事件发生时返回,返回值是就绪的文件描述符的数量。返回0表示没有事件发生。
我们从事件集、最大支持文件描述符数、工作模式和具体实现等四个方面进一步比较它们的异同,以明确在实际应用中应该选择使用哪个。
4.1、最大监听数量
poll
和epoll_wait分别用nfds和maxevents参数指定最多监听多少个文件描述符和事件。这两个数值都能达到系统允许打开的最大文件描述符数目,即65 535(cat/proc/sys/fs/file-max)。而select允许监听的最大文件描述符数量通常有限制。虽然用户可以修改这个限制,但这可能导致不可预期的后果。
4.2、工作模式
select
和poll
都只能工作在相对低效的LT模式,而epoll
则可以工作在ET高效模式。并且epoll
还支持EPOLLONESHOT事件。该事件能进一步减少可读、可写和异常等事件被触发的次数。
4.3、原理
select
和poll
采用的都是轮询的方式,即每次调用都要扫描整个注册文件描述符集合,并将其中就绪的文件描述符返回给用户程序,因此它们检测就绪事件的算法的时间复杂度是O(n)。epoll
则采用的是回调的方式。内核检测到就绪的文件描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪事件队列。内核最后在适当的时机将该就绪事件队列中的内容拷贝到用户空间。算法复杂度为O(1)。
但是,当活动连接比较多的时候,epoll_wait的效率未必比select和poll高,因为此时回调函数被触发得过于频繁。所以epoll_wait适用于连接数量多,但活动连接较少的情况。