承接上文 I/O多路转接之poll-CSDN博客
简介
epoll的相关系统调用
epoll底层原理
编写epoll的server
重新回归一下epoll原理,LT,ET
epoll改成ET工作模式 -- 初识(有bug)
epoll初识
按照man手册的说法: 是为处理大批量句柄而作了改进的poll.
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)
它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法
epoll的相关系统调用
epoll 有3个相关的系统调用,这一点和之前的poll是完全不一样的,还有一点是epoll还是一样只负责等待
epoll_create
int epoll_create(int size);
创建一个epoll的句柄
自从linux2.6.8之后, size参数是被忽略的.
用完之后, 必须调用close()关闭
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数
它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型;
第一个参数是epoll_create()的返回值(epoll的句柄);
第二个参数表示动作,用三个宏来表示;
第三个参数是需要监听的fd;
第四个参数是告诉内核需要监听什么事;
第二个参数的取值:
EPOLL_CTL_ADD :注册新的fd到epfd中;
EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
EPOLL_CTL_DEL :从epfd中删除一个fd;
events可以是以下几个宏的集合
EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里
epoll_wait
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集在epoll监控的事件中已经发送的事件
参数events是分配好的epoll_event结构体数组;
epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存);
maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size;
参数timeout是超时时间 (毫秒, 0会立即返回, -1是永久阻塞);
如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败;
epoll底层原理
当某一进程调用epoll_create方法时, Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
1.每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件;
2.这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度);
3.而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法;
4.这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中;
5.在epoll中,对于每一个事件,都会建立一个epitem结构体;
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可.
如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户. 这个操作的时间复杂度是O(1)
总结一下, epoll的使用过程就是三部曲:
调用epoll_create创建一个epoll句柄;
调用epoll_ctl, 将要监控的文件描述符进行注册;
调用epoll_wait, 等待文件描述符就绪
编写epoll的server
epollServer.cc
#include "epollServer.hpp"
#include <memory>
using namespace std;
using namespace epoll_ns;
static void usage(std::string proc)
{
std::cerr << "Usage:\n\t" << proc << " port"
<< "\n\n";
}
// 一个简单的回调方法
std::string echo(const std::string &message)
{
return "I am epollserver, " + message;
}
// ./select_server port
int main(int argc, char *argv[])
{
if (argc != 2)
{
usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = atoi(argv[1]);
unique_ptr<EpollServer> epollsvr(new EpollServer(echo, port));
epollsvr->initServer();
epollsvr->start();
return 0;
}
epollServer.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"
namespace epoll_ns
{
static const int defaultport = 8888; // 直接预设一个port
static const int size = 128; // 只要不小于0就行了,这个参数现在没有用
static const int defaultvalue = -1;
static const int defaultnum = 64;
using func_t = std::function<std::string(const std::string &)>; // 创建一个处理函数
class EpollServer
{
public:
EpollServer(func_t f, uint16_t port = defaultport,int num = defaultnum) : func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue)
{
}
void initServer()
{
// 1.创建socket
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
// 2.创建epoll模型
_epfd = epoll_create(size); // 这里的参数没有用
if (_epfd < 0)
{
logMessage(FATAL, "epoll create error: %s", strerror(errno));
exit(EPOLL_CREATE_ERR);
}
// 3.添加listensock到_epfd中去
struct epoll_event ev; // 这是一个联合体,目前看成fd使用
ev.events = EPOLLIN; // 读事件
ev.data.fd = _listensock; // 当事件就绪,被重新捞取上来的时候,我们要知道是哪一个fd就绪了!
epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);
// 4.申请就绪事件的空间
_revs = new struct epoll_event[_num];
logMessage(NORMAL, "init server success");
}
// 处理就绪事件
void HandlerEvent(int readyNum)
{
logMessage(DEBUG, "HandlerEvent in");
for (int i = 0; i < readyNum; ++i)
{
uint32_t events = _revs[i].events; // 按照顺序拿取就绪事件,但是我们并不清楚这个事件是谁(分不清是读还是写等等)
// 这时候联合体作用就体现出来了,因为在初始化的时候我们就已经填进去了
int sock = _revs[i].data.fd; // 拿到了对应就绪的描述符
if (sock == _listensock && (events & EPOLLIN))
{
// _listensock读事件就绪, 获取新连接
std::string clientip;
uint16_t clientport;
int fd = Sock::Accept(sock, &clientip, &clientport);
if (fd < 0)
{
logMessage(WARNING, "accept error");
continue;
}
// 获取fd成功,可以直接读取吗?还是不能,因为无法保证是否有数据到来了,所以我们还得继续添加到epoll模型中去
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);
}
else if (events & EPOLLIN)
{
// 普通的读事件就绪
// 注意这里的读取是有很大问题的
// 这样直接读取还是有问题,可以保证第一次读取有数据,但是无法保证读取完整
// 比如对方的报文差分多份就不可以读取完整了,为了简洁我们先这样读
// 打循环也无法保证
char buffer[1024];
int n = recv(sock, buffer, sizeof(buffer), 0);
if (n > 0)
{
buffer[n] = 0;
logMessage(DEBUG, "client# %s", buffer);
// TODO -- 这里的发送是有问题的,不能保证发送的条件是就绪的
std::string respose = func_(buffer);
send(sock, respose.c_str(), respose.size(), 0);
}
else if (n == 0)
{
// 这里表示对端把链接关了
// 关于这个套接字的所有都得清理掉
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
logMessage(NORMAL, "client quit");
}
else
{
// 出错误了
epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);
close(sock);
logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));
}
}
else // 自己可以添加其他的条件,比如写事件之类的
{
}
}
logMessage(DEBUG, "HandlerEvent out");
}
void start()
{
int timeout = -1;
for (;;)
{
int n = epoll_wait(_epfd, _revs, _num, timeout); // 就绪队列中拿取
// n有几个就说明拿到了几个
switch (n)
{
case 0:
logMessage(NORMAL, "timeout...");
break;
case -1:
logMessage(WARNING, "epoll_wait failed, code: %d, errstring: %s", errno, strerror(errno));
break;
default:
logMessage(NORMAL, "have event ready");
HandlerEvent(n); // 需要知道有几个事件就绪了
break;
}
}
}
~EpollServer()
{
if (_listensock != defaultvalue)
close(_listensock);
if (_epfd != defaultvalue)
close(_epfd);
if (_revs)
delete[] _revs;
}
private:
uint16_t _port;
int _listensock;
int _epfd;
struct epoll_event *_revs; // 获取事件
int _num;
func_t func_;
};
}
err.hpp
#pragma once
#include <iostream>
enum
{
USAGE_ERR = 1,
SOCKET_ERR,
BIND_ERR,
LISTEN_ERR,
EPOLL_CREATE_ERR
};
log.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char * to_levelstr(int level)
{
switch(level)
{
case DEBUG : return "DEBUG";
case NORMAL: return "NORMAL";
case WARNING: return "WARNING";
case ERROR: return "ERROR";
case FATAL: return "FATAL";
default : return nullptr;
}
}
void logMessage(int level, const char *format, ...)
{
#define NUM 1024
char logprefix[NUM];
snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",
to_levelstr(level), (long int)time(nullptr), getpid());
char logcontent[NUM];
va_list arg;
va_start(arg, format);
vsnprintf(logcontent, sizeof(logcontent), format, arg);
std::cout << logprefix << logcontent << std::endl;
}
makefile
epoll_server: epollServer.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f epoll_server
sock.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"
class Sock
{
const static int backlog = 32;
public:
static int Socket()
{
// 1. 创建socket文件套接字对象
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
logMessage(FATAL, "create socket error");
exit(SOCKET_ERR);
}
logMessage(NORMAL, "create socket success: %d", sock);
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt));
return sock;
}
static void Bind(int sock, int port)
{
// 2. bind绑定自己的网络信息
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
logMessage(FATAL, "bind socket error");
exit(BIND_ERR);
}
logMessage(NORMAL, "bind socket success");
}
static void Listen(int sock)
{
// 3. 设置socket 为监听状态
if (listen(sock, backlog) < 0) // 第二个参数backlog后面在填这个坑
{
logMessage(FATAL, "listen socket error");
exit(LISTEN_ERR);
}
logMessage(NORMAL, "listen socket success");
}
static int Accept(int listensock, std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(listensock, (struct sockaddr *)&peer, &len);
if (sock < 0)
logMessage(ERROR, "accept error, next");
else
{
logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
*clientip = inet_ntoa(peer.sin_addr);
*clientport = ntohs(peer.sin_port);
}
return sock;
}
};
至此一个简单的epoll服务器就完成了,代码中提及了种种问题,我们得到下一文章才能更好的解决了
重新回归一下epoll原理,LT,ET
因为select/poll/epoll的默认通知机制都是LT模式!
epoll改成ET工作模式
预告
如何正确的处理I0+协议定制+业务逻辑=Reactor
基于ET模式下的Reactor,处理所有的IO