定时器
- 引言
- 定时器的基本逻辑
- 定时器
- 信号事件
引言
传统的TCP socket模型是基于套接字(文件描述符)来传递消息的,但是文件描述符资是有限的,如果大量的连接占用了大量的文件描述符,那么新来的请求可能就无法申请到文件描述符,另一方面,如果连接数过多,也会导致服务器的负载过大.
如果我们想开发一个高性能的服务器,就需要为每一个连接创建一个定时器用来检测这个连接是不是一个活跃的连接,如果一个连接长时间的没有传送消息,我们就把它定义为非活跃,然后释放文件描述符资源.
定时器的基本逻辑
1 .当新的连接到来的时候(服务端调用accept函数获取已连接的socket时),创建定时器,定时器将连接资源指针,该连接资源所对应的超时时间,以及释放连接的回调函数指针封装在一起,并把定时器添加到一个双向链表中,方便后续对所有的定时器统一管理
2 .使用epoll统一事件源,用epoll来检测连接事件,读写事件,信号事件,异常事件等
3 .当epoll到信号事件时,将超时标记设置为true, 超时标记一旦为true后,对定时器容器(存放定时器的双向链表)遍历,将当前的时间与每一个定时器的超时时间对比,如果当前时间 > 定时器的超时时间,那么说明该连接已经是非活跃的连接了,所以调用函数直接对该连接进行释放,并将该连接对应的定时器移出链表. 在遍历完成之后,再将超时标记重置为false.
4 .如果epoll检测到异常事件,直接使用函数将该连接释放,并将定时器移除链表.
5 .如果epoll检测的是读/写事件,在一边处理读写事件的同时,更改读/写事件所对应的连接所对应的定时器的超时时间,并且在定时器容器中,将该定时器向后移动.
定时器
连接资源类
struct client_data
{
//客户端socket地址
sockaddr_in address;
//socket文件描述符
int sockfd;
//定时器
util_timer* timer;
};
连接资源中封装有定时器的指针,目的是当监测到异常事件或者读写事件时,直接可以通过该事件所在的文件描述符找到对应的定时器,然后对定时器进行相应的操作.
定时器类
class util_timer
{
public: util_timer() : prev( NULL ), next( NULL ){}
public:
//超时时间
time_t expire;
//回调函数
void (*cb_func)( client_data* );
//连接资源
client_data* user_data;
//前向定时器
util_timer* prev;
//后继定时器
util_timer* next;
};
释放连接的函数
void cb_func(client_data *user_data)
{
//删除非活动连接在socket上的注册事件
epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
assert(user_data);
//关闭文件描述符
close(user_data->sockfd);
//减少连接数
http_conn::m_user_count--;
}
释放一个连接分三步:
- 从epoll上删掉
- close函数关闭文件描述符
- 计数器 - 1
定时器容器类
//定时器容器类
class sort_timer_lst
{
public:
sort_timer_lst() : head( NULL ), tail( NULL ) {}
//常规销毁链表
~sort_timer_lst()
{
util_timer* tmp = head;
while( tmp )
{
head = tmp->next;
delete tmp;
tmp = head;
}
}
//添加定时器,内部调用私有成员add_timer
void add_timer( util_timer* timer )
{
if( !timer )
{
return;
}
if( !head )
{
head = tail = timer;
return;
}
//如果新的定时器超时时间小于当前头部结点
//直接将当前定时器结点作为头部结点
if( timer->expire < head->expire )
{
timer->next = head;
head->prev = timer;
head = timer;
return;
}
//否则调用私有成员,调整内部结点
add_timer( timer, head );
}
//调整定时器,任务发生变化时,调整定时器在链表中的位置
void adjust_timer( util_timer* timer )
{
if( !timer )
{
return;
}
util_timer* tmp = timer->next;
//被调整的定时器在链表尾部
//定时器超时值仍然小于下一个定时器超时值,不调整
if( !tmp || ( timer->expire < tmp->expire ) )
{
return;
}
//被调整定时器是链表头结点,将定时器取出,重新插入
if( timer == head )
{
head = head->next;
head->prev = NULL;
timer->next = NULL;
add_timer( timer, head );
}
//被调整定时器在内部,将定时器取出,重新插入
else
{
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
add_timer( timer, timer->next );
}
}
//删除定时器
void del_timer( util_timer* timer )
{
if( !timer )
{
return;
}
//链表中只有一个定时器,需要删除该定时器
if( ( timer == head ) && ( timer == tail ) )
{
delete timer;
head = NULL;
tail = NULL;
return;
}
//被删除的定时器为头结点
if( timer == head )
{
head = head->next;
head->prev = NULL;
delete timer;
return;
}
//被删除的定时器为尾结点
if( timer == tail )
{
tail = tail->prev;
tail->next = NULL;
delete timer;
return;
}
//被删除的定时器在链表内部,常规链表结点删除
timer->prev->next = timer->next;
timer->next->prev = timer->prev;
delete timer;
}
private:
//私有成员,被公有成员add_timer和adjust_time调用
//主要用于调整链表内部结点
void add_timer( util_timer* timer, util_timer* lst_head )
{
util_timer* prev = lst_head;
util_timer* tmp = prev->next;
//遍历当前结点之后的链表,按照超时时间找到目标定时器对应的位置,常规双向链表插入操作
while( tmp )
{
if( timer->expire < tmp->expire )
{
prev->next = timer;
timer->next = tmp;
tmp->prev = timer;
timer->prev = prev;
break;
}
prev = tmp;
tmp = tmp->next;
}
//遍历完发现,目标定时器需要放到尾结点处
if( !tmp )
{
prev->next = timer;
timer->prev = prev;
timer->next = NULL;
tail = timer;
}
}
private:
//头尾结点
util_timer* head;
util_timer* tail;
};
定时器容器类主要有以下几个基本功能:
- 添加一个定时器元素
- 删除一个定时器元素
- 当定时器的超时时间增加时,改变其在容器中的位置
- 销毁整个定时器
定时器容器类会按照超时时间升序的方式容纳定时器,目的就是方便统一管理所有的定时器,当触发信号事件时,超时标记设置为true,遍历整个定时器容器
核心代码:
1//定时处理任务,重新定时以不断触发SIGALRM信号
2void timer_handler()
3{
4 timer_lst.tick();
5 alarm(TIMESLOT);
6}
7
8//创建定时器容器链表
9static sort_timer_lst timer_lst;
10
11//创建连接资源数组
12client_data *users_timer = new client_data[MAX_FD];
13
14//超时默认为False
15bool timeout = false;
16
17//alarm定时触发SIGALRM信号
18alarm(TIMESLOT);
19
20while (!stop_server)
21{
22 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
23 if (number < 0 && errno != EINTR)
24 {
25 break;
26 }
27
28 for (int i = 0; i < number; i++)
29 {
30 int sockfd = events[i].data.fd;
31
32 //处理新到的客户连接
33 if (sockfd == listenfd)
34 {
35 //初始化客户端连接地址
36 struct sockaddr_in client_address;
37 socklen_t client_addrlength = sizeof(client_address);
38
39 //该连接分配的文件描述符
40 int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
41
42 //初始化该连接对应的连接资源
43 users_timer[connfd].address = client_address;
44 users_timer[connfd].sockfd = connfd;
45
46 //创建定时器临时变量
47 util_timer *timer = new util_timer;
48 //设置定时器对应的连接资源
49 timer->user_data = &users_timer[connfd];
50 //设置回调函数
51 timer->cb_func = cb_func;
52
53 time_t cur = time(NULL);
54 //设置绝对超时时间
55 timer->expire = cur + 3 * TIMESLOT;
56 //创建该连接对应的定时器,初始化为前述临时变量
57 users_timer[connfd].timer = timer;
58 //将该定时器添加到链表中
59 timer_lst.add_timer(timer);
60 }
61 //处理异常事件
62 else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
63 {
64 //服务器端关闭连接,移除对应的定时器
65 cb_func(&users_timer[sockfd]);
66
67 util_timer *timer = users_timer[sockfd].timer;
68 if (timer)
69 {
70 timer_lst.del_timer(timer);
71 }
72 }
73
74 //处理定时器信号
75 else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN))
76 {
77 //接收到SIGALRM信号,timeout设置为True
78 }
79
80 //处理客户连接上接收到的数据
81 else if (events[i].events & EPOLLIN)
82 {
83 //创建定时器临时变量,将该连接对应的定时器取出来
84 util_timer *timer = users_timer[sockfd].timer;
85 if (users[sockfd].read_once())
86 {
87 //若监测到读事件,将该事件放入请求队列
88 pool->append(users + sockfd);
89
90 //若有数据传输,则将定时器往后延迟3个单位
91 //对其在链表上的位置进行调整
92 if (timer)
93 {
94 time_t cur = time(NULL);
95 timer->expire = cur + 3 * TIMESLOT;
96 timer_lst.adjust_timer(timer);
97 }
98 }
99 else
100 {
101 //服务器端关闭连接,移除对应的定时器
102 cb_func(&users_timer[sockfd]);
103 if (timer)
104 {
105 timer_lst.del_timer(timer);
106 }
107 }
108 }
109 else if (events[i].events & EPOLLOUT)
110 {
111 util_timer *timer = users_timer[sockfd].timer;
112 if (users[sockfd].write())
113 {
114 //若有数据传输,则将定时器往后延迟3个单位
115 //并对新的定时器在链表上的位置进行调整
116 if (timer)
117 {
118 time_t cur = time(NULL);
119 timer->expire = cur + 3 * TIMESLOT;
120 timer_lst.adjust_timer(timer);
121 }
122 }
123 else
124 {
125 //服务器端关闭连接,移除对应的定时器
126 cb_func(&users_timer[sockfd]);
127 if (timer)
128 {
129 timer_lst.del_timer(timer);
130 }
131 }
132 }
133 }
134 //处理定时器为非必须事件,收到信号并不是立马处理
135 //完成读写事件后,再进行处理
136 if (timeout)
137 {
138 timer_handler();
139 timeout = false;
140 }
141}
这段代码将 超时标记timeout是怎么被设计为true, 又是怎么周期性触发的 一部分暂时省略了,后续讲解补充了这部分
信号事件
前面说到,当触发信号事件之后,超时标记设置为true,然后对整个定时器容器进行遍历,然后进行相应的操作.
如何设置信号事件?并且让它能够周期性的触发?
先介绍几个函数:
#include <unistd.h>;
unsigned int alarm(unsigned int seconds);
alarm函数: alarm系统调用中的函数,它相当于一个信号闹钟,调用alarm函数后,在seconds 秒后会触发一次 alarm信号 .
当触发信号之后,内核会代替进程先接受这个信号,将信号放入内核的信号队列中,然后给通知进程一个【中断】,进程就会从用户态陷入到内核态进行信号检测,当检测到信号之后,进程会由内核态返回到用户态执行【信号处理程序】,当信号处理程序执行完毕之后,会再次返回到内核态检测信号重复上述过程直到没有信号了,然后会返回到用户态并且恢复内核栈的内容和指令寄存器的位置,让进程回复到中断之前的位置继续执行.
图片来自:两猿社
也就是说,当触发信号之后,进程会转而进入信号处理程序之中,那么在C语言中,有一个函数可以修改信号的信号处理程序
#include <signal.h>
int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
signum表示操作的信号。
act表示对信号设置新的处理方式。
oldact表示信号原来的处理方式。
返回值,0 表示成功,-1 表示有错误发生。、
struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void);
}
sa_handler是一个函数指针,指向信号处理函数
sa_sigaction同样是信号处理函数,有三个参数,可以获得关于信号更详细的信息
sa_mask用来指定在信号处理函数执行期间需要被屏蔽的信号
sa_flags用于指定信号处理的行为
SA_RESTART,使被信号打断的系统调用自动重新发起
SA_NOCLDSTOP,使父进程在它的子进程暂停或继续运行时不会收到 SIGCHLD 信号
SA_NOCLDWAIT,使父进程在它的子进程退出时不会收到 SIGCHLD 信号,这时子进程如果退出也不会成为僵尸进程
SA_NODEFER,使对信号的屏蔽无效,即在信号处理函数执行期间仍能发出这个信号
SA_RESETHAND,信号处理之后重新设置为默认的处理方式
SA_SIGINFO,使用 sa_sigaction 成员而不是 sa_handler 作为信号处理函数
sa_restorer一般不使用
我们可以通过sigation函数改变信号触发时的信号处理程序,当信号触发时转而去执行我们自定义的信号处理函数,然后通过在信号处理函数中操作…,让信号的触发变成信号事件,让其可以被epoll检测到从而统一事件源.
我们可以提前定义一个socket管道
然后在信号处理函数中,通过往管道的写端写入信号值,并把管道的读端注册到epoll上,那么当信号触发之后,epoll就可以检测到管道的读端有信号值传递过来,如果信号值是终止信号,停止所有操作,服务器关闭,如果是超时信号,那么就把超市标记设置为true
信号处理函数
//信号处理函数
void sig_handler(int sig)
{
//为保证函数的可重入性,保留原来的errno
//可重入性表示中断后再次进入该函数,环境变量与之前相同,不会丢失数据
int save_errno = errno;
int msg = sig;
//将信号值从管道写端写入,传输字符类型,而非整型
send(pipefd[1], (char *)&msg, 1, 0);
//将原来的errno赋值为当前的errno
errno = save_errno;
}
调用sigation函数的信号设置函数
void addsig(int sig, void(handler)(int), bool restart = true)
{
//创建sigaction结构体变量
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
//信号处理函数中仅仅发送信号值,不做对应逻辑处理
sa.sa_handler = sig_handler;
if (restart)
sa.sa_flags |= SA_RESTART;
//将所有信号添加到信号集中
sigfillset(&sa.sa_mask);
//执行sigaction函数
assert(sigaction(sig, &sa, NULL) != -1);
}
核心代码:
//创建管道套接字
2ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
3assert(ret != -1);
4
5
6setnonblocking(pipefd[1]);
7
8//设置管道读端为ET非阻塞
9addfd(epollfd, pipefd[0], false);
10
11//传递给主循环的信号值,这里只关注SIGALRM和SIGTERM
12addsig(SIGALRM, sig_handler, false);
13addsig(SIGTERM, sig_handler, false);
14
15//循环条件
16bool stop_server = false;
17
18//超时标志
19bool timeout = false;
20
21//每隔TIMESLOT时间触发SIGALRM信号
22alarm(TIMESLOT);
23
24while (!stop_server)
25{
26 //监测发生事件的文件描述符
27 int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
28 if (number < 0 && errno != EINTR)
29 {
30 break;
31 }
32
33 //轮询文件描述符
34 for (int i = 0; i < number; i++)
35 {
36 int sockfd = events[i].data.fd;
37
38 //管道读端对应文件描述符发生读事件
39 if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN))
40 {
41 int sig;
42 char signals[1024];
43
44 //从管道读端读出信号值,成功返回字节数,失败返回-1
45 //正常情况下,这里的ret返回值总是1,只有14和15两个ASCII码对应的字符
46 ret = recv(pipefd[0], signals, sizeof(signals), 0);
47 if (ret == -1)
48 {
49 // handle the error
50 continue;
51 }
52 else if (ret == 0)
53 {
54 continue;
55 }
56 else
57 {
58 //处理信号值对应的逻辑
59 for (int i = 0; i < ret; ++i)
60 {
61 //这里面明明是字符
62 switch (signals[i])
63 {
64 //这里是整型
65 case SIGALRM:
66 {
67 timeout = true;
68 break;
69 }
70 case SIGTERM:
71 {
72 stop_server = true;
73 }
74 }
75 }
76 }
77 }
78 }
79}