文章目录
- 1.IO
- 1.1基本介绍
- 1.2基础io的低效性
- 1.3如何提高IO效率
- 1.4五种IO模型
- 1.5非阻塞模式的设置
- 2.IO多路转接之Select
- 2.1函数的基本了解
- 2.2fd_set理解
- 2.3完整例子代码(会在代码中进行讲解)
- 2.4优缺点
- 3.多路转接之poll
- 3.1poll函数的介绍
- 3.2poll服务器
- 3.3优缺点
- 4.epoll
- 4.1初始epoll
- 4.2epoll相关的系统调用
- 4.3epoll的工作原理
- 4.4epoll的工作方式
- 4.5LT模式重要代码
- 4.6ET模式重要代码
1.IO
1.1基本介绍
IO实际上就是 input && output
在冯诺依曼体系中就是与外设交互的意思,而我们的网络通信本质上也是一种IO。
1.2基础io的低效性
为什么基础io会低效呢?我们以读取为例来介绍。
当我们底层调用read函数的时候,如果缓冲区没有数据 ,我们就会将pcb放入等待队列,进行阻塞。
当我们底层调用read函数的时候,如果缓冲区有数据,我们就会读取。
这么来看我们io的本质其实就是数据拷贝+等待。
实际上回顾我们之前的文章,不光是在网络中在本地主机进行IO的时候也是进行这两个阶段。
当我们的程序需要读取磁盘中的内容时,磁盘需要先将内容加载到内存里面。
而在加载还未完成时我们的程序在做什么呢? 阻塞 或者说 等。
这也就是为什么我们使用scanf
和cin
等输入函数的时候 命令行会阻塞住。
在进行IO的时间里大部分时间都在等待。
1.3如何提高IO效率
顺着我们上面说的实际就是降低这个等待时间的比重。
1.4五种IO模型
经历了这么长时间的发展之后,计算机的前辈们已经总结出来了五种IO模型,让我们一起来学习一下吧。
如果光将一些概念大家应该很难理解,这里我们借用钓鱼的例子来为大家大致分析一下(把钓起鱼的一瞬间抽象成拷贝,等鱼儿上钩的时间想象成阻塞时间)。
例子一:张三去钓鱼的时候不喜欢被打扰,甩钩之后就一直盯着鱼漂,等什么时候余漂有反应了就立刻拉钩。
例子二:李四去钓鱼的时候专心不了,甩钩之后就喜欢刷刷手机,每刷一会儿手机就看一眼鱼漂,如果有反应了就拉钩,如果没反应就继续刷手机。
例子三:王五去钓鱼的时候喜欢在鱼漂上挂个铃铛 ,之后就去刷手机玩了。 如果铃铛响了,那么王五就去拉钩;如果没响,就一直玩手。
例子四:赵六去钓鱼的时候喜欢多备几根鱼竿,所有鱼竿下水之后赵六就在旁边巡视,哪一根鱼竿的鱼漂动了就去拉哪根鱼竿。
例子五:田七去钓鱼的时候带着一个小跟班,每次只需要布置任务让小跟班钓多少鱼就好,自己处理自己的事情去了。
上面的五个例子分别代表了五个IO模型分别是:
-
故事一: 阻塞
-
故事二: 非阻塞轮询
-
故事三: 信号驱动
-
故事四: 多路复用多路转接
-
故事五: 异步IO
理论上来讲,这些例子当中例子四就是我们的多路复用多路转接最为高效,当然这里提到异步IO我们就多提一嘴,其实有关异步IO和同步IO的概念一直都有争论,有兴趣的同学可以去了解一下,这里就不做过多讲解。
1.5非阻塞模式的设置
如果想让IO进行非阻塞的话 打开文件的时候就可以进行非阻塞设置,比如说 open
socket
。
但是如果我们使用每个函数的时候都记住它们的非阻塞标志未免也有点太麻烦了,
所以说我们这里使用 fcntl
函数来统一设置:
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */);
fd是要进行操作的文件描述符
cmd是控制命令
arg是与命令相关的参数
如果设置失败会返回-1 并且错误码会被设置 成功返回大于等于0
参数二的不同功能:
复制一个现有的描述符(cmd=F_DUPFD).
获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN)
获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)
使用案例:
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
int main() {
// 获取当前标准输出的状态标志
int flags = fcntl(STDOUT_FILENO, F_GETFL);
if (flags == -1) {
perror("fcntl");
return 1;
}
// 设置标准输出为非阻塞模式
if (fcntl(STDOUT_FILENO, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl");
return 1;
}
// 尝试从标准输出中读取数据,但不会阻塞
char buffer[1024];
ssize_t bytesRead = read(STDOUT_FILENO, buffer, sizeof(buffer));
if (bytesRead == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("No data available in non-blocking mode.\n");
} else {
perror("read");
}
} else {
// 读取到数据
buffer[bytesRead] = '\0';
printf("Read %zd bytes: %s", bytesRead, buffer);
}
return 0;
}
我们首先使用fcntl函数获取标准输出的当前状态标志,然后使用fcntl再次设置标准输出为非阻塞模式(通过将O_NONBLOCK标志添加到原来的标志中)。接下来,我们尝试从标准输出中读取数据,但由于标准输出已设置为非阻塞模式,如果没有数据可用,read将立即返回,并且errno会被设置为EAGAIN或EWOULDBLOCK,表示没有数据可读。
当我们使用了非阻塞IO的时候,每次读取如果遇到了 EWOULDBLOCK或EAGAIN我们就可以让我们的进程去做一会儿其他事情。
2.IO多路转接之Select
接下来我们会带大家了解select这个函数,其实质上就是在io等这一步上做了如下操作:
- 帮用户一次等待多个sock
- 如果有sock就绪了 select就要通知用户 这些sock就绪了 让用户调用read/recv函数来进行读取
2.1函数的基本了解
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
1.nfds:需要等待的文件当中文件描述符最大的+1
因为文件描述符是递归增长的,所以给定一个最大值就能确定范围。
这样子的话我们岂不是要等待从 0 ~ nfds-1所有文件描述符了?关于这个问题看了下面的参数介绍就能明白了.
2.readfds 是一个指向读取文件描述符集合的指针。
writefds 是一个指向写文件描述符集合的指针。
exceptfds 是一个指向异常文件描述符集合的指针。
这几个其实都是输出输出型参数
我们要传入的是一个fd_set类型的指针
这些参数在输入的时候分别表示:
我们是否关心读就绪
我们是否关心写就绪
我们是否关心有异常
在输出的时候分别表示:
哪些读就绪了
哪些写就绪了
出现哪些异常了
3.timeval *timeout
timeval实际上是一个结构体 在Linux系统中 它的定义如下:
struct timeval {
time_t tv_sec; // 当前时间的秒
suseconds_t tv_usec; // 当前时间的微秒
};
我们让select进行等待的时候 有三种模式可以供我们选择:
阻塞式
非阻塞式
阻塞一段时间 之后返回
对于这个参数来说:
如果我输入nullptr 那么它就是阻塞式的
如果我们输入结构体 {0 , 0} 那么它就是非阻塞式的
如果我们输入结构体{5, 0}那么它就会等待五秒钟之后返回,但是如果说五秒内有文件描述符就绪了的话,这个参数就会显示出输出性。比如说我们要求等五秒,而实际上2秒就有文件描述符就绪了,那么它就会返回{3 , 0}。
返回值类型:int
表示的是就绪的文件描述符的个数
只要让我们等待的文件描述符中 有一个就绪了 它就会返回
2.2fd_set理解
fd_set 叫做文件描述符集,它本质上是一个位图 。
系统提供了四个函数来让我们进行文件描述符集操作它们的作用如下:
- 清除某个文件描述符
- 判断某个文件描述符是否被设置
- 设置文件描述符
- 清空文件描述符
我们举个具体的使用场景:
fd_set *readfds
当它作为一个输入参数时
- 它是用户通知内核的一种方式
- 在比特位中 比特位的下标表示文件描述符
- 比特位下标对应的内容是否为1表示我对于该文件的读是否关心
- 比如 0101 就是我对于2号和0号文件描述符的读关心
当它作为一个输出参数时
- 它是内核通知用户的一种方式
- 在比特位中 比特位的下标表示文件描述符
- 比特位下标对应的内容是否为1表示该文件描述符的读是否就绪
- 比如说 0100 就是用户让系统关心的0号和2号文件描述符中 2号文件描述符就绪了
至于fd_set *writefds fd_set *exceptfds
通知的内容分别变成了 :是否关心写,是否关心异常
2.3完整例子代码(会在代码中进行讲解)
main.cc
#include "selectServer.hpp"
#include <memory>
int main()
{
// 1. fd_set是一个固定大小位图,直接决定了select能同时关心的fd的个数是有上限的!
// std::cout << sizeof(fd_set) * 8 << std::endl;
std::unique_ptr<SelectServer> svr(new SelectServer());
svr->Start();
return 0;
}
Log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志是有日志级别的
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
#define LOGFILE "./selectServer.log"
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void logMessage(int level, const char *format, ...)
{
// va_list ap;
// va_start(ap, format);
// while()
// int x = va_arg(ap, int);
// va_end(ap); //ap=nullptr
char stdBuffer[1024]; //标准部分
time_t timestamp = time(nullptr);
// struct tm *localtime = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld] ", gLevelMap[level], timestamp);
char logBuffer[1024]; //自定义部分
va_list args;
va_start(args, format);
// vprintf(format, args);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
// FILE *fp = fopen(LOGFILE, "a");
printf("%s%s\n", stdBuffer, logBuffer);
// fprintf(fp, "%s%s\n", stdBuffer, logBuffer);
// fclose(fp);
}
Sock.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <ctype.h>
class Sock
{
private:
// listen的第二个参数,意义:底层全连接队列的长度 = listen的第二个参数+1
const static int gbacklog = 10;
public:
Sock() {}
static int Socket()
{
int listensock = socket(AF_INET, SOCK_STREAM, 0);
if (listensock < 0)
{
exit(2);
}
int opt = 1;
setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
return listensock;
}
static void Bind(int sock, uint16_t port, std::string ip = "0.0.0.0")
{
struct sockaddr_in local;
memset(&local, 0, sizeof local);
local.sin_family = AF_INET;
local.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &local.sin_addr);
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
exit(3);
}
}
static void Listen(int sock)
{
if (listen(sock, gbacklog) < 0)
{
exit(4);
}
}
// 一般经验
// const std::string &: 输入型参数
// std::string *: 输出型参数
// std::string &: 输入输出型参数
static int Accept(int listensock, std::string *ip, uint16_t *port)
{
struct sockaddr_in src;
socklen_t len = sizeof(src);
int servicesock = accept(listensock, (struct sockaddr *)&src, &len);
if (servicesock < 0)
{
return -1;
}
if(port) *port = ntohs(src.sin_port);
if(ip) *ip = inet_ntoa(src.sin_addr);
return servicesock;
}
static bool Connect(int sock, const std::string &server_ip, const uint16_t &server_port)
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(server_port);
server.sin_addr.s_addr = inet_addr(server_ip.c_str());
if(connect(sock, (struct sockaddr*)&server, sizeof(server)) == 0) return true;
else return false;
}
~Sock() {}
};
selectServer.hpp
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include <iostream>
#include <string>
#include <vector>
#include <sys/select.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"
#define BITS 8
#define NUM (sizeof(fd_set)*BITS)
#define FD_NONE -1
using namespace std;
// select 我们只完成读取,写入和异常不做处理 -- epoll(写完整)
class SelectServer
{
public:
SelectServer(const uint16_t &port = 8080) : _port(port)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG,"%s","create base socket success");
for(int i = 0; i < NUM; i++) _fd_array[i] = FD_NONE;
// 规定 : _fd_array[0] = _listensock;
_fd_array[0] = _listensock;
}
void Start()
{
while (true)
{
// struct timeval timeout = {0, 0};
// 如何看待listensock? 获取新连接,我们把它依旧看做成为IO,input事件,如果没有连接到来呢?阻塞
// int sock = Sock::Accept(listensock, ...); //不能直接调用accept了
// 将listensock添加到读文件描述符集中
// FD_SET(_listensock, &rfds);
// int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout);
// 1. nfds: 随着我们获取的sock越来越多,随着我们添加到select的sock越来越多,注定了nfds每一次都可能要变化,我们需要对它动态计算
// 2. rfds/writefds/exceptfds:都是输入输出型参数,输入输出不一定以一样的,所以注定了我们每一次都要对rfds进行重新添加
// 3. timeout: 都是输入输出型参数,每一次都要进行重置,前提是你要的话
// 1,2 => 注定了我们必须自己将合法的文件描述符需要单独全部保存起来 用来支持:1. 更新最大fd 2.更新位图结构
DebugPrint();
fd_set rfds;
FD_ZERO(&rfds);
int maxfd = _listensock;
for(int i = 0; i < NUM; i++)
{
if(_fd_array[i] == FD_NONE) continue;
FD_SET(_fd_array[i], &rfds);
if(maxfd < _fd_array[i]) maxfd = _fd_array[i];
}
// rfds未来,一定会有两类sock,listensock,普通sock
// 我们select中,就绪的fd会越来越多!
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
// printf("hello select ...\n");
logMessage(DEBUG, "%s", "time out...");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
break;
default:
// 成功的
logMessage(DEBUG, "get a new link event..."); // 为什么会一直打印连接到来呢?连接已经建立完成,就绪了,但是你没有取走,select要一直通知你!
HandlerEvent(rfds);
break;
}
}
}
~SelectServer()
{
if (_listensock >= 0)
close(_listensock);
}
private:
void HandlerEvent(const fd_set &rfds) // fd_set 是一个集合,里面可能会存在多个sock
{
for(int i = 0; i < NUM; i++)
{
// 1. 去掉不合法的fd
if(_fd_array[i] == FD_NONE) continue;
// 2. 合法的就一定就绪了?不一定
if(FD_ISSET(_fd_array[i], &rfds))
{
//指定的fd,读事件就绪
// 读事件就绪:连接时间到来,accept
if(_fd_array[i] == _listensock) Accepter();
else Recver(i);
}
}
}
void Accepter()
{
string clientip;
uint16_t clientport = 0;
// listensock上面的读事件就绪了,表示可以读取了
// 获取新连接了
int sock = Sock::Accept(_listensock, &clientip, &clientport); // 这里在进行accept会不会阻塞?不会!
if(sock < 0)
{
logMessage(WARNING, "accept error");
return;
}
logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
// read / recv? 不能!为什么不能?我们不清楚该sock上面数据什么时候到来, recv、read就有可能先被阻塞,IO = 等+数据拷贝
// 谁可能最清楚呢?select!
// 得到新连接的时候,此时我们应该考虑的是,将新的sock托管给select,让select帮我们进行检测sock上是否有新的数据
// 有了数据select,读事件就绪,select就会通知我,我们在进行读取,此时我们就不会被阻塞了
// 要将sock添加 给 select, 其实我们只要将fd放入到数组中即可!
int pos = 1;
for(; pos < NUM; pos++){
if(_fd_array[pos] == FD_NONE) break;
}
if(pos == NUM){
logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);
close(sock);
}else{
_fd_array[pos] = sock;
}
}
void Recver(int pos)
{
// 读事件就绪:INPUT事件到来、recv,read
logMessage(DEBUG, "message in, get IO event: %d", _fd_array[pos]);
// 暂时先不做封装, 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
// 这样读取有bug吗?有的,你怎么保证以读到了一个完整包文呢?
char buffer[1024];
int n = recv(_fd_array[pos], buffer, sizeof(buffer)-1, 0);
if(n > 0){
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);
}
else if(n == 0){
logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
// 1. 我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2. 不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
else{
logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
// 1. 我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2. 不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
}
void DebugPrint()
{
cout << "_fd_array[]: ";
for(int i = 0; i < NUM; i++)
{
if(_fd_array[i] == FD_NONE) continue;
cout << _fd_array[i] << " ";
}
cout << endl;
}
private:
uint16_t _port;
int _listensock;
int _fd_array[NUM];
// int _fd_write[NUM];
// std::vector<int> arr;
};
#endif
2.4优缺点
优点:
- 效率高 IO等的时间少 尤其是在有大量连接 并且只有少量活跃的情况下
- 单进程 占用资源少
缺点:
- 为了维护第三方数组 select服务器充满大量的遍历操作
- 每一次都要对select参数进行重新设定
- 能够同时管理的fd的个数是有上限的
- 由于参数是输入输出的 所以避免不了大量用户和内核之间的拷贝
- 编码比较复杂
3.多路转接之poll
poll是系统提供的一个多路转接接口,它的作用和select函数基本一致。
3.1poll函数的介绍
原形:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd *fds
里面包含着文件描述符表,我们需要监视的文件描述符合集和就绪的文件描述符合集
- fd 特定的文件描述符值
- events 用户告诉内核 哪些事件需要关心
- revents 内核告诉用户 哪些事件就绪了
(也是用了位图结构来存储数据)
一个文件描述符实际上就是对应一个struct pollfd
,所以说理论上只要有多少个数组我们的poll就能检测多少的文件描述符。
以下是events和revents的取值:
我们需要特别注意的有三个分别是:
- POLLIN 可读
- POLLOUT 可写
- POLLERR 错误
nfds_t nfds
fds数组的长度
timeout
超时时间
- 单位是毫秒 比如说我们设置为1000 就是等待1秒
- 如果设置为0 就表示非阻塞模式
- 如果设置为-1 就表示阻塞模式
3.2poll服务器
我们将上面写的select的服务器修改一下:
私有成员变化如下:
private:
int _port;
int _listensock;
struct pollfd *_rfds;
func_t _func;
对比于我们select的第三方数组来说,我们这里多了一个数组指针和数组大小.
在初始化的时候 我们首先new出一个 struct pollfd
数组出来 ,并且遍历初始化一下.
_rfds[i].fd = defaultfd;
_rfds[i].events = 0;
_rfds[i].revents = 0;
对于数据如何判断就绪,我们可以使用按位与来判断:
_rfds[i].revents & POLLIN
具体代码如下:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "sock.hpp"
namespace select_ns
{
static const int defaultport = 8081;
static const int fdnum = sizeof(fd_set) * 8;
static const int defaultfd = -1;
using func_t = std::function<std::string (const std::string&)>;
class SelectServer
{
public:
SelectServer(func_t f, int port = defaultport) : func(f), _port(port), _listensock(-1), fdarray(nullptr)
{
}
void initServer()
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
fdarray = new int[fdnum];
for (int i = 0; i < fdnum; i++)
fdarray[i] = defaultfd;
fdarray[0] = _listensock; // 不变了
}
void Print()
{
std::cout << "fd list: ";
for (int i = 0; i < fdnum; i++)
{
if (fdarray[i] != defaultfd)
std::cout << fdarray[i] << " ";
}
std::cout << std::endl;
}
void Accepter(int listensock)
{
logMessage(DEBUG, "Accepter in");
// 走到这里,accept 函数,会不会阻塞???1 0
// select 告诉我, listensock读事件就绪了
std::string clientip;
uint16_t clientport = 0;
int sock = Sock::Accept(listensock, &clientip, &clientport); // accept = 等 + 获取
if (sock < 0)
return;
logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);
// sock我们能直接recv/read 吗?不能,整个代码,只有select有资格检测事件是否就绪
// 将新的sock 托管给select!
// 将新的sock托管给select的本质,其实就是将sock,添加到fdarray数组中即可!
int i = 0;
for (; i < fdnum; i++)
{
if (fdarray[i] != defaultfd)
continue;
else
break;
}
if (i == fdnum)
{
logMessage(WARNING, "server if full, please wait");
close(sock);
}
else
{
fdarray[i] = sock;
}
Print();
logMessage(DEBUG, "Accepter out");
}
void Recver(int sock, int pos)
{
logMessage(DEBUG, "in Recver");
// 1. 读取request
// 这样读取是有问题的!
char buffer[1024];
ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0); // 这里在进行读取的时候,会不会被阻塞?1, 0
if (s > 0)
{
buffer[s] = 0;
logMessage(NORMAL, "client# %s", buffer);
}
else if (s == 0)
{
close(sock);
fdarray[pos] = defaultfd;
logMessage(NORMAL, "client quit");
return;
}
else
{
close(sock);
fdarray[pos] = defaultfd;
logMessage(ERROR, "client quit: %s", strerror(errno));
return;
}
// 2. 处理request
std::string response = func(buffer);
// 3. 返回response
// write bug
write(sock, response.c_str(), response.size());
logMessage(DEBUG, "out Recver");
}
// 1. handler event rfds 中,不仅仅是有一个fd是就绪的,可能存在多个
// 2. 我们的select目前只处理了read事件
void HandlerReadEvent(fd_set &rfds)
{
for (int i = 0; i < fdnum; i++)
{
// 过滤掉非法的fd
if (fdarray[i] == defaultfd)
continue;
// 正常的fd
// 正常的fd不一定就绪了
// 目前一定是listensock,只有这一个
if (FD_ISSET(fdarray[i], &rfds) && fdarray[i] == _listensock)
Accepter(_listensock);
else if(FD_ISSET(fdarray[i], &rfds))
Recver(fdarray[i], i);
else{}
}
}
void start()
{
for (;;)
{
fd_set rfds;
// fd_set wfds;
FD_ZERO(&rfds);
int maxfd = fdarray[0];
for (int i = 0; i < fdnum; i++)
{
if (fdarray[i] == defaultfd)
continue;
FD_SET(fdarray[i], &rfds); // 合法 fd 全部添加到读文件描述符集中
if (maxfd < fdarray[i])
maxfd = fdarray[i]; // 更新所有fd中最大的fd
}
logMessage(NORMAL, "max fd is: %d", maxfd);
// struct timeval timeout = {1, 0};
// int n = select(_listensock + 1, &rfds, nullptr, nullptr, &timeout); // ??
// 一般而言,要是用select,需要程序员自己维护一个保存所有合法fd的数组!
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr); // ??
switch (n)
{
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(WARNING, "select error, code: %d, err string: %s", errno, strerror(errno));
break;
default:
// 说明有事件就绪了,目前只有一个监听事件就绪了
logMessage(NORMAL, "have event ready!");
HandlerReadEvent(rfds);
// HandlerWriteEvent(wfds);
break;
}
// std::string clientip;
// uint16_t clientport = 0;
// int sock = Sock::Accept(_listensock, &clientip, &clientport); // accept = 等 + 获取
// if(sock<0) continue;
// // 开始进行服务器的处理逻辑
}
}
~SelectServer()
{
if (_listensock < 0)
close(_listensock);
if (fdarray)
delete[] fdarray;
}
private:
int _port;
int _listensock;
int *fdarray;
func_t func;
};
}
3.3优缺点
优点:
- 效率高
- 适合有大量连接 少量活跃
- 输入输出分离,接口使用方便
- poll参数级别 没有可管理的fd上限
缺点:
- poll依旧需要不少的遍历
- poll需要内核到用户的拷贝
- poll的代码虽然比select容易 但是也很复杂
4.epoll
4.1初始epoll
epoll是为了处理大量句柄而做出改进的poll(句柄可以是一个整数、指针、引用或其他数据结构,它用于唯一标识和访问特定资源或对象。)
它在2.5.44内核中被引入到Linux
也是目前来说最常用的一种多路转接IO方式
4.2epoll相关的系统调用
- epoll_create
- epoll_ctl
- epoll_wait
int epoll_create(int size); //创建一个epoll模型
参数说明:
目前来说 epoll_create的参数是被废弃的 我们设置为256或者512就行 这样设计的原因是为了向前兼容
返回值说明:
返回一个epoll模型 (实际上就是一个文件描述符)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);//对创建出来的epoll模型进行操控
参数说明:
1.int epfd 标识一个我们的IO模型
2.int op (operator) 表示我们想要做出什么样的操作
3.int fd 表示我们需要添加的文件描述符
4.epoll_event *event 表示我们需要关心哪些事件
返回值说明:
函数成功调用返回0 失败返回-1 同时错误码将被设置
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
//监视我们关心的关键描述符
参数说明:
1.epfd:是 epoll 实例的文件描述符,它代表你要监听的一组文件描述符的集合。
2.events:是一个指向 struct epoll_event 数组的指针,用于存储就绪事件的信息。
3.maxevents:指定 events 数组的最大大小,即最多可以存储多少就绪事件的信息。
4.timeout:指定等待事件的超时时间,单位为毫秒。传递负值会使 epoll_wait 成为阻塞调用,直到有事件发生,传递零会使它成为非阻塞调用,立即返回,传递正值会在指定时间内等待事件。
返回值:epoll_wait 返回就绪事件的数量,如果发生错误,则返回 -1。如果超时时间到期而没有事件发生,它将返回 0。
poll_wait 返回的事件信息存储在 events 数组中。每个 struct epoll_event 结构包含以下信息:
events:一个位掩码,指示事件类型,如可读、可写、错误等。
data:一个联合,可以存储用户定义的数据,通常是文件描述符或其他标识符。
4.3epoll的工作原理
我们之前的学习的多路转接函数 无论是select还有poll 它们都需要我们做下面的操作:
- 让我们维护一个第三方的数组
- 都需要遍历整个数组
- 都需要经历用户到内核 内核到用户的事件通知
而我们的epoll则不同。
当然在我们讲解epoll的具体工作原理时我们需要先了解一些前置知识:
操作系统是如何知道硬件里面有数据了呢?
下图很好的解释了这一问题:
正式讲解 |
---|
当我们创建一个epoll模型之后操作系统底层会帮助我们维护一颗红黑树
红黑树的节点里面维护着很多元素 其中最重要的是两个:
- 文件描述符
- 事件
这颗红黑树解决了用户通知内核的问题。
用户通知内核自己要关心哪些文件描述符的哪些事件之后,操作系统就会生成一个节点然后插入到这颗红黑树当中
而这颗红黑树就是对应我们select和poll当中的数组。(现在由操作系统维护了)
当内核通知用户的则是通过了消息队列通知:
在内核维护的红黑树旁边有一个消息队列(也交就绪队列),每当有fd的事件就绪的时候就会在该队列上添加一个元素(也是由操作系统维护)。
操作系统在调用驱动的时候构建就绪队列节点:
在生成红黑树节点的时候,在驱动中,每个节点都会生成一个自己的回调函数。
于是在经历了硬件中断到读取数据的过程后,操作系统会调用驱动中的回调函数来获取该节点的数据 ,并且根据这些数据(fd和events)构建就绪节点,最后将构建好的节点插入到队列中。
知道了这些后,不妨再来看看我们上面提到的函数:
- epoll_create : 创建epoll模型 包括红黑树 就绪队列 回调函数等(这个描述符所对应的文件里面有指针可以找到红黑树和就绪队列)
- epoll_ctl : 对于红黑树的节点进行注册
- epoll_wait : 获取就绪队列中的内容
4.4epoll的工作方式
epoll有2种工作方式-水平触发(LT)和边缘触发(ET)
你正在吃鸡, 眼看进入了决赛圈, 你妈饭做好了, 喊你吃饭的时候有两种方式:
- 如果你妈喊你一次, 你没动, 那么你妈会继续喊你第二次, 第三次…(亲妈, 水平触发)
- 如果你妈喊你一次, 你没动, 你妈就不管你了(后妈, 边缘触发)
举个例子来帮大家彻底了解这两种模式:
-
我们已经把一个tcp socket添加到epoll描述符
-
这个时候socket的另一端被写入了2KB的数据
-
调用epoll_wait,并且它会返回. 说明它已经准备好读取操作
-
然后调用read, 只读取了1KB的数据
-
继续调用epoll_wait…
水平触发Level Triggere 工作模式:
epoll默认状态下就是LT工作模式.
当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait
仍然会立刻返回并通知socket读事件就绪.
直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
支持阻塞读写和非阻塞读写
边缘触发Edge Triggered工作模式:
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.
当epoll检测到socket上事件就绪时, 必须立刻处理.
如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,
epoll_wait 不会再返回了.
也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会.
ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). Nginx默认采用ET模式使用epoll.
只支持非阻塞的读写
select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET.
4.5LT模式重要代码
tcp_epoll_server.hpp
#pragma once
#include <vector>
#include <functional>
#include <sys/epoll.h>
#include "tcp_socket.hpp"
typedef std::function<void(const std::string&, std::string* resp)> Handler;
class Epoll {
public:
Epoll() {
epoll_fd_ = epoll_create(10);
}
~Epoll() {
close(epoll_fd_);
}
bool Add(const TcpSocket& sock) const {
int fd = sock.GetFd();
printf("[Epoll Add] fd = %d\n", fd);
epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLIN;
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
if (ret < 0) {
perror("epoll_ctl ADD");
return false;
}
return true;
}
bool Del(const TcpSocket& sock) const {
int fd = sock.GetFd();
printf("[Epoll Del] fd = %d\n", fd);
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
if (ret < 0) {
perror("epoll_ctl DEL");
return false;
}
return true;
}
bool Wait(std::vector<TcpSocket>* output) const {
output->clear();
epoll_event events[1000];
int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
if (nfds < 0) {
perror("epoll_wait");
return false;
}
// [注意!] 此处必须是循环到 nfds, 不能多循环
for (int i = 0; i < nfds; ++i) {
TcpSocket sock(events[i].data.fd);
output->push_back(sock);
}
return true;
}
private:
int epoll_fd_;
};
class TcpEpollServer {
public:
TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) {
}
bool Start(Handler handler) {
// 1. 创建 socket
TcpSocket listen_sock;
CHECK_RET(listen_sock.Socket());
// 2. 绑定
CHECK_RET(listen_sock.Bind(ip_, port_));
// 3. 监听
CHECK_RET(listen_sock.Listen(5));
// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
Epoll epoll;
epoll.Add(listen_sock);
// 5. 进入事件循环
for (;;) {
// 6. 进行 epoll_wait
std::vector<TcpSocket> output;
if (!epoll.Wait(&output)) {
continue;
}
// 7. 根据就绪的文件描述符的种类决定如何处理
for (size_t i = 0; i < output.size(); ++i) {
if (output[i].GetFd() == listen_sock.GetFd()) {
// 如果是 listen_sock, 就调用 accept
TcpSocket new_sock;
listen_sock.Accept(&new_sock);
epoll.Add(new_sock);
}
else {
// 如果是 new_sock, 就进行一次读写
std::string req, resp;
bool ret = output[i].Recv(&req);
if (!ret) {
// [注意!!] 需要把不用的 socket 关闭
// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
epoll.Del(output[i]);
output[i].Close();
continue;
}
handler(req, &resp);
output[i].Send(resp);
} // end for
} // end for (;;)
}
return true;
}
private:
std::string ip_;
uint16_t port_;
};
4.6ET模式重要代码
基于 LT 版本稍加修改即可
- 修改 tcp_socket.hpp, 新增非阻塞读和非阻塞写接口
- 对于 accept 返回的 new_sock 加上 EPOLLET 这样的选项
注意: 此代码暂时未考虑 listen_sock ET 的情况. 如果将 listen_sock 设为 ET, 则需要非阻塞轮询的方式 accept. 否则会导致同一时刻大量的客户端同时连接的时候, 只能 accept 一次的问题.
tcp_socket.hpp
// 以下代码添加在 TcpSocket 类中
// 非阻塞 IO 接口
bool SetNoBlock() {
int fl = fcntl(fd_, F_GETFL);
if (fl < 0) {
perror("fcntl F_GETFL");
return false;
}
int ret = fcntl(fd_, F_SETFL, fl | O_NONBLOCK);
if (ret < 0) {
perror("fcntl F_SETFL");
return false;
}
return true;
}
bool RecvNoBlock(std::string* buf) const {
// 对于非阻塞 IO 读数据, 如果 TCP 接受缓冲区为空, 就会返回错误
// 错误码为 EAGAIN 或者 EWOULDBLOCK, 这种情况也是意料之中, 需要重试
// 如果当前读到的数据长度小于尝试读的缓冲区的长度, 就退出循环
// 这种写法其实不算特别严谨(没有考虑粘包问题)
buf->clear();
char tmp[1024 * 10] = { 0 };
for (;;) {
ssize_t read_size = recv(fd_, tmp, sizeof(tmp) - 1, 0);
if (read_size < 0) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
continue;
}
perror("recv");
比特就业课
tcp_epoll_server.hpp
return false;
}
if (read_size == 0) {
// 对端关闭, 返回 false
return false;
}
tmp[read_size] = '\0';
*buf += tmp;
if (read_size < (ssize_t)sizeof(tmp) - 1) {
break;
}
}
return true;
}
bool SendNoBlock(const std::string& buf) const {
// 对于非阻塞 IO 的写入, 如果 TCP 的发送缓冲区已经满了, 就会出现出错的情况
// 此时的错误号是 EAGAIN 或者 EWOULDBLOCK. 这种情况下不应放弃治疗
// 而要进行重试
ssize_t cur_pos = 0; // 记录当前写到的位置
ssize_t left_size = buf.size();
for (;;) {
ssize_t write_size = send(fd_, buf.data() + cur_pos, left_size, 0);
if (write_size < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 重试写入
continue;
}
return false;
}
cur_pos += write_size;
left_size -= write_size;
// 这个条件说明写完需要的数据了
if (left_size <= 0) {
break;
}
}
return true;
}
tcp_epoll_server.hpp
#pragma once
#include <vector>
#include <functional>
#include <sys/epoll.h>
#include "tcp_socket.hpp"
typedef std::function<void(const std::string&, std::string* resp)> Handler;
class Epoll {
public:
Epoll() {
epoll_fd_ = epoll_create(10);
}
~Epoll() {
close(epoll_fd_);
}
bool Add(const TcpSocket& sock, bool epoll_et = false) const {
int fd = sock.GetFd();
printf("[Epoll Add] fd = %d\n", fd);
epoll_event ev;
ev.data.fd = fd;
if (epoll_et) {
ev.events = EPOLLIN | EPOLLET;
}
else {
ev.events = EPOLLIN;
}
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
if (ret < 0) {
perror("epoll_ctl ADD");
return false;
}
return true;
}
bool Del(const TcpSocket& sock) const {
int fd = sock.GetFd();
printf("[Epoll Del] fd = %d\n", fd);
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
if (ret < 0) {
perror("epoll_ctl DEL");
return false;
}
return true;
}
bool Wait(std::vector<TcpSocket>* output) const {
output->clear();
epoll_event events[1000];
int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
if (nfds < 0) {
perror("epoll_wait");
return false;
}
// [注意!] 此处必须是循环到 nfds, 不能多循环
for (int i = 0; i < nfds; ++i) {
TcpSocket sock(events[i].data.fd);
output->push_back(sock);
}
return true;
}
private:
int epoll_fd_;
};
class TcpEpollServer {
public:
TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip), port_(port) {
}
bool Start(Handler handler) {
// 1. 创建 socket
TcpSocket listen_sock;
CHECK_RET(listen_sock.Socket());
// 2. 绑定
CHECK_RET(listen_sock.Bind(ip_, port_));
// 3. 监听
CHECK_RET(listen_sock.Listen(5));
// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
Epoll epoll;
epoll.Add(listen_sock);
// 5. 进入事件循环
for (;;) {
// 6. 进行 epoll_wait
std::vector<TcpSocket> output;
if (!epoll.Wait(&output)) {
continue;
}
// 7. 根据就绪的文件描述符的种类决定如何处理
for (size_t i = 0; i < output.size(); ++i) {
if (output[i].GetFd() == listen_sock.GetFd()) {
// 如果是 listen_sock, 就调用 accept
TcpSocket new_sock;
listen_sock.Accept(&new_sock);
epoll.Add(new_sock, true);
}
else {
// 如果是 new_sock, 就进行一次读写
std::string req, resp;
bool ret = output[i].RecvNoBlock(&req);
if (!ret) {
// [注意!!] 需要把不用的 socket 关闭
// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
epoll.Del(output[i]);
output[i].Close();
continue;
}
handler(req, &resp);
output[i].SendNoBlock(resp);
printf("[client %d] req: %s, resp: %s\n", output[i].GetFd(),
req.c_str(), resp.c_str());
} // end for
} // end for (;;)
}
return true;
}
private:
std::string ip_;
uint16_t port_;
};
onst std::string& ip, uint16_t port) : ip_(ip), port_(port) {
}
bool Start(Handler handler) {
// 1. 创建 socket
TcpSocket listen_sock;
CHECK_RET(listen_sock.Socket());
// 2. 绑定
CHECK_RET(listen_sock.Bind(ip_, port_));
// 3. 监听
CHECK_RET(listen_sock.Listen(5));
// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
Epoll epoll;
epoll.Add(listen_sock);
// 5. 进入事件循环
for (;😉 {
// 6. 进行 epoll_wait
std::vector output;
if (!epoll.Wait(&output)) {
continue;
}
// 7. 根据就绪的文件描述符的种类决定如何处理
for (size_t i = 0; i < output.size(); ++i) {
if (output[i].GetFd() == listen_sock.GetFd()) {
// 如果是 listen_sock, 就调用 accept
TcpSocket new_sock;
listen_sock.Accept(&new_sock);
epoll.Add(new_sock, true);
}
else {
// 如果是 new_sock, 就进行一次读写
std::string req, resp;
bool ret = output[i].RecvNoBlock(&req);
if (!ret) {
// [注意!!] 需要把不用的 socket 关闭
// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
epoll.Del(output[i]);
output[i].Close();
continue;
}
handler(req, &resp);
output[i].SendNoBlock(resp);
printf(“[client %d] req: %s, resp: %s\n”, output[i].GetFd(),
req.c_str(), resp.c_str());
} // end for
} // end for (;😉
}
return true;
}
private:
std::string ip_;
uint16_t port_;
};