一、rector网络模型
对高并发编程,网络连接上的消息处理,可以分为两个阶段:等待消息准备好、消息处理。当使用默认的阻塞套接字时,往往是把这两个阶段合而为一,这样操作套接字的代码所在的线程就得睡眠来等待消息准备好,这导致了高并发下线程会频繁的睡眠、唤醒,从而影响了 CPU 的使用效率。
高并发编程方法当然就是把两个阶段分开处理。即,等待消息准备好的代码段,与处理消息的代码段是分离的。当然,这也要求套接字必须是非阻塞的,否则,处理消息的代码段很容易导致条件不满足时,所在线程又进入了睡眠等待阶段。那么问题来了,等待消息准备好这个阶段怎么实现?它毕竟还是等待,这意味着线程还是要睡眠的!解决办法就是,线程主动查询,或者让 1 个线程为所有连接而等待!这就是 IO 多路复用了。多路复用就是处理等待消息准备好这件事的,但它可以同时处理多个连接!它也可能“等待”,所以它也会导致线程睡眠,然而这不要紧,因为它一对多、它可以监控所有连接。这样,当我们的线程被唤醒执行时,就一定是有一些连接准备好被我们的代码执行了。
作为一个高性能服务器程序通常需要考虑处理三类事件: I/O 事件,定时事件及信号。两种高效的事件处理模型:Reactor 和 Proactor。
首先来回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程序,程序继续处理。Reactor 释义“反应堆”,是一种事件驱动机制。和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的时间发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数”
Reactor 模式是处理并发 I/O 比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的 I/O 事件注册到一个中心 I/O 多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
Reactor 模型有三个重要的组件:
- 多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
- 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中。
- 事件处理器:负责处理特定事件的处理函数
具体流程如下:
- 注册读就绪事件和相应的事件处理器;
- 事件分离器等待事件;
- 事件到来,激活分离器,分离器调用事件对应的处理器;
- 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权
Reactor 模式是编写高性能网络服务器的必备技术之一,它具有如下的优点:
- 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的;
- 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销; 可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
- 可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;Reactor 模型开发效率上比起直接使用 IO 复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源,但也有附带优点,即每个事件处理中很多时候可以不考虑共享资源的互斥访问。可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力,当程序需要使用多核资源时,Reactor 模型就会悲剧, 为什么呢?
如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开启多个反应堆,每个反应堆对应一颗 CPU 核心,这些反应堆上跑的请求互不相关,这是完全可以利用多核的。例如 Nginx 这样的 http 静态服务器。
二、reactor为什么搭配非阻塞IO
(1)多线程环境下时,是可以将listenfd放到多个线程的多个epoll中去处理的,那如果有一个连接接入的时候,会给全部线程发送一个信号,也就是所有的线程的epoll都可能返回(惊群),但是该事件只会在一个线程中被处理,其他线程再去accept的时候,连接已经被其他线程处理过,也就不存在了,如果socket没有被设置成nonblocking,此accept将阻塞当前线程
(2)当为边缘触发模式下(一次只会触发一次读事件),读事件触发时,read需要循环把读缓存读空,不然会到下次数据到来时才会触发读事件。比如有500字节数据,单次read 100字节,需要循环5次全部读完,那第6次再去读的时候就会阻塞住当前线程
(3)当reactor使用的是select实现时,select存在一个bug:当某个socket接收缓冲区有新数据到达,然后select报告这个socket描述符可读,但随后,协议栈检查到这个新分节检验和错误时,会丢弃这个分节,这时再调用read则无数据可读,如果socket没有被设置成nonblocking,此read将阻塞当前线程
额外补充一点:在select中关闭socket时需要注意,关闭socket需要及时更新监测socket集合。当一个socket被关闭之后,我们需要将其对应的文件描述符从select的监测集合中删除。否则,会导致select函数在下一次调用时伪唤醒,也就是说,调用后立即返回,并且一直返回“有事件发生”,而实际上这个已关闭的文件描述符并没有事件可处理。
三、代码实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/time.h>
#include <sys/sendfile.h>
#define BUFFER_LENGTH 1024
#define MAX_EPOLL_EVENTS 1024
#define RESOURCE_LENGTH 1024
#define SERVER_PORT 8888
#define PORT_COUNT 100
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#if 0
struct epoll_event {
__uint32_t events; // epoll 监控的事件类型
epoll_data_t data; // 用户数据
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
typedef int NCALLBACK(int ,int, void*);
1、在epoll监听到可读可写事件后,回调recv_cb、send_cd时的参数传递过程如下:
nty_event_add: struct epoll_event.data.ptr = struct ntyevent.arg
nty_event_set: struct ntyevent.arg = struct ntyreactor
2、所以在epoll_wait返回时,能通过events[i].data.ptr获取到struct ntyreactor,然后传入recv_cb、send_cd回调函数进行处理
#endif
//socketfd子项
struct ntyevent
{
int fd;
int events; //EPOLLOUT、EPOLLIN事件
void *arg; //指向reactor
int (*callback)(int fd, int events, void *arg);
int added; //是否添加过epoll
char rbuffer[BUFFER_LENGTH];
char wbuffer[BUFFER_LENGTH];
int rlength;
int wlength;
// int method;
// char resource[RESOURCE_LENGTH];
};
//事件块,每个块中存放 ITEM_LENGTH 个 sock_item
struct eventblock
{
struct eventblock *next;//指向下一个 eventblock
struct ntyevent *events;//数组(由于fd是顺序增加的,可直接将fd与数组的下标一一对应)
};
//epoll的fd,与eventblock事件块管理
struct ntyreactor
{
int epfd;
int blkcnt;
struct eventblock *evblks;
};
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg);
int nty_event_add(int epfd, int events, struct ntyevent *ev);
int nty_event_del(int epfd, struct ntyevent *ev);
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd);
int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);
int accept_cb(int fd, int events, void *arg);
struct timeval tv_begin; //listen时赋初值,用于计算连接所需时长
//recv时的回调函数
int recv_cb(int fd, int events, void *arg)
{
struct ntyreactor *reactor = (struct ntyreactor*)arg;
struct ntyevent *ev = ntyreactor_idx(reactor, fd);
if (ev == NULL)
return -1;
//接收数据
int len = recv(fd, ev->rbuffer, BUFFER_LENGTH, 0);
nty_event_del(reactor->epfd, ev); //先将fd从epoll中删除!!!
if (len > 0)
{
ev->rlength = len;
ev->rbuffer[len] = '\0';
printf("recv [%d]:%s\n", fd, ev->rbuffer);
//组织应答包
ev->wlength = sprintf(ev->wbuffer, "%s", "this is a server!");
//将fd添加进epoll中,设置为EPOLLOUT监听,发送数据
nty_event_set(ev, fd, send_cb, reactor);
nty_event_add(reactor->epfd, EPOLLOUT, ev);
}
else if (len == 0) //客户端主动断开连接
{
printf("recv_cb --> disconnect\n");
nty_event_del(reactor->epfd, ev); //将fd从epoll中删除
close(ev->fd);
}
else
{
if (errno == EAGAIN && errno == EWOULDBLOCK) {
} else if (errno == ECONNRESET){
nty_event_del(reactor->epfd, ev);
close(ev->fd);
}
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
return len;
}
//send时的回调函数
int send_cb(int fd, int events, void *arg)
{
struct ntyreactor *reactor = (struct ntyreactor*)arg;
struct ntyevent *ev = ntyreactor_idx(reactor, fd);
if (ev == NULL)
return -1;
//没有数据要发生
if(ev->wlength == 0)
{
//将fd添加进epoll中,设置为EPOLLIN监听
nty_event_del(reactor->epfd, ev);
nty_event_set(ev, fd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, ev);
return 0;
}
int len = send(fd, ev->wbuffer, ev->wlength, 0);
if (len > 0)
{
//将fd添加进epoll中,设置为EPOLLIN监听
nty_event_del(reactor->epfd, ev);
nty_event_set(ev, fd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, ev);
}
else
{
nty_event_del(reactor->epfd, ev);
close(ev->fd);
printf("send[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
return len;
}
//新client连接时的accept处理回调函数
int accept_cb(int fd, int events, void *arg)
{
static int curfds = 0;
struct ntyreactor *reactor = (struct ntyreactor*)arg;
if (reactor == NULL)
return -1;
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int clientfd;
//accept连接新的client
if ((clientfd = accept(fd, (struct sockaddr*)&client_addr, &len)) == -1)
{
if (errno != EAGAIN && errno != EINTR) {
}
printf("accept: %s\n", strerror(errno));
return -1;
}
//将clientfd设置成非阻塞
int flag = 0;
if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {
printf("%s: fcntl nonblocking failed, %d\n", __func__, MAX_EPOLL_EVENTS);
return -1;
}
//根据sockfd从reactor中 找到对应的ntyevent
struct ntyevent *event = ntyreactor_idx(reactor, clientfd);
if (event == NULL)
return -1;
//将clientfd对应的ntyevent添加到epoll中
nty_event_set(event, clientfd, recv_cb, reactor);
nty_event_add(reactor->epfd, EPOLLIN, event);
//计算连接所花的时间
if (curfds++ % 1000 == 999)
{
struct timeval tv_cur;
memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
gettimeofday(&tv_begin, NULL);
int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("connections: %d, sockfd:%d, time_used:%d\n", curfds, clientfd, time_used);
}
printf("new connect [%s:%d], pos[%d]\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), clientfd);
return 0;
}
//设置ntyevent,对各参数赋值
//arg:指向reactor
void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg)
{
ev->fd = fd;
ev->callback = callback;
ev->events = 0;
ev->arg = arg;
//ev->last_active = time(NULL);
return ;
}
//将fd添加进epoll中
int nty_event_add(int epfd, int events, struct ntyevent *ev)
{
struct epoll_event ep_ev = {0, {0}};
ev->events = events;
ep_ev.data.ptr = ev; //指向fd对应的ntyevent
ep_ev.events = events;
int op;
if (ev->added == 1) {
op = EPOLL_CTL_MOD;
} else {
op = EPOLL_CTL_ADD;
ev->added = 1; //fd第一次添加完后,之后都是 EPOLL_CTL_MOD 操作
}
if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {
printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
return -1;
}
return 0;
}
//将fd从epoll中删除
int nty_event_del(int epfd, struct ntyevent *ev)
{
struct epoll_event ep_ev = {0, {0}};
//如果没有添加过,直接返回
if (ev->added != 1) {
return -1;
}
ep_ev.data.ptr = ev; //指向fd对应的ntyevent
ev->added = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);
return 0;
}
//添加新块,扩容
int ntyreactor_alloc(struct ntyreactor *reactor)
{
if (reactor == NULL) return -1;
if (reactor->evblks == NULL) return -1;
struct eventblock *blk = reactor->evblks;
//定位到最后一个next为NULL的节点
while (blk->next != NULL) {
blk = blk->next;
}
//申请事件块中的数组空间
struct ntyevent* evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (evs == NULL) {
printf("ntyreactor_alloc ntyevent failed\n");
return -2;
}
memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
//申请事件块节点空间
struct eventblock *block = malloc(sizeof(struct eventblock));
if (block == NULL) {
printf("ntyreactor_alloc eventblock failed\n");
return -3;
}
//事件块赋值
block->events = evs;
block->next = NULL;
//添加到reactor
blk->next = block;
reactor->blkcnt ++;
return 0;
}
//根据sockfd从reactor中 找到对应的ntyevent
struct ntyevent *ntyreactor_idx(struct ntyreactor *reactor, int sockfd)
{
int i = 0;
if (reactor == NULL) return NULL;
if (reactor->evblks == NULL) return NULL;
//计算sockfd对应块的索引(从0开始),如果超过了现有管理的块数后,会添加新的块来扩容
//sockfd 0-1023在1块、sockfd 1024-2047在2块、...
int blkidx = sockfd / MAX_EPOLL_EVENTS;
while (blkidx >= reactor->blkcnt) {
ntyreactor_alloc(reactor);
}
//定位到sockfd所在的块
struct eventblock *blk = reactor->evblks;
while (i++ != blkidx && blk != NULL) {
blk = blk->next;
}
//返回ntyevent,sockfd % ITEM_LENGTH 就是 sockfd在数组中的索引
return &blk->events[sockfd % MAX_EPOLL_EVENTS];
}
//初始化reactor:创建epoll、创建1个事件管理块
int ntyreactor_init(struct ntyreactor *reactor)
{
if (reactor == NULL) return -1;
memset(reactor, 0, sizeof(struct ntyreactor));
//1、创建epoll
reactor->epfd = epoll_create(1);
if (reactor->epfd <= 0) {
printf("create epfd in %s err %s\n", __func__, strerror(errno));
return -2;
}
//2、创建1个事件管理块
struct ntyevent* evs = (struct ntyevent*)malloc((MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
if (evs == NULL) {
printf("create epfd in %s err %s\n", __func__, strerror(errno));
close(reactor->epfd);
return -3;
}
memset(evs, 0, (MAX_EPOLL_EVENTS) * sizeof(struct ntyevent));
struct eventblock *block = malloc(sizeof(struct eventblock));
if (block == NULL) {
free(evs);
close(reactor->epfd);
return -3;
}
block->events = evs;
block->next = NULL;
reactor->evblks = block;
reactor->blkcnt = 1;
return 0;
}
//销毁rector
int ntyreactor_destory(struct ntyreactor *reactor)
{
close(reactor->epfd);
struct eventblock *blk = reactor->evblks;
struct eventblock *blk_next;
while (blk != NULL) {
blk_next = blk->next;
free(blk->events);
free(blk);
blk = blk_next;
}
return 0;
}
//往reactor中添加listen事件
int ntyreactor_addlistener(struct ntyreactor *reactor, int sockfd, NCALLBACK *acceptor)
{
if (reactor == NULL) return -1;
if (reactor->evblks == NULL) return -1;
struct ntyevent *event = ntyreactor_idx(reactor, sockfd);
if (event == NULL) return -1;
nty_event_set(event, sockfd, acceptor, reactor);
nty_event_add(reactor->epfd, EPOLLIN, event);
return 0;
}
//监听epoll
int ntyreactor_run(struct ntyreactor *reactor)
{
if (reactor == NULL) return -1;
if (reactor->epfd < 0) return -1;
if (reactor->evblks == NULL) return -1;
struct epoll_event events[MAX_EPOLL_EVENTS+1];
int checkpos = 0, i;
while (1)
{
int nready = epoll_wait(reactor->epfd, events, MAX_EPOLL_EVENTS, 1000);
if (nready < 0) {
printf("epoll_wait error, exit\n");
continue;
}
for (i = 0;i < nready;i ++)
{
//data.ptr指向fd对应的ntyevent
struct ntyevent *ev = (struct ntyevent*)events[i].data.ptr;
//ntyevent->arg指向reactor
if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
ev->callback(ev->fd, events[i].events, ev->arg);
}
}
}
}
//初始化server,完成 socket--->bind--->listen
//ip:INADDR_ANY
//port:自定义
int init_sock(short port)
{
int fd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(fd, F_SETFL, O_NONBLOCK);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port);
bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (listen(fd, 20) < 0) {
printf("listen failed : %s\n", strerror(errno));
return -1;
}
printf("listen server port : %d\n", port);
gettimeofday(&tv_begin, NULL);
return fd;
}
int main(int argc, char *argv[])
{
int i = 0;
int sockfds[PORT_COUNT] = {0};
struct ntyreactor *reactor = NULL;
//创建并初始化reactor
reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
ntyreactor_init(reactor);
//初始化server,开启监听
unsigned short port = SERVER_PORT;
if (argc == 2) {
port = atoi(argv[1]);
}
for (i = 0;i < PORT_COUNT;i ++) {
sockfds[i] = init_sock(port+i);
ntyreactor_addlistener(reactor, sockfds[i], accept_cb);
}
//监听io事件
ntyreactor_run(reactor);
//销毁rector
ntyreactor_destory(reactor);
for (i = 0;i < PORT_COUNT;i ++) {
close(sockfds[i]);
}
free(reactor);
return 0;
}