文章目录
- 前言
- 一、用例
- 简单服务端实现
- 参数设置
- 二、基本数据结构介绍
- 三、源码分析
- event_base_new
- event_new
- event_add
- event_base_dispatch
- 三、libevent和epoll中的事件标记
- epoll中的事件标记
- libevent中的事件标记
- libevent和epoll中事件标记的对应关系
- 总结
前言
libevent中对三类事件进行了封装,io事件、信号事件、定时器事件,libevent源码分析系列文章会分别分析这三类事件,本文分析io事件。
本文通过简单的例子展现libevent中io事件的使用,然后通过源码分析libevent中的IO事件实现原理,其中主要分析的是对epoll中的封装。
一、用例
简单服务端实现
#include <iostream>
#include <event.h>
#include <thread>
#include <errno.h>
#include <string.h>
using namespace std;
#define SPORT 8000
void read_cb(evutil_socket_t s,short w, void *arg)
{
printf("====read_cb======\n");
}
void write_cb(evutil_socket_t s,short w, void *arg)
{
sleep(5);
printf("====write_cb======\n");
}
void ev_cb(evutil_socket_t s,short w, void *arg)
{
if(w & EV_READ)
{
printf("====read======\n");
}
else if(w & EV_WRITE)
{
printf("====write======\n");
}
else if(w & EV_TIMEOUT)
{
printf("====timeout======\n");
}
}
void listen_cb(evutil_socket_t s,short w, void *arg) //有新的连接到达触发回调函数
{
cout<<"listen_cb"<<endl;
sockaddr_in sin;
socklen_t size = sizeof(sin);
evutil_socket_t cfd = accept(s,(sockaddr*)&sin,&size);
evutil_make_socket_nonblocking(cfd);
char ip[16] = {0};
evutil_inet_ntop(AF_INET,&sin.sin_addr,ip,sizeof(ip)-1);
event_base *base = (event_base *)arg;
event *rev = event_new(base,cfd,EV_READ|EV_PERSIST,read_cb,NULL);//设置连接fd的读写事件,默认水平触发,只有设置成EV_PERSIST,水平触发和边沿触发才有意义
event *wev = event_new(base,cfd,EV_WRITE,write_cb,NULL);
event *ev = event_new(base,cfd,EV_WRITE|EV_READ|EV_TIMEOUT|PERSIST,ev_cb,NULL);
event_add(rev,NULL);
event_add(wev,NULL);
event_add(ev,NULL);
}
int main(int argc,char *argv[])
{
event_base *base = event_base_new();
//创建socket
evutil_socket_t sock = socket(AF_INET,SOCK_STREAM,0);
if(sock<=0)
{
cerr<<"socket error:"<<strerror(errno)<<endl;
return -1;
}
//设置地址复用和非阻塞
evutil_make_socket_nonblocking(sock);
evutil_make_listen_socket_reuseable(sock);
//绑定端口和地址
sockaddr_in sin;
memset(&sin,0,sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
int re = ::bind(sock,(sockaddr*)&sin,sizeof(sin));
if(re != 0)
{
cerr<<"bind error:"<<strerror(errno)<<endl;
return -1;
}
//开始监听
listen(sock,10);
//初始化监听fd的读事件,能够持续触发
event *ev = event_new(base,sock,EV_READ|EV_PERSIST,listen_cb,base);
event_add(ev,0);
//进入事件主循环
event_base_dispatch(base);
evutil_closesocket(sock);
event_base_free(base);
return 0;
}
参数设置
void listen_cb(evconnlistener *ev, evutil_socket_t s, struct sockaddr * addr, int socklen, void *arg)
{
cout << "lesson_cb" << endl;
}
int main()
{
//创建配置上下文
event_config *conf = event_config_new();
//显示支持的网络模式,select poll epoll
const char **methods = event_get_supported_methods();
cout << "supported_methods:" << endl;
for (int i = 0; methods[i]!= NULL; i++) //二维数组字符串的末尾以Null结尾
{
cout << methods[i] << endl;
}
//设置特征,根据特征选择网络模型
//设置了EV_FEATURE_FDS 其他特征就无法设置,在windows中EV_FEATURE_FDS无效
//event_config_require_features(conf, EV_FEATURE_ET);
event_config_require_features(conf,EV_FEATURE_FDS); //不支持epoll
//设置不选择的网络模型
event_config_avoid_method(conf, "epoll");//去掉epoll
//event_config_avoid_method(conf, "poll");
//设置libevent中不使用锁,一个线程一个base的情况下能减少锁的开销
event_config_set_flag(conf,EVENT_BASE_FLAG_NOLOCK);
//初始化配置libevent上下文
event_base *base = event_base_new_with_config(conf);
event_config_free(conf);//配置完成后就可以释放配置的空间
if (base)
{
//获取当前网络模型
cout << "current method is " << event_base_get_method(base) << endl;
//确认特征是否生效
int f = event_base_get_features(base);
if (f&EV_FEATURE_ET) //windows中这些特征都不支持,linux当中不支持文件描述符
//在linux中设置文件描述符后,初始化会失败
cout << "EV_FEATURE_ET events are supported." << endl;
else
cout << "EV_FEATURE_ET events are not supported." << endl;
if (f&EV_FEATURE_O1)
cout << "EV_FEATURE_O1 events are supported." << endl;
else
cout << "EV_FEATURE_O1 events are not supported." << endl;
if (f&EV_FEATURE_FDS)
cout << "EV_FEATURE_FDS events are supported." << endl;
else
cout << "EV_FEATURE_FDS events are not supported." << endl;
cout << "event_base_new_with_config success!" << endl;
sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);
evconnlistener * ev = evconnlistener_new_bind(base, listen_cb, base, 10,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
(sockaddr*)&sin, sizeof(sin));
event_base_dispatch(base);
evconnlistener_free(ev);
event_base_free(base);
}
return 0;
}
enum event_method_feature {
EV_FEATURE_ET = 0x01, //要求边沿触发
EV_FEATURE_O1 = 0x02, //要求事件操作的时间复杂度为o(1),epoll满足前两个
EV_FEATURE_FDS = 0x04 //要求支持任意文件描述符,不仅是套接字,select/poll只满足这个
};
enum event_base_config_flag {
EVENT_BASE_FLAG_NOLOCK = 0x01, //不加锁,一个线程一个base就可以不加锁
EVENT_BASE_FLAG_IGNORE_ENV = 0x02, //忽略环境变量,不通过环境变量检测支持的网络模型
EVENT_BASE_FLAG_STARTUP_IOCP = 0x04, //用于windows中的iocp
EVENT_BASE_FLAG_NO_CACHE_TIME = 0x08, //不缓存时间,在处理定时器事件是每次通过系统调用获取时间,消耗更多cpu
EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST = 0x10 //epoll中使用
};
二、基本数据结构介绍
struct event {
TAILQ_ENTRY(event) ev_active_next; //激活事件队列的节点
TAILQ_ENTRY(event) ev_next; //注册事件队列的节点
...
union {
/* used for io events */
struct {
TAILQ_ENTRY(event) ev_io_next; //io事件节点
struct timeval ev_timeout;
} ev_io;
...
void (*ev_callback)(evutil_socket_t, short, void *arg); //注册回调函数
void *ev_arg;
};
struct event_base {
const struct eventop *evsel; //多路复用io封装的方法
/** Number of total events added to this event_base */
int event_count; //所有事件的个数
/** Number of total events active in this event_base */
int event_count_active; //激活事件的个数
...
struct event_list *activequeues; //激活事件队列
int nactivequeues; //激活事件个数
...
/** Mapping from file descriptors to enabled (added) events */
struct event_io_map io; //存放io事件的hash表
/** All events that have been enabled (added) in this event_base */
struct event_list eventqueue; //存放所有事件的链表
...
};
io事件队列
io事件队列:分配空间为最大fd,index为对应fd的值,相同的fd事件用链表进行串联,当fd有对应的事件发生时,就遍历fd对应的链表,将对应的事件加入激活事件队列中。
struct evmap_io {
struct event_list events; //数组中的指针指向evmap_io结构体
ev_uint16_t nread; //记录链表中io事件有几个读事件,如果读写事件都有则新加入io事件就不用调用epoll_ctl
ev_uint16_t nwrite; //记录链表中io事件有几个写事件
};
激活事件队列
激活事件队列:分配空间为设置event事件的最大优先级值,index为对应的优先级,index越小越先执行,监听信号事件的fd设置成0。每次有激活事件就按照优先级加入到激活事件队列中,执行对应回调函数时就从小到大遍历index,同时遍历对应的链表。
struct event_list *activequeues;
注册事件队列
注册时间队列:为双向链表将所有的事件串联起来,用于判断是否还有未激活的事件,该队列为空则循环就可以结束了。
struct event_list eventqueue;
libevent使用的双向链表结构详细情况如下文:
TAILQ链表队列详解
三、源码分析
event_base_new
设置epoll中的参数
struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{
int i;
struct event_base *base;
int should_check_environment;
if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
event_warn("%s: calloc", __func__);
return NULL;
}
detect_monotonic();
gettime(base, &base->event_tv);
min_heap_ctor(&base->timeheap);
TAILQ_INIT(&base->eventqueue);
...
evmap_io_initmap(&base->io);
evmap_signal_initmap(&base->sigmap);
event_changelist_init(&base->changelist);
base->evbase = NULL;
should_check_environment =
!(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));
for (i = 0; eventops[i] && !base->evbase; i++) {
if (cfg != NULL) {
/* determine if this backend should be avoided */
if (event_config_is_avoided_method(cfg,
eventops[i]->name)) //通过设置不支持io复用的名字,选择使用的网络模型,epoll/poll/iocp默认epoll
continue;
if ((eventops[i]->features & cfg->require_features)
!= cfg->require_features) //通过设置要求的特性来选择io多路复用方法
continue;
}
/* also obey the environment variables */
if (should_check_environment &&
event_is_method_disabled(eventops[i]->name)) //通过环境变量检测网络模型是否可用
continue;
base->evsel = eventops[i]; //找到对应的多路复用方法
base->evbase = base->evsel->init(base); //调用方法中的初始化
}
...
if (event_base_priority_init(base, 1) < 0) {
event_base_free(base);
return NULL;
}
...
return (base);
}
base->evsel->init----》epoll_init
static void *
epoll_init(struct event_base *base)
{
int epfd;
struct epollop *epollop;
/* Initialize the kernel queue. (The size field is ignored since
* 2.6.8.) */
if ((epfd = epoll_create(32000)) == -1) {
if (errno != ENOSYS)
event_warn("epoll_create");
return (NULL);
}
evutil_make_socket_closeonexec(epfd);
if (!(epollop = mm_calloc(1, sizeof(struct epollop)))) {
close(epfd);
return (NULL);
}
epollop->epfd = epfd;
/* Initialize fields */
epollop->events = mm_calloc(INITIAL_NEVENT, sizeof(struct epoll_event));
if (epollop->events == NULL) {
mm_free(epollop);
close(epfd);
return (NULL);
}
epollop->nevents = INITIAL_NEVENT;
if ((base->flags & EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST) != 0 ||
((base->flags & EVENT_BASE_FLAG_IGNORE_ENV) == 0 &&
evutil_getenv("EVENT_EPOLL_USE_CHANGELIST") != NULL))
base->evsel = &epollops_changelist;
evsig_init(base);
return (epollop);
}
event_new
初始化一个fd的event事件
int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
if (!base)
base = current_base;
_event_debug_assert_not_added(ev);
ev->ev_base = base;
ev->ev_callback = callback;
ev->ev_arg = arg;
ev->ev_fd = fd;
ev->ev_events = events;
ev->ev_res = 0;
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
if (events & EV_SIGNAL) {
...
} else {
if (events & EV_PERSIST) {
evutil_timerclear(&ev->ev_io_timeout);
ev->ev_closure = EV_CLOSURE_PERSIST;
} else {
ev->ev_closure = EV_CLOSURE_NONE;
}
}
min_heap_elem_init(ev);
if (base != NULL) {
/* by default, we put new events into the middle priority */
ev->ev_pri = base->nactivequeues / 2;
}
return 0;
}
event_add
io事件加入io事件队列、注册事件队列中
static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
...
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_add(base, ev->ev_fd, ev); //io事件加入io事件队列中
else if (ev->ev_events & EV_SIGNAL)
res = evmap_signal_add(base, (int)ev->ev_fd, ev);
if (res != -1)
event_queue_insert(base, ev, EVLIST_INSERTED); //加入注册事件队列中
if (res == 1) {
/* evmap says we need to notify the main thread. */
notify = 1;
res = 0;
}
}
...
return (res);
}
int
evmap_io_add(struct event_base *base, evutil_socket_t fd, struct event *ev)
{
const struct eventop *evsel = base->evsel;
struct event_io_map *io = &base->io;
struct evmap_io *ctx = NULL;
int nread, nwrite, retval = 0;
short res = 0, old = 0;
struct event *old_ev;
EVUTIL_ASSERT(fd == ev->ev_fd);
if (fd < 0)
return 0;
#ifndef EVMAP_USE_HT
if (fd >= io->nentries) {
if (evmap_make_space(io, fd, sizeof(struct evmap_io *)) == -1)
return (-1);
}
#endif//通过fd在io事件队列中查找链表,没有则创建一个空的队列,有则直接返回
GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,
evsel->fdinfo_len);
nread = ctx->nread;
nwrite = ctx->nwrite;//fd对应的链表中读写事件的个数
if (nread)
old |= EV_READ;//fd对应链表中已经有读事件
if (nwrite)
old |= EV_WRITE; //fd对应链表中已经有写事件
if (ev->ev_events & EV_READ) {
if (++nread == 1) //读事件个数+1
res |= EV_READ; //fd对应链表中第一次加入读事件,需要加入读事件的标志
}
if (ev->ev_events & EV_WRITE) {
if (++nwrite == 1)
res |= EV_WRITE;
}
if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff)) { //读写事件的个数超过最大值
event_warnx("Too many events reading or writing on fd %d",
(int)fd);
return -1;
}
...
if (res) { //代表fd链表第一次加入读或者写事件
void *extra = ((char*)ctx) + sizeof(struct evmap_io);
/* XXX(niels): we cannot mix edge-triggered and
* level-triggered, we should probably assert on
* this. */
if (evsel->add(base, ev->ev_fd, //将监听事件添加到epoll当中,默认水平触发
old, (ev->ev_events & EV_ET) | res, extra) == -1)
return (-1);
retval = 1;
}
ctx->nread = (ev_uint16_t) nread;
ctx->nwrite = (ev_uint16_t) nwrite;
TAILQ_INSERT_TAIL(&ctx->events, ev, ev_io_next);//事件加入i/o事件队列中
return (retval);
}
evsel->add ---》
static int
epoll_nochangelist_add(struct event_base *base, evutil_socket_t fd,
short old, short events, void *p)
{
struct event_change ch;
ch.fd = fd;
ch.old_events = old; //之前触发的事件
ch.read_change = ch.write_change = 0;
if (events & EV_WRITE) //新加入写事件
ch.write_change = EV_CHANGE_ADD |
(events & EV_ET); //增加写事件的标记
if (events & EV_READ) //新加入读事件
ch.read_change = EV_CHANGE_ADD |
(events & EV_ET); //增加读事件的标记
return epoll_apply_one_change(base, base->evbase, &ch);
}
static int
epoll_apply_one_change(struct event_base *base,
struct epollop *epollop,
const struct event_change *ch)
{
struct epoll_event epev;
int op, events = 0;
if (1) {
if ((ch->read_change & EV_CHANGE_ADD) ||
(ch->write_change & EV_CHANGE_ADD)) { //有新增的读或者写事件
/* If we are adding anything at all, we'll want to do
* either an ADD or a MOD. */
events = 0;
op = EPOLL_CTL_ADD; //添加事件标记 epoll_ctl add
if (ch->read_change & EV_CHANGE_ADD) {
events |= EPOLLIN; //添加读事件
} else if (ch->read_change & EV_CHANGE_DEL) {
;
} else if (ch->old_events & EV_READ) {
events |= EPOLLIN;
}
if (ch->write_change & EV_CHANGE_ADD) {
events |= EPOLLOUT; //添加写事件
} else if (ch->write_change & EV_CHANGE_DEL) {
;
} else if (ch->old_events & EV_WRITE) {
events |= EPOLLOUT;
}
if ((ch->read_change|ch->write_change) & EV_ET)
events |= EPOLLET; //添加边沿触发标志
if (ch->old_events) {
op = EPOLL_CTL_MOD; //如果之前fd中有事件,则调用epoll_ctl mod
}
} else if ((ch->read_change & EV_CHANGE_DEL) ||
(ch->write_change & EV_CHANGE_DEL)) { //删除事件
/* If we're deleting anything, we'll want to do a MOD
* or a DEL. */
op = EPOLL_CTL_DEL;
if (ch->read_change & EV_CHANGE_DEL) {
if (ch->write_change & EV_CHANGE_DEL) {
events = EPOLLIN|EPOLLOUT; //删除读写事件 epoll_del
} else if (ch->old_events & EV_WRITE) { //之前fd中有写事件
events = EPOLLOUT; //删除读事件,通过只添加fd的epollout事件 epoll_mod epollout
op = EPOLL_CTL_MOD;
} else {
events = EPOLLIN; //fd中之前没有事件,删除读事件 epoll_del epollin
}
} else if (ch->write_change & EV_CHANGE_DEL) {
if (ch->old_events & EV_READ) {
events = EPOLLIN; //fd中有读事件,删除写事件 epoll_mod epollin
op = EPOLL_CTL_MOD;
} else {
events = EPOLLOUT;//fd中没有读事件,删除写事件 epoll_del
}
}
}
if (!events)
return 0;
memset(&epev, 0, sizeof(epev));
epev.data.fd = ch->fd;
epev.events = events; //将fd的事件用于更新epoll
if (epoll_ctl(epollop->epfd, op, ch->fd, &epev) == -1) {
if (op == EPOLL_CTL_MOD && errno == ENOENT) {
/* If a MOD operation fails with ENOENT, the
* fd was probably closed and re-opened. We
* should retry the operation as an ADD.
*/ //如果epoll_mod失败,fd可能被关闭了,再用epoll_add再试一次
if (epoll_ctl(epollop->epfd, EPOLL_CTL_ADD, ch->fd, &epev) == -1) {
event_warn("Epoll MOD(%d) on %d retried as ADD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
}
} else if (op == EPOLL_CTL_ADD && errno == EEXIST) {
/* If an ADD operation fails with EEXIST,
* either the operation was redundant (as with a
* precautionary add), or we ran into a fun
* kernel bug where using dup*() to duplicate the
* same file into the same fd gives you the same epitem
* rather than a fresh one. For the second case,
* we must retry with MOD. */
//epoll_add 失败在epoll_mod试一次
if (epoll_ctl(epollop->epfd, EPOLL_CTL_MOD, ch->fd, &epev) == -1) {
event_warn("Epoll ADD(%d) on %d retried as MOD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll ADD(%d) on %d retried as MOD; succeeded.",
(int)epev.events,
ch->fd));
}
} else if (op == EPOLL_CTL_DEL &&
(errno == ENOENT || errno == EBADF ||
errno == EPERM)) {
} else {
}
} else {
}
return 0;
}
event_base_dispatch
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
...
while (!done) {
base->event_continue = 0;
...
timeout_correct(base, &tv);
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p); //只有io事件 tv_p==NULL,epoll_wait阻塞
} else {
evutil_timerclear(&tv); //还有激活事件没有处理,epoll_wait直接返回
}
/* If we have no events, we just exit */
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) { //没有注册事件队列直接退出
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
...
res = evsel->dispatch(base, tv_p); //没有超时事件则永久阻塞,直到有io事件发生,并将Io事件加入激活事件队列中
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
...
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base); //处理激活事件队列中的io事件
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
done:
...
return (retval);
}
evsel->dispatch ----》
static int
epoll_dispatch(struct event_base *base, struct timeval *tv)
{
struct epollop *epollop = base->evbase;
struct epoll_event *events = epollop->events;
int i, res;
long timeout = -1;
...
epoll_apply_changes(base);
event_changelist_remove_all(&base->changelist, base);
EVBASE_RELEASE_LOCK(base, th_base_lock);
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); //没有定时事件timeout==-1永久阻塞
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}
return (0);
}
for (i = 0; i < res; i++) {
int what = events[i].events;
short ev = 0;
//将底层的epoll事件标记,转换成libevent的事件标记
if (what & (EPOLLHUP|EPOLLERR)) { //连接文件描述符被关闭或者出现错误
ev = EV_READ | EV_WRITE; //设置成读写事件
} else {
if (what & EPOLLIN)
ev |= EV_READ;
if (what & EPOLLOUT)
ev |= EV_WRITE;
}
if (!ev)
continue;
evmap_io_active(base, events[i].data.fd, ev | EV_ET); //从io事件队列中触发的event事件加入到激活事件队列当中
}
if (res == epollop->nevents && epollop->nevents < MAX_NEVENT) { //分配存储epoll_wait返回event的空间不够用了,需要2倍扩容
/* We used all of the event space this time. We should
be ready for more events next time. */
int new_nevents = epollop->nevents * 2;
struct epoll_event *new_events;
new_events = mm_realloc(epollop->events,
new_nevents * sizeof(struct epoll_event));
if (new_events) {
epollop->events = new_events;
epollop->nevents = new_nevents;
}
}
return (0);
}
static int
event_process_active(struct event_base *base)
{
/* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
int i, c = 0;
for (i = 0; i < base->nactivequeues; ++i) { //遍历激活事件队列
if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
base->event_running_priority = i;
activeq = &base->activequeues[i];
c = event_process_active_single_queue(base, activeq); //挨着处理激活事件队列中的事件
if (c < 0) {
base->event_running_priority = -1;
return -1;
} else if (c > 0)
break;
}
}
return c;
}
static int
event_process_active_single_queue(struct event_base *base,
struct event_list *activeq)
{
struct event *ev;
int count = 0;
EVUTIL_ASSERT(activeq != NULL);
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
if (ev->ev_events & EV_PERSIST)
event_queue_remove(base, ev, EVLIST_ACTIVE); //从激活事件队列中移除该事件
else
event_del_internal(ev); //从io事件队列、激活事件队列、注册事件队列中移除该事件
if (!(ev->ev_flags & EVLIST_INTERNAL))
++count;
...
switch (ev->ev_closure) {
case EV_CLOSURE_SIGNAL:
event_signal_closure(base, ev);
break;
case EV_CLOSURE_PERSIST:
event_persist_closure(base, ev); //有persit标记的fd事件会反复触发
break;
default:
case EV_CLOSURE_NONE: //没有persist标记的fd事件只会触发一次
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg);
break;
}
...
return count;
}
//没有persist标记,fd只会触发一次
static inline int
event_del_internal(struct event *ev)
{
struct event_base *base;
int res = 0, notify = 0;
base = ev->ev_base;
...
if (ev->ev_flags & EVLIST_ACTIVE)
event_queue_remove(base, ev, EVLIST_ACTIVE); //从激活事件队列中移除
if (ev->ev_flags & EVLIST_INSERTED) {
event_queue_remove(base, ev, EVLIST_INSERTED); //从注册事件队列中移除
if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_del(base, ev->ev_fd, ev); //从Io事件队列中移除
else
res = evmap_signal_del(base, (int)ev->ev_fd, ev);
if (res == 1) {
/* evmap says we need to notify the main thread. */
notify = 1;
res = 0;
}
}
...
return (res);
}
int
evmap_io_del(struct event_base *base, evutil_socket_t fd, struct event *ev)
{
const struct eventop *evsel = base->evsel;
struct event_io_map *io = &base->io;
struct evmap_io *ctx;
int nread, nwrite, retval = 0;
short res = 0, old = 0;
if (fd < 0)
return 0;
...
GET_IO_SLOT(ctx, io, fd, evmap_io);//通过fd从io事件队列中获得链表
nread = ctx->nread; //该链表中读事件的个数
nwrite = ctx->nwrite;
if (nread)
old |= EV_READ; //之前有读事件监听
if (nwrite)
old |= EV_WRITE; //之前有写事件监听
if (ev->ev_events & EV_READ) { //触发的事件为读事件
if (--nread == 0) //这个读事件触发完了就没有读事件了
res |= EV_READ; //接下来需要在epoll中将读事件删除
EVUTIL_ASSERT(nread >= 0);
}
if (ev->ev_events & EV_WRITE) {
if (--nwrite == 0)
res |= EV_WRITE;
EVUTIL_ASSERT(nwrite >= 0);
}
if (res) { //如果有需要从epoll中删除的事件
void *extra = ((char*)ctx) + sizeof(struct evmap_io);
if (evsel->del(base, ev->ev_fd, old, res, extra) == -1) //从epoll中删除读或者写事件的监听
return (-1);
retval = 1;
}
ctx->nread = nread;
ctx->nwrite = nwrite;
TAILQ_REMOVE(&ctx->events, ev, ev_io_next); //从io事件队列中移除已经发生的事件
return (retval);
}
三、libevent和epoll中的事件标记
epoll中的事件标记
EPOLLIN:表示对应的文件描述符可读。
EPOLLOUT:表示对应的文件描述符可写。
EPOLLPRI:表示有紧急的数据可读。
EPOLLERR:表示对应的文件描述符出现错误条件。
EPOLLHUP:表示对应的文件描述符挂起或者断开连接。
EPOLLET:这是一个模式标志,设置此标志将使epoll工作在边缘触发(Edge Triggered, ET)模式而非默认的水平触发(Level Triggered, LT)模式。
EPOLLONESHOT:只会触发一次。
EPOLLRDHUP:最新版内核添加标记,当对端调用close或则shutdown (SHUT_WR) 时会触发这个标记,没有这个标记则只能通过read返回0来判断对端是否关闭连接,有了这个标记就能马上得知对端关闭了链接而不需要将所有数据读完直到读到FIN标记。
EPOLLHUP和EPOLLRDHUP的区别:
EPOLLHUP触发常见时机:
1.收到对端发来的 RST 报文触发EPOLLIN + EPOLLRDHUP + EPOLLHUP + EPOLLERR 事件;
2.将一个不可能触发该事件发生的套接字加入 EPOLL中。
3.本端调用shutdown (SHUT_RDWR),只会关闭连接的读端和写端,不会释放文件描述符和其他相关资源,但此时该套接字已经处于 “聋哑” 状态,没有作用了,所以相当与被 “挂起”。而当关闭(close)套接字时,内核会自动将套接字描述符从 epoll 中删除并且释放相关资源,因此本端不会再触发任何事件 。
EPOLLRDHUP触发的时机:
对端调用close或者shutdown (SHUT_WR)触发 EPOLLIN + EPOLLRDHUP。
libevent中的事件标记
#define EV_TIMEOUT 0x01
表示定时器事件
#define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg))
标志一个事件是否为定时器事件是通过event_add中是否有超时时间确定的,而不是通过EV_TIMEOUT标记。
#define EV_READ 0x02
表示读事件,等同于epoll中的EPOLLIN
##define EV_WRITE 0x04
表示写事件,等同于epoll中的EPOLLOUT
#define EV_SIGNAL 0x08
表示加入的事件是信号事件,epoll中只能处理io事件,不能与EV_READ EV_WRITE EV_ET 并列
#define EV_PERSIST 0x10
//表示事件能够反复触发,能用于io事件、定时器事件、信号事件
#define evsignal_new(b, x, cb, arg)
event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg)) //默认信号事件能够反复触发
#define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg)) //默认定时器事件只能触发一次
和epoll中的区别: epoll中不加EPOLLONESHOT表示能够反复触发io事件,epoll中加上EPOLLONESHOT表示io事件只触发一次;libevent中不加EV_PERSIST表示事件只触发一次,加上表示事件能够反复触发。
#define EV_ET 0x20
//边沿触发标志,默认和epoll中一致都是水平触发
#define EV_CLOSED 0x80
libevent2.1版本新增,标记对端关闭了连接
libevent和epoll中事件标记的对应关系
libevent2.0版本
//将底层的epoll事件标记,转换成libevent的事件标记
if (what & (EPOLLHUP|EPOLLERR)) { //连接文件描述符被关闭或者出现错误
ev = EV_READ | EV_WRITE; //设置成读写事件
} else {
if (what & EPOLLIN)
ev |= EV_READ;
if (what & EPOLLOUT)
ev |= EV_WRITE;
}
libevent2.1版本增加了EV_CLOSED标记
if (what & EPOLLERR) { //文件描述符发生错误
ev = EV_READ | EV_WRITE;
} else if ((what & EPOLLHUP) && !(what & EPOLLRDHUP)) { //文件描述符被挂起排除对端关闭连接这种情况
ev = EV_READ | EV_WRITE;
} else {
if (what & EPOLLIN) //读事件
ev |= EV_READ;
if (what & EPOLLOUT) //写事件
ev |= EV_WRITE;
if (what & EPOLLRDHUP) //表示对端关闭了连接
ev |= EV_CLOSED;
}
总结
以上就是对io事件的源码分析,io事件一般分为三个方面:io事件的建立、io事件的处理、io事件的关闭,libevent框架主要是对io事件的建立,还需要使用者注册回调来处理io事件和关闭io事件,这需要后续讨论。