在 Linux Socket 服务器短编程时,为了处理大量客户的连接请求,需要使用非阻塞I/O和复用,select,poll 和 epoll 是 Linux API 提供的I/O复用方式。
\ | select | poll | epoll |
---|---|---|---|
操作方式 | 遍历 | 遍历 | 回调 |
底层实现 | 数组 | 链表 | 哈希表 |
IO效率 | 每次调用都进行线性遍历,时间复杂度为O(n) | 每次调用都进行线性遍历,时间复杂度为O(n) | 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪 fd 放到 rdllist 里面。时间复杂度O(1) |
最大连接数 | 1024(x86)或 2048(x64) | 无上限 | 无上限 |
fd拷贝 | 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态 | 每次调用 poll,都需要把 fd 集合从用户态拷贝到内核态 | 调用 epoll_ctl 时拷贝进内核并保存,之后每次 epoll_wait 不拷贝 |
select
在一段指定的时间内,监听用户感兴趣的文件描述符上可读、可写和异常等事件。
select 实现多路复用的方式是,将已连接的 Socket 都放到一个文件描述符集合,然后调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生,检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理。
所以,对于 select 这种方式,需要进行 2 次「遍历」文件描述符集合,一次是在内核态里,一个次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中。
select 使用固定长度的 BitsMap,表示文件描述符集合,而且所支持的文件描述符的个数是有限制的,在 Linux 系统中,由内核中的 FD_SETSIZE 限制, 默认最大值为 1024,只能监听 0~1023 的文件描述符。
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。
参数:
nfds
: 最大的文件描述符加1。
readfds
: 用于检查可读的。
writefds
:用于检查可写性。
exceptfds
:用于检查异常的数据。
timeout
:一个指向 timeval
结构的指针,用于决定 select
等待 I/O
的最长时间。如果为空将一直等待。
timeval
结构的定义:
struct timeval{
long tv_sec; // seconds
long tv_usec; // microseconds
}
返回值: >0
是已就绪的文件句柄的总数, =0
超时,<0
表示出错,错误: errno
void FD_CLR(int fd, fd_set *set); // 把文件描述符集合里fd清0
int FD_ISSET(int fd, fd_set *set); // 测试文件描述符集合里fd是否置1
void FD_SET(int fd, fd_set *set); // 把文件描述符集合里fd位置1
void FD_ZERO(fd_set *set); //把文件描述符集合里所有位清0
传统 select/pol l的另一个致命弱点就是当你拥有一个很大的 socket 集合,由于网络得延时,使得任一时间只有部分的 socket 是"活跃" 的,而 select/poll 每次调用都会线性扫描全部的集合,导致效率呈现线性下降。
但是 epoll 不存在这个问题,它只会对"活跃"的 socket 进行操作—这是因为在内核实现中 epoll 是根据每个 fd 上面的 callback 函数实现的。于是,只有"活跃"的 socket 才会主动去调用 callback函数,其他idle 状态的 socket 则不会,在这点上,epoll实现了一个 "伪"AIO,因为这时候推动力在os内核。
示例
select_server.c
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
int main()
{
int server_sockfd, client_sockfd;
int server_len, client_len;
struct sockaddr_in server_address;
struct sockaddr_in client_address;
int result;
fd_set readfds, testfds;
server_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立服务器端socket
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(9000);
server_len = sizeof(server_address);
bind(server_sockfd, (struct sockaddr*)&server_address, server_len);
listen(server_sockfd, 5); //监听队列最多容纳5个
FD_ZERO(&readfds);
FD_SET(server_sockfd, &readfds);//将服务器端socket加入到集合中
while (1)
{
char ch;
int fd;
int nread;
testfds = readfds;//将需要监视的描述符集copy到select查询队列中,select会对其修改,所以一定要分开使用变量
printf("server waiting\n");
/*无限期阻塞,并测试文件描述符变动 */
result = select(FD_SETSIZE, &testfds, (fd_set*)0, (fd_set*)0, (struct timeval*)0); //FD_SETSIZE:系统默认的最大文件描述符
if (result < 1)
{
perror("server5");
exit(1);
}
/*扫描所有的文件描述符*/
for (fd = 0; fd < FD_SETSIZE; fd++)
{
/*找到相关文件描述符*/
if (FD_ISSET(fd, &testfds))
{
/*判断是否为服务器套接字,是则表示为客户请求连接。*/
if (fd == server_sockfd)
{
client_len = sizeof(client_address);
client_sockfd = accept(server_sockfd,
(struct sockaddr*)&client_address, &client_len);
FD_SET(client_sockfd, &readfds);//将客户端socket加入到集合中
printf("adding client on fd %d\n", client_sockfd);
}
/*客户端socket中有数据请求时*/
else
{
ioctl(fd, FIONREAD, &nread);//取得数据量交给nread
/*客户数据请求完毕,关闭套接字,从集合中清除相应描述符 */
if (nread == 0)
{
close(fd);
FD_CLR(fd, &readfds); //去掉关闭的fd
printf("removing client on fd %d\n", fd);
}
/*处理客户数据请求*/
else
{
read(fd, &ch, 1);
sleep(5);
printf("serving client on fd %d\n", fd);
ch++;
write(fd, &ch, 1);
}
}
}
}
}
return 0;
}
select_client.c
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/time.h>
int main()
{
int client_sockfd;
int len;
struct sockaddr_in address;//服务器端网络地址结构体
int result;
char ch = 'A';
client_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立客户端socket
address.sin_family = AF_INET;
address.sin_addr.s_addr = inet_addr("127.0.0.1");
address.sin_port = htons(9000);
len = sizeof(address);
result = connect(client_sockfd, (struct sockaddr*)&address, len);
if (result == -1)
{
perror("oops: client2");
exit(1);
}
//第一次读写
write(client_sockfd, &ch, 1);
read(client_sockfd, &ch, 1);
printf("the first time: char from server = %c\n", ch);
sleep(5);
//第二次读写
write(client_sockfd, &ch, 1);
read(client_sockfd, &ch, 1);
printf("the second time: char from server = %c\n", ch);
close(client_sockfd);
return 0;
}
poll
poll 不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限制,当然还会受到系统文件描述符限制。
但是 poll 和 select 并没有太大的本质区别,都是使用 「线性结构」 存储进程关注的 Socket 集合,因此都需要遍历文件描述符集合来找到可读或可写的 Socket,时间复杂度为 O(n),而且也需要在用户态与内核态之间拷贝文件描述符集合,这种方式随着并发数上来,性能的损耗会呈指数级增长。
poll 和select 区别: select 有文件句柄上线设置,值为 FD_SETSIZE,而poll 理论上没有限制!
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
输入参数:
fds
:可以传递多个结构体,也就是说可以监测多个驱动设备所产生的事件,只要有一个产生了请求事件,就能立即返回。
struct pollfd {
int fd; /*文件描述符 open打开的那个*/
short events; /*请求的事件类型,监视驱动文件的事件掩码*/ POLLIN | POLLOUT
short revents; /*驱动文件实际返回的事件*/
}
nfds
:监测驱动文件的个数。
timeout
:超时时间,单位是ms。
事件类型 events
可以为下列值:
POLLIN
有数据可读
POLLRDNORM
有普通数据可读,等效与POLLIN
POLLPRI
有紧迫数据可读
POLLOUT
写数据不会导致阻塞
POLLER
指定的文件描述符发生错误
POLLHUP
指定的文件描述符挂起事件
POLLNVAL
无效的请求,打不开指定的文件描述符
返回值:
有事件发生 返回revents
域不为0
的文件描述符个数
超时:return 0
失败:return -1
错误:errno
示例
poll_server.c
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
#include <poll.h>
#define MAX_FD 8192
struct pollfd fds[MAX_FD];
int cur_max_fd = 0;
int main()
{
int server_sockfd, client_sockfd;
int server_len, client_len;
struct sockaddr_in server_address;
struct sockaddr_in client_address;
int result;
//fd_set readfds, testfds;
server_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立服务器端socket
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(9000);
server_len = sizeof(server_address);
bind(server_sockfd, (struct sockaddr*)&server_address, server_len);
listen(server_sockfd, 5); //监听队列最多容纳5个
//FD_ZERO(&readfds);
//FD_SET(server_sockfd, &readfds);//将服务器端socket加入到集合中
fds[server_sockfd].fd = server_sockfd;
fds[server_sockfd].events = POLLIN;
fds[server_sockfd].revents = 0;
if(cur_max_fd <= server_sockfd)
{
cur_max_fd = server_sockfd + 1;
}
while (1)
{
char ch;
int i, fd;
int nread;
//testfds = readfds;//将需要监视的描述符集copy到select查询队列中,select会对其修改,所以一定要分开使用变量
printf("server waiting\n");
/*无限期阻塞,并测试文件描述符变动 */
result = poll(fds, cur_max_fd, 1000);
//result = select(FD_SETSIZE, &testfds, (fd_set*)0, (fd_set*)0, (struct timeval*)0); //FD_SETSIZE:系统默认的最大文件描述符
if (result < 0)
{
perror("server5");
exit(1);
}
/*扫描所有的文件描述符*/
for (i = 0; i < cur_max_fd; i++)
{
/*找到相关文件描述符*/
if (fds[i].revents)
{
fd = fds[i].fd;
/*判断是否为服务器套接字,是则表示为客户请求连接。*/
if (fd == server_sockfd)
{
client_len = sizeof(client_address);
client_sockfd = accept(server_sockfd,
(struct sockaddr*)&client_address, &client_len);
fds[client_sockfd].fd = client_sockfd;//将客户端socket加入到集合中
fds[client_sockfd].events = POLLIN;
fds[client_sockfd].revents = 0;
if(cur_max_fd <= client_sockfd)
{
cur_max_fd = client_sockfd + 1;
}
printf("adding client on fd %d\n", client_sockfd);
//fds[server_sockfd].events = POLLIN;
}
/*客户端socket中有数据请求时*/
else
{
//ioctl(fd, FIONREAD, &nread);//取得数据量交给nread
nread = read(fd, &ch, 1);
/*客户数据请求完毕,关闭套接字,从集合中清除相应描述符 */
if (nread == 0)
{
close(fd);
memset(&fds[i], 0, sizeof(struct pollfd)); //去掉关闭的fd
printf("removing client on fd %d\n", fd);
}
/*处理客户数据请求*/
else
{
//read(fds[fd].fd, &ch, 1);
sleep(5);
printf("serving client on fd %d, read: %c\n", fd, ch);
ch++;
write(fd, &ch, 1);
//fds[fd].events = POLLIN;
}
}
}
}
}
return 0;
}
epoll
epoll 可以理解为 event poll (基于事件的轮询)
使用场合:
- 当客户处理多个描述符时(一般是交互式输入和网络套接口),必须使用I/O复用。
- 当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
- 如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
- 如果一个服务器既要处理TCP,又要处理UDP,一般要使用I/O复用。
- 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
Reactor 设计模式
reactor 是一种事件驱动的反应堆模式,高效的事件处理模型
reactor 反应堆:事件来了,执行,事件类型可能不尽相同,所以我们需要提前注册好不同的事件处理函数;事件到来就由 epoll_wait
获取同时到来的多个事件,并且根据数据的不同类型将事件分发给事件处理机制 (事件处理器), 也就是我们提前注册的那些接口函数。
reactor 模型的设计思想和思维方式: 它需要的是事件驱动,相应的事件发生,我们需要根据事件自动的调用相应的函数,所以我们需要提前注册好处理函数的接口到 reactor 中, 函数是由 reactor 去调用的,而不是再主函数中直接进行调用的,所以需要使用回调函数。
Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O;
中心思想是将所有要处理的 I/O 事件注册到一个 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上; 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
流程
1.注册事件和对应的事件处理器;
2.多路复用器等待事件到来;
3.事件到来,激发事件分发器分发事件到对应的处理器;
4.事件处理器处理事件,然后注册新的事件, (比如处理读事件,处理完成之后需要将其设置为写事件再注册,因为读取之后我们需要针对业务需求进行数据处理,之后将其send 回去响应客户端结果,所以自然需要改成写事件,也就需要重新注册)
多路复用器 :由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
事件分发器 :将多路复用器中返回的就绪事件分到对应的处理函数中,分发给事件处理器。
事件处理器 :处理对应的IO事件。
Reactor优点:
1)响应快,不必为单个同步事件所阻塞,虽然 Reactor 本身依然是同步的;
2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3)可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
4)可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;
I/O 多路复用
1.创建 EPOLL 句柄
int epoll_create(int size);
2.向 EPOLL 对象中添加、修改或者删除感兴趣的事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
op 取值:
EPOLL_CTL_ADD
添加新的事件到epoll中
EPOLL_CTL_MOD
修改EPOLL中的事件
EPOLL_CTL_DEL
删除epoll中的事件
events 取值:
EPOLLIN
表示有数据可以读出(接受连接、关闭连接)
EPOLLOUT
表示连接可以写入数据发送(向服务器发起连接,连接成功事件)
EPOLLERR
表示对应的连接发生错误
EPOLLHUP
表示对应的连接被挂起
3.收集在epoll监控的事件中已经发生的事件
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
参数:
epfd
: epoll 的描述符。
events
:分配好的 epoll_event 结构体数组,epoll 将会把发生的事件复制到 events 数组中(events 不可以是空指针,内核只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)。
struct epoll_event{
__uint32_t events;
epoll_data_t data;
}
typedef union epoll_data{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t
maxevents
: 本次可以返回的最大事件数目,通常 maxevents
参数与预分配的 events
数组的大小是相等的。
timeout
: 表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout
为0
,立刻返回,不会等待。-1
表示无限期阻塞。
epoll 支持两种事件触发模式:边缘触发(edge-triggered,ET)和水平触发(level-triggered,LT)。
边缘触发:当被监控的文件描述符上有可读写事件发生时,epoll_wait()
会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll_wait(
)时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!
这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!
设置方式: stat->_ev.events = EPOLLIN | EPOLLET
水平触发:当被监控的文件描述符上有可读写事件发生时,epoll_wait()
会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll_wait()
时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!
如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率。
两者的区别:水平触发的意思是只要满足事件的条件,比如内核中有数据需要读,就一直不断地把这个事件传递给用户;而边缘触发的意思是只有第一次满足条件的时候才触发,之后就不会再传递同样的事件了。
epoll为什么高效?
1、红黑树提高 epoll 事件增删查改效率;
2、回调通知机制;
当epoll监听套接字有数据读或者写时,会通过注册到socket的回调函数通知epoll,epoll检测到事件后,将事件存储在就绪队列(rdllist)。
就绪队列
epoll_wait 返回成功后,会将所有就绪事件存储在事件数组,用户不需要进行无效的轮询,从而提高了效率。
示例1:使用 epoll 实现简单的 web 服务器
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>
// int fd;
typedef struct _ConnectStat ConnectStat;
typedef void(*response_handler) (ConnectStat * stat);
struct _ConnectStat {
int fd;
char name[64];
char age[64];
struct epoll_event _ev;
int status;//0 -未登录 1 - 已登陆
response_handler handler;//不同页面的处理函数
};
//http协议相关代码
ConnectStat * stat_init(int fd);
void connect_handle(int new_fd);
void do_http_respone(ConnectStat * stat);
void do_http_request(ConnectStat * stat);
void welcome_response_handler(ConnectStat * stat);
void commit_respone_handler(ConnectStat * stat);
const char *main_header = "HTTP/1.0 200 OK\r\nServer: Martin Server\r\nContent-Type: text/html\r\nConnection: Close\r\n";
static int epfd = 0;
void usage(const char* argv)
{
printf("%s:[ip][port]\n", argv);
}
void set_nonblock(int fd)
{
int fl = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
int startup(char* _ip, int _port) //创建一个套接字,绑定,检测服务器
{
//sock
//1.创建套接字
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
perror("sock");
exit(2);
}
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
//2.填充本地 sockaddr_in 结构体(设置本地的IP地址和端口)
struct sockaddr_in local;
local.sin_port = htons(_port);
local.sin_family = AF_INET;
local.sin_addr.s_addr = inet_addr(_ip);
//3.bind()绑定
if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0)
{
perror("bind");
exit(3);
}
//4.listen()监听 检测服务器
if (listen(sock, 5) < 0)
{
perror("listen");
exit(4);
}
//sleep(1000);
return sock; //这样的套接字返回
}
int main(int argc, char *argv[])
{
if (argc != 3) //检测参数个数是否正确
{
usage(argv[0]);
exit(1);
}
int listen_sock = startup(argv[1], atoi(argv[2])); //创建一个绑定了本地 ip 和端口号的套接字描述符
//1.创建epoll
epfd = epoll_create(256); //可处理的最大句柄数256个
if (epfd < 0)
{
perror("epoll_create");
exit(5);
}
struct epoll_event _ev; //epoll结构填充
ConnectStat * stat = stat_init(listen_sock);
_ev.events = EPOLLIN; //初始关心事件为读
_ev.data.ptr = stat;
//_ev.data.fd = listen_sock; //
//2.托管
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &_ev); //将listen sock添加到epfd中,关心读事件
struct epoll_event revs[64];
int timeout = -1;
int num = 0;
int done = 0;
while (!done)
{
//epoll_wait()相当于在检测事件
switch ((num = epoll_wait(epfd, revs, 64, timeout))) //返回需要处理的事件数目 64表示 事件有多大
{
case 0: //返回0 ,表示监听超时
printf("timeout\n");
break;
case -1: //出错
perror("epoll_wait");
break;
default: //大于零 即就是返回了需要处理事件的数目
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int i;
for (i = 0; i < num; i++)
{
ConnectStat * stat = (ConnectStat *)revs[i].data.ptr;
int rsock = stat->fd; //准确获取哪个事件的描述符
if (rsock == listen_sock && (revs[i].events) && EPOLLIN) //如果是初始的 就接受,建立链接
{
int new_fd = accept(listen_sock, (struct sockaddr*)&peer, &len);
if (new_fd > 0)
{
printf("get a new client:%s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));
//sleep(1000);
connect_handle(new_fd);
}
}
else // 接下来对num - 1 个事件处理
{
if (revs[i].events & EPOLLIN)
{
do_http_request((ConnectStat *)revs[i].data.ptr);
}
else if (revs[i].events & EPOLLOUT)
{
do_http_respone((ConnectStat *)revs[i].data.ptr);
}
else
{
}
}
}
}
break;
}//end switch
}//end while
return 0;
}
ConnectStat * stat_init(int fd) {
ConnectStat * temp = NULL;
temp = (ConnectStat *)malloc(sizeof(ConnectStat));
if (!temp) {
fprintf(stderr, "malloc failed. reason: %m\n");
return NULL;
}
memset(temp, '\0', sizeof(ConnectStat));
temp->fd = fd;
temp->status = 0;
//temp->handler = welcome_response_handler;
}
//初始化连接,然后等待浏览器发送请求
void connect_handle(int new_fd) {
ConnectStat *stat = stat_init(new_fd);
set_nonblock(new_fd);
stat->_ev.events = EPOLLIN;
stat->_ev.data.ptr = stat;
epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &stat->_ev); //二次托管
}
void do_http_respone(ConnectStat * stat) {
stat->handler(stat);
}
void do_http_request(ConnectStat * stat) {
//读取和解析http 请求
char buf[4096];
char * pos = NULL;
//while header \r\n\r\ndata
ssize_t _s = read(stat->fd, buf, sizeof(buf) - 1);
if (_s > 0)
{
buf[_s] = '\0';
printf("receive from client:%s\n", buf);
pos = buf;
//Demo 仅仅演示效果,不做详细的协议解析
if (!strncasecmp(pos, "GET", 3)) {
stat->handler = welcome_response_handler;
}
else if (!strncasecmp(pos, "Post", 4)) {
//获取 uri
printf("---Post----\n");
pos += strlen("Post");
while (*pos == ' ' || *pos == '/') ++pos;
if (!strncasecmp(pos, "commit", 6)) {//获取名字和年龄
int len = 0;
printf("post commit --------\n");
pos = strstr(buf, "\r\n\r\n");
char *end = NULL;
if (end = strstr(pos, "name=")) {
pos = end + strlen("name=");
end = pos;
while (('a' <= *end && *end <= 'z') || ('A' <= *end && *end <= 'Z') || ('0' <= *end && *end <= '9')) end++;
len = end - pos;
if (len > 0) {
memcpy(stat->name, pos, end - pos);
stat->name[len] = '\0';
}
}
if (end = strstr(pos, "age=")) {
pos = end + strlen("age=");
end = pos;
while ('0' <= *end && *end <= '9') end++;
len = end - pos;
if (len > 0) {
memcpy(stat->age, pos, end - pos);
stat->age[len] = '\0';
}
}
stat->handler = commit_respone_handler;
}
else {
stat->handler = welcome_response_handler;
}
}
else {
stat->handler = welcome_response_handler;
}
//生成处理结果 html ,write
stat->_ev.events = EPOLLOUT;
//stat->_ev.data.ptr = stat;
epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev); //二次托管
}
else if (_s == 0) //client:close
{
printf("client: %d close\n", stat->fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, stat->fd, NULL);
close(stat->fd);
free(stat);
}
else
{
perror("read");
}
}
void welcome_response_handler(ConnectStat * stat) {
const char * welcome_content = "\
<html lang=\"zh-CN\">\n\
<head>\n\
<meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
<title>This is a test</title>\n\
</head>\n\
<body>\n\
<div align=center height=\"500px\" >\n\
<br/><br/><br/>\n\
<h2>大家好,欢迎来到奇牛学院VIP 课!</h2><br/><br/>\n\
<form action=\"commit\" method=\"post\">\n\
尊姓大名: <input type=\"text\" name=\"name\" />\n\
<br/>芳龄几何: <input type=\"password\" name=\"age\" />\n\
<br/><br/><br/><input type=\"submit\" value=\"提交\" />\n\
<input type=\"reset\" value=\"重置\" />\n\
</form>\n\
</div>\n\
</body>\n\
</html>";
char sendbuffer[4096];
char content_len[64];
strcpy(sendbuffer, main_header);
snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", (int)strlen(welcome_content));
strcat(sendbuffer, content_len);
strcat(sendbuffer, welcome_content);
printf("send reply to client \n%s", sendbuffer);
write(stat->fd, sendbuffer, strlen(sendbuffer));
stat->_ev.events = EPOLLIN;
//stat->_ev.data.ptr = stat;
epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
}
void commit_respone_handler(ConnectStat * stat) {
const char * commit_content = "\
<html lang=\"zh-CN\">\n\
<head>\n\
<meta content=\"text/html; charset=utf-8\" http-equiv=\"Content-Type\">\n\
<title>This is a test</title>\n\
</head>\n\
<body>\n\
<div align=center height=\"500px\" >\n\
<br/><br/><br/>\n\
<h2>欢迎学霸同学 %s ,你的芳龄是 %s!</h2><br/><br/>\n\
</div>\n\
</body>\n\
</html>\n";
char sendbuffer[4096];
char content[4096];
char content_len[64];
int len = 0;
len = snprintf(content, 4096, commit_content, stat->name, stat->age);
strcpy(sendbuffer, main_header);
snprintf(content_len, 64, "Content-Length: %d\r\n\r\n", len);
strcat(sendbuffer, content_len);
strcat(sendbuffer, content);
printf("send reply to client \n%s", sendbuffer);
write(stat->fd, sendbuffer, strlen(sendbuffer));
stat->_ev.events = EPOLLIN;
//stat->_ev.data.ptr = stat;
epoll_ctl(epfd, EPOLL_CTL_MOD, stat->fd, &stat->_ev);
}
示例2:使用 epoll 封装的库的使用
globals.h
#ifndef GLOBALS_H
#define GLOBALS_H
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <errno.h>
#include <fcntl.h>
#include <assert.h>
#define FD_DESC_SZ 64
#define COMM_OK (0)
#define COMM_ERROR (-1)
#define COMM_NOMESSAGE (-3)
#define COMM_TIMEOUT (-4)
#define COMM_SHUTDOWN (-5)
#define COMM_INPROGRESS (-6)
#define COMM_ERR_CONNECT (-7)
#define COMM_ERR_DNS (-8)
#define COMM_ERR_CLOSING (-9)
//调试相关
#define DEBUG_LEVEL 0
#define DEBUG_ONLY 8
#define debug(m, n) if( m >= DEBUG_LEVEL && n <= DEBUG_ONLY ) printf
#define safe_free(x) if (x) { free(x); x = NULL; }
typedef void PF(int, void *);
struct _fde {
unsigned int type;
__u_short local_port;
__u_short remote_port;
struct in_addr local_addr;
char ipaddr[16]; /* dotted decimal address of peer */
PF *read_handler;
void *read_data;
PF *write_handler;
void *write_data;
PF *timeout_handler;
time_t timeout;
void *timeout_data;
};
typedef struct _fde fde;
extern fde *fd_table;
extern int Biggest_FD;
/*系统时间相关,设置成全局变量,供所有模块使用*/
extern struct timeval current_time;
extern double current_dtime;
extern time_t sys_curtime;
/* epoll 相关接口实现 */
extern void do_epoll_init(int max_fd);
extern void do_epoll_shutdown();
extern void epollSetEvents(int fd, int need_read, int need_write);
extern int do_epoll_select(int msec);
/*框架外围接口*/
void comm_init(int max_fd);
extern int comm_select(int msec);
extern inline void comm_call_handlers(int fd, int read_event, int write_event);
void commUpdateReadHandler(int fd, PF * handler, void *data);
void commUpdateWriteHandler(int fd, PF * handler, void *data);
extern const char *xstrerror(void);
int ignoreErrno(int ierrno);
#endif /* GLOBALS_H */
comm_epoll.c
#include "globals.h"
#include <sys/epoll.h>
#define MAX_EVENTS 256 /* 一次处理的最大的事件 */
/* epoll structs */
static int kdpfd;
static struct epoll_event events[MAX_EVENTS];
static int epoll_fds = 0;
static unsigned *epoll_state; /* 保存每个epoll 的事件状态 */
static const char *
epolltype_atoi(int x)
{
switch (x) {
case EPOLL_CTL_ADD:
return "EPOLL_CTL_ADD";
case EPOLL_CTL_DEL:
return "EPOLL_CTL_DEL";
case EPOLL_CTL_MOD:
return "EPOLL_CTL_MOD";
default:
return "UNKNOWN_EPOLLCTL_OP";
}
}
void do_epoll_init(int max_fd)
{
kdpfd = epoll_create(max_fd);
if (kdpfd < 0)
fprintf(stderr,"do_epoll_init: epoll_create(): %s\n", xstrerror());
//fd_open(kdpfd, FD_UNKNOWN, "epoll ctl");
//commSetCloseOnExec(kdpfd);
epoll_state = calloc(max_fd, sizeof(*epoll_state));
}
void do_epoll_shutdown()
{
close(kdpfd);
kdpfd = -1;
safe_free(epoll_state);
}
void epollSetEvents(int fd, int need_read, int need_write)
{
int epoll_ctl_type = 0;
struct epoll_event ev;
assert(fd >= 0);
debug(5, 8) ("commSetEvents(fd=%d)\n", fd);
memset(&ev, 0, sizeof(ev));
ev.events = 0;
ev.data.fd = fd;
if (need_read)
ev.events |= EPOLLIN;
if (need_write)
ev.events |= EPOLLOUT;
if (ev.events)
ev.events |= EPOLLHUP | EPOLLERR;
if (ev.events != epoll_state[fd]) {
/* If the struct is already in epoll MOD or DEL, else ADD */
if (!ev.events) {
epoll_ctl_type = EPOLL_CTL_DEL;
} else if (epoll_state[fd]) {
epoll_ctl_type = EPOLL_CTL_MOD;
} else {
epoll_ctl_type = EPOLL_CTL_ADD;
}
/* Update the state */
epoll_state[fd] = ev.events;
if (epoll_ctl(kdpfd, epoll_ctl_type, fd, &ev) < 0) {
debug(5, 1) ("commSetEvents: epoll_ctl(%s): failed on fd=%d: %s\n",
epolltype_atoi(epoll_ctl_type), fd, xstrerror());
}
switch (epoll_ctl_type) {
case EPOLL_CTL_ADD:
epoll_fds++;
break;
case EPOLL_CTL_DEL:
epoll_fds--;
break;
default:
break;
}
}
}
int do_epoll_select(int msec)
{
int i;
int num;
int fd;
struct epoll_event *cevents;
/*if (epoll_fds == 0) {
assert(shutting_down);
return COMM_SHUTDOWN;
}
statCounter.syscalls.polls++;
*/
num = epoll_wait(kdpfd, events, MAX_EVENTS, msec);
if (num < 0) {
getCurrentTime();
if (ignoreErrno(errno))
return COMM_OK;
debug(5, 1) ("comm_select: epoll failure: %s\n", xstrerror());
return COMM_ERROR;
}
//statHistCount(&statCounter.select_fds_hist, num);
if (num == 0)
return COMM_TIMEOUT;
for (i = 0, cevents = events; i < num; i++, cevents++) {
fd = cevents->data.fd;
comm_call_handlers(fd, cevents->events & ~EPOLLOUT, cevents->events & ~EPOLLIN);
}
return COMM_OK;
}
comm.c
#include "globals.h"
double current_dtime;
time_t sys_curtime;
struct timeval current_time;
int Biggest_FD = 1024; /*默认的最大文件描述符数量 1024*/
static int MAX_POLL_TIME = 1000; /* see also comm_quick_poll_required() */
fde *fd_table = NULL;
time_t getCurrentTime(void)
{
gettimeofday(¤t_time, NULL);
current_dtime = (double) current_time.tv_sec +
(double) current_time.tv_usec / 1000000.0;
return sys_curtime = current_time.tv_sec;
}
void
comm_close(int fd)
{
assert(fd>0);
fde *F = &fd_table[fd];
if(F) memset((void *)F,'\0',sizeof(fde));
epollSetEvents(fd, 0, 0);
close(fd);
}
void
comm_init(int max_fd)
{
if(max_fd > 0 ) Biggest_FD = max_fd;
fd_table = calloc(Biggest_FD, sizeof(fde));
do_epoll_init(Biggest_FD);
}
void
comm_select_shutdown(void)
{
do_epoll_shutdown();
if(fd_table) free(fd_table);
}
//static int comm_select_handled;
inline void
comm_call_handlers(int fd, int read_event, int write_event)
{
fde *F = &fd_table[fd];
debug(5, 8) ("comm_call_handlers(): got fd=%d read_event=%x write_event=%x F->read_handler=%p F->write_handler=%p\n"
,fd, read_event, write_event, F->read_handler, F->write_handler);
if (F->read_handler && read_event) {
PF *hdl = F->read_handler;
void *hdl_data = F->read_data;
/* If the descriptor is meant to be deferred, don't handle */
debug(5, 8) ("comm_call_handlers(): Calling read handler on fd=%d\n", fd);
//commUpdateReadHandler(fd, NULL, NULL);
hdl(fd, hdl_data);
}
if (F->write_handler && write_event) {
PF *hdl = F->write_handler;
void *hdl_data = F->write_data;
//commUpdateWriteHandler(fd, NULL, NULL);
hdl(fd, hdl_data);
}
}
int
commSetTimeout(int fd, int timeout, PF * handler, void *data)
{
fde *F;
debug(5, 3) ("commSetTimeout: FD %d timeout %d\n", fd, timeout);
assert(fd >= 0);
assert(fd < Biggest_FD);
F = &fd_table[fd];
if (timeout < 0) {
F->timeout_handler = NULL;
F->timeout_data = NULL;
return F->timeout = 0;
}
assert(handler || F->timeout_handler);
if (handler || data) {
F->timeout_handler = handler;
F->timeout_data = data;
}
return F->timeout = sys_curtime + (time_t) timeout;
}
void
commUpdateReadHandler(int fd, PF * handler, void *data)
{
fd_table[fd].read_handler = handler;
fd_table[fd].read_data = data;
epollSetEvents(fd,1,0);
}
void
commUpdateWriteHandler(int fd, PF * handler, void *data)
{
fd_table[fd].write_handler = handler;
fd_table[fd].write_data = data;
epollSetEvents(fd,0,1);
}
static void
checkTimeouts(void)
{
int fd;
fde *F = NULL;
PF *callback;
for (fd = 0; fd <= Biggest_FD; fd++) {
F = &fd_table[fd];
/*if (!F->flags.open)
continue;
*/
if (F->timeout == 0)
continue;
if (F->timeout > sys_curtime)
continue;
debug(5, 5) ("checkTimeouts: FD %d Expired\n", fd);
if (F->timeout_handler) {
debug(5, 5) ("checkTimeouts: FD %d: Call timeout handler\n", fd);
callback = F->timeout_handler;
F->timeout_handler = NULL;
callback(fd, F->timeout_data);
} else {
debug(5, 5) ("checkTimeouts: FD %d: Forcing comm_close()\n", fd);
comm_close(fd);
}
}
}
int
comm_select(int msec)
{
static double last_timeout = 0.0;
int rc;
double start = current_dtime;
debug(5, 3) ("comm_select: timeout %d\n", msec);
if (msec > MAX_POLL_TIME)
msec = MAX_POLL_TIME;
//statCounter.select_loops++;
/* Check timeouts once per second */
if (last_timeout + 0.999 < current_dtime) {
last_timeout = current_dtime;
checkTimeouts();
} else {
int max_timeout = (last_timeout + 1.0 - current_dtime) * 1000;
if (max_timeout < msec)
msec = max_timeout;
}
//comm_select_handled = 0;
rc = do_epoll_select(msec);
getCurrentTime();
//statCounter.select_time += (current_dtime - start);
if (rc == COMM_TIMEOUT)
debug(5, 8) ("comm_select: time out\n");
return rc;
}
const char *
xstrerror(void)
{
static char xstrerror_buf[BUFSIZ];
const char *errmsg;
errmsg = strerror(errno);
if (!errmsg || !*errmsg)
errmsg = "Unknown error";
snprintf(xstrerror_buf, BUFSIZ, "(%d) %s", errno, errmsg);
return xstrerror_buf;
}
int
ignoreErrno(int ierrno)
{
switch (ierrno) {
case EINPROGRESS:
case EWOULDBLOCK:
#if EAGAIN != EWOULDBLOCK
case EAGAIN:
#endif
case EALREADY:
case EINTR:
#ifdef ERESTART
case ERESTART:
#endif
return 1;
default:
return 0;
}
/* NOTREACHED */
}
main.c
#include "globals.h"
typedef struct _ConnectStat ConnectStat;
#define BUFLEN 1024
struct _ConnectStat {
int fd;
char send_buf[BUFLEN];
PF *handler;//不同页面的处理函数
};
//echo 服务实现相关代码
ConnectStat * stat_init(int fd);
void accept_connection(int fd, void *data);
void do_echo_handler(int fd, void *data);
void do_echo_response(int fd,void *data);
void do_echo_timeout(int fd, void *data);
void usage(const char* argv)
{
printf("%s:[ip][port]\n", argv);
}
void set_nonblock(int fd)
{
int fl = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
int startup(char* _ip, int _port) //创建一个套接字,绑定,检测服务器
{
//sock
//1.创建套接字
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
perror("sock");
exit(2);
}
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
//2.填充本地 sockaddr_in 结构体(设置本地的IP地址和端口)
struct sockaddr_in local;
local.sin_port = htons(_port);
local.sin_family = AF_INET;
local.sin_addr.s_addr = inet_addr(_ip);
//3.bind()绑定
if (bind(sock, (struct sockaddr*)&local, sizeof(local)) < 0)
{
perror("bind");
exit(3);
}
//4.listen()监听 检测服务器
if (listen(sock, 5) < 0)
{
perror("listen");
exit(4);
}
return sock; //这样的套接字返回
}
ConnectStat * stat_init(int fd) {
ConnectStat * temp = NULL;
temp = (ConnectStat *)malloc(sizeof(ConnectStat));
if (!temp) {
fprintf(stderr, "malloc failed. reason: %m\n");
return NULL;
}
memset(temp, '\0', sizeof(ConnectStat));
temp->fd = fd;
//temp->status = 0;
}
void do_welcome_handler(int fd, void *data) {
const char * WELCOME= "Welcome.\n";
int wlen = strlen(WELCOME);
int n ;
ConnectStat * stat = (ConnectStat *)(data);
if( (n = write(fd, "Welcome.\n",wlen)) != wlen ){
if(n<=0){
fprintf(stderr, "write failed[len:%d], reason: %s\n",n,strerror(errno));
}else fprintf(stderr, "send %d bytes only ,need to send %d bytes.\n",n,wlen);
}else {
commUpdateReadHandler(fd, do_echo_handler,(void *)stat);
commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);
}
}
void do_echo_handler(int fd, void *data) {
ConnectStat * stat = (ConnectStat *)(data);
char * p = NULL;
assert(stat!=NULL);
p = stat->send_buf;
*p++ = '-';
*p++ = '>';
ssize_t _s = read(fd, p, BUFLEN-(p-stat->send_buf)-1); //2字节"->" +字符结束符.
if (_s > 0)
{
*(p+_s) = '\0';
printf("receive from client: %s\n", p);
//_s--;
//while( _s>=0 && ( stat->send_buf[_s]=='\r' || stat->send_buf[_s]=='\n' ) ) stat->send_buf[_s]='\0';
if(!strncasecmp(p, "quit", 4)){//退出.
comm_close(fd);
free(stat);
return ;
}
//write(fd,
commUpdateWriteHandler(fd, do_echo_response, (void *)stat);
commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);
}else if (_s == 0) //client:close
{
fprintf(stderr,"Remote connection[fd: %d] has been closed\n", fd);
comm_close(fd);
free(stat);
}
else //err occurred.
{
fprintf(stderr,"read faield[fd: %d], reason:%s [%d]\n",fd , strerror(errno), _s);
}
}
void do_echo_response(int fd, void *data) {
ConnectStat * stat = (ConnectStat *)(data);
int len = strlen(stat->send_buf);
int _s = write(fd, stat->send_buf, len);
if(_s>0){
commSetTimeout(fd, 10, do_echo_timeout, (void *)stat);
commUpdateReadHandler(fd, do_echo_handler, (void *)stat);
}else if(_s==0){
fprintf(stderr,"Remote connection[fd: %d] has been closed\n", fd);
comm_close(fd);
free(stat);
}else {
fprintf(stderr,"read faield[fd: %d], reason:%s [%d]\n",fd ,_s ,strerror(errno));
}
}
//read()
//注册写事件
//写事件就绪
//write()
void accept_connection(int fd, void *data){
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
ConnectStat * stat = (ConnectStat *)data;
int new_fd = accept(fd, (struct sockaddr*)&peer, &len);
if (new_fd > 0)
{
ConnectStat *stat = stat_init(new_fd);
set_nonblock(new_fd);
printf("new client: %s:%d\n", inet_ntoa(peer.sin_addr), ntohs(peer.sin_port));
commUpdateWriteHandler(new_fd, do_welcome_handler, (void *)stat);
commSetTimeout(new_fd, 30,do_echo_timeout, (void *)stat);
}
}
void do_echo_timeout(int fd, void *data){
fprintf(stdout,"---------timeout[fd:%d]----------\n",fd);
comm_close(fd);
free(data);
}
int main(int argc,char **argv){
if (argc != 3) //检测参数个数是否正确
{
usage(argv[0]);
exit(1);
}
int listen_sock = startup(argv[1], atoi(argv[2])); //创建一个绑定了本地 ip 和端口号的套接字描述符
//初始化异步事件处理框架epoll
comm_init(102400);
ConnectStat * stat = stat_init(listen_sock);
commUpdateReadHandler(listen_sock,accept_connection,(void *)stat);
do{
//不断循环处理事件
comm_select(1000);
}while(1==1);
comm_select_shutdown();
}
参考:
Select、Poll、Epoll详解
小林coding-I/O 多路复用
epoll高度封装reactor,几乎所有可见服务器的底层框架