概述
libevent
的event
,event_callback
,event_base
除了可以用来支持套接字的自动和手动分发,也可用来支持定时机制,信号处理.这里,我们补充对定时机制,信号处理的分析.
libevent
中的通信对象集成了对流量控制的支持,我们这里补充对通信对象流量控制的分析.
libevent中的定时机制
1.概述
前面我们分析event
,event_callback
,event_base
时,立足于服务于套接字的event
来展开分析的.
libevent
中的event
既可以用于支持对套接字的自动分法,手动分发.也可用于支持定时机制.还可用于支持信号处理.
这一部分立足于支持定时机制,来分析libevent
中event,event_callback,event_base
.
2. event
对于定时支持
struct event {
struct event_callback ev_evcallback;
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
size_t min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
short ev_events;
short ev_res;
struct event_base *ev_base;
union {
struct {
LIST_ENTRY (event) ev_io_next;
struct timeval ev_timeout;// 当服务于持续性超时event时.这里存储超时间隔.
} ev_io;
struct {
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls;
short *ev_pncalls;
} ev_signal;
} ev_;
struct timeval ev_timeout;// 服务于超时event时,这里存储下个超时时间点.
};
我们之前已经从服务于套接字事件分发的角度介绍过event,event_base
及其各个字段含义.这里我们立足与服务于定时机制介绍相关字段含义.
(1). ev_evcallback
服务于定时机制时,同样借助ev_evcallback
指定事件被分发时的处理信息.
(2). ev_timeout_pos
,event
服务于定时机制时,需加入到event_base
相应结构中.
event_base
为服务于定时的event
准备了两类可选结构:通用结构,堆.
被放置于堆结构时,这里存储在堆内索引.
被放置于通用结构时,由于位于同一通用结构内各个event
基于链表连接.这里用于构建链接信息.
(3). ev_fd
event_base
对指定event
支持其服务于三种类型:信号处理,套接字事件分发,定时机制.
其中,套接字事件分发与定时机制可同时存在于一个event
身上.
某个event
同时服务于套接字事件分发,定时机制时,ev_fd
用于存储套接字描述符.
某个event
独立服务于定时机制时,ev_fd
应设置为-1
.
(4). ev_events
某个event
同时服务于套接字事件分发,定时机制时,ev_events
用于存储感兴趣的套接字事件类型.
某个event
独立服务于定时机制时,ev_events
应设置为0
.
(5). ev_res
若一个服务于定时机制的event
,因为超时而被分发,ev_res
用于存储分发原因.超时对应EV_TIMEOUT
.
(6). ev_base
存储关联到的event_base
对象指针.
(7). ev_
当服务于定时机制下,且event
属于持久性event
.此时ev_io.ev_timeout
用来存储超时间隔.
如何理解持久性event
?
a. 持久性event
:
a.1. 初次添加到event_base
后,从添加时间点或指定时间点开始,直到达到超时间隔期间该event
未被分发时,会被自动分发一次.分发原因将是超时.
a.2. 每次event
被分发即将执行其回调处理前,依据当前时间点结合超时间隔计算下一次超时时间点并再次注册到event_base
.
b. 非持久性event
:
b.1. 初次添加到event_base
后,从添加时间点或指定时间点开始,直到达到超时间隔期间该event
未被分发时,会被自动分发一次.分发原因将是超时.若在此期间因为其他原因得到分发,分发处理前,此event
会自动从event_base
移除.这样,后续不会再被event_base
分发了.
实现上,服务于超时的event
因为超时被分发时,会自动从event_base
移除.在服务于超时的event
即将被回调处理前,判断若是持久性的,会重新计算下个超时时间点并再次添加到event_base
.
作为对比,服务于套接字的event
因为套接字事件被分发时,不会从event_base
移除.但在event
即将被回调处理前,判断若是非持久性的,会将此event
从event_base
移除.
(8). ev_timeout
服务于定时机制时,用于存储此event
将发生超时的超时时间点信息.
3.event_callback
对定时机制的支持
我们从支持定时机制角度分析其各个字段.
struct event_callback
{
TAILQ_ENTRY(event_callback) evcb_active_next;
short evcb_flags;
ev_uint8_t evcb_pri; /* smaller numbers are higher priority */
ev_uint8_t evcb_closure;
/* allows us to adopt for different types of events */
union
{
void (*evcb_callback)(evutil_socket_t, short, void *);
void (*evcb_selfcb)(struct event_callback *, void *);
void (*evcb_evfinalize)(struct event *, void *);
void (*evcb_cbfinalize)(struct event_callback *, void *);
} evcb_cb_union;
void *evcb_arg;
};
(1). evcb_active_next
用于将分发的event_callback
组织在链表结构.
(2). evcb_flags
当此event_callback
依附的event
被插入服务于定时的堆结构或通用结构后,将包含EVLIST_TIMEOUT
.
(3). evcb_pri
特权级
(4). evcb_closure
当此event_callback
依附的event
服务于定时机制时,依据是否是持久的event
.
对持久的event
,evcb_closure
为EV_CLOSURE_EVENT
,相应的evcb_cb_union
为evcb_callback
.
对非持久的event
,evcb_closure
为EV_CLOSURE_EVENT_PERSIST
,相应的evcb_cb_union
为evcb_callback
.
(5). evcb_cb_union
参考上述.
(6). evcb_arg
自定义回调参数.
3.event_base
对定时支持
(1). 提供通用结构,堆结构来容纳服务于定时的event
.
放置在通用结构的一组event
,依赖一个内部的放置在堆结构的event
实现分发处理.
(2). 对放置在通用结构的event
.
其ev_timeout
结构中tv_usec
结构符合如下结构:
高4
比特位为tag
,固定为0x5
.
中间8
比特位用作索引.
低20
位为实际数值.
对持久的超时event
,其用于时间间隔的ev_.ev_io.ev_timeout
也是如此结构.
(3). event_base
的事件循环会在每次io
复用器分发后,依据当前时间点对超时的各个event
执行分发处理.每次事件循环进入io
阻塞等待前,也会保证最大等待时间不会超过当前时间点距离最近一个会超时event
超时时间点的间隔.
4.io
复用器对定时支持
io
复用器在支持timerfd
的情况下,允许利用timefd
而非epoll_wait
的超时参数的方式实现阻塞等待的超时唤醒.仅仅较新的linux
内核支持此特性.
libevent中的信号机制
1.概述
前面我们分析event
,event_callback
,event_base
时,立足于服务于套接字的event
来展开分析的.
libevent
中的event
既可以用于支持对套接字的自动分法,手动分发.也可用于支持定时机制.还可用于支持信号处理.
这一部分立足于支持信号机制,来分析libevent
中event,event_callback,event_base
.
2.服务于信号处理的event
struct event {
struct event_callback ev_evcallback;
/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
size_t min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
short ev_events;
short ev_res; /* result passed to event callback */
struct event_base *ev_base;
union {
/* used for io events */
struct {
LIST_ENTRY (event) ev_io_next;
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} ev_;
struct timeval ev_timeout;
};
我们从信号处理视角分析各个字段.
(1). ev_evcallback
包含回调处理信息.
(2). ev_timeout_pos
服务于定时分发时,存储所在结构内位置信息.
(3). ev_fd
服务于信号处理时,这里存储要处理的信号编号.
(4). ev_events
服务于信号处理时,必然包含EV_SIGNAL
.类似套接字event
,信号处理event
也支持持久,非持久概念.对持久的event
必然包含EV_PERSIST
.
对非持久的服务于信号处理的event
,因为信号产生被分发后,即从event_base
移除.
对持久的服务于信号处理的event
,信号产生被分发后,不会从event_base
移除.除非显式请求.
(5). ev_res
服务于信号处理的event
,因为信号产生而被分发时,这里包含EV_SIGNAL
.
(6). ev_base
指向关联到的event_base
.
(7). ev_
ev_signal
中各个字段含义如下:
a. ev_signal_next
类似套接字event
,服务于同一个信号编号的多个event
,通过链式结构关联在一起.ev_signal_next
用于构成链式结构.
b. ev_ncalls
信号event
被分发时,允许设置一次分发时回调处理执行多次.这里记录回调处理要执行的次数.
c. ev_pncalls
这里安排这个指针的意义在于.
在信号event
的回调处理进行中,让ev_pncalls
指向ev_ncalls
.如果再次期间,另一个用户将此event
从event_base
移除.
移除时会通过ev_pncalls
,设置其指向变量为0
,从而使得当前信号处理回调可以尽快结束.
4.event_base
事件循环对信号处理的支持
struct event_base {
const struct eventop *evsigsel;
struct evsig_info sig;
struct event_signal_map sigmap;
};
我们立足于服务于信号处理的角度,截取event_base
中相关字段并分析其含义.
(1). evsigsel
套接字event
的自动分发依赖io
复用器.信号event
的分发也有自己的复用器来提供支持.
(2). sig
struct evsig_info {
struct event ev_signal;
evutil_socket_t ev_signal_pair[2];
int ev_signal_added;
int ev_n_signals_added;
#ifdef EVENT__HAVE_SIGACTION
struct sigaction **sh_old;
#else
ev_sighandler_t **sh_old;
#endif
int sh_old_max;
};
这个结构来为event_base
实现信号处理提供支持.其各个字段含义如下:
a. ev_signal
,ev_signal_pair
我们需要一个独立的event
来帮助实现信号event
的分发.
在event_base
的初始化阶段,会创建一个pipe
,得到两个套接字.并执行如下初始化:
event_assign(&base->sig.ev_signal, base, base->sig.ev_signal_pair[0], EV_READ | EV_PERSIST, evsig_cb, base);
static void evsig_cb(evutil_socket_t fd, short what, void *arg)
{
static char signals[1024];
ev_ssize_t n;
int i;
int ncaught[NSIG];
struct event_base *base;
base = arg;
memset(&ncaught, 0, sizeof(ncaught));
while (1) {
#ifdef _WIN32
n = recv(fd, signals, sizeof(signals), 0);
#else
n = read(fd, signals, sizeof(signals));// 读取pipe套接字数据
#endif
if (n == -1) {
int err = evutil_socket_geterror(fd);
if (! EVUTIL_ERR_RW_RETRIABLE(err))
event_sock_err(1, fd, "%s: recv", __func__);// 异常
break;// 表示已经读取干净
} else if (n == 0) {
break;// 表示pipe套接字被关闭了
}
// 对读取内容逐个字节分析
for (i = 0; i < n; ++i) {
ev_uint8_t sig = signals[i];
if (sig < NSIG)
ncaught[sig]++;// 每个字节代表一个信号编号
}
}
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
for (i = 0; i < NSIG; ++i) {
// 允许特定信号的event被分发时,分发中执行多次回调处理
if (ncaught[i])
evmap_signal_active_(base, i, ncaught[i]);// 实现信号event分发.
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
}
这样得到的效果时,若我们向使得某个信号编号下的各个event
得到分发处理,我们只需向base->sig.ev_signal_pair[0]
套接字写入信号编号即可.
b. ev_signal_added
非0
时表示依附的event_base
上至少注册了一个服务于信号处理的event
.
c. ev_n_signals_added
记录依附的event_base
上注册的服务于信号处理的event
的数量.
d. sh_old,sh_old_max
当我们向event_base
注册服务于某个信号编号的event
时,会通过服务于信号处理的复用器执行信号处理函数安装.安装时,我们通过sh_old
来记录此信号编号安装前的信号处理信息.这样,当此信号编号的最后一个event
从event_base
移除时,我们需要借助sh_old
中的信号处理信息来恢复.
5.服务于信号处理的复用器
// 专门服务于信号分发的复用器
static const struct eventop evsigops = {
"signal",
NULL,
evsig_add,
evsig_del,
NULL,
NULL,
0, 0, 0
};
当我们最初向event_base
添加某个信号编号的event
时,将执行一次evsig_add
.当我们将某个信号编号的最后一个event
从event_base
移除时,将执行一次evsig_del
.
a. evsig_add
// 代表开始监控某个信号
static int evsig_add(struct event_base *base, evutil_socket_t evsignal, short old, short events, void *p)
{
struct evsig_info *sig = &base->sig;
(void)p;
EVUTIL_ASSERT(evsignal >= 0 && evsignal < NSIG);
EVSIGBASE_LOCK();
// 这意味着至少两个event_base各自在监控信号event
if (evsig_base != base && evsig_base_n_signals_added) {
event_warnx("Added a signal to event base %p with signals "
"already added to event_base %p. Only one can have "
"signals at a time with the %s backend. The base with "
"the most recently added signal or the most recent "
"event_base_loop() call gets preference; do "
"not rely on this behavior in future Libevent versions.",
base, evsig_base, base->evsel->name);
}
// 通过静态结构记录最近负责信号event监控的event_base.
// 及其上监控的信号数目.
evsig_base = base;
evsig_base_n_signals_added = ++sig->ev_n_signals_added;
// 通过pipe的套接字实现分发通知
evsig_base_fd = base->sig.ev_signal_pair[1];
EVSIGBASE_UNLOCK();
event_debug(("%s: %d: changing signal handler", __func__, (int)evsignal));
// 信号处理设置
if (evsig_set_handler_(base, (int)evsignal, evsig_handler) == -1) {
goto err;
}
if (!sig->ev_signal_added) {
if (event_add_nolock_(&sig->ev_signal, NULL, 0))
goto err;
sig->ev_signal_added = 1;
}
return (0);
err:
EVSIGBASE_LOCK();
--evsig_base_n_signals_added;
--sig->ev_n_signals_added;
EVSIGBASE_UNLOCK();
return (-1);
}
上述过程主要为:
为信号编号安装信号处理.evsig_set_handler_
处理中会安装信号处理,同时将此信号编号之前的信息处理保存起来.
这样,后续此信号产生时,将执行一次evsig_handler
.
evsig_handler
中则通过向evsig_base_fd
写入此信号编号内容.由于sig->ev_signal
的作用.这将引发event_base
后续对此信号相关的所有event
执行一次分发.
b. evsig_del
static int evsig_del(struct event_base *base, evutil_socket_t evsignal, short old, short events, void *p)
{
EVUTIL_ASSERT(evsignal >= 0 && evsignal < NSIG);
event_debug(("%s: "EV_SOCK_FMT": restoring signal handler", __func__, EV_SOCK_ARG(evsignal)));
EVSIGBASE_LOCK();
--evsig_base_n_signals_added;
--base->sig.ev_n_signals_added;
EVSIGBASE_UNLOCK();
return (evsig_restore_handler_(base, (int)evsignal));
}
当某个信号编号的最后一个event
从event_base
移除时,会执行一次evsig_del
.
evsig_del
会更新变量,并借助evsig_restore_handler_
恢复此信号编号之前的信号处理.
6.最佳实践
因为linux
多线程应用中,发给应用的信号只会传递给一个线程.激发一次信号处理.
所以相应的借助event_base
的event
实现信号处理时,我们应该:
(1). 将服务于信号处理的event
集中注册到一个event_base
.
libevent
中相应的设置了以下局部静态变量来存储此event_base
及关联信息.
#ifndef EVENT__DISABLE_THREAD_SUPPORT
static void *evsig_base_lock = NULL;
#endif
static struct event_base *evsig_base = NULL;
static int evsig_base_n_signals_added = 0;
static evutil_socket_t evsig_base_fd = -1;
libevent中通信对象的流量控制
1.流量控制初始化
流控初始化时会设置好时间点,最大单次允许量,速率,间隔单位.
2.读,写操作前更新并获取允许流量
以读为例,每次读时,依据上次余量,距离上次读取间隔,速率,最大单次允许量可以计算出本次操作允许量.从而控制本次操作时,允许读取的量,进而达到流控效果.
3.读,写操作后再次更新允许流量
以读为例,每次读后,依据读取量,上次余量重新计算余量.在余量降低到0
时候,会通过移除可读事件来临时禁止后续读取操作.并同时启动一个定时event
,在定时回调中重新计算余量.在余量大于0
时,重新恢复可读事件,并取消定时event
.