文章目录
- 基于reactor模式的tcp服务器
- 什么是reactor模式?
- 实现步骤
- 修改recv_cb逻辑变成web服务器
- web服务器性能测试(wrk工具的使用)
基于reactor模式的tcp服务器
- 本文基于上篇的简易tcp通信服务器基础 上进行封装,写出使用epoll的io模型和reactor模式的服务器代码。
什么是reactor模式?
- reactor模式是非堵塞、同步事件触发的处理模式,将监听客户端连接和业务处理分离,当有读写事件发生后,就执行对应事件回调函数,或者放入任务队列里,交给工作进程或者线程池去执行业务逻辑。
实现步骤
-
修改状态机判断,将判断IO模式改为判断事件模式,也就是在
epoll_wait
函数返回后的for循环判断里的状态机判断改为只判断是读事件和写事件,因为sockfd也是读事件直接归到读事件里即可,然后执行对应事件的回调函数即可。- 代码如下:
//这里connlist是个全局变量,为了存储每个fd的数据的一个结构体数组,结构如下: //typedef int (*RCALLBACK)(int fd); /*struct conn_item { int fd; char r_buffer[BUFFERSIZE]; char w_buffer[BUFFERSIZE]; int r_idx; int w_idx; union {//使用联合的方式区分是普通fd还是scokfd,对应不同的回调函数。 CALL_BACK recv_callback; CALL_BACK accept_callback; } recv_t; CALL_BACK send_callback; };*/ int ret_code = epoll_wait(epoll_fd, epoll_events, 1024, -1); for (int i = 0; i < ret_code; ++i) { int connt_fd = epoll_events[i].data.fd; if (epoll_events[i].events & EPOLLIN) { int count = connlist[connt_fd].recv_t.recv_callback(connt_fd); printf("recv count: %d <-- buffer: %s\n", count, connlist[connt_fd].r_buffer); } else if (epoll_events[i].events & EPOLLOUT){ int count = connlist[connt_fd].send_callback(connt_fd); printf("send count: %d <-- buffer: %s\n", count, connlist[connt_fd].w_buffer); } }
- 代码如下:
-
将
epoll_ctl
封装抽离,每次需要更改或者添加fd状态时直接调用接口即可。- 代码如下:
int sent_event(int fd, int event, int flag) { struct epoll_event ev; ev.events = event; ev.data.fd = fd; if (flag) { epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev); } else { epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &ev); } }
- 代码如下:
-
分别将io处理逻辑封装到函数里,参数只需要提供fd即可,需要写三个函数
recv_cb、accept_cb、send_cb
,分别对应读数据、接收新的连接、写数据。- recv_cb:
-
函数功能就是使用recv接口里的读数据逻辑封装,但是现在是将数据读到全局的conn_list里的r_buffer,然后将r_buffer的数据拷贝到w_buffer里,方便写事件拿到数据,所以在读完数据后还需要执行sentevent去注册写事件。
-
代码如下:
//receive char* buffer = connlist[fd].r_buffer; int idx = connlist[fd].r_idx; int recv_len = recv(fd, buffer + idx, BUFFERSIZE - idx, 0); if (recv_len == -1){ perror("recv error"); close(fd); } else if (recv_len == 0) { //printf("disconnect\n"); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL); close(fd); return -1; } connlist[fd].r_idx += recv_len; #if 1 memcpy(connlist[fd].w_buffer, connlist[fd].r_buffer, connlist[fd].r_idx); //memset(connlist[fd].r_buffer, 0, BUFFERSIZE); connlist[fd].w_idx = connlist[fd].r_idx; //connlist[fd].r_idx -= connlist[fd].r_idx; #else http_response(&connlist[fd]); #endif //注册写事件 sent_event(fd, EPOLLOUT, 0); return recv_len; }
-
- accept_cb:
- 就是将之前状态机里的accept新连接封装到这里,并且添加到全局list里,代码如下:
int accept_cb(int fd) { struct sockaddr_in client_addr; int len = sizeof(struct sockaddr); int client_fd = accept(fd, (struct sockaddr*)&client_addr, &len); sent_event(client_fd, EPOLLIN, 1);//为新的fd注册epoll读事件 // 向全局connlist添加新的连接 connlist[client_fd].fd = client_fd; memset(connlist[client_fd].r_buffer, 0, sizeof(BUFFERSIZE)); connlist[client_fd].r_idx = 0; memset(connlist[client_fd].w_buffer, 0, sizeof(BUFFERSIZE)); connlist[client_fd].w_idx = 0; connlist[client_fd].recv_t.recv_callback = recv_cb;//注册回调函数 connlist[client_fd].send_callback = send_cb; return client_fd; }
- 就是将之前状态机里的accept新连接封装到这里,并且添加到全局list里,代码如下:
- send_cb:
-
send的逻辑比较简单,直接从全局conn_list里的w_buffer里拿到数据调用send函数写数据即可,注意写完数据后还需要注册读事件。
-
代码如下:
int send_cb(int fd) { //send char* buffer = connlist[fd].w_buffer; int idx = connlist[fd].w_idx; int count = send(fd, buffer, idx, 0); sent_event(fd, EPOLLIN, 0); return count; }
-
- recv_cb:
-
除此之外,还可以把服务初始化操作封装:
-
代码如下:
int init_server (unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(struct sockaddr_in)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr))) { perror("bind error"); return -1; } if (-1 == listen(sockfd, 8)) { perror("listen error"); return -1; } return sockfd; }
-
- 至此,基于reactor和epoll的简易tcp服务器就大功告成。
修改recv_cb逻辑变成web服务器
-
修改recv_cb的逻辑,往w_buffer里写数据的时候,直接写一个HTTP请求格式的字符串,或者直接读取一个html文件写到w_buffer里。
-
这里函数参数不再是fd,而是一个connection_t,其实还是上面的那个结构体,只是为了实现一连接一响应的模式。
-
代码如下:
//在recv_cb里调用该函数即可,之前是执行memcpy拷贝 int http_response(connection_t* conn) { #if 0 //返回字符串 conn->w_idx = sprintf(conn->w_buffer, "HTTP/1.1 200 OK\r\n" "Accept-Ranges: bytes\r\n" "Content-Length: 78\r\n" "Content-Type: text/html\r\n" "Date: Sat, 06 Aug 2023 13:16:46 GMT\r\n\r\n" "<html><head><title>aha->sgt</title></head><body><h1>sgt</h1></body></html>\r\n\r\n"); #else //返回一个html文件,这里直接用里nginx的index.html int fd = open("/home/sgt/project/learn/daily_learn/learn/index.html", O_RDONLY); struct stat fstat_buf; fstat(fd, &fstat_buf);//获取文件属性 //todo使用sendfile(); conn->w_idx = sprintf(conn->w_buffer, "HTTP/1.1 200 OK\r\n" "Accept-Ranges: bytes\r\n" "Content-Length: %ld\r\n" "Content-Type: text/html\r\n" "Date: Sat, 06 Aug 2023 13:16:46 GMT\r\n\r\n", fstat_buf.st_size); conn->w_idx += read(fd, conn->w_buffer + conn->w_idx, BUFFERSIZE - conn->w_idx); #endif return conn->w_idx; }
-
效果如下:
web服务器性能测试(wrk工具的使用)
这里测试服务器的性能使用开源的服务器性能测试工具wrk,github地址: https://github.com/wg/wrk.git
- clone下来后,进入wrk目录,需要有zip压缩工具,没有下载一个:
sudo apt install zip
,然后执行make等待编译完成。
- 测试方法:
- 执行命令:
./wrk -c 100 -d10s -t 50 +ip:port
//参数可以根据自己需求去修改-c, --connections Connections to keep open 连接数
-d, --duration Duration of test 间隔时间
-t, --threads Number of threads to use 线程数 - 这里可以优化一些,就是把代码里的输出注释掉,性能会更好。
- 执行结果:
- 优化前:
- 优化后: - 这- 这里对参数做下解释:
- 执行命令:
Running 10s test @ http://www.baidu.com
#这里使用50个线程,100个连接
50 threads and 100 connections
各项分别为平均值,标准差,最大值以及标准差占比,一般我们主要关注平均值和最大值. 标准差如果太大说明样本本身离散程度比较高. 有可能系统性能波动很大。
Thread Stats Avg Stdev Max +/- Stdev
# 延迟:
Latency 243.13ms 74.40ms 999.31ms 72.63%
# 每秒请求数
Req/Sec 8.70 3.72 20.00 76.50%
4096 requests in 10.08s, 41.03MB read #10s请求4096次,产生流量41.03M
Requests/sec: 406.28 #qps:每秒的连接数
Transfer/sec: 4.07MB #每秒的流量