文章目录
- 前言
- 一、用例
- 二、基本数据结构介绍
- 三、源码分析
- event_base_new
- evsignal_new
- event_add
- event_base_dispatch
- 总结
前言
libevent中对三类事件进行了封装,io事件、信号事件、定时器事件,libevent源码分析系列文章会分别分析这三类事件,本文分析信号事件。libevent框架将linux中的信号事件转换成io事件进行处理,从而将信号事件和io事件进行了有机的统一。
本文通过简单的例子展现libevent中信号事件的使用,然后通过源码分析libevent中的信号事件实现原理。
一、用例
#include <event.h>
void signal_cb(evutil_socket_t s,short w, void *arg)
{
printf("====signal_cb======\n");
exit(0);
}
int main()
{
event_base *base = event_base_new(); //初始化reactor对象,epoll_create
event* signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base); //初始化一个信号事件,默认EV_SIGNAL|EV_PERSIST,信号能够被反复触发
event_add(signal_event,0); //将事件注册到reactor中,epoll_ctl
event_base_dispatch(base); //事件主循环,epoll_wait
}
二、基本数据结构介绍
struct event {
TAILQ_ENTRY(event) ev_active_next; //激活事件队列的节点
TAILQ_ENTRY(event) ev_next; //注册事件队列的节点
...
union {
/* used for io events */
struct {
TAILQ_ENTRY(event) ev_io_next; //io事件节点
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
TAILQ_ENTRY(event) ev_signal_next; //信号事件节点
short ev_ncalls; //对应信号执行的次数
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} _ev;
...
void (*ev_callback)(evutil_socket_t, short, void *arg); //注册回调函数
void *ev_arg;
};
struct event_base {
const struct eventop *evsel; //多路复用io封装的方法
const struct eventop *evsigsel; //信号事件的方法
struct evsig_info sig; //存放信号处理的信息
/** Number of total events added to this event_base */
int event_count; //所有事件的个数
/** Number of total events active in this event_base */
int event_count_active; //激活事件的个数
...
struct event_list *activequeues; //激活事件队列
int nactivequeues; //激活事件个数
...
/** Mapping from file descriptors to enabled (added) events */
struct event_io_map io; //存放io事件的hash表
/** Mapping from signal numbers to enabled (added) events. */
struct event_signal_map sigmap; //存放信号事件的hash表
/** All events that have been enabled (added) in this event_base */
struct event_list eventqueue; //存放所有事件的链表
...
};
信号事件队列
信号事件队列:分配空间为32的数组存储所有信号事件,index为注册对应的信号值,相同的信号值用链表串联,当某个信号发生后就能将该信号对应的所有事件添加到激活事件队列中。
struct event_signal_map {
void **entries; //指向数组的指针
int nentries; //数组的个数
};
struct evmap_signal {
struct event_list events; //数组中指针所指向evmap_signal结构体,结构体中为event事件链表
};
io事件队列
io事件队列:分配空间为最大fd,index为对应fd的值,相同的fd事件用链表进行串联,当fd有对应的事件发生时,就遍历fd对应的链表,将对应的事件加入激活事件队列中。
struct evmap_io {
struct event_list events; //数组中的指针指向evmap_io结构体
ev_uint16_t nread; //记录链表中io事件有几个读事件,如果读写事件都有则新加入io事件就不用调用epoll_ctl
ev_uint16_t nwrite; //记录链表中io事件有几个写事件
};
激活事件队列
激活事件队列:分配空间为设置event事件的最大优先级值,index为对应的优先级,index越小越先执行,监听信号事件的fd设置成0。每次有激活事件就按照优先级加入到激活事件队列中,执行对应回调函数时就从小到大遍历index,同时遍历对应的链表。
struct event_list *activequeues;
注册事件队列
注册时间队列:为双向链表将所有的事件串联起来,用于判断是否还有未激活的事件。
libevent使用的双向链表结构详细情况如下文:
TAILQ链表队列详解
三、源码分析
event_base_new
struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{
int i;
struct event _base *base;
...
TAILQ_INIT(&base->eventqueue);
base->sig.ev_signal_pair[0] = -1;
base->sig.ev_signal_pair[1] = -1;
...
evmap_signal_initmap(&base->sigmap); //初始化信号事件队列
...
base->evbase = NULL;
for (i = 0; eventops[i] && !base->evbase; i++) {
if (cfg != NULL) {
/* determine if this backend should be avoided */
if (event_config_is_avoided_method(cfg,
eventops[i]->name)) //选择使用的网络模型,epoll/poll/iocp默认epoll
continue;
if ((eventops[i]->features & cfg->require_features)
!= cfg->require_features) //选择使用水平触发还是边缘触发,默认边缘触发
continue;
}
...
base->evsel = eventops[i]; //找到对应的多路复用方法
base->evbase = base->evsel->init(base); //调用方法中的初始化
}
...
/* allocate a single active event queue */
if (event_base_priority_init(base, 1) < 0) { //初始化激活事件队列,激活事件队列数组大小为1
event_base_free(base);
return NULL;
}
...
return (base);
}
base->evbase = base->evsel->init(base);
初始化信号事件
static const struct eventop epollops_changelist = {
"epoll (with changelist)",
epoll_init,
event_changelist_add,
event_changelist_del,
epoll_dispatch,
epoll_dealloc,
1, /* need reinit */
EV_FEATURE_ET|EV_FEATURE_O1,
EVENT_CHANGELIST_FDINFO_SIZE
};
static void *
epoll_init(struct event_base *base)
{
int epfd;
struct epollop *epollop;
/* Initialize the kernel queue. (The size field is ignored since
* 2.6.8.) */
if ((epfd = epoll_create(32000)) == -1) {
if (errno != ENOSYS)
event_warn("epoll_create");
return (NULL);
}
evutil_make_socket_closeonexec(epfd);
if (!(epollop = mm_calloc(1, sizeof(struct epollop)))) {
close(epfd);
return (NULL);
}
epollop->epfd = epfd;
/* Initialize fields */
epollop->events = mm_calloc(INITIAL_NEVENT, sizeof(struct epoll_event));
if (epollop->events == NULL) {
mm_free(epollop);
close(epfd);
return (NULL);
}
epollop->nevents = INITIAL_NEVENT;
...
evsig_init(base);//初始化信号事件
return (epollop);
}
evsig_init
将信号事件转换成io事件
int
evsig_init(struct event_base *base)
{
if (evutil_socketpair( //初始化管道用于信号通知
AF_UNIX, SOCK_STREAM, 0, base->sig.ev_signal_pair) == -1) {
...
return -1;
}
evutil_make_socket_closeonexec(base->sig.ev_signal_pair[0]);
evutil_make_socket_closeonexec(base->sig.ev_signal_pair[1]);
base->sig.sh_old = NULL;
base->sig.sh_old_max = 0;
evutil_make_socket_nonblocking(base->sig.ev_signal_pair[0]); //管道设置非阻塞
evutil_make_socket_nonblocking(base->sig.ev_signal_pair[1]);
event_assign(&base->sig.ev_signal, base, base->sig.ev_signal_pair[1],
EV_READ | EV_PERSIST, evsig_cb, base); //初始化管道的读事件,设置persist保证能够持续对信号进行处理
base->sig.ev_signal.ev_flags |= EVLIST_INTERNAL;
event_priority_set(&base->sig.ev_signal, 0); //该管道事件的优先级设置成0,加入激活事件队列index=0
base->evsigsel = &evsigops; //注册信号事件的方法
return 0;
}
evsig_init—>evsig_cb
有信号来时会调用evsig_cb回调函数
static void
evsig_cb(evutil_socket_t fd, short what, void *arg)
{
static char signals[1024];
ev_ssize_t n;
int i;
int ncaught[NSIG];
struct event_base *base;
base = arg;
memset(&ncaught, 0, sizeof(ncaught));
while (1) {
n = recv(fd, signals, sizeof(signals), 0); //管道的另一端发送的为激活的信号值,这里读取所有激活的信 号值到数组中
if (n == -1) {
int err = evutil_socket_geterror(fd);
if (! EVUTIL_ERR_RW_RETRIABLE(err))
event_sock_err(1, fd, "%s: recv", __func__);
break;
} else if (n == 0) {
/* XXX warn? */
break;
}
for (i = 0; i < n; ++i) { //将信号发生的次数记录ncaught数组中,index为信号值,内容为次数
ev_uint8_t sig = signals[i];
if (sig < NSIG)
ncaught[sig]++; //方便处理几个相同信号的情况,几个相同信号就调用几次callback
}
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
for (i = 0; i < NSIG; ++i) {
if (ncaught[i])
evmap_signal_active(base, i, ncaught[i]); //将激活的信号加入到激活事件队列中
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
}
evmap_signal_active
加入激活事件队列中
void
evmap_signal_active(struct event_base *base, evutil_socket_t sig, int ncalls)
{
struct event_signal_map *map = &base->sigmap;
struct evmap_signal *ctx;
struct event *ev;
EVUTIL_ASSERT(sig < map->nentries);
GET_SIGNAL_SLOT(ctx, map, sig, evmap_signal);//通过信号值在信号事件队列中找到对应的链表
TAILQ_FOREACH(ev, &ctx->events, ev_signal_next)//遍历链表中的所有事件
event_active_nolock(ev, EV_SIGNAL, ncalls);//将事件加入激活队列当中
}
event_active_nolock
加入激活队列中
void
event_active_nolock(struct event *ev, int res, short ncalls)
{
struct event_base *base;
...
if (ev->ev_events & EV_SIGNAL) {
ev->ev_ncalls = ncalls;//设置该事件的回调函数会触发几次
ev->ev_pncalls = NULL;
}
event_queue_insert(base, ev, EVLIST_ACTIVE);
}
static void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
...
ev->ev_flags |= queue;
switch (queue) {
case EVLIST_INSERTED:
TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
break;
case EVLIST_ACTIVE:
base->event_count_active++;
TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
ev,ev_active_next); //信号事件加入到激活事件队列中,这里的优先级和管道的io优先级是不同的
break;
...
default:
event_errx(1, "%s: unknown queue %x", __func__, queue);
}
}
evsignal_new
初始化信号事件
#define evsignal_new(b, x, cb, arg) \
event_new((b), (x), EV_SIGNAL|EV_PERSIST, (cb), (arg)) //EV_PERSIST表示信号能够被反复触发
struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{
struct event *ev;
ev = mm_malloc(sizeof(struct event));
if (ev == NULL)
return (NULL);
if (event_assign(ev, base, fd, events, cb, arg) < 0) { //初始化一个信号事件
mm_free(ev);
return (NULL);
}
return (ev);
}
int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
if (!base)
base = current_base;
_event_debug_assert_not_added(ev);
ev->ev_base = base;
ev->ev_callback = callback;
ev->ev_arg = arg;
ev->ev_fd = fd;
ev->ev_events = events;
ev->ev_res = 0;
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
if (events & EV_SIGNAL) { //设置的信号事件不能和read/write并列
if ((events & (EV_READ|EV_WRITE)) != 0) {
event_warnx("%s: EV_SIGNAL is not compatible with "
"EV_READ or EV_WRITE", __func__);
return -1;
}
ev->ev_closure = EV_CLOSURE_SIGNAL; //设置信号事件的结束标志
} else {
...
}
if (base != NULL) {
ev->ev_pri = base->nactivequeues / 2; //默认将信号事件放在激活事件队列的中间索引位置
}
return 0;
}
event_add
static inline int
event_add_internal(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
...
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_add(base, ev->ev_fd, ev); //时间加入io事件队列
else if (ev->ev_events & EV_SIGNAL)
res = evmap_signal_add(base, (int)ev->ev_fd, ev); //加入信号事件队列
if (res != -1)
event_queue_insert(base, ev, EVLIST_INSERTED); //加入注册事件队列
}
...
return (res);
}
evmap_signal_add
将信号事件加入信号事件队列中
int
evmap_signal_add(struct event_base *base, int sig, struct event *ev)
{
const struct eventop *evsel = base->evsigsel;//获得信号事件的处理方法
struct event_signal_map *map = &base->sigmap;
struct evmap_signal *ctx = NULL;
if (sig >= map->nentries) {
if (evmap_make_space( //存放信号事件队列的数组容量不够分配内存
map, sig, sizeof(struct evmap_signal *)) == -1)
return (-1);
}
GET_SIGNAL_SLOT_AND_CTOR(ctx, map, sig, evmap_signal, evmap_signal_init,
base->evsigsel->fdinfo_len);//在信号事件队列中根据信号值找到对应的链表ctx
if (TAILQ_EMPTY(&ctx->events)) {
if (evsel->add(base, ev->ev_fd, 0, EV_SIGNAL, NULL) //调用信号事件方法evsig_add
== -1)
return (-1);
}
TAILQ_INSERT_TAIL(&ctx->events, ev, ev_signal_next);//信号事件插入对应的信号事件队列
return (1);
}
evsig_add
监听管道读事件,设置信号捕捉函数
static int
evsig_add(struct event_base *base, evutil_socket_t evsignal, short old, short events, void *p)
{
struct evsig_info *sig = &base->sig;
...
evsig_base = base;
evsig_base_n_signals_added = ++sig->ev_n_signals_added; //添加信号事件的个数
evsig_base_fd = base->sig.ev_signal_pair[0]; //管道写端保存成全局的静态变量
if (_evsig_set_handler(base, (int)evsignal, evsig_handler) == -1) {//设置该信号的捕捉函数
goto err;
}
if (!sig->ev_signal_added) { //用于信号的管道读端没有监听
if (event_add(&sig->ev_signal, NULL)) //将管道的读端事件加入io事件队列中并epoll监听
goto err;
sig->ev_signal_added = 1;
}
return (0);
err:
EVSIGBASE_LOCK();
--evsig_base_n_signals_added;
--sig->ev_n_signals_added;
EVSIGBASE_UNLOCK();
return (-1);
}
_evsig_set_handler
设置对应信号的捕捉函数
int
_evsig_set_handler(struct event_base *base,
int evsignal, void (__cdecl *handler)(int))
{
#ifdef _EVENT_HAVE_SIGACTION
struct sigaction sa;
#else
ev_sighandler_t sh;
#endif
struct evsig_info *sig = &base->sig;
void *p;
if (evsignal >= sig->sh_old_max) { //信号值大于用于保存旧信号值存放的数组,则分配空间
int new_max = evsignal + 1;
event_debug(("%s: evsignal (%d) >= sh_old_max (%d), resizing",
__func__, evsignal, sig->sh_old_max));
p = mm_realloc(sig->sh_old, new_max * sizeof(*sig->sh_old));
if (p == NULL) {
event_warn("realloc");
return (-1);
}
memset((char *)p + sig->sh_old_max * sizeof(*sig->sh_old),
0, (new_max - sig->sh_old_max) * sizeof(*sig->sh_old));
sig->sh_old_max = new_max;
sig->sh_old = p;
}
/* allocate space for previous handler out of dynamic array */
sig->sh_old[evsignal] = mm_malloc(sizeof *sig->sh_old[evsignal]);//初始化数组的二级指针
if (sig->sh_old[evsignal] == NULL) {//
event_warn("malloc");
return (-1);
}
/* save previous handler and setup new handler */
#ifdef _EVENT_HAVE_SIGACTION
memset(&sa, 0, sizeof(sa));
sa.sa_handler = handler;
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
if (sigaction(evsignal, &sa, sig->sh_old[evsignal]) == -1) {//注册信号捕捉函数,并保存原有的信号处理动作到数组中。
event_warn("sigaction");
mm_free(sig->sh_old[evsignal]);
sig->sh_old[evsignal] = NULL;
return (-1);
}
#else
if ((sh = signal(evsignal, handler)) == SIG_ERR) {//作用同上
event_warn("signal");
mm_free(sig->sh_old[evsignal]);
sig->sh_old[evsignal] = NULL;
return (-1);
}
*sig->sh_old[evsignal] = sh;
#endif
return (0);
}
evsig_handler
将信号值通过管道的一端进行发送
static void __cdecl
evsig_handler(int sig)
{
...
#ifndef _EVENT_HAVE_SIGACTION
signal(sig, evsig_handler);
#endif
msg = sig;
send(evsig_base_fd, (char*)&msg, 1, 0);//将信号写入管道的写端
errno = save_errno;
...
}
event_base_dispatch
事件主循环
int
event_base_dispatch(struct event_base *event_base)
{
return (event_base_loop(event_base, 0));
}
int
event_base_loop(struct event_base *base, int flags)
{
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
...
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base(base);
done = 0;
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
...
res = evsel->dispatch(base, tv_p);//有信号触发读管道的信号事件,并将对应的信号事件加入到激活事件队列中
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
...
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);//处理激活事件队列
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
clear_time_cache(base);
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
event_process_active
处理激活队列中的信号事件
static int
event_process_active(struct event_base *base)
{
struct event_list *activeq = NULL;
int i, c = 0;
for (i = 0; i < base->nactivequeues; ++i) {//遍历激活事件队列中的数组获得链表
if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
base->event_running_priority = i;
activeq = &base->activequeues[i];
c = event_process_active_single_queue(base, activeq);//处理某一信号的对应的所有信号事件
...
}
base->event_running_priority = -1;
return c;
}
static int
event_process_active_single_queue(struct event_base *base,
struct event_list *activeq)
{
struct event *ev;
int count = 0;
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
if (ev->ev_events & EV_PERSIST) //信号事件默认有EV_PERSIST
event_queue_remove(base, ev, EVLIST_ACTIVE);//信号事件从激活事件队列中删除,该信号能够反复触发
else
event_del_internal(ev); //信号事件从注册事件队列、激活事件队列、信号事件队列中删除该信号事件,该信号只触发一次
switch (ev->ev_closure) {
case EV_CLOSURE_SIGNAL:
event_signal_closure(base, ev); //调用信号事件的回调函数
break;
case EV_CLOSURE_PERSIST:
event_persist_closure(base, ev);
break;
default:
case EV_CLOSURE_NONE:
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*ev->ev_callback)(
ev->ev_fd, ev->ev_res, ev->ev_arg);
break;
}
...
return count;
}
static void
event_queue_remove(struct event_base *base, struct event *ev, int queue)
{
ev->ev_flags &= ~queue;
switch (queue) {
case EVLIST_INSERTED:
...
break;
case EVLIST_ACTIVE:
base->event_count_active--;
TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
ev, ev_active_next);//从激活事件队列中删除信号事件
break;
case EVLIST_TIMEOUT:
...
break;
default:
event_errx(1, "%s: unknown queue %x", __func__, queue);
}
}
static inline void
event_signal_closure(struct event_base *base, struct event *ev)
{
short ncalls;
ncalls = ev->ev_ncalls;
if (ncalls != 0)
ev->ev_pncalls = &ncalls;
while (ncalls) { //对应的信号事件触发n次
ncalls--;
ev->ev_ncalls = ncalls;
if (ncalls == 0)
ev->ev_pncalls = NULL;
(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg); //调用信号事件的回调函数
...
}
}
总结
本文对信号事件的使用和源码进行了分析,但是使用中还有一些注意事项需要后面分析。