高级IO
- 一.各种接口
- 二.工作模式
按照man手册的说法: 是为处理大批量句柄而作了改进的poll.
一.各种接口
快速认识接口:
events可以是以下几个宏的集合:
EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里
epoll_ctl:
它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
第一个参数是epoll_create()的返回值(epoll的句柄).
第二个参数表示动作,用三个宏来表示.:
第三个参数是需要监听的fd.
第四个参数是告诉内核需要监听什么事
第二个参数取值:
EPOLL_CTL_ADD :注册新的fd到epfd中;
EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
EPOLL_CTL_DEL :从epfd中删除一个fd;
epoll模型:
epoll模型是一个结构体,它被同一管理,然后接入到struct file里,所以epoll的返回值也是一个文件描述符。
创建epoll模型就是创建红黑树,就绪队列和底层回调函数。网卡接收数据,触发硬件中断,操作系统将数据拷贝到网卡驱动里,再经过回调函数(将数据交付给tcp的接收缓冲区)查找红黑树节点是否有关心该数据,如果有,就创建新节点插入到就绪队列里,用户直接就可以从就绪队列里拿数据了。
所以epollctl本质上就是在修改这棵红黑树。epoll_wait就是在关联就绪队列。通过弹出就绪队列的节点就有效避免了select和poll的遍历问题。
epoll优势
- 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
- 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)
- 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中,
- epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响.
- 没有数量限制: 文件描述符数目无上限
二.工作模式
epoll有2种工作方式-水平触发(LT)和边缘触发(ET)
epolI默认模式:LT模式。事件到来,但是上层不处理,高电平,一直有效
ET:数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次
LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完.因为它只有一次提示。
相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的.另一方面, ET 的代码复杂程度更高了
通知本质上就是添加就绪队列节点。如果是ET模式,就是只添加一次;如果是LT就是每次都添加。可以使用EPOLLET进行设置。
简单使用例子:
1.epoller封装
#pragma once
#include "nocopy.hpp"
#include "Log.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>
class Epoller : public nocopy
{
static const int size = 128;
public:
Epoller()
{
_epfd = epoll_create(size);
if (_epfd == -1)
{
lg(Error, "epoll_create error: %s", strerror(errno));
}
else
{
lg(Info, "epoll_create success: %d", _epfd);
}
}
int EpollerWait(struct epoll_event revents[], int num)
{
int n = epoll_wait(_epfd, revents, num, /*_timeout 0*/ -1);
return n;
}
int EpllerUpdate(int oper, int sock, uint32_t event)
{
int n = 0;
if (oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epfd, oper, sock, nullptr);
if (n != 0)
{
lg(Error, "epoll_ctl delete error!");
}
}
else
{
// EPOLL_CTL_MOD || EPOLL_CTL_ADD
struct epoll_event ev;
ev.events = event;
ev.data.fd = sock; // 目前,方便我们后期得知,是哪一个fd就绪了!
n = epoll_ctl(_epfd, oper, sock, &ev);
if (n != 0)
{
lg(Error, "epoll_ctl error!");
}
}
return n;
}
~Epoller()
{
if (_epfd >= 0)
close(_epfd);
}
private:
int _epfd;
int _timeout{3000};
};
2.epoller使用
#pragma once
#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Epoller.hpp"
#include "Log.hpp"
#include "nocopy.hpp"
uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);
class EpollServer : public nocopy
{
static const int num = 64;
public:
EpollServer(uint16_t port)
: _port(port),
_listsocket_ptr(new Sock()),
_epoller_ptr(new Epoller())
{
}
void Init()
{
_listsocket_ptr->Socket();
_listsocket_ptr->Bind(_port);
_listsocket_ptr->Listen();
lg(Info, "create listen socket success: %d\n", _listsocket_ptr->Fd());
}
void Accepter()
{
// 获取了一个新连接
std::string clientip;
uint16_t clientport;
int sock = _listsocket_ptr->Accept(&clientip, &clientport);
if (sock > 0)
{
// 我们能直接读取吗?不能
_epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sock, EVENT_IN);
lg(Info, "get a new link, client info@ %s:%d", clientip.c_str(), clientport);
}
}
// for test
void Recver(int fd)
{
// demo
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?
if (n > 0)
{
buffer[n] = 0;
std::cout << "get a messge: " << buffer << std::endl;
// wrirte
std::string echo_str = "server echo $ ";
echo_str += buffer;
write(fd, echo_str.c_str(), echo_str.size());
}
else if (n == 0)
{
lg(Info, "client quit, me too, close fd is : %d", fd);
//细节3
_epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
else
{
lg(Warning, "recv error: fd is : %d", fd);
_epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
}
void Dispatcher(struct epoll_event revs[], int num)
{
for (int i = 0; i < num; i++)
{
uint32_t events = revs[i].events;
int fd = revs[i].data.fd;
if (events & EVENT_IN)
{
if (fd == _listsocket_ptr->Fd())
{
Accepter();
}
else
{
// 其他fd上面的普通读取事件就绪
Recver(fd);
}
}
else if (events & EVENT_OUT)
{
}
else
{
}
}
}
void Start()
{
// 将listensock添加到epoll中 -> listensock和他关心的事件,添加到内核epoll模型中rb_tree.
_epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, _listsocket_ptr->Fd(), EVENT_IN);
struct epoll_event revs[num];
for (;;)
{
int n = _epoller_ptr->EpollerWait(revs, num);
if (n > 0)
{
// 有事件就绪
lg(Debug, "event happened, fd is : %d", revs[0].data.fd);
Dispatcher(revs, n);
}
else if (n == 0)
{
lg(Info, "time out ...");
}
else
{
lg(Error, "epll wait error");
}
}
}
~EpollServer()
{
_listsocket_ptr->Close();
}
private:
std::shared_ptr<Sock> _listsocket_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
uint16_t _port;
};