目录
IO为什么低效?
1.快速理解五种IO模式
2.五种IO模型
3.非阻塞IO
fcntl()
4.IO多路转接 select
select
fd_set类型
struct timeval*类型
5.Select的代码测试
5.1 问题一:一开始,我们只有一个listen套接字
5.2 问题二:建立连接成功了,可以accept()了,可以直接读/写吗?
5.2.1 对于更新位图结构的修改
5.2.2 对于处理事件逻辑的修改
5.3 完整代码
5.4 测试结果
6.select的优缺点
本文代码部分中使用的Sock.hpp 和 log.hpp在,本文把Sock.hpp的方法全部变成了静态成员函数
Linux相关博文中使用的头文件_cout头文件 linux_Gosolo!的博客-CSDN博客
之前提到过,我们所谓的IO接口,本质上就是拷贝函数,仅是将内核级缓冲区的数据拷贝过来,或者拷给内核级缓冲区。
IO为什么低效?
当我们在向屏幕输入数据时,如果我们不敲击键盘,那么进程在干嘛?阻塞。
当我们read/recv的时候,如果底层缓冲区没有数据,IO接口就会阻塞,也就是等待资源就绪。
当read/recv的时候,如果底层缓冲区中有数据,IO接口就会直接进行拷贝。
单位时间内,大部分时间IO类的接口其实都在等待。
所以IO操作,本质上就是等+拷贝。那么如何才叫高效的IO呢?
在单位的时间内,让等的比重变得很低,IO效率就会变高。
1.快速理解五种IO模式
以钓鱼为例。
张三:钓鱼时,眼睛一直死死的盯着鱼钩。鱼一咬钩,他就进行钓动作。
李四:钓鱼时,他会看一下鱼钩有没有晃动,如果没有,他就低头看会儿手机。每隔一段时间就会在看看鱼钩,这样往复。
王五:钓鱼时,他在鱼钩上绑了一个铃铛,然后就沉迷手机,等待铃铛响之后,他会进行钓动作。
赵六:钓鱼时,他一个人拿了一百竿鱼竿,同时观察这一百竿是否有鱼上钩,如果有,他就会对上钩的那个鱼竿进行钓动作。
田七:看到别人钓鱼,他也想吃鱼。于是让手底下的小王去钓鱼,给了他一杆鱼竿,诱饵等,又给了他一部电话。说“等调到鱼了你给我打电话”。于是田七离开了河边。
首先,回答一个问题。这五个人谁的效率最高?
赵六。在单位时间内,赵六的等待时间在概率上来说是最小的。所以高效。
然后我们把这五种钓鱼的方式套到IO模式上来
张三:阻塞式。
李四:非阻塞式。
王五:信号驱动。
赵六:多路复用。
田七和小王:异步IO
由于异步IO和同步IO有争议,我们这里以下方这个原则介绍。
IO=等+拷贝 所谓的参与,实际上要么是参与了等,要么是参与了拷贝,要么是两个都参与了
2.五种IO模型
阻塞式
数据没就位,阻塞挂起。操作系统觉察数据就绪时,会唤醒执行流。
非阻塞式
数据没就绪,执行流不断的询问。一旦数据就绪后,操作系统会把资源放进缓冲区,执行流会自己去询问。
信号驱动
内核将数据准备好时,使用SIGIO信号通知应用程序进行IO操作。
多路复用
进程受阻于select调用,等待可能多个套接字中的任何一个变为可读。等待过程是同时发生的,但是一旦有一个资源就绪,拷贝是原子性的,一段时间内只能执行一个。
异步IO
执行流通过一定的接口,将指定的缓冲区和要求交给操作系统,操作系统检测到资源就位,会自行完成拷贝。然后通过信号/其他方式通知执行流。
3.非阻塞IO
首先,之前recv等系统调用接口,本事就可以通过传入的参数进行非阻塞。但现在提供了一个通用的接口,只要需要非阻塞IO就可以通过下面这个接口进行非阻塞设置。
fcntl()
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd,int cmd,.../*arg*/);
传入的cmd不同,后面追加的参数也不同
cmd参数 | 作用 |
F_DUPFD | 复制一个现有的描述符 |
F_GETFD或F_SETFD | 获得/设置文件描述符标记 |
F_GETFL或F_SETFL | 获得/设置文件状态标记 |
F_GETOWN或F_SETOWN | 获得/设置异步IO所有权 |
F_GETLK,F_SETLK或F_SETLKW | 获得/设置记录锁 |
这里我们使用第三种功能。
一个简单的阻塞式IO
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
using std::cout;
using std::endl;
bool SetNonBlock(int fd)
{
int f1=fcntl(fd,F_GETFL);//在底层获取当前fd对应的文件读写标志位
if(f1<0) return false;
fcntl(fd,F_SETFL,f1|O_NONBLOCK);//设置非阻塞
return true;
}
int main()
{
//SetNonBlock(0);//只用设置一次 后续就都是非阻塞了
char buffer[1024];
while(true)
{
ssize_t s=read(0,buffer,sizeof(buffer)-1);
if(s>0)
{
buffer[s]=0;
cout<<"echo: "<<buffer<<endl;
}
else
{
cout<<"read fail "<<endl;
}
}
return 0;
}
将SetNonBlock放出来之后
4.IO多路转接 select
select
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
参数 | 解释 |
返回值 | 返回就绪的个数 |
nfds | 等待的所有文件中,最大的文件描述+1 |
readfds | 传进一个fd_set类型,表示fd_set里面设置 的文件描述符集,当读资源就绪时,select 可以捕获。 |
writefds | 当写资源就绪时,select可以捕获 |
exceptfds | select捕获文件描述集中所有的文件出现的异常。 |
timeout | select等待多个fd,等待策略可以选择, 阻塞式传入nullptr,非阻塞式传入(0,0) 自定义等待时间(5,0),时间到了立马返回。如果在此期间有 fd就绪,timeout会输出距离下一次timeout的时间 |
fd_set类型
fd_set和信号集中sigset_t类型类似,都是一个位图结构。表示文件描述符集。当我们想要select关心一些文件的写操作是否就绪,就需要先把这些文件的文件描述符设置在一个fd_set的类型里面,然后将这个类型,传给writedfs参数。
上述三个fd_set类型的参数:
a.输入时,用户告诉内核,你要帮我关心哪个sock里面的哪一种事件。
b.输出时,内核告诉用户,我所关心的sock里,哪些sock上的哪类事件就绪了。
fd_set类型位图的操作函数
void FD_CLR(int fd, fd_set *set); // 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set); // 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set); // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set); // 用来清除描述词组set的全部位
注意:用户和内核都会修改同一个位图结构,这个参数用一次之后,一定需要进行重新设定。
fd_set是一个固定大小的位图,直接决定了select可以同时关心的fd的个数是有限的。
在我测试的环境中,大小为128字节,可以表示128*8个比特位,即128*8个文件描述符。
struct timeval*类型
struct timeval
{
__time_t tv_sec; //秒
__susecond_t tv_usec;//微秒
};
5.Select的代码测试
注:一开始的代码有很多问题,会在文中一边分析原因一边解决。
5.1 问题一:一开始,我们只有一个listen套接字
而且他是用来获取新连接的一个参数。
如何看待listen套接字?
可以将它看做读事件,只有底层的资源就绪之后,才能进行获取。那如果一开始没有新连接呢?那不就阻塞了吗?所以不能上来就调用accept();而需要让select()帮我们等,所以也就需要在调用select()之前,创建fd_set类型,并进行初始化。
using namespace std;
class SelectServer
{
public:
SelectServer(const uint16_t &port=8080)
:_port(port)
{
_listensock=Sock::Socket();
Sock::Bind(_listensock,_port);//ip缺省值为 0.0.0.0
Sock::Listen(_listensock);
logMessage(DEBUG,"%s","create base socket success");
}
void Start()
{
fd_set rfds;
FD_ZERO(&rfds);
while(true)
{
//int sock=Sock::Accept();
//放在里面是 因为当timeout的时间等待够3秒时,
//由于其为输入输出型参数,下次循环中的timeout会变为{0,0}
struct timeval timeout={3,0};
FD_SET(_listensock,&rfds);
int n=select(_listensock+1,&rfds,nullptr,nullptr,&timeout);
switch(n)
{
case 0:
logMessage(DEBUG,"time out...");
break;
case -1:
logMessage(WARNING,"select errno: %d : %s",errno,strerror(errno));
break;
default:
//成功
//具体细节后面详谈
break;
}
}
}
~SelectServer()
{
if(_listensock>=0) close(_listensock);
}
private:
uint16_t _port;
int _listensock;
};
5.2 问题二:建立连接成功了,可以accept()了,可以直接读/写吗?
不能!因为建立连接成功了,我们不知道数据什么时候会发送过来。recv(),read()可能被阻塞,导致服务器不能处理接下来的连接请求!
那select()清楚资源有没有就绪,怎么把我accept()来的文件描述符交给select呢?
nfds表示最大的文件描述符,随着获取的sock越来越多,添加到select中的套接字也会更多,也就决定着nfds每次都可能会变化,我们需要对他进行动态计算!
rfds/writefds/exceptfds:都是输入输出型参数,输入输出不一定是一样的。
比如让select关心1-10号文件描述符,但是其中有3,4就绪,那么select输出的rfds就会设置为3,4,其余的文件描述符就会被丢弃掉。所以注定了我们每一次都要对rfds进行重新设置。timeout同理。
综上,我们必须自己把合法的文件描述符单独保存起来,用来支持1.更新最大nfds 2.更新位图结构
5.2.1 对于更新位图结构的修改
//新增宏
#define NUM sizeof(fd_set)*8
#define FD_DONE -1
//新增成员变量
private:
int _fd_array[NUM];
//------------分界线-------------------
class SelectServer
{
public:
SelectServer(const uint16_t &port=8080)
:_port(port)
{
_listensock=Sock::Socket();
Sock::Bind(_listensock,_port);//ip缺省值为 0.0.0.0
Sock::Listen(_listensock);
logMessage(DEBUG,"%s","create base socket success");
for(int i=0;i<NUM;i++)
{
_fd_array[i]=FD_NONE;
}
//做一个规定 _fd_array[0]=_listensock
_fd_array[0]=_listensock;
}
void Start()
{
while(true)
{
DebugPrint();
fd_set rfds;
FD_ZERO(&rfds);
int maxfd=_listensock;
for(int i=0;i<NUM;i++)
{
if(_fd_array[i]==FD_NONE) continue;
FD_SET(_fd_array[i],&rfds);
if(maxfd<_fd_array[i]) maxfd=_fd_array[i];
}
int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);
switch(n)
{
case 0:
logMessage(DEBUG,"time out...");
break;
case -1:
logMessage(WARNING,"select errno: %d : %s",errno,strerror(errno));
break;
default:
//成功
logMessage(DEBUG, "get a new link event...");
HandlerEvent(rfds);
break;
}
}
~SelectServer()
{
if(_listensock>=0) close(_listensock);
}
private:
void HandlerEvent(const fd_set& rfds)
{
string clientip;
uint16_t clientport=0;
//FD_ISSET用于判断 _listensock在rfds中是否被设置 即是否就绪
if(FD_ISSET(_listensock,&rfds))
{
//_listensock上面的读事件就绪了 表示可以读取了 即获取连接
int sock=Sock::Accept(_listensock,&clientip,&clientport);
if(sock<0)
{
logMessage(WARNING,"accept error");
return;
}
logMessage(DEBUG,"get a new link success :[%s:%d] :%d",clientip.c_str(),clientport,sock);
//找一个位置添加 我刚刚得到的sock套接字 好让select帮我关心
int pos=1;
for(;pos<NUM;pos++)
{
if(_fd_array[pos]==FD_NONE) break;
}
if(pos==NUM)
{
//已经满了 不能再关心其他的了
logMessage(WARNING,"%s:%d","select server already full,close fd: %d",sock);
close(sock);
}
else
{
_fd_array[pos]=sock;
}
}
}
void DebugPrint()
{
cout<<"_fd_array[]: ";
for(int i=0;i<NUM;i++)
{
if(_fd_array[i]==FD_NONE) continue;
cout<<_fd_array[i]<<" ";
}
cout<<endl;
}
private:
uint16_t _port;
int _listensock;
int _fd_array[NUM];
};
当前逻辑只处理了一个套接字,即listensock,连接时候的处理动作。可随着套接字的不断增多,肯定也有读或者写的情况,所以HandlerEvent仍需要进行处理。
5.2.2 对于处理事件逻辑的修改
只需要将我们刚才写的那一大串处理逻辑封装为Accept()
现在我们完成输入事件的处理逻辑
注意
测试时,我们发送的资源都是字符串,方便测试。按道理来说这里也需要制定协议,以保证我们读到的是一个完整的报文。
void Recver(int pos)
{
// 读事件就绪:INPUT事件到来、recv,read
logMessage(DEBUG, "message in, get IO event: %d", _fd_array[pos]);
char buffer[1024];
int n = recv(_fd_array[pos], buffer, sizeof(buffer)-1, 0);
if(n > 0)
{
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);
}
else if(n == 0)
{
logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
// 1. 我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2. 不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
else
{
logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
// 1. 我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2. 不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
}
5.3 完整代码
#pragma once
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include <iostream>
#include <string>
#include <vector>
#include <sys/select.h>
#include <sys/time.h>
#include "log.hpp"
#include "Sock.hpp"
#define NUM sizeof(fd_set)*8
#define FD_NONE -1
using namespace std;
class SelectServer
{
public:
SelectServer(const uint16_t &port=8080)
:_port(port)
{
_listensock=Sock::Socket();
Sock::Bind(_listensock,_port);//ip缺省值为 0.0.0.0
Sock::Listen(_listensock);
logMessage(DEBUG,"%s","create base socket success");
for(int i=0;i<NUM;i++)
{
_fd_array[i]=FD_NONE;
}
//做一个规定 _fd_array[0]=_listensock
_fd_array[0]=_listensock;
}
void Start()
{
while(true)
{
DebugPrint();
fd_set rfds;
FD_ZERO(&rfds);
int maxfd=_listensock;
for(int i=0;i<NUM;i++)
{
if(_fd_array[i]==FD_NONE) continue;
FD_SET(_fd_array[i],&rfds);
if(maxfd<_fd_array[i]) maxfd=_fd_array[i];
}
int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);
switch(n)
{
case 0:
logMessage(DEBUG,"time out...");
break;
case -1:
logMessage(WARNING,"select errno: %d : %s",errno,strerror(errno));
break;
default:
//成功
logMessage(DEBUG, "get a new link event...");
HandlerEvent(rfds);
break;
}
}
}
~SelectServer()
{
if(_listensock>=0) close(_listensock);
}
private:
void Acceptr()
{
string clientip;
uint16_t clientport=0;
int sock=Sock::Accept(_listensock,&clientip,&clientport);
if(sock<0)
{
logMessage(WARNING,"accept error");
return;
}
logMessage(DEBUG,"get a new link success :[%s:%d] : %d",clientip.c_str(),clientport,sock);
//找一个位置添加 我刚刚得到的sock套接字 好让select帮我关心
int pos=1;
for(;pos<NUM;pos++)
{
if(_fd_array[pos]==FD_NONE) break;
}
if(pos==NUM)
{
logMessage(WARNING,"%s:%d","select server already full,close fd: %d",sock);
close(sock);
}
else
{
_fd_array[pos]=sock;
}
}
void Recver(int pos)
{
// 读事件就绪:INPUT事件到来、recv,read
logMessage(DEBUG, "message in, get IO event: %d", _fd_array[pos]);
// 暂时先不做封装, 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
// 这样读取有bug吗?有的,你怎么保证以读到了一个完整包文呢?
char buffer[1024];
int n = recv(_fd_array[pos], buffer, sizeof(buffer)-1, 0);
if(n > 0)
{
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);
}
else if(n == 0)
{
logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
// 1. 我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2. 不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
else
{
logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
// 1. 我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2. 不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
}
void HandlerEvent(const fd_set& rfds)
{
for(int i=0;i<NUM;i++)
{
//没让select关心这个文件
if(_fd_array[i]==FD_NONE) continue;
//让关心了 但是需要知道他是否就绪
if(FD_ISSET(_fd_array[i],&rfds))
{
//就绪
//现在需要判断他是链接事件 accept
//还是输入事件 recv read
if(_fd_array[i]==_listensock)
{
Acceptr();
}
else
{
Recver(i);
}
}
}
}
void DebugPrint()
{
cout<<"_fd_array[]: ";
for(int i=0;i<NUM;i++)
{
if(_fd_array[i]==FD_NONE) continue;
cout<<_fd_array[i]<<" ";
}
cout<<endl;
}
private:
uint16_t _port;
int _listensock;
int _fd_array[NUM];
};
#endif
5.4 测试结果
6.select的优缺点
优点:
效率高。适用于有大量连接,但是只有少量是活跃的场景。
缺点:
为了维护第三方数组,充满多次遍历数组。
而且每次调用select都需要手动设置fd集合,从接口使用角度来说也不方便。
每次调用select,都需要把fd集合从用户态拷贝到内核态,开销比较大。
同时每次调用select都需要在内核遍历传递进来的所有fd,开销比较大。
能够同时管理的fd是有上限的。
编码比较复杂。