一、epoll模型介绍
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,用于监视一个或多个文件描述符,以查看它们是否可以进行读取、写入或异常处理。它能够显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。
二、epoll函数参数介绍
epoll有如下函数:
- epoll_create函数:用于生成一个epoll专用的文件描述符,其中的参数是指定生成描述符的最大范围。在某些内核版本中,该参数用于初始化哈希表的大小。
- epoll_ctl函数:用于控制某个文件描述符上的事件,可以注册事件、修改事件、删除事件。
- 参数:epfd(由epoll_create生成的epoll专用的文件描述符);op(要进行的操作,例如注册事件,可能的取值有EPOLL_CTL_ADD(注册)、EPOLL_CTL_MOD(修改)、EPOLL_CTL_DEL(删除));fd(关联的文件描述符);event(指向epoll_event的指针)。
- epoll_wait函数:用于轮询I/O事件的发生。
- 参数:epfd(由epoll_create生成的epoll专用的文件描述符);events(指向epoll_event的指针);maxevents(最多可以返回的事件数量);timeout(等待的时间,如果设置为-1则无限等待)。
另外,epoll除了提供select/poll那种IO事件的水平触发外,还提供了边缘触发,这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
三、epoll模型底层和运行过程
epoll模型在内核中的运行主要涉及以下步骤:
- 创建epoll模型:通过调用epoll_create函数创建一个文件描述符,这个文件描述符对应一个红黑树和一个就绪队列。红黑树用于存放需要关注的文件描述符及其对应的事件,而就绪队列则用于存放已经就绪的文件描述符及其事件。
- 添加文件描述符及监听事件:通过epoll_ctl函数向红黑树中添加文件描述符和监听事件。这些信息包括文件描述符、需要监听的事件以及二叉树的其他必要内容。此外,还需要建立回调策略,以便在有数据到达时通知内核处理。
- 查找并传递就绪的文件描述符:当某个文件描述符的事件就绪时,内核会将其加入到就绪队列中。用户态的程序可以通过调用epoll_wait函数来等待这些就绪的文件描述符。epoll_wait函数会观察就绪链表,并将就绪的文件描述符返回给用户态程序。这些文件描述符是通过内存映射的方式传递的,减少了不必要的拷贝操作。
- 重复监听的处理:当需要重复监听某个文件描述符时,只需再次调用epoll_ctl函数添加该文件描述符和监听事件。由于红黑树和就绪队列已经存在,所以无需重新构建数据结构,直接沿用即可。
总之,epoll模型在内核中的运行涉及到创建模型、添加文件描述符及监听事件、查找并传递就绪的文件描述符以及重复监听的处理等步骤。通过这些步骤,epoll模型能够高效地处理大量的文件描述符和事件,提供更好的系统性能和响应能力。
四、两种通信机制
ET和LT是两种不同的通信机制,它们在通信过程中有不同的特点和行为。
ET(Event Trigger)是一种事件触发的通信机制。在这种机制中,发送方在发送数据时不需要等待接收方的回应,一旦发送完成,发送方就会继续执行后续的操作。当接收方收到数据后,会触发相应的事件进行处理。这种机制的特点是发送方不会被阻塞,可以同时处理多个数据发送和执行其他任务。因此,ET通信机制具有高效性和并发性。这种机制要求程序员尽快将数据取走!
LT(Level Trigger)是一种电平触发的通信机制。在这种机制中,发送方会持续发送数据,直到接收方处理完之前的数据或者发送方主动停止发送。在这个过程中,发送方会被阻塞,直到接收方处理完之前的数据。LT通信机制的特点是简单易用,但是在处理大量数据时可能会导致发送方被长时间阻塞,影响程序的效率和响应能力。
在ET模式下,我如何直到我读的底层数据读完了?
答:循环读取,直到底层拒绝了我的读取请求!也就是没有数据了,这样的话就要求我们必须使用非阻塞fd来进行IO,这样在我们最后一次读不到数据的时候就可以不阻塞等待!
五、代码实现epoll服务端模式
#pragma once
#include <iostream>
#include "log.hpp"
#include "sock.hpp"
#include <functional>
#include <sys/epoll.h>
#define NEW_NUM 1024
static const uint16_t defaultport = 8080;
static const int size = 10;
static const int defaultvalue = 666;
namespace epoll_ns
{
using func_t = std::function<std::string(const std::string &)>;
class epollServer
{
public:
epollServer(func_t cb, const uint16_t port = defaultport) : _port(port), _listensock(defaultvalue), _revs(nullptr), _f(cb)
{
}
void Accepter()
{
logMessage(DEBUG, "Accepter in");
std::string clientip;
uint16_t clientport;
int fd = sock::Accept(_listensock, &clientip, &clientport);
if (fd < 0)
{
logMessage(WARNING, "accept error");
return;
}
struct epoll_event ev;
ev.events = EPOLLIN ;
ev.data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
logMessage(DEBUG, "Accepter out");
}
void Recver(int fd)
{
logMessage(DEBUG, "Recver in");
char buffer[NEW_NUM];
while (true)
{
//循环读取这样保证数据全部读取走
int n = recv(fd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n-1] = 0;
logMessage(DEBUG, "client# %s", buffer);
string request = buffer;
string resp = _f(request) + '\n'; //伪处理回调机制
write(fd, resp.c_str(), resp.size());
}
else if (n == 0)
{
// 一定要记住关闭!
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
logMessage(NORMAL, "client quit!");
return;
}
else
{
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
close(fd);
logMessage(WARNING, "recv error!");
return;
}
}
logMessage(DEBUG, "Recver out");
}
void handler(int readyNum)
{
logMessage(DEBUG, "handler events in");
for (int i = 0; i < readyNum; i++)
{
uint32_t events = _revs[i].events;
int sock = _revs[i].data.fd;
if (sock == _listensock && (events & EPOLLIN))
{
// accept就绪!
Accepter();
}
else if (events & EPOLLIN)
{
Recver(sock);
}
}
logMessage(DEBUG, "handler events out");
}
void init()
{
// 1.创建->绑定->监听 套接字
_listensock = sock::GetSocket(_port);
sock::Bind(_listensock, _port);
sock::Listen(_listensock);
// 2.创建epoll模型 通知机制-> 1.LT(level trigglered) 水平触发 一直通知 2.ET(edge trigglered) 边缘触发 通知一次直到有新的数据到来
_epfd = epoll_create(size);
if (_epfd < 0)
{
logMessage(FALTAL, "epoll create error: %s", strerror(errno));
exit(3);
}
// 3.添加listen套接字到epoll中
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = _listensock; // 作用:当事件就绪时,被重新获取时候,我没要知道是哪个fd就绪了!
epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);
// 4.申请就绪空间
_revs = new struct epoll_event[size];
}
void start()
{
while (true)
{
int timeout = 5000;
int n = epoll_wait(_epfd, _revs, size, timeout); //等待五秒就通知一次
switch (n)
{
case 0: //没有任务就绪
logMessage(NORMAL, "time out....");
break;
case -1: //出现异常
logMessage(WARNING, "epoll_wait error code:%d, error string : %s", errno, strerror(errno));
break;
default: //有任务就绪
logMessage(NORMAL, "events get ready!");
handler(n);
break;
}
}
}
~epollServer()
{
if (_listensock != defaultvalue)
close(_listensock);
if (_epfd != defaultvalue)
close(_epfd);
if (_revs)
delete[] _revs;
}
private:
uint16_t _port; //设置的端口号
int _listensock; //监听套接字
int _epfd; //epoll模型文件描述符
struct epoll_event *_revs; //关心的事件
func_t _f; //回调处理函数
};
}