IO复用
- 概述
- IO模型
- 阻塞式IO
- 非阻塞式IO
- IO复用
- select、poll、epoll异同
- 信号驱动式IO
- 异步IO
- select函数
- select示例代码
- poll函数
- poll示例代码
- epoll函数
- 创建 epoll_create
- 注册、修改、删除 epoll_ctl
- 轮询 I/O 事件的发生 epoll_wait
- epoll示例代码
- 基于TCP和epoll在线多人聊天室服务器例子
概述
什么是IO复用呢?I/O复用(I/O multiplexing),指的是通过一个支持同时感知多个描述符的函数系统调用,阻塞在这个系统调用上,等待某一个或者几个描述符准备就绪,就返回可读条件。
IO多路复用解决了什么问题呢?当多个客户端与服务器通信时,若服务器阻塞在其中一个客户的read(sockfd1,…),当另一个客户数据到达sockfd2时,服务器无法及时处理,此时需要用到IO多路复用。即同时监听n个客户,当其中有一个发来消息时就从select的阻塞中返回,然后调用read读取收到消息的sockfd,然后又循环回select阻塞。这样就解决了阻塞在一个消息而无法处理其它的。即用来解决对多个I/O监听时,一个I/O阻塞影响其他I/O的问题。
这时候大家可能会想到线程和进程也能做到这样的效果啊(基于tcp多线程在线聊天室例子)?但是,CPU切换进程和线程的成本是很高的,这也正是IO复用的优点,可以做到用更少的资源完成更多的事。
IO模型
阻塞式IO
这种类型的IO会阻塞等待到有输入。这类IO大部分时间处于睡眠态、阻塞态、挂起态。优点是不占用 CPU 宝贵的时间片,但是同一时刻只能处理一个操作、效率比较低。
如何理解呢?就像你在家里睡觉,等着你的外卖送到,送到了给你打电话你才起床去拿外卖。如果一次性到了很多外卖,你也只能一个电话一个电话去接听,然后挨着拿。
应用场景多为多进程、多线程的TCP 并发服务器、客户端。
非阻塞式IO
该种类型的IO需要程序轮询检测输入,如果有输入就调用IO进行读取数据。优点是提高了程序的执行效率,但是需要占用更多的 CPU 和系统资源,使得 CPU 负荷增高。
我们还举外卖的例子。这种情况就像你非常饿,一直顶着配送地图,一分钟刷新一次地图,看看到了没,到了你就立即下楼去拿,拿完了继续刷新地图看看别的还有多久到。
IO复用
这类IO采用阻塞式IO+组长机制。多路IO共用一个同步阻塞接口,任意IO可操作都可激活IO操作。它能同时等待多个文件描述符,而这些文件描述符其中的任意一个进入读就绪状态,函数就可以返回。
IO复用包含了select、poll和epoll。其中select相当于你家楼下放外卖的桌子,你得自己挨个外卖看一遍,直达找到你的外卖。而epoll像外卖柜,你只要看一下手机就知道你的外卖在哪个柜子里面。
select、poll、epoll异同
相同之处:
1 . 都可以用于监听多个文件描述符的读写事件。
2 . 都是阻塞式的,即当没有任何事件发生时,它们都会一直阻塞等待。
3 . 都可以通过设置超时时间来控制阻塞等待的时间。
不同之处:
1 . select和poll使用轮询的方式来检查所有的文件描述符,而epoll使用回调的方式,只有在有事件发生时才会调用回调函数。
2 . select和poll的文件描述符集合是通过参数传递给函数的,而epoll使用epoll_ctl函数来注册和删除文件描述符,通过epoll_wait函数来等待事件。
3 . select和poll对于大量的文件描述符来说,性能会下降,因为每次都需要遍历整个文件描述符集合,而epoll使用红黑树来存储文件描述符,效率更高。
4 . select和poll支持的文件描述符数量有一定的限制,而epoll没有明确的限制。
总的来说,epoll相对于select和poll来说,在处理大量的并发连接时性能更好,并且使用更方便。
信号驱动式IO
注册一个IO信号事件,在数据可操作时通过SIGIO信号通知线程,这应该算是一种异步机制.
异步IO
应用进程通知内核开始一个异步I/O操作,并让内核在整个操作(包含将数据从内核复制到应该进程的缓冲区)完成后通知应用进程。例如POSIX 的&IO _系列函数。
select函数
头文件:
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
函数原型:int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
参数介绍:
nfds:集合中所有文件描述符的范围,即所有文件描述符的最大值+1。
readfds:监听的读事件文件描述符集合。
writefds:监听的写事件文件描述符集合。
exceptfds:意外文件描述符集合。
timeout:
返回值:返回值是一个整数,表示有多少个文件描述符已经就绪。如果返回0,表示在指定的超时时间内没有任何文件描述符就绪;如果返回-1,表示发生错误。
//FD_CLR(inr fd,fd_set* set);用来清除描述词组 set 中相关 fd 的位
//FD_ISSET(int fd,fd_set *set);用来测试描述词组 set 中相关 fd 的位是否为真(遍历检测函数)
//FD_SET(int fd,fd_set*set);用来设置描述词组 set 中相关 fd 的位
//FD_ZERO(fd_set *set);用来清除描述词组 set 的全部位(在初始化时用到以免里面有垃圾值)
fd_set readfds;
FD_ZERO(&readfds);//清空
FD_SET(socketfd,&readfds); //用来设置描述词组 set 中相关 socketfd-fd 的位
fd_set changeReadfds = readfds;
int count = select(nfds,&useChangeReadfds, NULL,NULL,0);
select示例代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/select.h>
typedef struct sockaddr SA;
typedef struct sockaddr_in SIN;
#define MAXBACKLOG 100
int Socket(int domain,int type,int protocol);
int Bind(int sockfd,struct sockaddr * my_addr,int addrlen);
int Listen(int s,int backlog);
int Accept(int s,struct sockaddr * addr,int * addrlen);
void * clientRecvDataFunction(void * arg);
//运行格式 ./app 192.168.5.166 8888
int main(int argc,char *argv[])
{
int opt =1;
//建立监听套接字
int socketfd = Socket(AF_INET,SOCK_STREAM,0);
//需要进行重用地址及其端口号
setsockopt(socketfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
//绑定信息编写服务器信息
SIN serverinfo;
serverinfo.sin_family =AF_INET; //协议IPV4
serverinfo.sin_port =htons(atoi(argv[2])); //网络字节序(大端字节序)与主机字节序(小端字节序)
serverinfo.sin_addr.s_addr= inet_addr(argv[1]);
int addrlen = sizeof(SIN);
Bind(socketfd,(SA*)&serverinfo,addrlen);
//监听
Listen(socketfd,MAXBACKLOG);
//select函数参数填写
int nfds = socketfd+1;
fd_set readfds; //栈区定义 先清空 再写入位
FD_ZERO(&readfds); //清空
FD_SET(socketfd,&readfds); //用来设置描述词组set中相关socketfd-fd的位
//读写
while(1)
{
fd_set useChangeReadfds = readfds;
int count = select(nfds,&useChangeReadfds, NULL,NULL,0);
if(count >0)
{
//文件描述符socketfd调用accept
if(FD_ISSET(socketfd,&useChangeReadfds))
{
SIN clientinfo;
int clientaddrlen =sizeof(SA);
int newfd = Accept(socketfd,(SA*)&clientinfo,&clientaddrlen);
printf("客户端地址:%s 端口号:%d\n",inet_ntoa(clientinfo.sin_addr),ntohs(clientinfo.sin_port));
//将新的文件描述符添加至readfds
FD_SET(newfd,&readfds);
//调整检测范围
newfd >= nfds?(nfds++):(nfds =nfds);
}
//文件描述符非socketfd调用read
else
{
for(int startFd = socketfd+1; startFd < nfds;startFd++)
{
if(FD_ISSET(startFd,&useChangeReadfds))
{
//读取客户端发送来的数据
char readbuff[512]={0};
int len = read(startFd,readbuff,sizeof(readbuff));
if(len > 0)
{
printf("%d:%s\n",startFd,readbuff);
}
else if(len == 0)
{
printf("%d:客户端退出\n",startFd);
FD_CLR(startFd,&readfds);
//存在弊端nfds不好调整
close(startFd);
}
else if(len < 0)
{
printf("%d:客户端异常退出\n",startFd);
FD_CLR(startFd,&readfds);
//存在弊端nfds不好调整
close(startFd);
}
}
}
}
}
}
//关闭
close(socketfd);
return 0;
}
int Socket(int domain,int type,int protocol)
{
int socketFd = socket(domain,type,protocol);
if(socketFd ==-1)
{
perror("socket");
exit(1);
}
return socketFd;
}
int Bind(int sockfd,struct sockaddr * my_addr,int addrlen)
{
int val = bind(sockfd,my_addr,addrlen);
if(val)
{
perror("bind");
exit(1);
}
return 0;
}
int Listen(int s,int backlog)
{
int val = listen(s,backlog);
if(val == -1)
{
perror("listen");
exit(1);
}
return val;
}
int Accept(int s,struct sockaddr * addr,int * addrlen)
{
int NEWfd= accept(s,addr,addrlen);
if(NEWfd == -1)
{
perror("listen");
exit(1);
}
return NEWfd;
}
poll函数
头文件:
#include <poll.h>
函数原型:int poll(struct pollfd fd[], nfds_t nfds, int timeout);
参数介绍:
fd[]:文件描述符结构体。
nfds:指定结构体数组元素个数。
timeout:
返回值:成功时 poll() 返回结构体中 revents 域不为 0 的文件描述符个数,如果在超时前没有任何事件发生,poll()返回 0。
//struct pollfd结构体内容
struct pollfd
{
int fd; //文件描述符 所要检测的文件描述符
short events; //请求的事件 检测文件描述符的事件
short revents; //返回的事件(内核给的反馈)
};
struct pollfd fds[1025];
//清空结构体数组
//清空结构体数组
for(int i = 0;i < sizeof(fds)/sizeof(struct pollfd);i++)
{
fds[i].fd = -1;
}
//初始化填入 socketfd;
fds[0].fd = socketfd; //文件描述符
fds[0].events = POLLIN; //读事件
//nfds 用来指定第一个参数数组元素个数;
int nfds = 1;
int count = poll(fds,nfds, -1);
poll示例代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <time.h>
#include <poll.h>
typedef struct sockaddr SA;
typedef struct sockaddr_in SIN;
#define MAXBACKLOG 100
int Socket(int domain,int type,int protocol);
int Bind(int sockfd,struct sockaddr * my_addr,int addrlen);
int Listen(int s,int backlog);
int Accept(int s,struct sockaddr * addr,int * addrlen);
void * clientRecvDataFunction(void * arg);
//./app 192.168.5.166 8888
int main(int argc,char *argv[])
{
int opt =1;
//建立监听套接字
int socketfd = Socket(AF_INET,SOCK_STREAM,0);
//需要进行重用地址及其端口号
setsockopt(socketfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
//绑定信息编写服务器信息
SIN serverinfo;
serverinfo.sin_family =AF_INET; //协议IPV4
serverinfo.sin_port =htons(atoi(argv[2]));
serverinfo.sin_addr.s_addr= inet_addr(argv[1]);
int addrlen = sizeof(SIN);
Bind(socketfd,(SA*)&serverinfo,addrlen);
//监听
Listen(socketfd,MAXBACKLOG);
//pollt函数参数填写
struct pollfd fds[1025];
//清空结构体数组
for(int i = 0;i < sizeof(fds)/sizeof(struct pollfd);i++)
{
fds[i].fd = -1;
}
//初始化填入socketfd;
fds[0].fd = socketfd; //文件描述符
fds[0].events = POLLIN; //读事件
//nfds用来指定第一个参数数组元素个数;
int nfds = 1;
//读写
while(1)
{
int count = poll(fds,nfds, -1);
if(count > 0)
{
//检查fds[0]的fd是否发生动作------>accept
if(fds[0].revents == POLLIN)
{
SIN clientinfo;
int clientaddrlen =sizeof(SA);
int newfd = Accept(socketfd,(SA*)&clientinfo,&clientaddrlen);
printf("客户端地址:%s 端口号:%d\n",inet_ntoa(clientinfo.sin_addr),ntohs(clientinfo.sin_port));
//需要将newfd放至fds中
for(int i = 0; i < sizeof(fds)/sizeof(struct pollfd);i++)
{
if(fds[i].fd == -1)
{
fds[i].fd = newfd; //文件描述符
fds[i].events = POLLIN; //读事件
i >= nfds?(nfds++):(nfds =nfds); //nfds与下标有联系
break;
}
}
}
else
{
//检查fds[>0]的是否发送动作-------->read
for(int startfd = 1;startfd<nfds;startfd++)
{
if(fds[startfd].revents == POLLIN)
{
char readbuff[512]={0};
int len = read(fds[startfd].fd,readbuff,sizeof(readbuff));
if(len > 0)
{
printf("%d:%s\n",fds[startfd].fd,readbuff);
}
else if(len == 0)
{
printf("%d:客户端退出\n",fds[startfd].fd);
//归位处理
close(fds[startfd].fd);
fds[startfd].fd=-1;
}
else if(len < 0)
{
printf("%d:客户端异常退出\n",fds[startfd].fd);
//归位处理
close(fds[startfd].fd);
fds[startfd].fd=-1;
}
}
}
}
}
}
//关闭
close(socketfd);
return 0;
}
//下面的函数与select相同
epoll函数
创建 epoll_create
头文件:
#include <sys/epoll.h>
函数原型:int epoll_create(int size)
参数介绍:
size:标识这个监听的数目最大有多大。
返回值: 成功时,这些系统调用将返回非负文件描述符。如果出错,则返回-1,并且将errno设置为指示错误。
int epollfd = epoll_create(1024);
注册、修改、删除 epoll_ctl
头文件:
#include <sys/epoll.h>
函数原型:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数介绍:
epfd: epoll 专用的文件描述符。
op:要进行的操作,EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除。
fd:关联的文件描述符。
event:指向 epoll_event 的指针。
返回值:成功返回1,失败返回0.
//结构体 epoll_event 被用于注册所感兴趣的事件和回传所发生待处理的事件,定义如下:
typedef union epoll_data_t { //联合体
void *ptr;
int fd; //比较常用
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;//保存触发事件的某个文件描述符相关的数据
struct epoll_event {
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
/*其中 events 表示感兴趣的事件和被触发的事件,可能的取值为:
EPOLLIN:表示对应的文件描述符可以读;
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数可读;
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: ET 的 epoll 工作模式;*/
struct epoll_event event;
event.events = EPOLLIN; //事件成员
event.data.fd = socketfd; //数据
epoll_ctl(epollfd,EPOLL_CTL_ADD,socketfd, &event);
轮询 I/O 事件的发生 epoll_wait
头文件:
#include <sys/epoll.h>
函数原型:int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);
参数介绍:
epfd:epoll 专用的文件描述符。
events:回传处理事件的数组。
maxevents:每次能处理的事件数。
timeout:等待 I/O 事件发生的超时值。
返回值:返回发生的事件数,失败返回-1。
int count = epoll_wait(epollfd,events,10,-1);
epoll示例代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <time.h>
#include <sys/epoll.h>
typedef struct sockaddr SA;
typedef struct sockaddr_in SIN;
#define MAXBACKLOG 100
int Socket(int domain,int type,int protocol);
int Bind(int sockfd,struct sockaddr * my_addr,int addrlen);
int Listen(int s,int backlog);
int Accept(int s,struct sockaddr * addr,int * addrlen);
void * clientRecvDataFunction(void * arg);
//./app 192.168.5.166 8888
int main(int argc,char *argv[])
{
int opt =1;
//建立监听套接字
int socketfd = Socket(AF_INET,SOCK_STREAM,0);
//需要进行重用地址及其端口号
setsockopt(socketfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
//绑定信息编写服务器信息
SIN serverinfo;
serverinfo.sin_family =AF_INET; //协议IPV4
serverinfo.sin_port =htons(atoi(argv[2]));
serverinfo.sin_addr.s_addr= inet_addr(argv[1]);
int addrlen = sizeof(SIN);
Bind(socketfd,(SA*)&serverinfo,addrlen);
//监听
Listen(socketfd,MAXBACKLOG);
//epollt创建根节点
int epollfd = epoll_create(1024);
//添加socketfd文件描述符至内核 红黑树
struct epoll_event event;
event.events = EPOLLIN; //事件成员
event.data.fd = socketfd; //数据
epoll_ctl(epollfd,EPOLL_CTL_ADD,socketfd, &event);
//读写
while(1)
{
struct epoll_event events[10];
int count = epoll_wait(epollfd,events,10,-1);
if(count > 0)
{
for(int i = 0; i< count;i++)
{
if(events[i].events == EPOLLIN)
{
if(events[i].data.fd==socketfd)
{
//等待连接
SIN clientinfo;
struct epoll_event event;
int clientaddrlen =sizeof(SA);
int newfd = Accept(socketfd,(SA*)&clientinfo,&clientaddrlen);
printf("客户端地址:%s 端口号:%d\n",inet_ntoa(clientinfo.sin_addr),ntohs(clientinfo.sin_port));
//需要将newfd放至红黑树中
event.events = EPOLLIN; //事件成员
event.data.fd = newfd; //数据
epoll_ctl(epollfd,EPOLL_CTL_ADD,newfd, &event);
}
else
{
//read
char readbuff[512]={0};
int len = read(events[i].data.fd,readbuff,sizeof(readbuff));
if(len > 0)
{
printf("%d:%s\n",events[i].data.fd,readbuff);
}
else if(len == 0)
{
printf("%d:客户端退出\n",events[i].data.fd);
epoll_ctl(epollfd,EPOLL_CTL_DEL,events[i].data.fd,NULL);
close(events[i].data.fd);
}
else if(len < 0)
{
printf("%d:客户端异常退出\n",events[i].data.fd);
epoll_ctl(epollfd,EPOLL_CTL_DEL,events[i].data.fd,NULL);
close(events[i].data.fd);
}
}
}
}
}
}
//关闭
close(socketfd);
return 0;
}
//下面的函数与select相同
基于TCP和epoll在线多人聊天室服务器例子
点我查看