在之前的文章中,我们学习了IO复用模型之select原理及例子,但是select
有监听描述符个数的限制,而且select
的效率并不高,所以这篇文章就来学习效率更高的poll
和Linux特有的epoll
方法。
文章目录
- 1 select/poll/epoll对比
- 2 poll
- 2.1 poll函数
- 2.2 poll实战:实现多个套接字监听
- 2.2.1 客户端
- 2.2.2 服务端
- 2.2.3 实验结果
- 2.2.4 完整代码
- 3 epoll
- 3.1 相关函数
- 3.2 epoll实战:实现多个套接字监听
- 3.2.1 客户端
- 3.2.2 服务端
- 3.2.3 实验结果
- 3.3.4 完整代码
1 select/poll/epoll对比
这三者都用于I/O多路复用来监视多个文件描述符。epoll
的目的是取代较旧的POSIX中的select
和poll
系统调用。
复杂性与可扩展性
select
或poll
的时间复杂度为O(n),每次调用内核都需要遍历整个文件描述符epoll
的时间复杂度为O(1),它使用红黑树来跟踪当前被监视的所有文件描述符。epoll
在文件描述符很多的情况下表现良好,且具有良好的可扩展性
可用性与可移植性
-
select
和poll
在任何Unix系统上都可用 -
epoll
是Linux特有的(在2.5.44版本之后可用) -
poll
是POSIX标准接口,因此在需要代码可移植时可以使用
poll和select
给定一组文件描述符,它们告诉你哪些文件描述符有可读/可写的数据。select
和poll
从根本上基本使用相同的代码。poll
对文件描述符返回一组可能的结果,如POLLRDNORM
、POLLRDBAND
、POLLIN
、POLLHUP
、POLLERR
,而select
只告诉你有输入/输出/错误。
如果你有一个稀疏的文件描述符集(如设备长时间运行,在文件描述符回收和创建的过程中,可能一个描述符为1,一个描述符为1000),poll
可以比select
执行得更好。poll
使用pollfd
参数指定要监视的文件描述符;select
使用位集并需要遍历整个范围。
select
函数在某些系统上有文件描述符数量的限制,通常由文件描述符集的大小限制,例如 FD_SETSIZE
。这个宏定义了文件描述符集的最大大小,通常是1024。而poll
使用一个动态分配的数组来存储文件描述符集,因此理论上没有硬性的文件描述符数量限制。但在实际使用中,系统可能对单个进程所能打开的文件描述符总数有一定的限制,这是由操作系统的配置和资源限制决定的(可使用ulimit -n
查看)。
2 poll
2.1 poll函数
poll
允许程序监视多个文件描述符以确定是否可以进行I/O操作。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds
:指向pollfd
结构体数组的指针,每个结构体表示一个要监视的文件描述符以及感兴趣的事件nfds
:数组中结构体的数量。timeout
:超时时间,单位是毫秒。传递负值表示无限超时,传递0表示立即返回。
pollfd
结构体:
struct pollfd {
int fd; // 文件描述符
short events; // 要监视的事件
short revents; // 实际发生的事件
};
其中events
/revents
的取值可以为如下几个选项:
POLLIN
:有数据可读。对于套接字来说,表示连接被对端关闭。POLLPRI
:有紧急数据可读。对于套接字来说,表示有带外数据。POLLOUT
:对端可写。POLLRDHUP
:对端挂起(连接关闭或半关闭)。POLLERR
:有错误发生。POLLHUP
:挂起事件。对于套接字来说,表示连接被挂起。POLLNVAL
:无效的请求,文件描述符未打开。
例如,如果你想监视可读和错误事件,可以将events
设置为 POLLIN | POLLERR
。
注意
- 如果
revents
中包含POLLNVAL
,说明文件描述符无效或未打开,此时poll
结果可能不可靠 revents
中可能同时包含多个标志,需要使用位运算和上述常量进行判断POLLRDHUP
和POLLHUP
标志在不同系统上可能有不同的行为,具体情况可以查看文档或相关头文件定义
2.2 poll实战:实现多个套接字监听
和之前select
一样,这里就来实现一个服务端和客户端的模型,从代码中来深入理解poll
函数的使用。
2.2.1 客户端
客户端需要能够监听标准输入stdin
的消息,然后转发个服务端;还需要监听服务端的套接字,以接收服务端发来的消息。代码如下:
struct pollfd fds[2];
char buffer[1024];
fds[0].fd = 0; // stdin
fds[0].events = POLLIN;
fds[1].fd = sock;
fds[1].events = POLLIN;
while(1)
{
int ret = poll(fds, 2, -1);
if (ret > 0)
{
if (fds[0].revents & POLLIN)
{
fgets(buffer, sizeof(buffer), stdin);
send(sock, buffer, strlen(buffer), 0);
}
if (fds[1].revents & POLLIN)
{
int valread = read(sock, buffer, sizeof(buffer));
if (valread > 0)
{
buffer[valread] = '\0';
printf("Server says: %s", buffer);
}
else
{
// Server disconnected
printf("Server disconnected\n");
break;
}
}
}
}
这里声明了一个 pollfd
结构体变量fds
,监听stdin
和服务端的套接字。poll
第三个超时参数为-1,表示无限等待。在poll
返回之后,我们只需要判断对应fds
中revents
对应的事件有没有置位就行了。
2.2.2 服务端
服务端则是一边要accept
新的客户端连接请求,一边接收来自客户端的消息并回显回去。代码如下:
int client_sockets[MAX_CLIENTS] = {0};
struct pollfd fds[MAX_CLIENTS + 1]; // +1 for the listening socket
// Initialize the pollfd structure for the listening socket
fds[0].fd = server_fd;
fds[0].events = POLLIN;
while (1)
{
activity = poll(fds, max_clients + 1, -1);
if ((activity < 0) && (errno != EINTR))
{
perror("Poll error");
exit(EXIT_FAILURE);
}
// Check for incoming connections on the listening socket
if (fds[0].revents & POLLIN)
{
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0)
{
perror("Accept failed");
exit(EXIT_FAILURE);
}
printf("New connection, socket fd is %d, ip is: %s, port is: %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
// Add the new socket to the array of client sockets
for (i = 1; i < max_clients + 1; i++)
{
if (client_sockets[i] == 0)
{
client_sockets[i] = new_socket;
fds[i].fd = new_socket;
fds[i].events = POLLIN;
printf("Added new client to the list of sockets at index %d\n", i);
break;
}
}
}
// Check for data from clients
for (i = 1; i < max_clients + 1; i++)
{
sd = client_sockets[i];
if (fds[i].revents & POLLIN)
{
if ((valread = read(sd, buffer, 1024)) == 0)
{
close(sd);
client_sockets[i] = 0;
printf("Client at index %d disconnected\n", i);
}
else
{
buffer[valread] = '\0';
printf("Client at index %d says: %s\n", i, buffer);
}
}
}
}
和select
一样,这里可以判断一下poll
的返回值,小于0表示系统异常,但是如果errno
为EINTR
则表示进程被信号中断,继续下一次poll
即可。
2.2.3 实验结果
首先打开服务端,再打开客户端,连接上后,客户端向服务端发送nohao
,然后按Ctrl+C
退出客户端,如下图所示:
2.2.4 完整代码
客户端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <poll.h>
#define PORT 8080
int main() {
int sock = 0;
struct sockaddr_in serv_addr;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Socket creation error");
exit(EXIT_FAILURE);
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
// Convert IPv4 and IPv6 addresses from text to binary form
if (inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
perror("Invalid address/ Address not supported");
exit(EXIT_FAILURE);
}
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Connection Failed");
exit(EXIT_FAILURE);
}
struct pollfd fds[2];
char buffer[1024];
fds[0].fd = 0; // stdin
fds[0].events = POLLIN;
fds[1].fd = sock;
fds[1].events = POLLIN;
while (1) {
int ret = poll(fds, 2, -1);
if (ret > 0) {
if (fds[0].revents & POLLIN) {
fgets(buffer, sizeof(buffer), stdin);
send(sock, buffer, strlen(buffer), 0);
}
if (fds[1].revents & POLLIN) {
int valread = read(sock, buffer, sizeof(buffer));
if (valread > 0) {
buffer[valread] = '\0';
printf("Server says: %s", buffer);
} else {
// Server disconnected
printf("Server disconnected\n");
break;
}
}
}
}
close(sock);
return 0;
}
服务端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <poll.h>
#include <errno.h>
#define PORT 8080
#define MAX_CLIENTS 10
int main() {
int server_fd, new_socket, max_clients = MAX_CLIENTS;
int activity, i, valread;
int sd, max_sd;
struct sockaddr_in address;
int addrlen = sizeof(address);
char buffer[1024];
// Create a socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("Socket creation failed");
exit(EXIT_FAILURE);
}
// Set up the server address struct
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// Bind the socket to the address
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("Bind failed");
exit(EXIT_FAILURE);
}
// Listen for incoming connections
if (listen(server_fd, 3) < 0) {
perror("Listen failed");
exit(EXIT_FAILURE);
}
int client_sockets[MAX_CLIENTS] = {0};
struct pollfd fds[MAX_CLIENTS + 1]; // +1 for the listening socket
// Initialize the pollfd structure for the listening socket
fds[0].fd = server_fd;
fds[0].events = POLLIN;
printf("Waiting for connections...\n");
while (1) {
// Use poll to wait for events
activity = poll(fds, max_clients + 1, -1);
if ((activity < 0) && (errno != EINTR)) {
perror("Poll error");
exit(EXIT_FAILURE);
}
// Check for incoming connections on the listening socket
if (fds[0].revents & POLLIN) {
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {
perror("Accept failed");
exit(EXIT_FAILURE);
}
printf("New connection, socket fd is %d, ip is: %s, port is: %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
// Add the new socket to the array of client sockets
for (i = 1; i < max_clients + 1; i++) {
if (client_sockets[i] == 0) {
client_sockets[i] = new_socket;
fds[i].fd = new_socket;
fds[i].events = POLLIN;
printf("Added new client to the list of sockets at index %d\n", i);
break;
}
}
}
// Check for data from clients
for (i = 1; i < max_clients + 1; i++) {
sd = client_sockets[i];
if (fds[i].revents & (POLLIN | POLLHUP | POLLERR)) {
if ((valread = read(sd, buffer, 1024)) == 0) {
// Client disconnected
close(sd);
client_sockets[i] = 0;
printf("Client at index %d disconnected\n", i);
} else {
// Process client message (in this example, just print it)
buffer[valread] = '\0';
printf("Client at index %d says: %s\n", i, buffer);
}
}
}
}
return 0;
}
3 epoll
epoll
比select
和poll
更为灵活和高效,特别是在大量连接上的场景。
3.1 相关函数
来看一下与epoll
相关的函数原型:
1、epoll_create和epoll_create1:创建epoll
实例
int epoll_create(int size);
int epoll_create1(int flags);
epoll_create
:创建一个epoll
实例。size
参数在大多数情况下会被忽略,可以设置为大于0的任何值。epoll_create1
:与epoll_create
类似,但它支持flag
设置为EPOLL_CLOEXEC
,表示在调用exec
进程时,epoll
实例的文件描述符将会被关闭,以防止它在新程序中继续存在。这可以增强程序的安全性和可预测性。- 如果
flags
为 0,那么EPOLL_CLOEXEC
标志将不会被设置。
- 如果
2、epoll_ctl:在epoll
实例中注册、修改或删除文件描述符的监听事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epfd
:是一个由epoll_create
或epoll_create1
返回的epoll
实例的文件描述符。op
:是一个操作符,指定对epoll
实例的操作类型。可以取以下值:EPOLL_CTL_ADD
:添加一个新的文件描述符到epoll
实例中进行监听。EPOLL_CTL_MOD
:修改一个已经在epoll
实例中的文件描述符的监听事件。EPOLL_CTL_DEL
:从epoll
实例中删除一个文件描述符。
fd
:是要进行操作的文件描述符,即要添加、修改或删除的文件描述符。event
:是一个指向struct epoll_event
结构的指针,用于指定要监听的事件类型以及关联的数据。
3、epoll_wait:等待事件发生。返回发生的事件的数量,并将事件信息填充到提供的数组中。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epfd
:是一个由epoll_create
或epoll_create1
返回的epoll
实例的文件描述符。events
:是一个指向struct epoll_event
结构的数组,用于存储发生事件的文件描述符和相关信息。maxevents
:events
数组的大小,即最多能存储多少个事件。timeout
:是等待的超时时间,以毫秒为单位。传递负值表示epoll_wait
将一直阻塞,直到有事件发生。传递0表示立即返回,不管是否有事件发生。
其中 **struct epoll_event
**结构体定义如下:
struct epoll_event {
__uint32_t events; // 要监视的事件
epoll_data_t data; // 用户数据
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
events
字段:表示要监视的事件,可以是EPOLLIN
(可读)、EPOLLOUT
(可写)等。具体的事件常量可以查看<sys/epoll.h>
头文件。data
字段:用于保存用户数据。可以是文件描述符(fd
)、指针(ptr
)等,取决于epoll_data
的类型。
3.2 epoll实战:实现多个套接字监听
这里用epoll
来实现一个服务端和客户端的模型,通过代码来理解epoll
的使用方法。
3.2.1 客户端
1、创建epoll实例
int epoll_fd = epoll_create1(0);
2、添加待监听的文件描述符
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = STDIN_FILENO;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &event);
event.events = EPOLLIN;
event.data.fd = sock;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &event);
这里epoll_event
的data
(用户数据)就保存文件描述符,用于后面判断是哪里来的消息。同时这里两个epoll_ctl
的最后一个参数用了同一个变量event
的地址传入,这是因为传入后函数内部会对数据进行拷贝。
3、等待和处理事件
struct epoll_event events[MAX_EVENTS];
while (1)
{
int event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < event_count; i++)
{
if (events[i].data.fd == STDIN_FILENO)
{
fgets(buffer, sizeof(buffer), stdin);
send(sock, buffer, strlen(buffer), 0);
}
else if (events[i].data.fd == sock)
{
ssize_t bytes_received = recv(sock, buffer, sizeof(buffer), 0);
if (bytes_received <= 0)
{
printf("Server disconnected\n");
close(sock);
exit(EXIT_SUCCESS);
}
else
{
buffer[bytes_received] = '\0';
printf("Server says: %s\n", buffer);
}
}
}
}
epoll_wait
会返回事件的个数,并将结果保存在events
中,我们只需要遍历它即可。
3.2.2 服务端
1、创建epoll实例和添加文件描述符
epoll_fd = epoll_create1(0);
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = server_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event);
2、等待和处理事件
while (1)
{
event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < event_count; i++)
{
if (events[i].data.fd == server_fd)
{
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, &addrlen)) < 0)
{
perror("Accept failed");
exit(EXIT_FAILURE);
}
event.events = EPOLLIN;
event.data.fd = new_socket;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1)
{
perror("Failed to add new client socket to epoll");
exit(EXIT_FAILURE);
}
printf("New connection, socket fd is %d, ip is: %s, port is: %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
}
else
{
int client_socket = events[i].data.fd;
ssize_t bytes_received = recv(client_socket, buffer, sizeof(buffer), 0);
if (bytes_received <= 0)
{
printf("Client at socket fd %d disconnected\n", client_socket);
close(client_socket);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_socket, NULL);
}
else
{
buffer[bytes_received] = '\0';
printf("Client at socket fd %d says: %s\n", client_socket, buffer);
}
}
}
}
3.2.3 实验结果
首先打开服务端,再打开客户端,连接上后,客户端向服务端发送123
,然后按Ctrl+C
退出客户端,如下图所示:
3.3.4 完整代码
客户端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#define PORT 8080
#define MAX_EVENTS 10
int main() {
int sock = 0;
struct sockaddr_in serv_addr;
char buffer[1024];
// Create a socket
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Socket creation error");
exit(EXIT_FAILURE);
}
// Set up the server address struct
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
// Convert IPv4 and IPv6 addresses from text to binary form
if (inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
perror("Invalid address/ Address not supported");
exit(EXIT_FAILURE);
}
// Connect to the server
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Connection Failed");
exit(EXIT_FAILURE);
}
// Create epoll instance
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("Failed to create epoll instance");
exit(EXIT_FAILURE);
}
// Add stdin and socket to epoll
struct epoll_event events[MAX_EVENTS];
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = STDIN_FILENO;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &event) == -1) {
perror("Failed to add stdin to epoll");
exit(EXIT_FAILURE);
}
event.events = EPOLLIN;
event.data.fd = sock;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &event) == -1) {
perror("Failed to add socket to epoll");
exit(EXIT_FAILURE);
}
while (1) {
int event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < event_count; i++) {
if (events[i].data.fd == STDIN_FILENO) {
// Data from stdin
fgets(buffer, sizeof(buffer), stdin);
send(sock, buffer, strlen(buffer), 0);
} else if (events[i].data.fd == sock) {
// Data from server
ssize_t bytes_received = recv(sock, buffer, sizeof(buffer), 0);
if (bytes_received <= 0) {
// Server disconnected
printf("Server disconnected\n");
close(sock);
exit(EXIT_SUCCESS);
} else {
// Process server message (in this example, just print it)
buffer[bytes_received] = '\0';
printf("Server says: %s\n", buffer);
}
}
}
}
close(sock);
return 0;
}
服务端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#define PORT 8080
#define MAX_EVENTS 10
int main() {
int server_fd, new_socket;
int epoll_fd, event_count;
struct sockaddr_in address;
socklen_t addrlen = sizeof(address);
char buffer[1024];
// Create a socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("Socket creation failed");
exit(EXIT_FAILURE);
}
// Set up the server address struct
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
// Bind the socket to the address
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("Bind failed");
exit(EXIT_FAILURE);
}
// Listen for incoming connections
if (listen(server_fd, 3) < 0) {
perror("Listen failed");
exit(EXIT_FAILURE);
}
// Create epoll instance
epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("Failed to create epoll instance");
exit(EXIT_FAILURE);
}
// Add the server socket to epoll
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
perror("Failed to add server socket to epoll");
exit(EXIT_FAILURE);
}
struct epoll_event events[MAX_EVENTS];
printf("Waiting for connections...\n");
while (1) {
event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < event_count; i++) {
if (events[i].data.fd == server_fd) {
// New client connection
if ((new_socket = accept(server_fd, (struct sockaddr *)&address, &addrlen)) < 0) {
perror("Accept failed");
exit(EXIT_FAILURE);
}
// Add new client socket to epoll
event.events = EPOLLIN;
event.data.fd = new_socket;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1) {
perror("Failed to add new client socket to epoll");
exit(EXIT_FAILURE);
}
printf("New connection, socket fd is %d, ip is: %s, port is: %d\n", new_socket, inet_ntoa(address.sin_addr), ntohs(address.sin_port));
} else {
// Data from client
int client_socket = events[i].data.fd;
ssize_t bytes_received = recv(client_socket, buffer, sizeof(buffer), 0);
if (bytes_received <= 0) {
// Client disconnected
printf("Client at socket fd %d disconnected\n", client_socket);
close(client_socket);
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_socket, NULL);
} else {
// Process client message (in this example, just print it)
buffer[bytes_received] = '\0';
printf("Client at socket fd %d says: %s\n", client_socket, buffer);
}
}
}
}
close(server_fd);
return 0;
}