文章目录
一、项目简介
二、项目整体认识
2、1 HTTP服务器
2、2 Reactor模型
三、预备知识
3、1 C++11 中的 bind
3、2 简单的秒级定时任务实现
3、3 正则库的简单使用
3、4 通用类型any类型的实现
四、服务器功能模块划分与实现
4、1 Buffer模块
4、2 Socket模块
4、3 Channel模块
4、4 Poller模块
4、5 Eventloop模块
4、5、1 时间轮思想
4、5、2 TimerWheel定时器模块整合
4、5、3 Channel 与 EventLoop整合
4、5、3 时间轮与EventLoop整合
4、6 Connection模块
4、7 Acceptor模块
4、8 LoopThread模块
4、9 LoopThreadPool模块
4、10 TcpServer模块
4、11 测试代码
五、HTTP协议支持实现
5、1 Util模块
5、2 HttpRequest模块
5、3 HttpResponse模块
5、4 HttpContext模块
5、5 HttpServer模块
六、对服务器进行测试
6、1 长连接测试
6、2 不完整报文请求
6、3 业务处理超时测试
6、4 一次发送多条数据测试
6、5 大文件传输测试
6、6 性能测试
🙋♂️ 作者:@Ggggggtm 🙋♂️
👀 专栏:实战项目👀
💥 标题: 仿muduo库实现one thread one loop式并发服务器 💥
❣️ 寄语:与其忙着诉苦,不如低头赶路,奋路前行,终将遇到一番好风景 ❣️
一、项目简介
该项目目标是实现一个高并发的服务器。但并不是自己完全实现一个,而是仿照现有成熟的技术进行模拟实现。
一些必备知识:线程、网络套接字编程、多路转接技术(epoll),另外还有一些小的知识,在本篇文章中会提前讲解。
本项目主要分为多个模块来进行讲解,实际上就是一个个小的组件。通过这些组件,我们可以很快的搭建起来一个高并发式的服务器。
二、项目整体认识
2、1 HTTP服务器
该项目组件内提供的不同应用层协议支持,由于应用层协议有很多,我们就在项目中提供较为常见的HTTP协议组件支持。
HTTP(Hyper Text Transfer Protocol),超⽂本传输协议是应⽤层协议,是⼀种简单的请求-响应协议(客户端根据自己的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)。
但是需要注意的是HTTP协议是⼀个运行在TCP协议之上的应用层协议,这⼀点本质上是告诉我们,HTTP服务器其实就是个TCP服务器,只不过在应用层基于HTTP协议格式进行数据的组织和解析来明确客⼾端的请求并完成业务处理。因此实现HTTP服务器简单理解,只需要以下几步即可:
- 搭建⼀个TCP服务器,接收客户端请求。
- 以HTTP协议格式进行解析请求数据,明确客户端目的。
- 明确客户端请求目的后提供对应服务。
- 将服务结果⼀HTTP协议格式进行组织,发送给客户端实现⼀个HTTP服务器很简单,但是实现⼀个高性能的服务器并不简单,这个单元中将讲解基于Reactor模式的高性能服务器实现。
2、2 Reactor模型
Reactor模式,是指通过⼀个或多个输入同时传递给服务器进行请求处理时的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫Dispatcher模式。简单理解就是使用 I/O 多路复用 统⼀监听事件(Reactor 模式就是基于IO多路复用构建起来的),收到事件后分发给处理进程或线程,是编写高性能网络服务器的必备技术之⼀。
网络模型演化过程中,将建立连接、IO等待/读写以及事件转发等操作分阶段处理,然后可以对不同阶段采用相应的优化策略来提高性能;也正是如此,Reactor 模型在不同阶段都有相关的优化策略,常见的有以下三种方式呈现:
- 单Reactor单线程模型:单I/O多路复用+业务处理;
- 单Reactor多线程模型:单I/O多路复用+线程池;
- 多Reactor多线程模型:多I/O多路复用+线程池。
下面我们来具体分析一下其优缺点。
单Reactor单线程:在单个线程中进行事件监控并处理。具体步骤如下:
- 通过IO多路复用模型进行客户端请求监控。
- 触发事件后,进行事件处理。
- 如果是新建连接请求,则获取新建连接,并添加至多路复用模型进行事件监控。如果是数据通信请求,则进行对应数据处理(接收数据,处理数据,发送响应)。
- 优点:所有操作均在同⼀线程中完成,思想流程较为简单,不涉及进程/线程间通信及资源争抢问题。
- 缺点:无法有效利用CPU多核资源,很容易达到性能瓶颈。
- 适用场景:适用于客户端数量较少,且处理速度较为快速的场景。(处理较慢或活跃连接较多,会导致串行处理的情况下,后处理的连接长时间无法得到响应)
单Reactor多线程:一个Reactor进行时间监控,由多个线程(线程池)来处理就绪事件。
- Reactor线程通过I/O多路复用模型进行客户端请求监控;
- 触发事件后,进行事件处理
- 如果是新建连接请求,则获取新建连接,并添加至多路复用模型进行事件监控。
- 如果是数据通信请求,则接收数据后分发给Worker线程池进行业务处理。
- 工作线程处理完毕后,将响应交给Reactor线程进行数据响应。
其优缺点如下:
- 优点:充分利用CPU多核资源
- 缺点:多线程间的数据共享访问控制较为复杂,单个Reactor 承担所有事件的监听和响应,在单线程中运行,高并发场景下容易成为性能瓶颈。
多Reactor多线程:多I/O多路复用进行时间监控,同时使用线程池来对就绪时间进行处理。
- 在主Reactor中处理新连接请求事件,有新连接到来则分发到子Reactor中监控
- 在子Reactor中进行客户端通信监控,有事件触发,则接收数据分发给Worker线程池
- Worker线程池分配独立的线程进行具体的业务处理
- 工作线程处理完毕后,将响应交给子Reactor线程进行数据响应。
优点:充分利用CPU多核资源,主从Reactor各司其职。但是大家也要理解:执行流并不是越多越好,因为执行流多了,反而会增加cpu切换调度的成本。
目标定位-One Thread One Loop主从Reactor模型高并发服务器。
咱们要实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的高效性,提高服务器的并发性能。主Reactor获取到新连接后分发给子Reactor进行通信事件监控。而子Reactor线程监控各自的描述符的读写事件进行数据读写以及业务处理。
One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进行,⼀个线程对应⼀个事件处理的循环。
当前实现中,因为并不确定组件使用者的使用意向,因此并不提供业务层工作线程池的实现,只实现主从Reactor,而Worker工作线程池,可由组件库的使用者的需要自行决定是否使用和实现。
对比上个模型,One Thread One Loop主从Reactor模型高并发服务器结构图如下:
三、预备知识
3、1 C++11 中的 bind
bind也是一种函数包装器,也叫做适配器。它可以接受一个可调用对象,以及函数的各项参数,然后返回⼀个新的函数对象,但是这个函数对象的参数已经被绑定为设置的参数。运⾏的时候相当于总是调用传入固定参数的原函数。
调用bind的一般形式为:auto newCallable = bind(callable, arg_list);
解释说明:
- callable:需要包装的可调用对象。
- newCallable:生成的新的可调用对象。
- arg_list:逗号分隔的参数列表,对应给定的callable的参数。当调用newCallable时,newCallable会调用callable,并传给它arg_list中的参数。
arg_list中的参数可能包含形如_n的名字,其中n是一个整数,这些参数是“占位符”,表示newCallable的参数,它们占据了传递给newCallable的参数的“位置”。数值n表示生成的可调用对象中参数的位置,比如_1为newCallable的第一个参数,_2为第二个参数,以此类推。
此外,除了用auto接收包装后的可调用对象,也可以用function类型指明返回值和形参类型后接收包装后的可调用对象。当然,arg_list中的参数也可以绑定固定的值。下面我们来结合几个例子理解一下。
绑定固定值如下:
int Plus(int a, int b) { return a + b; } int main() { //绑定固定参数 function<int()> func = bind(Plus, 10, 10); cout << func() << endl; return 0; }
在上述代码中,func()相当于调用了Plus(10,10)。因为我们绑定了固定的两个参数值,所以直接调用即可。接下来我们再看一下使用展位符进行绑定。代码如下:
int Plus(int a, int b) { return a + b; } int main() { //绑定固定参数 function<int(int)> func = bind(Plus, placeholders::_1, 10); cout << func(2) << endl; //12 return 0; }
这里的 placeholders::_1 就是一个占位符,相当于func中传入的第一个参数。
上述的场景并不使用,一般情况我们会在对类内的成员函数进行绑定,因为在类外调用类内成员函数时,由于类内的成员函数第一个参数是都是this指针,所以很不方便调用,于是我们可以绑定一个this指针,或者匿名对象都是可以的,这样就可以正常的进行调用了。结合如下例子理解一下:
class Sub { public: int sub(int a, int b) { return a - b; } }; int main() { //绑定固定参数 function<int(int, int)> func = bind(&Sub::sub, Sub(), placeholders::_1, placeholders::_2); cout << func(1, 2) << endl; //-1 return 0; }
还有一种场景,bind函数有个好处就是,这种任务池在设计的时候,不⽤考虑都有哪些任务处理方式了,处理函数该如何设计,有多少个什么样的参数,这些都不用考虑了,降低了代码之间的耦合度。代码如下:
#include <iostream> #include <string> #include <vector> #include <functional> void print(const std::string &str) { std::cout << str << std::endl; } int main() { using Functor = std::function<void()>; std::vector<Functor> task_pool; task_pool.push_back(std::bind(print, "你好")); task_pool.push_back(std::bind(print, "我是")); task_pool.push_back(std::bind(print, "Ggggggtm")); for (auto &functor : task_pool) { functor(); } return 0; }
在上述代码中,print函数就是我们要执行的任务,当然还可以有其他的函数。如果没有bind,那么处理各种不同参数的函数是很麻烦的,而这里我们只需要一个bind函数,可以将他们同意看成无参的函数。
3、2 简单的秒级定时任务实现
在当前的⾼并发服务器中,我们不得不考虑⼀个问题,那就是连接的超时关闭问题。我们需要避免⼀个连接⻓时间不通信,但是也不关闭,空耗资源的情况。这时候我们就需要⼀个定时任务,定时的将超时过期的连接进⾏释放。Linux中给我们提供了定时器,代码如下:#include <sys/timerfd.h> int timerfd_create(int clockid, int flags); //clockid : CLOCK_REALTIME - 系统实时时间,如果修改了系统时间就会出问题; CLOCK_MONOTONIC - 从开机到现在的时间是⼀种相对时间; flags : 0 - 默认阻塞属性 int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); //fd : timerfd_create返回的⽂件描述符 // flags : 0 - // 相对时间, 1 - 绝对时间;默认设置为0即可.new: ⽤于设置定时器的新超时时间 old: ⽤于接 收原来的超时时间 struct timespec { time_t tv_sec; /* Seconds */ long tv_nsec; /* Nanoseconds */ }; struct itimerspec { struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */ struct timespec it_value; /* 第⼀次超时时间 */ }; // 定时器会在每次超时时,⾃动给fd中写⼊8字节的数据,表⽰在上⼀次读取数据到当前读取数据期间超 // 时了多少次。
下面我们来结合一个实际的例子来看一下。具体如下:
#include <iostream> #include <stdio.h> #include <errno.h> #include <sys/timerfd.h> #include <unistd.h> int main() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if(timerfd < 0) { perror("timerfd_create error"); exit(2); } struct itimerspec itm; itm.it_value.tv_sec = 3; itm.it_value.tv_nsec = 0; itm.it_interval.tv_sec = 3; itm.it_interval.tv_nsec = 0; timerfd_settime(timerfd, 0, &itm, nullptr); while(true) { uint64_t tmp; int n = read(timerfd, &tmp, sizeof tmp); if(n < 0) { perror("read error"); exit(3); } std::cout << "超时了,距离上一次超时: " << tmp << " 次数" << std::endl; } return 0; }
其实上述代码我们就设置了一个每3秒钟的定时器,也就是每个3秒钟,都会出发一次,相当于每个3秒钟像文件中写入一次数据。运行结果如下图:
注意,后面我们会根据定时器实现一个时间轮来完成对超时任务的释放销毁。这里你可能还不理解超时任务的释放销毁,或许会详细讲解到。
3、3 正则库的简单使用
正则表达式(regular expression)描述了一种字符串匹配的模式(pattern),可以用来检查一个串是否含有某种子串、将匹配的子串替换或者从某个串中取出符合某个条件的子串等。
正则表达式的使用,可以使得HTTP请求的解析更加简单(这里指的时程序员的工作变得的简单,这并不代表处理效率会变高,实际上效率上是低于直接的字符串处理的),使我们实现的HTTP组件库使用起来更加灵活。
本篇文章就不再过多对正则表达式的详细使用进行详解,但是代码中会有注释,不懂的小伙伴可以取搜索相关文章进行学习。实例代码如下:
#include <regex> void req_line() { std::cout << "------------------first line start-----------------\n"; // std::string str = "GET /bitejiuyeke HTTP/1.1\r\n"; // std::string str = "GET /bitejiuyeke HTTP/1.1\n"; std::string str = "GET /bitejiuyeke?a=b&c=d HTTP/1.1\r\n"; // 匹配规则 std::regex re("(GET|HEAD|POST|PUT|DELETE) (([^?]+)(?:\\?(.*?))?) (HTTP/1\\.[01])(?:\r\n |\n)"); std::smatch matches; std::regex_match(str, matches, re); /*正则匹配获取完毕之后matches中的存储情况*/ /* matches[0] 整体⾸⾏ GET /bitejiuyeke?a=b&c=d HTTP/1.1 matches[1] 请求⽅法 GET matches[2] 整体URL /bitejiuyeke?a=b&c=d matches[3] ?之前 /bitejiuyeke matches[4] 查询字符串 a=b&c=d matches[5] 协议版本 HTTP/1.1 */ int i = 0; for (const auto &it : matches) { std::cout << i++ << ": "; std::cout << it << std::endl; } if (matches[4].length() > 0) { std::cout << "have param!\n"; } else { std::cout << "have not param!\n"; } std::cout << "------------------first line start-----------------\n"; return; } void method_match(const std::string str) { std::cout << "------------------method start-----------------\n"; std::regex re("(GET|HEAD|POST|PUT|DELETE) .*"); /* () 表⽰捕捉符合括号内格式的数据 * GET|HEAD|POST... |表⽰或,也就是匹配这⼏个字符串中的任意⼀个 * .* 中.表⽰匹配除换⾏外的任意单字符, *表⽰匹配前边的字符任意次; 合起来在这⾥就是 表⽰空格后匹配任意字符 * 最终合并起来表⽰匹配以GET或者POST或者PUT...⼏个字符串开始,然后后边有个空格的字 符串, 并在匹配成功后捕捉前边的请求⽅法字符串 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------method over------------------\n"; } void path_match(const std::string str) { // std::regex re("(([^?]+)(?:\\?(.*?))?)"); std::cout << "------------------path start------------------\n"; std::regex re("([^?]+).*"); /* * 最外层的() 表⽰捕捉提取括号内指定格式的内容 * ([^?]+) [^xyz] 负值匹配集合,指匹配⾮^之后的字符, ⽐如[^abc] 则plain就匹配到 plin字符 * +匹配前⾯的⼦表达式⼀次或多次 * 合并合并起来就是匹配⾮?字符⼀次或多次 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------path over------------------\n"; } void query_match(const std::string str) { std::cout << "------------------query start------------------\n"; std::regex re("(?:\\?(.*?))? .*"); /* * (?:\\?(.*?))? 最后的?表⽰匹配前边的表达式0次或1次,因为有的请求可能没有查询 字符串 * (?:\\?(.*?)) (?:pattern)表⽰匹配pattern但是不获取匹配结果 * \\?(.*?) \\?表⽰原始的?字符,这⾥表⽰以?字符作为起始 * .表⽰\n之外任意单字符, *表⽰匹配前边的字符0次或多次, ?跟在*或+之后表⽰懒惰模式, 也就是说以?开始的字符串就只匹配这⼀次就⾏, 后边还有以?开始的同格式字符串也不不会匹配 () 表⽰捕捉获取符合内部格式的数据 * 合并起来表⽰的就是,匹配以?开始的字符串,但是字符串整体不要, * 只捕捉获取?之后的字符串,且只匹配⼀次,就算后边还有以?开始的同格式字符串也不不会匹 配 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------query over------------------\n"; } void version_mathch(const std::string str) { std::cout << "------------------version start------------------\n"; std::regex re("(HTTP/1\\.[01])(?:\r\n|\n)"); /* * (HTTP/1\\.[01]) 外层的括号表⽰捕捉字符串 * HTTP/1 表⽰以HTTP/1开始的字符串 * \\. 表⽰匹配 . 原始字符 * [01] 表⽰匹配0字符或者1字符 * (?:\r\n|\n) 表⽰匹配⼀个\r\n或者\n字符,但是并不捕捉这个内容 * 合并起来就是匹配以HTTP/1.开始,后边跟了⼀个0或1的字符,且最终以\n或者\r\n作为结 尾的字符串 */ std::smatch matches; std::regex_match(str, matches, re); std::cout << matches[0] << std::endl; std::cout << matches[1] << std::endl; std::cout << "------------------version over------------------\n"; }
3、4 通用类型any类型的实现
所谓通用类型,就是可以存储任意类型。我们第一时间可能想到通过模板来实现,代码如下:
template<class T> class Any { T _any; };
但上述并不是我们想要的。但是我们在定义Any对象时,必须指定参数。使用模板并不是我们想要的,我们想要的是如下:
/* template<class T> class Any { T _any; }; Any<int> a; a = 10; */ // 我们实际上想要的 Any a; a = 10; a = "Ggggggtm";
所以使用模板是肯定不行的。那我们就想到了类内再嵌套一个类,这样行不行呢?
class Any { private: template<class T> class placeholder { T _val; }; placeholder<T> _content; };
这样好像也不太行,因为我们在实例化Any类内中的placeholder对象时,也必须指定类型。那么有没有什么很好的办法,在实例化Any类中的成员变量对象时,不用指定其类型还能很好的存储任意类型呢?这里就可以使用多态的方法。思路是:
- 利用多态的特点,父类对象指向子类对象,也可以安全的访问子类对象中的成员;
- 子类使用模板,来存储任意类型;
- Any类中存储父类对象指针,来调用子类成员。
- 当我们存储任意类型时,new一个子类对象来保存数据, 然后用子类对象来初始化Any类中的所保存的父类(holder)对象指针即可。
大体的思路代码如下:
class Any { private: class holder { //...... }; template<class T> class placeholder : public holder { //..... T _val; }; holder* _content; };
整体的思路有了,下面我们直接给出实现代码,其中详细细节就不再过多解释。代码如下:
class Any { public: Any() :_content(nullptr) {} template<class T> Any(const T& val) :_content(new placeholder<T>(val)) {} Any(const Any& other) :_content(other._content ? other._content->clone() : nullptr) {} ~Any() { delete _content; } template<class T> T* get() { assert(typeid(T) == _content->type()); return &(((placeholder<T>*)_content)->_val); } Any& Swap(Any& other) { std::swap(_content, other._content); return *this; } template<class T> Any& operator=(const T& val) { Any(val).Swap(*this); return *this; } Any& operator=(const Any& other) { Any(other).Swap(*this); return *this; } private: class holder { public: virtual ~holder() {} virtual const std::type_info& type() = 0; virtual holder* clone() = 0; }; template<class T> class placeholder : public holder { public: placeholder(const T& val) :_val(val) {} virtual const std::type_info& type() { return typeid(T); } virtual holder* clone() { return new placeholder(_val); } public: T _val; }; holder *_content; };
四、服务器功能模块划分与实现
实现一个Reactor模式的服务器,首先肯定需要进行网络套接字编程。Reactor模式就是基于多路转接技术继续进行实现的,那么我们肯定需要对IO事件进行监控,然后对就绪的IO事件进行处理。怎么判断接收到的数据是否是一份完整的数据呢?所以在这里我们还要进行协议定制。当然,我们用的就是HTTP协议模式进行传输数据。那么不够一份完整的报文时,我们需要将接收到的数据暂时保存起来,那么肯定还需要定义一个接受和发送缓冲区。同时我们这个所实现的服务器当中,还添加了对不活跃链接的销毁,在后面我们也会详细讲到。
4、1 Buffer模块
Buffer模块是一个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能。Buffer模块主要就是用于当我们接收到一个不完整的报文时,需要将该报文暂时保存起来。同时,我们在对于客户端响应的数据,应该是在套接字可写的情况下进行发送,所以需要把数据放到暂时放到Buffer 的发送缓冲区当中。
对于缓冲区,我们只需要一段线性的空间来保存即可。那就可以直接用vector即可。我们实现的功能大概如下:
写入位置:
- 当前写入位置指向哪里,从哪里开始写入
- 如果后续剩余空间不够了!
- 考虑整体缓冲区空闲空间是否足够!(因为读位置也会向后偏移,前后有可能有空闲空间)
- 缓冲区空闲空间足够:将数据移动到起始位置
- 缓冲区空闲空间不够:扩容,从当前写位置开始扩容足够大小!
- 数据一旦写入成功,当前写位置,向后偏移!
读取数据:
- 当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读
- 可读数据大小:当前写入位置,减去当前读取位置!
整体实现相对来说较为简单,这里我们就直接给出代码,就不再做过多解释。
#include <ctime> #include <cstring> #include <iostream> #include <vector> #include <cassert> #include <string> using namespace std; #define BUFFER_SIZE 1024 class Buffer { private: std::vector<char> _buffer; // 使用vector进行内存空间管理 uint64_t _read_idx; // 读偏移 uint64_t _write_idx; // 写偏移 public: Buffer():_read_idx(0),_write_idx(0),_buffer(BUFFER_SIZE) {} char* begin() {return &*_buffer.begin();} // 获取当前写入起始地址 char *writePosition() { return begin() + _write_idx;} // 获取当前读取起始地址 char *readPosition() { return begin() + _read_idx; } // 获取缓冲区末尾空间大小 —— 写偏移之后的空闲空间,总体大小减去写偏移 uint64_t tailIdleSize() {return _buffer.size() - _write_idx; } // 获取缓冲区起始空间大小 —— 读偏移之前的空闲空间 uint64_t handIdleSize() {return _read_idx ;} // 获取可读空间大小 = 写偏移 - 读偏移 uint64_t readAbleSize() {return _write_idx - _read_idx ;} // 将读偏移向后移动 void moveReadOffset(uint64_t len) { // 向后移动大小必须小于可读数据大小 assert(len <= readAbleSize()); _read_idx += len; } // 将写偏移向后移动 void moveWriteOffset(uint64_t len) { assert(len <= tailIdleSize()); _write_idx += len; } void ensureWriteSpace(uint64_t len) { // 确保可写空间足够 (整体空间够了就移动数据,否则就扩容!) if (tailIdleSize() >= len) return; // 不够的话 ,判断加上起始位置够不够,够了将数据移动到起始位置 if (len <= tailIdleSize() + handIdleSize()) { uint64_t rsz = readAbleSize(); //帮当前数据大小先保存起来 std::copy(readPosition(),readPosition() + rsz,begin()); // 把可读数据拷贝到起始位置 _read_idx = 0; // 读归为0 _write_idx = rsz; // 可读数据大小是写的偏移量! } else { // 总体空间不够!需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可! _buffer.resize(_write_idx + len); } } // 写入数据 void Write(const void *data,uint64_t len) { ensureWriteSpace(len); const char *d = (const char*) data; std::copy(d,d + len,writePosition()); } void WriteAndPush(void* data,uint64_t len) { Write(data,len); moveWriteOffset(len); } void WriteStringAndPush(const std::string &data) { writeString(data); moveWriteOffset(data.size()); } void writeString(const std::string &data) { return Write(data.c_str(),data.size()); } void writeBuffer(Buffer &data) { return Write(data.readPosition(),data.readAbleSize()); } void writeBufferAndPush(Buffer &data) { writeBuffer(data); moveWriteOffset(data.readAbleSize()); } std::string readAsString (uint64_t len) { assert(len <= readAbleSize()); std::string str; str.resize(len); Read(&str[0],len); return str; } void Read(void *buf,uint64_t len) { // 读取数据 1. 保证足够的空间 2.拷贝数据进去 // 要求获取的大小必须小于可读数据大小! assert(len <= readAbleSize()); std::copy(readPosition(),readPosition() + len,(char*)buf); } void readAndPop(void *buf,uint64_t len) { Read(buf,len); moveReadOffset(len); } // 逐步调试!!!!! std::string ReadAsStringAndPop(uint64_t len) { assert(len <= readAbleSize()); std::string str = readAsString(len); moveReadOffset(len); return str; } char* FindCRLF() { char *res = (char*)memchr(readPosition(),'\n',readAbleSize()); return res; } // 通常获取一行数据,这种情况针对是: std::string getLine() { char* pos = FindCRLF(); if (pos == NULL) { return ""; } // +1 为了把换行数据取出来! return readAsString(pos - readPosition() + 1); } std::string getLineAndPop() { std::string str = getLine(); moveReadOffset(str.size()); return str; } void Clear() { // 清空缓冲区!clear // 只需要将偏移量归0即可! _read_idx = 0; _write_idx = 0; } };
4、2 Socket模块
我们在编写服务器时,少不了的肯定是需要Socket套接字编程的。Socket模块就是对网络套接字编程进行一个封装,方便我们后面直接进行相关操作。主要功能如下:
- 创建套接字(socket);
- 绑定地址信息(bind);
- 开始监听(listen);
- 向服务器发起连接(connect);
- 获取新连接(accept);
- 接受数据(recv);
- 发送数据(send);
- 关闭套接字(close);
- 创建一个监听链接;
- 创建一个客户端连接;
- 开启地址和端口重用;
- 设置套接字为非阻塞。
这里对简单的一些网络套接字接口就不再过多解释,对上述功能的后四点进行简单解释。我们先来看一下该模块的代码实现:
#define MAX_LISTEN 1024 class Socket { public: Socket() : _sockfd(-1) {} Socket(int fd) : _sockfd(fd) {} ~Socket() { Close(); } int Fd() { return _sockfd; } bool Create() { _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (_sockfd < 0) { ERR_LOG("create socket failed!"); return false; } return true; } bool Bind(const std::string ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); int ret = bind(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { ERR_LOG("bind sockfd failed!"); return false; } return true; } bool Listen(int backlog = MAX_LISTEN) { int ret = listen(_sockfd, backlog); if (ret < 0) { ERR_LOG("listen sockfd failed!"); return false; } return true; } // 向服务器发起连接 bool Connect(const std::string ip, uint16_t port) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(struct sockaddr_in); int ret = connect(_sockfd, (struct sockaddr *)&addr, len); if (ret < 0) { ERR_LOG("connect server failed!"); return false; } return true; } int Accept() { int newfd = accept(_sockfd, nullptr, nullptr); if (newfd < 0) { ERR_LOG("accept socker failed"); return -1; } return newfd; } ssize_t Recv(void *buf, size_t len, int flag = 0) { ssize_t ret = recv(_sockfd, buf, len, flag); if (ret <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0; ERR_LOG("recv msg failed!"); return -1; } return ret; } ssize_t NonBlockRecv(void *buf, size_t len) { return Recv(buf, len, MSG_DONTWAIT); } ssize_t Send(const void *buf, size_t len, int flag = 0) { ssize_t ret = send(_sockfd, buf, len, flag); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0; ERR_LOG("send msg failed!"); return -1; } return ret; } ssize_t NonBlockSend(const void *buf, size_t len) { return Send(buf, len, MSG_DONTWAIT); } void Close() { if (_sockfd) { close(_sockfd); _sockfd = -1; } } // 创建一个服务器端连接 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) { if (Create() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; ReuseAddress(); return true; } // 创建一个客户端连接 bool CreateClient(uint16_t port, const std::string &ip) { if (Create() == false) return false; if (Connect(ip, port) == false) return false; return true; } // 设置套接字选项 —— 开启地址端口重用 void ReuseAddress() { int val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val)); val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(val)); } // 设置套接字为非阻塞 void NonBlock() { int flag = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); } private: int _sockfd; };
我们知道在Tcp通信当中,建立连接会有三次握手,断开连接会有四次挥手。而主动断开链接的一方在进行第四次挥手的时候会变成TIME_WAIT状态,也就四需要等上一段时间该链接才算断开释放。这也就意味着主动断开连接的一方并不能很快的重新建立连接。为了解决这种情况,可以通过setsockopt函数进行设置套接字选项,开启地址和端口重用。具体封装后的代码如下:
void ReuseAddress() { int val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val)); val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(val)); }
我们知道,调用read读取数据的时候,如果底层数据不就绪,默认情况下是阻塞的。在我们实现的服务器时,并不像让其阻塞。如果在读取数据时阻塞了,就会导致其他的任务得不到很好的执行。所以我们还需要一个对套接字设置非阻塞的功能。封装后的代码如下:
void NonBlock() { int flag = fcntl(_sockfd, F_GETFL, 0); fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); }
需要注意的是,当read函数以非阻塞方式读取数据时,如果底层数据不就绪,那么read函数就会立即返回,但当底层数据不就绪时,read函数是以出错的形式返回的,此时的错误码会被设置为
EAGAIN
或EWOULDBLOCK
。因此在以非阻塞方式读取数据时,如果调用read函数时得到的返回值是-1,此时还需要通过错误码进一步进行判断,如果错误码的值是EAGAIN或EWOULDBLOCK,说明本次调用read函数出错是因为底层数据还没有就绪,因此后续还应该继续调用read函数进行轮询检测数据是否就绪,当数据继续时再进行数据的读取。
此外,调用read函数在读取到数据之前可能会被其他信号中断,此时read函数也会以出错的形式返回,此时的错误码会被设置为EINTR,此时应该重新执行read函数进行数据的读取。
因此在以非阻塞的方式读取数据时,如果调用read函数读取到的返回值为-1,此时并不应该直接认为read函数在底层读取数据时出错了,而应该继续判断错误码,如果错误码的值为
EAGAIN
、EWOULDBLOCK
或EINTR
则应该继续调用read函数再次进行读取或者说明底层没有数据。
创建一个监听连接是什么意思呢?当我们服务端创建套接字、绑定ip和端口后,需要将该套接字设置为监听状态,以上过程就是在创建一个监听的连接,也就是创建一个服务端连接。我们对上述的过程进行了封装,具体封装后的代码如下:‘
// 创建一个服务器端连接 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) { if (Create() == false) return false; if (block_flag) NonBlock(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; ReuseAddress(); return true; }
创建客户端连接是什么意思?无非就是创建一个套接字,然后向服务器发起请求连接。这也是客户端所需要做的。我们对其进行简单封装后代码如下:
// 创建一个客户端连接 bool CreateClient(uint16_t port, const std::string &ip) { if (Create() == false) return false; if (Connect(ip, port) == false) return false; return true; }
4、3 Channel模块
高性能服务器必备的就是多路转接技术。当然,我们的项目也不例外。我们需要利用多路转接技术来帮我们进行等待(监控)事件就绪。且当有事件就绪时,会有一个Handler函数根据所触发的就绪事件统一帮我们处理就绪后的操作。
每个通信套接字都会有许多不同的事件,例如:读事件、写事件等等。为了方便我们后续对描述符(套接字)的监控事件在用户态更容易维护,以及触发事件后的操作流程更加的清晰,我们在这里对描述符(套接字)监控的事件和管理进行封装。那么Channel模块的主要功能就很清晰了。
1.对监控事件的管理:
- 判断描述符是否可读;
- 判断描述符是否可写;
- 对描述符监控添加可读;
- 对描述符监控添加可写;
- 解除可读事件监控;
- 解除可写事件监控;
- 解除所有事件监控。
2.对监控事件触发后的处理:
- 设置对于不同事件的回调处理函数;
- 明确触发了某个事件该如何处理。
我们先看一下Channel模块的代码:
class Channel { private: int _fd; uint32_t events; // 当前需要监控的事件 uint32_t revents; // 当前连接触发的事件 using eventCallback = std::function < void() > ; eventCallback _read_callback; // 可读事件被触发的回调函数 eventCallback _error_callback; // 可写事件被触发的回调函数 eventCallback _close_callback; // 连接关闭事件被触发的回调函数 eventCallback _event_callback; // 任意事件被触发的回调函数 eventCallback _write_callback; // 可写事件被触发的回调函数 public: Channel(int fd) : fd(_fd) {} int Fd() { return _fd; } void SetRevents(uint32_t events) { _revents = events; } void setReadCallback(const eventCallback &cb) { _read_callback = cb; } void setWriteCallback(const eventCallback &cb) { _write_callback = cb; } void setErrorCallback(const eventCallback &cb) { _error_callback = cb; } void setCloseCallback(const eventCallback &cb) { _close_callback = cb; } void setEventCallback(const eventCallback &cb) { _event_callback = cb; } bool readAble() { // 当前是否可读 return (_events & EPOLLIN); } bool writeAble() { // 当前是否可写 return (_events & EPOLLOUT); } void enableRead() { // 启动读事件监控 _events |= EPOLLIN; // 后面会添加到EventLoop的事件监控! } void enableWrite() { // 启动写事件监控 _events |= EPOLLOUT; // 后面会添加到EventLoop的事件监控! } void disableRead() { // 关闭读事件监控 _events &= ~EPOLLIN; // 后面会修改到EventLoop的事件监控! } void disableWrite() { // 关闭写事件监控 _events &= ~EPOLLOUT; } void disableAll() { // 关闭所有事件监控 _events = 0; } void Remove(); // 后面会调用EventLoop接口移除监控 void HandleEvent() { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_read_callback) _read_callback(); } /*有可能会释放连接的操作事件,一次只处理一个*/ if (_revents & EPOLLOUT) { if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { if (_error_callback) _error_callback(); // 一旦出错,就会释放连接,因此要放到前边调用任意回调 } else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } /*不管任何事件,都调用的回调函数*/ if (_event_callback) _event_callback(); } };
注意,我们在这里使用多路转接技术时,采用的时epoll。因为epoll的编码简单,且效率最高。所以在私有成员变量时,我们给出了监控事件和就绪事件。
同时,我们这里使用了通用的函数封装器function。原因就是我们并不知道所触发事件的回调函数所需要的参数。当在设置回调函数时,使用函数包装器bind进行绑定参数即可。
在Handler也就是上述的HandlerEvent函数中,我们对所触发的事件需要回调进行了分类。读事件触发后,并不会直接释放连接(后续会讲解原因)。其他事件触发后,都有可能导致连接被释放,所以一次处理一个事件,以防连接被释放的情况下再去处理事件就会导致陈鼓型崩溃。
4、4 Poller模块
上述的Channel模块是对描述符的监控事件进行管理的封装。现在我们还需要对描述符进行IO事件监控啊!说明这两个模块是密切关联的。
上述我们也提到了,所用的多路转接模型是epoll。那么该模块就是对epoll的操作进行封装的。封装思想:
- 必须拥有一个epoll的操作句柄;
- 拥有一个struct epoll_event 结构数组,监控保存所有的活跃事件;
- 使用一个哈希表管理描述符与描述符对应的事件管理Channnel对象。
整体逻辑流程:
- 对描述符进行监控,通过Channnel才能知道描述符监控什么事件(注意,我们在使用epoll对事件监控前,一定是在Channel模块中对所需要监控的事件events进行了设置,然后再使用epoll进行监控);
- 当描述符就绪了,通过描述符在哈希表中找到对应的Channel(当然,我们都会添加Channel到哈希表种的。得到了Channel才知道什么事件如何处理)当描述符就绪了,返回就绪描述符对应的Channel。
通过对上述的了解,我们就已经知道该模块所需要实现的功能了。具体如下:
- 添加事件监控 (channel模块);
- 修改事件监控;
- 移除事件监控;
- 开始事件监控。
具体该模块实现代码如下:
#define MAX_EPOLLEVENTS 1024 // Poller模块是对epoll进⾏封装的⼀个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。 class Poller { private: int _epfd; struct epoll_event _evs[MAX_EPOLLEVENTS]; std::unordered_map<int, Channel *> _channels; private: // 对epoll直接操作 void Update(Channel *channel, int op) { int fd = channel->Fd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->Events(); int ret = epoll_ctl(_epfd, op, fd, &ev); if (ret < 0) { ERR_LOG("EPOLLCTL FAILED!!!"); abort(); // 推出程序!! } } // 判断一个Channel是否已经添加到了事件监控 bool hashChannel(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it == _channels.end()) { return false; } return true; } public: Poller() { _epfd = epoll_create(MAX_EPOLLEVENTS); if (_epfd < 0) { ERR_LOG("EPOLL CREATE FAILED!!"); abort(); // 退出程序 } } // 添加或者修改监控事件 void UpdateEvent(Channel *channel) { // 有描述符也有事件 bool ret = hashChannel(channel); if (ret == false) { _channels.insert(std::make_pair(channel->Fd(), channel)); return Update(channel, EPOLL_CTL_ADD); // 不存在添加 } return Update(channel, EPOLL_CTL_MOD); // 存在了更新 } // 移除监控事件 void removeEvent(Channel *channel) { auto it = _channels.find(channel->Fd()); if (it != _channels.end()) { _channels.erase(it); } Update(channel, EPOLL_CTL_DEL); } // 开始监控,返回活跃链接! void Poll(std::vector<Channel *> *active) { // int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout) int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); if (nfds < 0) { if (errno == EINTR) { return; } ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno)); abort(); // 退出程序 } for (int i = 0; i < nfds; i++) { auto it = _channels.find(_evs[i].data.fd); assert(it != _channels.end()); it->second->setRevents(_evs[i].events); // 设置实际就绪的事件 active->push_back(it->second); } return; } };
这个模块主要都是封装的对epoll的操作。其中需要注意的是,我们在对事件开始监控时,需要将不同描述符就绪的事件进行返回,以便我们后续进行操作。所以这里就传入了一个指针,作为输出型参数。
4、5 Eventloop模块
这个模块其实就是我们所说的 one thread one loop中的loop,也是我们所说的reactor。这个模块必然是一个模块对应一个线程。这个模块是干什么的呢?其实就是进行事件监控管理和事件处理的模块。你也可以理解为对Channel模块和Poller模块的整合。接下来我们详细解释一下该模块的思路讲解。
EventLoop模块是进行时间监控,以及事件处理的模块。同时这个模块还是与线程一一对应的。监控了一个链接,而这个连接一旦就绪,就要进行事件处理。假如一个线程正在执行就绪事件,那么该连接再有其他事件就绪呢?会不会就被分配到其他线程了呢?但是如果这个描述符在多个线程中都出发了事件进行处理,就会存在线程安全的问题。因此我们需要将一个连接的事件监控,以及连接事件的处理,以及其他操作都放在同一个线程当中进行。
但是问题又来了:如何保证一个连接所有的操作都在eventloop对应的线程中执行呢?我们可以在eventloop模块中添加一个任务队列,对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列当中去。
总结eventloop处理流程:
- 在线程中对描述符进行事件监控;
- 有描述符就绪,则对描述符进行事件处理(必须保证处理回调函数中的操作都在线程当中);
- 所有的就绪事件处理完了,这时候再去将任务队列中的任务一一执行。
事件监控就交给Poller模块来处理,有事件就绪了则进行处理事件。但是有一个需要注意的点:因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西,能够唤醒事件监控的阻塞。
我们再来看一下eventfd函数。如下图:
eventfd:一种事件通知机制,该函数就是创建一个描述符用于实现事件通知,eventfd本质在内核里边管理的就是一个计数器。创建eventfd就会在内核中创建一个计数器(结构),每当向evenfd中写入一个数值--用于表示事件通知次数,可以使用read进行数据的读取,读取到的数据就是通知的次数。假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,再去read读取出来的数字就是3,读取之后计数清0。用处:在EventLoop模块中实现线程间的事件通知功能。eventfd也是通过read/write/close进行操作的。
接下来我们看一下该模块的代码实现:
class EventLoop { private: using Functor = std::function<void()>; std::thread::id _thread_id; // 线程ID int _event_fd; // eventfd 唤醒IO事件监控有可能的阻塞!!! std::unique_ptr<Channel> _event_channel; Poller _poller; // 进行所有描述符的事件监控 std::vector<Functor> _tasks; // 任务池 std::mutex _mutex; // 实现任务池操作的线程安全!!! public: // 执行任务池中的所有任务!! void runAllTask() { std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); // 出了作用域,锁就会被解开!! _tasks.swap(functor); } for (auto &f : functor) { f(); } return; } static int createEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { ERR_LOG("CREATE ENVENTED FAILED !!!"); abort(); } return efd; } void readEventfd() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if (ret < 0) { if (errno == EINTR || errno == EAGAIN) { return; } ERR_LOG("READ EVENTFD FAILED!"); abort(); } return; } void weakEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { if (errno == EINTR) { return; } ERR_LOG("READ EVENTFD FAILED!"); abort(); } return; } public: EventLoop() : _thread_id(std::this_thread::get_id()), _event_fd(createEventFd()), _event_channel(new Channel(this, _event_fd)), { // 给eventfd添加可读事件回调函数,读取eventfd事件通知次数 _event_channel->setReadCallback(std::bind(&EventLoop::readEventfd, this)); // 启动eventfd的读事件监控 _event_channel->enableRead(); } void runInLoop(const Functor &cb) { // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。 if (isInLoop()) { return cb(); } return QueueInLoop(cb); } void queueInLoop(const Functor &cb) { // 将操作压入任务池! std::unique_lock<std::mutex> _lock(_mutex); // 唤醒有可能因为没有事件就绪,而导致的epoll阻塞; // 其实就是给eventfd写入一个数据,eventfd就会触发可读事件 _tasks.push_back(cb); weakEventFd(); } bool isInLoop() { // 永远判断当前线程是否是EventLoop所对应的线程 return (_thread_id == std::this_thread::get_id()); } void updateEvent(Channel *channel) { // 添加/修改描述符的事件监控 return _poller.UpdateEvent(channel); } void removeEvent(Channel *channel) { // 移除描述符的监控 return _poller.removeEvent(channel); } void Start() { // 任务监控完毕进行处理任务! // 三步走:事件监控-》就绪事件处理-》执行任务 std::vector<Channel *> actives; _poller.Poll(&actives); for (auto &channel : actives) { channel->handleEvent(); } runAllTask(); } };
4、5、1 时间轮思想
现在我们想设置一个超时连接释放的功能。所谓超市连接释放,其实就是一个在我们设置的一段时间内,如果该连接没有任何IO事件就绪,我们就认为他是一个不活跃连接,直接释放即可!假设我们只使用定时器,存在一个很大的问题,每次超时都要将所有的连接遍历一遍(因为每个连接的超时间可能是不同的),如果有上万个连接,效率无疑是较为低下的。这时候大家就会想到,我们可以针对所有的连接,根据每个连接最近一次通信的系统时间建立一个小根堆,这样只需要每次针对堆顶部分的连接逐个释放,直到没有超时的连接为止,这样也可以大大提高处理的效率。
上述方法可以实现定时任务,但是这里给大家介绍另一种方案:时间轮。时间轮的思想来源于钟表,如果我们定了一个3点钟的闹铃,则当时针走到3的时候,就代表时间到了。同样的道理,如果我们定义了一个数组,并且有一个指针,指向数组起始位置,这个指针每秒钟向后走动一步,走到哪里,则代表哪里的任务该被执行了,那么如果我们想要定一个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中走一步,三秒钟后tick走到对应位置,这时候执行对应位置的任务即可。但是,同一时间可能会有大批量的定时任务,因此我们可以给数组对应位置下拉一个数组,这样就可以在同一个时刻上添加多个定时任务了。
当然,上述操作也有一些缺陷,比如我们如果要定义一个60s后的任务,则需要将数组的元素个数设置为60才可以,如果设置一小时后的定时任务,则需要定义3600个元素的数组,这样无疑是比较麻烦的。
因此,可以采用多层级的时间轮,有秒针轮,分针轮,时针轮,60<time<3600则time/60就是分针轮对应存储的位置,当tick/3600等于对应位置的时候,将其位置的任务向分针,秒针轮进行移动。
因为当前我们的应用中,倒是不用设计的这么麻烦,因为我们的定时任务通常设置的30s以内,所以简单的单层时间轮就够用了。
但是,我们也得考虑一个问题,当前的设计是时间到了,则主动去执行定时任务,释放连接,那能不能在时间到了后,自动执行定时任务呢,这时候我们就想到一个操作-类的析构函数。
一个类的析构函数,在对象被释放时会自动被执行,那么我们如果将一个定时任务作为一个类的析构函数内的操作,则这个定时任务在对象被释放的时候就会执行。
但是仅仅为了这个目的,而设计一个额外的任务类,好像有些不划算,但是这里我们又要考虑另一个问题,那就是假如有一个连接建立成功了,我们给这个连接设置了一个30s后的定时销毁任务,但是在第10s的时候,这个连接进行了一次通信,那么我们应该时在第30s的时候关闭,还是第40s的时候关闭呢?无疑应该是第40s的时候。也就是说,这时候,我们需要让这个第30s的任务失效,但是我们该如何实现这个操作呢?
这里,我们就用到了智能指针shared_ptr,shared_ptr有个计数器,当计数为0的时候,才会真正释放一个对象,那么如果连接在第10s进行了一次通信,则我们继续向定时任务中,添加一个30s后(也就是第40s)的任务类对象的shared_ptr,则这时候两个任务shared_ptr计数为2,则第30s的定时任务被释放的时候,计数-1,变为1,并不为0,则并不会执行实际的析构函数,那么就相当于这个第30s的任务失效了,只有在第40s的时候,这个任务才会被真正释放。下面我们来看一下其具体的实现如下:
using TaskFunc = std::function<void()>; using ReleaseFunc = std::function<void()>; class TimeTask { public: TimeTask(uint64_t id, uint32_t timeout, const TaskFunc& cb) :_id(id) ,_timeout(timeout) ,_task_cb(cb) ,_canceled(false) {} ~TimeTask() { if(_canceled == false) _task_cb(); _release(); } void Cancel() { _canceled = true; } void SetRelease(const ReleaseFunc& cb) { _release = cb; } uint32_t Delaytime() { return _timeout; } private: uint64_t _id; uint32_t _timeout; bool _canceled; TaskFunc _task_cb; ReleaseFunc _release; }; class TimeWheel { public: TimeWheel() :_tick(0) ,_capacity(60) ,_wheel(_capacity) {} void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb) { PtrTask pt(new TimeTask(id, delay, cb)); pt->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id)); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); // std::cout <<"添加任务成功, 任务id:" << id << std::endl; } void TimeRefresh(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) return; PtrTask pt = it->second.lock(); int delay = pt->Delaytime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); // std::cout <<"刷新定时任务成功, 任务id:" << id << std::endl; } void TimeCancel(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) return; PtrTask pt = it->second.lock(); if(pt) pt->Cancel(); } void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); } private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if(it == _timers.end()) return; _timers.erase(id); } private: using WeakTask = std::weak_ptr<TimeTask>; using PtrTask = std::shared_ptr<TimeTask>; int _tick; int _capacity; std::vector<std::vector<PtrTask>> _wheel; std::unordered_map<uint64_t, WeakTask> _timers; };
下面是测试代码,大家可自行测试理解:
#include <iostream> #include "timewheel.hpp" class Test { public: Test() { std::cout << "构造" << std::endl; } ~Test() { std::cout << "析构" << std::endl; } }; void DelTest(Test *t) { delete t; } int main() { TimeWheel tw; Test* t = new Test(); tw.TimerAdd(888, 3, std::bind(DelTest, t)); for(int i = 0; i < 5; i++) { sleep(1); tw.TimeRefresh(888); tw.RunTimerTask(); std::cout << "Test 定时任务被重新定时执行" << std::endl; } while(true) { sleep(1); std::cout <<"tick 移动了一部" << std::endl; tw.RunTimerTask(); } return 0; }
4、5、2 TimerWheel定时器模块整合
现在我们想设置一个超时连接释放的功能,就需要借助我们上述的定时功能了。首先我们需要将一个连接任务保存起来,然后我们采用时间轮的思想,就是每一秒向后走一个位置,如果该位置有任务,那么说明该位置的任务超时了,需要释放。因为定时器任务需要被监控起来,每当超过我们所定时的事件,就会自动往fd中写入一个数据,所以我们可以通过EventLoop将其进行监控管理。
当我们有一个连接创建后,就为该连接添加一个秒级别的定时任务。同时这只一个一秒触发一次的定时器。当我们添加一个定时任务后,同时为该定时任务创建一个定时器,把该定时器的timerfd添加可读监控,每当触发可读事件就绪时,我们设置了回调,就会调用回调函数去读取的超时次数,然后秒针再向后走对应步数即可。
class TimerTask { private: uint64_t _id; // 定时器任务对象ID uint32_t _timeout; // 定时任务的超时时间 bool _canceled; // false-表示没有被取消, true-表示被取消 TaskFunc _task_cb; // 定时器对象要执行的定时任务 ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息 public: TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id) , _timeout(delay) , _task_cb(cb) , _canceled(false) {} ~TimerTask() { if (_canceled == false) _task_cb(); _release(); } void Cancel() { _canceled = true; } void SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t DelayTime() { return _timeout; } }; class TimerWheel { private: using WeakTask = std::weak_ptr<TimerTask>; using PtrTask = std::shared_ptr<TimerTask>; int _tick; // 当前的秒针,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务 int _capacity; // 表盘最大数量---其实就是最大延迟时间 std::vector<std::vector<PtrTask>> _wheel; std::unordered_map<uint64_t, WeakTask> _timers; EventLoop *_loop; int _timerfd; // 定时器描述符--可读事件回调就是读取计数器,执行定时任务 std::unique_ptr<Channel> _timer_channel; private: void RemoveTimer(uint64_t id) { auto it = _timers.find(id); if (it != _timers.end()) { _timers.erase(it); } } static int CreateTimerfd() { int timerfd = timerfd_create(CLOCK_MONOTONIC, 0); if (timerfd < 0) { ERR_LOG("TIMERFD CREATE FAILED!"); abort(); } // int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); struct itimerspec itime; itime.it_value.tv_sec = 1; itime.it_value.tv_nsec = 0; // 第一次超时时间为1s后 itime.it_interval.tv_sec = 1; itime.it_interval.tv_nsec = 0; // 第一次超时后,每次超时的间隔时 timerfd_settime(timerfd, 0, &itime, NULL); return timerfd; } int ReadTimefd() { uint64_t times; // 有可能因为其他描述符的事件处理花费事件比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次 // read读取到的数据times就是从上一次read之后超时的次数 int ret = read(_timerfd, ×, 8); if (ret < 0) { ERR_LOG("READ TIMEFD FAILED!"); abort(); } return times; } // 这个函数应该每秒钟被执行一次,相当于秒针向后走了一步 void RunTimerTask() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉 } void OnTime() { // 根据实际超时的次数,执行对应的超时任务 int times = ReadTimefd(); for (int i = 0; i < times; i++) { RunTimerTask(); } } void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) { PtrTask pt(new TimerTask(id, delay, cb)); pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); } void TimerRefreshInLoop(uint64_t id) { // 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中 auto it = _timers.find(id); if (it == _timers.end()) { return; // 没找着定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptr int delay = pt->DelayTime(); int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); } void TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return; // 没找着定时任务,没法刷新,没法延迟 } PtrTask pt = it->second.lock(); if (pt) pt->Cancel(); } public: TimerWheel(EventLoop *loop) : _capacity(60) , _tick(0) , _wheel(_capacity) , _loop(loop) , _timerfd(CreateTimerfd()) , _timer_channel(new Channel(_loop, _timerfd)) { _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); _timer_channel->EnableRead(); // 启动读事件监控 } /*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*/ /*如果不想加锁,那就把对定期的所有操作,都放到一个线程中进行*/ void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb); // 刷新/延迟定时任务 void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); /*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行*/ bool HasTimer(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; } }; void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb)); } // 刷新/延迟定时任务 void TimerWheel::TimerRefresh(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id)); } void TimerWheel::TimerCancel(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id)); }
4、5、3 Channel 与 EventLoop整合
注意,当我们对一个文件描述符设置可读事件监控或者可写事件监控时,不仅仅是要设置Channel对应的events中,因为此时epoll底层实际上并没有进行监控,我们还要设置到epoll模型当中去!EventLoop中封装了Poller,所以在这里我们直接包含EventLoop的一个指针即可。
class Poller; class EventLoop; class Channel { private: int _fd; EventLoop *_loop; uint32_t _events; // 当前需要监控的事件 uint32_t _revents; // 当前连接触发的事件 using EventCallback = std::function<void()>; EventCallback _read_callback; // 可读事件被触发的回调函数 EventCallback _write_callback; // 可写事件被触发的回调函数 EventCallback _error_callback; // 错误事件被触发的回调函数 EventCallback _close_callback; // 连接断开事件被触发的回调函数 EventCallback _event_callback; // 任意事件被触发的回调函数 public: Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) { } int Fd() { return _fd; } uint32_t Events() { return _events; } // 获取想要监控的事件 void SetRevents(uint32_t events) { _revents = events; } // 设置实际就绪的事件 void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; } void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; } void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; } void SetEventCallback(const EventCallback &cb) { _event_callback = cb; } // 当前是否监控了可读 bool ReadAble() { return (_events & EPOLLIN); } // 当前是否监控了可写 bool WriteAble() { return (_events & EPOLLOUT); } // 启动读事件监控 void EnableRead() { _events |= EPOLLIN; Update(); } // 启动写事件监控 void EnableWrite() { _events |= EPOLLOUT; Update(); } // 关闭读事件监控 void DisableRead() { _events &= ~EPOLLIN; Update(); } // 关闭写事件监控 void DisableWrite() { _events &= ~EPOLLOUT; Update(); } // 关闭所有事件监控 void DisableAll() { _events = 0; Update(); } // 移除监控 void Remove(); void Update(); // 事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定 void HandleEvent() { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_event_callback) _event_callback(); /*不管任何事件,都调用的回调函数*/ if (_read_callback) _read_callback(); } /*有可能会释放连接的操作事件,一次只处理一个*/ if (_revents & EPOLLOUT) { if (_event_callback) _event_callback(); if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { if (_error_callback) _error_callback(); // 一旦出错,就会释放连接,因此要放到前边调用任意回调 } else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } // if (_event_callback) _event_callback(); } }; void Channel::Update() { _loop->UpdateEvent(this); } void Channel::Remove() { _loop->RemoveEvent(this); }
4、5、3 时间轮与EventLoop整合
EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimeWheel与定时器模块,Socket模块的一个整体封装,进行所有描述符的事件监控。EventLoop模块为了保证整个服务器的线程安全问题,因此要求使用者对于Connection的所有操作一定要在其对应的EventLoop线程内完成,不能在其他线程中进行(比如组作使用者使用Connection发送数据,以及关闭连接这种操作)。EventLoop模块保证自己内部所监控的所有描述符,都要是活跃连接,非活跃连接就要及时释放避免资源浪费。综上我们整合到所有的EventLoop如下:
class EventLoop { using Functor = std::function<void()>; public: void RunAllTask() { std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); _tasks.swap(functor); } for (auto &f : functor) { f(); } return; } static int CreatEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { ERR_LOG("create eventfd failed"); abort(); } return efd; } void ReadEventfd() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if (ret < 0) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { return; } ERR_LOG("read eventfd failed"); abort(); } return; } void WeakUpEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { if (errno == EINTR) { return; } ERR_LOG("write eventfd failed!"); abort(); } return; } public: EventLoop() : _thread_id(std::this_thread::get_id()) , _event_fd(CreatEventFd()) , _event_channel(new Channel(this, _event_fd)) ,_timer_wheel(this) { _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this)); _event_channel->EnableRead(); } void Start() { while (1) { std::vector<Channel *> actives; _poller.Poll(&actives); for (auto &channel : actives) { channel->HandleEvent(); } RunAllTask(); } } bool IsInLoop() { return (_thread_id == std::this_thread::get_id()); } void AssertInLoop() { assert(_thread_id == std::this_thread::get_id()); } void RunInLoop(const Functor &cb) { if (IsInLoop()) { return cb(); } return QueueInLoop(cb); } void QueueInLoop(const Functor &cb) { { std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(cb); } WeakUpEventFd(); } void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); } void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); } void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); } void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); } void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); } bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); } private: std::thread::id _thread_id; // 线程id int _event_fd; std::unique_ptr<Channel> _event_channel; Poller _poller; std::vector<Functor> _tasks; std::mutex _mutex; TimerWheel _timer_wheel; };
4、6 Connection模块
Connection模块,一个连接有任何的事件怎么处理都是有这个模块来进行处理的,因为组件的设计也不知道使用者要如何处理事件,因此只能是提供一些事件回到函数由使用者设置。
Connection模块是对Buffer模块,Socket模块,Channel模块的一个整体封装,实现了对一个通信套接字的整体的管理,每一个进行数据通信的套接字(也就是accept获取到的新连接)都会使用Connection进行管理。
- Connection模块内部包含有四个由组件使用者传入的回调函数:连接建立完成回调,事件回调,新数据回调,关闭回调。
- Connection模块内部包含有两个组件使用者提供的接口:数据发送接口,连接关闭接口;
- Connection模块内部包含有两个用户态缓冲区:用户态接收缓冲区,用户态发送缓冲区;
- Connection模块内部包含有⼀个Socket对象:完成描述符面向系统的IO操作;
- Connection模块内部包含有⼀个Channel对象:完成描述符IO事件就绪的处理。
Connection模块大致具体处理流程如下:
- 实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描述符添加到Poller事件监控中。
- 当描述符在Poller模块中就绪了I0可读事件,则调用描述符对应Channel中保存的读事件处理函数,进行数据读取,将socket接收缓冲区全部读取到Connection管理的用户态接收缓冲区中。然后调用由组件使用者传入的新数据到来回调函数进行处理。
- 组件使用者进行数据的业务处理完毕后,通过Connection向使用者提供的数据发送接口,将数据写入Connection的发送缓冲区中。
- 启动描述符在Poll模块中的IO写事件监控,就绪后。调用Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Socket进行面向系统的实际数据发送。
综上我们再来设计Connection模块的功能就很简单了。具体如下:
- 套接字的管理,能够进行套接字的操作!
- 连接事件的管理,可读,可写,错误,挂断,任意!
- 缓冲区管理:把socket读取的数据放进缓冲区,要有输入缓冲区和输出缓冲区管理!
- 协议上下文的管理,记录请求数据的处理过程!
- 启动或者取消非活跃连接超时销毁功能!
- 回调函数的管理:因为连接收到数据之后该如何处理,需要由用户决定,必须要有业务处理函数!一个连接建立成功后,应该如何处理,由用户决定,因此必须有连接建立成功的回调函数!一个连接关闭前,该如何处理,有用户决定,因此必须有关闭连接回调函数!任何事件的产生,有没有某些处理,由用户决定,因此必须任意事件的回调函数!
其实Connection模块都是当服务器接收到新连接后,为新连接创建的。当来一个新连接时,我们为其创建一个Connection,其中就包括了channel对新连接设置就绪事件触发后的各种回调处理函数,还有buffer维护的接受和发送缓冲区。当设置完回调后,我们就可以把新连接设置成监控读事件状态。如果客服端发送信息了,我们服务器就会把信息读到接收缓冲区当中。然后再调用所设置用户的回调处理函数(业务处理)。最后再将数据写入发送缓冲区进行发送。通过上面我们可以看到,Connection模块就是对一个连接的所有操作进行了封装管理。
但是还有一个特殊场景:对连接进行操作的时候,对于连接以及被释放,导致内存访问错误,最终程序崩溃!
解决方案:使用智能指针share_ptr 对Connect 对象进行管理,这样可以保证任意一个地方对Connect对象进行操作的时候,保存了一分share_ptr,因此就算其他地方进行了释放,也只是对share_ptr的计数器-1,而不会导致Connection的实际释放!该模块具体实现如下:
class Connection; typedef enum { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING} ConnStatu; using PtrConnection = std::shared_ptr<Connection>; class Connection : public std::enable_shared_from_this<Connection> { using ConnectedCallback = std::function<void(const PtrConnection &)>; using MessageCallback = std::function<void(const PtrConnection &, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection &)>; using AnyEventCallback = std::function<void(const PtrConnection &)>; private: void HandleRead() { char buf[65536]; ssize_t ret = _socket.NonBlockRecv(buf, 65535); if(ret < 0) { // 出错了并不会直接关闭连接,而是需要先处理一下缓冲区中的数据 return ShutdownInLoop(); } _in_buffer.WriteAndPush(buf, ret); if(_in_buffer.ReadAbleSize() > 0) { return _message_callback(shared_from_this(), &_in_buffer); } } void HandleWrite() { ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPostion(), _out_buffer.ReadAbleSize()); if(ret < 0) { if(_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); } // 发送完数据后,都指针向后偏移 _out_buffer.MoveReadOffset(ret); if(_out_buffer.ReadAbleSize() == 0) { _channel.DisableWrite(); if(_statu == DISCONNECTING) { return Release(); } } return; } void HandleClose() { if(_in_buffer.ReadAbleSize() > 0) { _message_callback(shared_from_this(), &_in_buffer); } return Release(); } void HandleError() { return HandleClose(); } void HandleEvent() { if(_enable_inactive_release == true) { _loop->TimerRefresh(_conn_id); } if(_event_callback) { _event_callback(shared_from_this()); } } // 连接获取之后,所处的状态下要进行各种设置(启动都监控,调用回调函数) void EstablishedInLoop() { assert(_statu == CONNECTING); _statu = CONNECTED; _channel.EnableRead(); if(_connected_callback) { _connected_callback(shared_from_this()); } } // 真正释放接口 void ReleaseInLoop() { // 1.修改状态 _statu = DISCONNECTED; // 2.移除事件监控 _channel.Remove(); // 3.关闭描述符 _socket.Close(); // 4.如果当前定时器队列中还有定时销毁任务,则取消任务 if(_loop->HasTimer(_conn_id)) { CancelInactiveReleaseInLoop(); } // 5.调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数 if(_closed_callback) { _closed_callback(shared_from_this()); } // 6.移除服务器内部管理的连接信息 if(_server_closed_callback) { _server_closed_callback(shared_from_this()); } } // 这个接口并不是实际发送数据的,而是把数据放到发送缓冲区,启动可写事件监控 void SendInLoop(Buffer& buf) { if(_statu == DISCONNECTED) { return; } _out_buffer.WriteBufferAndPush(buf); if(_channel.WriteAble() == false) { _channel.EnableWrite(); } } //这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送 void ShutdownInLoop() { _statu = DISCONNECTING; if(_in_buffer.ReadAbleSize() > 0) { if(_message_callback) _message_callback(shared_from_this(), &_in_buffer); } //要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭 if(_out_buffer.ReadAbleSize() > 0) { if(_channel.WriteAble() == false) _channel.EnableWrite(); } if(_out_buffer.ReadAbleSize() == 0) { Release(); } } //启动非活跃连接超时释放规则 void EnableInactiveReleaseInLoop(int sec) { //1. 将判断标志 _enable_inactive_release 置为true _enable_inactive_release = true; //2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可 if(_loop->HasTimer(_conn_id)) { return _loop->TimerRefresh(_conn_id); } //3. 如果不存在定时销毁任务,则新增 _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this)); } void CancelInactiveReleaseInLoop() { _enable_inactive_release = false; if(_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); } } void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _context = context; _connected_callback = conn; _message_callback = msg; _closed_callback = closed; _event_callback = event; } public: Connection(EventLoop *loop, uint64_t conn_id, int sockfd) : _conn_id(conn_id) , _sockfd(sockfd) , _enable_inactive_release(false) , _loop(loop) , _statu(CONNECTING) , _socket(_sockfd) , _channel(loop, _sockfd) { _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this)); _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this)); _channel.SetReadCallback(std::bind(&Connection::HandleRead, this)); _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this)); _channel.SetErrorCallback(std::bind(&Connection::HandleError, this)); } ~Connection() { DBG_LOG("release connction:%p", this); } int Fd() { return _sockfd; } int Id() { return _conn_id; } bool Connected() { return _statu == CONNECTED; } // 设置上下文 -- 建立连接完成后进行调用 void SetContext(const Any &context) { _context = context; } Any* GetContext() { return &_context; } void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } void SetSrvClosedCallback(const ClosedCallback &cb) { _server_closed_callback = cb; } // 连接建立完成后,进行channel回调设置,启动都监控,调用_connected_callback void Established() { _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this)); } //发送数据,将数据放到发送缓冲区,启动写事件监控 void Send(const char* data, size_t len) { //外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行 //因此有可能执行的时候,data指向的空间有可能已经被释放了。 Buffer buf; buf.WriteAndPush(data, len); _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf))); } // 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理 void Shutdown() { _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this)); } void Release() { _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this)); } // 启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务 void EnableInactiveRelease(int sec) { _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec)); } // 取消非活跃销毁 void CancelInactiveRelease() { _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this)); } // 切换协议---重置上下文以及阶段性回调处理函数 -- 而是这个接口必须在EventLoop线程中立即执行 // 防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。 void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) { _loop->AssertInLoop(); _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event)); } private: uint64_t _conn_id; // 连接的唯一ID,便于连接的管理和查找 // uint64_t _timer_id; //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器ID int _sockfd; // 连接关联的文件描述符 bool _enable_inactive_release; // 连接是否启动非活跃销毁的判断标志,默认为false EventLoop *_loop; // 连接所关联的一个EventLoop ConnStatu _statu; // 连接状态 Socket _socket; // 套接字操作管理 Channel _channel; // 连接的事件管理 Buffer _in_buffer; // 输入缓冲区---存放从socket中读取到的数据 Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据 Any _context; // 请求的接收处理上下文 /*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*/ /*换句话说,这几个回调都是组件使用者使用的*/ ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; /*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*/ /*就应该从管理的地方移除掉自己的信息*/ ClosedCallback _server_closed_callback; };
4、7 Acceptor模块
上述的Connection模块是对通信连接的所有操作管理,Acceptor模块就是对连接套接字管理。Acceptor模块是对Socket模块,Channel模块的一个整体封装,实现了对一个监听套接字的整体的管理。
- Acceptor模块内部包含有一个Socket对象:实现监听套接字的操作;
- Acceptor模块内部包含有一个Channel对象:实现监听套接字IO事件就绪的处理具体处理流程如下:
- 实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接;
- 为新连接构建一个Connection对象出来。
当获取了一个新建连接的描述符后,需要为这个通信连接,封装一个connection对象,设置不同回调。注意:因为Acceptor模块本身并不知道一个链接产生了某个事件该如何处理,因此获取一个通信连接后,Connection的封装,以及事件回调的设置都应该由服务器模块来进行!
具体实现代码如下:
class Acceptor { using AcceptCallback = std::function<void(int)>; private: void HandleRead() { int newfd = _socket.Accept(); if(newfd < 0) return ; if(_accept_callback) _accept_callback(newfd); } int CreateServer(int port) { bool ret = _socket.CreateServer(port); assert(ret); return _socket.Fd(); } public: Acceptor(EventLoop* loop, int port) : _socket(CreateServer(port)) , _loop(loop) , _channel(_loop, _socket.Fd()) { /*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动*/ /*否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/ _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this)); } void SetAcceptCallback(const AcceptCallback &cb) { _accept_callback = cb; } void Listen() { _channel.EnableRead(); } private: Socket _socket; // 创建监听套接字 EventLoop *_loop; // 对监听套接字进行事件监控 Channel _channel; // 对监听套接字进行事件管理 AcceptCallback _accept_callback; // 对新连接进行管理 };
4、8 LoopThread模块
上述我们讲到EventLoop模块是与线程一一对应的,但是怎么保证一个线程和一个EvenLoop一一对应起来呢?我们该模块就是将线程与EventLoop结合起来。EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id,而后边当运行一个操作的时候判断当前是否运行在eventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的thread_id进行一个比较,相同就表示在同一个线程,不同就表示当前运行线程并不是EventLoop线程。
具体就是EventLoop模块在实例化对象的时候,必须在线程内部。因为EventLoop实例化对象时会设置自己的thread_id,如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置存在问题:在构造EventLoop对象,到设置新的thread_id期间将是不可控的。因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象。
该模块总结下来就是将eventloop模块和线程整合起来,对外提供的功能:
- 创建线程;
- 在线程中实例化 eventloop 对象;
- 可以向外部返回实例化的eventloop。
下面我们看一下该模块的实现代码:
class LoopThread { private: void ThreadEntry() { EventLoop loop; { std::unique_lock<std::mutex> lock(_mutex); _loop = &loop; _cond.notify_all(); } loop.Start(); } public: LoopThread() : _loop(nullptr) , _thread(std::thread(&LoopThread::ThreadEntry, this)) {} EventLoop *GetLoop() { EventLoop *loop = nullptr; { std::unique_lock<std::mutex> lock(_mutex); _cond.wait(lock, [&](){ return _loop != nullptr; }); loop = _loop; } return _loop; } private: std::mutex _mutex; std::condition_variable _cond; EventLoop* _loop; std::thread _thread; };
4、9 LoopThreadPool模块
LoopThreadPool模块就是对所有的LoopThread进行管理及分配。其功能:
- 线程数量可配置(0个或多个)。注意事项:在服务器中,主从Reactor模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理。因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器,一个线程及负责获取连接,也负责连接的处理。
- 对所有的线程进行管理,其实就是管理0个或多个LoopThread对象。
- 提供线程分配的功能。当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理。假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理。假设有多个从属线程,则采用RR轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)。
下面我们直接看代码一起理解一下。
class LoopThreadPool { public: LoopThreadPool(EventLoop *baseloop) : _thread_count(0) , _next_idx(0) , _baseloop(baseloop) {} void SetThreadCount(int count) { _thread_count = count; } void Create() { if(_thread_count > 0) { _threads.resize(_thread_count); _loops.resize(_thread_count); for(int i = 0; i < _thread_count; i++) { _threads[i] = new LoopThread(); _loops[i] = _threads[i]->GetLoop(); } } return ; } EventLoop *NextLoop() { if(_thread_count == 0) return _baseloop; _next_idx = (_next_idx + 1) % _thread_count; return _loops[_next_idx]; } private: int _thread_count; int _next_idx; EventLoop *_baseloop; std::vector<LoopThread*> _threads; std::vector<EventLoop*> _loops; };
4、10 TcpServer模块
上述我们就已经实现了高并发服务器的所有模块。这个模块就是将上述的所有模块进行了整合,通过 Tcpserver 模块实例化的对象,可以非常简单的完成一个服务器的搭建。
Tcpserver 模块主要管理的对象:
- Acceptor对象,创建一个监听套接字!
- EventLoop 对象,baseloop对象,实现对监听套接字的事件监控!
- std::vector conns,实现对新建连接的管理!
- LoopThreadPool 对象,创建loop线程池,对新建连接进行事件监控和处理!
该模块搭建服务器的主要流程:
- 在TcpServer中实例一个Acceptor对象,以及一个EventLoop 对象(baseloop);
- 将Acceptor 挂在baseloop 进行事件监控;
- 一旦Acceptor 对象就绪了可读事件,则执行时间回调函数获取新建连接;
- 对新连接,创造一个 Connection 进行管理;
- 对新连接对应的 Connection 设置功能回调 (连接完成回调,消息回调,关闭回调,任意事件监控!);
- 启动Connection 的非活跃链接的超时销毁功能
- 将新连接对应的Connection 挂到 LoopThreadPool 中的丛书线程对应的Eventloop 中进行事件监控!
- 一旦Connection对应的链接就绪了可读事件,则这个时候执行读事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调!
那我们在实现的时候就可以主要实现以下功能:
- 设置从属线程池数量;
- 启动服务器;
- 设置各种回调函数。(连接建立完成,消息,关闭,任意) 用户设置给TcpServer TcpServer设置获取的新连接;
- 是否启动非活跃连接超时销毁功能;
- 添加任务。
我们看如下实现代码:
class TcpServer { using ConnectedCallback = std::function<void(const PtrConnection&)>; using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection&)>; using AnyEventCallback = std::function<void(const PtrConnection&)>; using Functor = std::function<void()>; private: void NewConnection(int fd) { _next_id++; PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)); conn->SetMessageCallback(_message_callback); conn->SetClosedCallback(_closed_callback); conn->SetConnectedCallback(_connected_callback); conn->SetAnyEventCallback(_event_callback); conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1)); if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout); // 启动非活跃超时销毁 conn->Established(); // 就绪初始化 _conns.insert(std::make_pair(_next_id, conn)); } void RemoveConnectionInLoop(const PtrConnection &conn) { int id = conn->Id(); auto it = _conns.find(id); if (it != _conns.end()) { _conns.erase(it); } } // 从管理Connection的_conns中移除连接信息 void RemoveConnection(const PtrConnection &conn) { _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn)); } void RunAfterInLoop(const Functor &task, int delay) { _next_id++; _baseloop.TimerAdd(_next_id, delay, task); } public: TcpServer(int port) : _port(port) , _next_id(0) , _enable_inactive_release(false) , _acceptor(&_baseloop, port) , _pool(&_baseloop) { _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1)); _acceptor.Listen(); } void SetThreadCount(int count) { return _pool.SetThreadCount(count); } void SetConnectedCallback(const ConnectedCallback &cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback &cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback &cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback &cb) { _event_callback = cb; } void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; } // 添加一个定时任务 void RunAfter(const Functor& task, int delay) { _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay)); } void Start() { _pool.Create(); _baseloop.Start(); } private: uint64_t _next_id; int _port; int _timeout; bool _enable_inactive_release; EventLoop _baseloop; Acceptor _acceptor; LoopThreadPool _pool; std::unordered_map<uint64_t, PtrConnection> _conns; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; };
4、11 测试代码
有了TcpSerevr模块,我们就可以很好的搭建出一个服务器了。我们只需要设置服务器触发IO事件后的回调即可!具体测试服务器代码如下:
#include "../Server.hpp" class EchoServer { private: TcpServer _server; private: void OnConnected(const PtrConnection &conn) { DBG_LOG("NEW CONNECTION:%p", conn.get()); } void OnClosed(const PtrConnection &conn) { DBG_LOG("CLOSE CONNECTION:%p", conn.get()); } void OnMessage(const PtrConnection &conn, Buffer *buf) { conn->Send(buf->ReadPostion(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); conn->Shutdown(); } public: EchoServer(int port):_server(port) { _server.SetThreadCount(2); _server.EnableInactiveRelease(10); _server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1)); _server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void Start() { _server.Start(); } };
五、HTTP协议支持实现
HTTP协议模块用于对高并发服务器模块进行协议支持,基于提供的协议支持能够更方便的完成指定协议服务器的搭建。而HTTP协议支持模块的实现,可以细分为下述几个小节的模块。
5、1 Util模块
这个模块是一个工具模块,主要提供HTTP协议模块所用到的一些工具函数,比如url编解码,文件读写....等。其主要提供的功能如下:
- 读取文件内容;
- 向文件写入内容;
- URL编码;
- URL解码;
- 通过HTTP状态码获取描述信息;
- 通过文件后缀名获取mime;
- 判断一个文件是不是目录;
- 判断一个文件是否是一个普通文件;
- HTTP资源路径有效性判断;
该模块其中的实现,可以说是对零碎的功能进行了整合。具体实现代码如下:
std::unordered_map<int, std::string> _statu_msg = { {100, "Continue"}, {101, "Switching Protocol"}, {102, "Processing"}, {103, "Early Hints"}, {200, "OK"}, {201, "Created"}, {202, "Accepted"}, {203, "Non-Authoritative Information"}, {204, "No Content"}, {205, "Reset Content"}, {206, "Partial Content"}, {207, "Multi-Status"}, {208, "Already Reported"}, {226, "IM Used"}, {300, "Multiple Choice"}, {301, "Moved Permanently"}, {302, "Found"}, {303, "See Other"}, {304, "Not Modified"}, {305, "Use Proxy"}, {306, "unused"}, {307, "Temporary Redirect"}, {308, "Permanent Redirect"}, {400, "Bad Request"}, {401, "Unauthorized"}, {402, "Payment Required"}, {403, "Forbidden"}, {404, "Not Found"}, {405, "Method Not Allowed"}, {406, "Not Acceptable"}, {407, "Proxy Authentication Required"}, {408, "Request Timeout"}, {409, "Conflict"}, {410, "Gone"}, {411, "Length Required"}, {412, "Precondition Failed"}, {413, "Payload Too Large"}, {414, "URI Too Long"}, {415, "Unsupported Media Type"}, {416, "Range Not Satisfiable"}, {417, "Expectation Failed"}, {418, "I'm a teapot"}, {421, "Misdirected Request"}, {422, "Unprocessable Entity"}, {423, "Locked"}, {424, "Failed Dependency"}, {425, "Too Early"}, {426, "Upgrade Required"}, {428, "Precondition Required"}, {429, "Too Many Requests"}, {431, "Request Header Fields Too Large"}, {451, "Unavailable For Legal Reasons"}, {501, "Not Implemented"}, {502, "Bad Gateway"}, {503, "Service Unavailable"}, {504, "Gateway Timeout"}, {505, "HTTP Version Not Supported"}, {506, "Variant Also Negotiates"}, {507, "Insufficient Storage"}, {508, "Loop Detected"}, {510, "Not Extended"}, {511, "Network Authentication Required"} }; std::unordered_map<std::string, std::string> _mime_msg = { {".aac", "audio/aac"}, {".abw", "application/x-abiword"}, {".arc", "application/x-freearc"}, {".avi", "video/x-msvideo"}, {".azw", "application/vnd.amazon.ebook"}, {".bin", "application/octet-stream"}, {".bmp", "image/bmp"}, {".bz", "application/x-bzip"}, {".bz2", "application/x-bzip2"}, {".csh", "application/x-csh"}, {".css", "text/css"}, {".csv", "text/csv"}, {".doc", "application/msword"}, {".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"}, {".eot", "application/vnd.ms-fontobject"}, {".epub", "application/epub+zip"}, {".gif", "image/gif"}, {".htm", "text/html"}, {".html", "text/html"}, {".ico", "image/vnd.microsoft.icon"}, {".ics", "text/calendar"}, {".jar", "application/java-archive"}, {".jpeg", "image/jpeg"}, {".jpg", "image/jpeg"}, {".js", "text/javascript"}, {".json", "application/json"}, {".jsonld", "application/ld+json"}, {".mid", "audio/midi"}, {".midi", "audio/x-midi"}, {".mjs", "text/javascript"}, {".mp3", "audio/mpeg"}, {".mpeg", "video/mpeg"}, {".mpkg", "application/vnd.apple.installer+xml"}, {".odp", "application/vnd.oasis.opendocument.presentation"}, {".ods", "application/vnd.oasis.opendocument.spreadsheet"}, {".odt", "application/vnd.oasis.opendocument.text"}, {".oga", "audio/ogg"}, {".ogv", "video/ogg"}, {".ogx", "application/ogg"}, {".otf", "font/otf"}, {".png", "image/png"}, {".pdf", "application/pdf"}, {".ppt", "application/vnd.ms-powerpoint"}, {".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"}, {".rar", "application/x-rar-compressed"}, {".rtf", "application/rtf"}, {".sh", "application/x-sh"}, {".svg", "image/svg+xml"}, {".swf", "application/x-shockwave-flash"}, {".tar", "application/x-tar"}, {".tif", "image/tiff"}, {".tiff", "image/tiff"}, {".ttf", "font/ttf"}, {".txt", "text/plain"}, {".vsd", "application/vnd.visio"}, {".wav", "audio/wav"}, {".weba", "audio/webm"}, {".webm", "video/webm"}, {".webp", "image/webp"}, {".woff", "font/woff"}, {".woff2", "font/woff2"}, {".xhtml", "application/xhtml+xml"}, {".xls", "application/vnd.ms-excel"}, {".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"}, {".xml", "application/xml"}, {".xul", "application/vnd.mozilla.xul+xml"}, {".zip", "application/zip"}, {".3gp", "video/3gpp"}, {".3g2", "video/3gpp2"}, {".7z", "application/x-7z-compressed"} }; class Util { public: static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string>* arry) { size_t offset = 0; while(offset < src.size()) { size_t pos = src.find(sep, offset); if(pos == std::string::npos) { if(pos == src.size()) break; arry->push_back(src.substr(offset)); return arry->size(); } if(pos == offset) { offset = offset + sep.size(); continue; } arry->push_back(src.substr(offset, pos - offset)); offset = pos + sep.size(); } return arry->size(); } static bool ReadFile(const std::string &filename, std::string *buf) { std::ifstream ifs(filename.c_str(), std::ios::binary); if(ifs.is_open() == false) { printf("open %s file failed", filename.c_str()); return false; } size_t fsize = 0; ifs.seekg(0, ifs.end); fsize = ifs.tellg(); ifs.seekg(0, ifs.beg); buf->resize(fsize); ifs.read(&(*buf)[0], fsize); if(ifs.good() == false) { printf("read %s file failed", filename.c_str()); ifs.close(); return false; } ifs.close(); return true; } static bool WriteFile(const std::string &filename, const std::string &buf) { std::ofstream ofs(filename, std::ios::binary | std::ios::trunc); if(ofs.is_open() == false) { printf("open %s file failed", filename.c_str()); return false; } ofs.write(buf.c_str(), buf.size()); if(ofs.good() == false) { printf("write %s file failed", filename.c_str()); ofs.close(); return false; } ofs.close(); return true; } // URL编码,避免URL中资源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义 // 编码格式:将特殊字符的ascii值,转换为两个16进制字符,前缀% C++ -> C%2B%2B // 不编码的特殊字符: RFC3986文档规定 . - _ ~ 字母,数字属于绝对不编码字符 // RFC3986文档规定,编码格式 %HH // W3C标准中规定,查询字符串中的空格,需要编码为+, 解码则是+转空格 static std::string UrlEncode(const std::string url, bool convert_space_to_plus) { std::string res; for (auto &c : url) { if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c)) { res += c; continue; } if (c == ' ' && convert_space_to_plus == true) { res += '+'; continue; } // 剩下的字符都是需要编码成为 %HH 格式 char tmp[4] = {0}; // snprintf 与 printf比较类似,都是格式化字符串,只不过一个是打印,一个是放到一块空间中 snprintf(tmp, sizeof tmp, "%%%02X", c); res += tmp; } return res; } static char HEXTOI(char c) { if (c >= '0' && c <= '9') { return c - '0'; } else if (c >= 'a' && c <= 'z') { return c - 'a' + 10; } else if (c >= 'A' && c <= 'Z') { return c - 'A' + 10; } return -1; } static std::string UrlDecode(const std::string url, bool convert_plus_to_space) { // 遇到了%,则将紧随其后的2个字符,转换为数字,第一个数字左移4位,然后加上第二个数字 + -> 2b %2b->2 << 4 + 11 std::string res; for (int i = 0; i < url.size(); i++) { if (url[i] == '+' && convert_plus_to_space == true) { res += ' '; continue; } if (url[i] == '%' && (i + 2) < url.size()) { char v1 = HEXTOI(url[i + 1]); char v2 = HEXTOI(url[i + 2]); char v = v1 * 16 + v2; res += v; i += 2; continue; } res += url[i]; } return res; } // 响应状态码的描述信息获取 static std::string StatuDesc(int statu) { auto it = _statu_msg.find(statu); if (it != _statu_msg.end()) { return it->second; } return "Unknow"; } // 根据文件后缀名获取文件mime static std::string ExtMime(const std::string &filename) { // a.b.txt 先获取文件扩展名 size_t pos = filename.find_last_of('.'); if (pos == std::string::npos) { return "application/octet-stream"; } // 根据扩展名,获取mime std::string ext = filename.substr(pos); auto it = _mime_msg.find(ext); if (it == _mime_msg.end()) { return "application/octet-stream"; } return it->second; } static bool IsDirectory(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if(ret < 0) { return false; } return S_ISDIR(st.st_mode); } static bool IsRegular(const std::string &filename) { struct stat st; int ret = stat(filename.c_str(), &st); if(ret < 0) { return false; } return S_ISREG(st.st_mode); } // http请求的资源路径有效性判断 // /index.html --- 前边的/叫做相对根目录 映射的是某个服务器上的子目录 // 想表达的意思就是,客户端只能请求相对根目录中的资源,其他地方的资源都不予理会 // /../login, 这个路径中的..会让路径的查找跑到相对根目录之外,这是不合理的,不安全的 static bool ValidPath(const std::string &path) { // 思想:按照/进行路径分割,根据有多少子目录,计算目录深度,有多少层,深度不能小于0 std::vector<std::string> subdir; Split(path, "/", &subdir); int level = 0; for (auto &dir : subdir) { if (dir == "..") { level--; // 任意一层走出相对根目录,就认为有问题 if (level < 0) return false; continue; } level++; } return true; } };
注意,这里的状态码和对应的文件名后缀名获取mime都是固定的一一对应的。我们只需要用一个hash表将他们存储起来,然后又来状态码或者文件后缀名去对应的表中查找即可。
对URL的编码和解码都是有固定的,编码格式:
- URL编码,避免URL中资源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义
- 编码格式:将特殊字符的ascii值,转换为两个16进制字符,前缀% C++ -> C%2B%2B
- 不编码的特殊字符: RFC3986文档规定 . - _ ~ 字母,数字属于绝对不编码字符;
- RFC3986文档规定,编码格式 %HH;
- W3C标准中规定,查询字符串中的空格,需要编码为+, 解码则是+转空格;
在解码的时候遇到了%,则将紧随其后的2个字符,转换为数字,第一个数字左移4位,然后加上第二个数字。例如:%2b->2 << 4 + 11。
为什么还要判断Http请求资源有效性呢?例如:/index.html ,前边的 / 叫做相对根目录,映射的是某个服务器上的子目录。客户端只能请求相对根目录中的资源,其他地方的资源都不予理会。例如这种情况: /../login, 这个路径中的..会让路径的查找跑到相对根目录之外,这是不合理的,不安全的。
5、2 HttpRequest模块
这个模块是HTTP请求数据模块,用于保存HTTP请求数据被解析后的各项请求元素信息。HTTP的请求格式我们就不再说明,不懂的同学可以去搜索一下。该模块就是用来接收到一个数据,按照HTTP请求格式进行解析,得到各个关键要素放到Request中,让HTTP请求的分析更加简单。我们直接看代码:
class HttpRequest { public: std::string _method; // 请求方法 std::string _path; // 资源路径 std::string _version; // 协议版本 std::string _body; // 请求正文 std::smatch _matches; // 资源路径的正则提取数据 std::unordered_map<std::string, std::string> _headers; // 头部字段 std::unordered_map<std::string, std::string> _params; // 查询字符串 public: HttpRequest() : _version("HTTP/1.1") {} void ReSet() { _method.clear(); _path.clear(); _version = "HTTP/1.1"; _body.clear(); std::smatch match; _matches.swap(match); _headers.clear(); _params.clear(); } // 插入头部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } // 判断是否存在指定头部字段 bool HasHeader(const std::string &key) const { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 获取指定头部字段的值 std::string GetHeader(const std::string &key) const { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } // 插入查询字符串 void SetParam(const std::string &key, const std::string &val) { _params.insert(std::make_pair(key, val)); } // 判断是否有某个指定的查询字符串 bool HasParam(const std::string &key) const { auto it = _params.find(key); if (it == _params.end()) { return false; } return true; } // 获取指定的查询字符串 std::string GetParam(const std::string &key) const { auto it = _params.find(key); if (it == _params.end()) { return ""; } return it->second; } // 获取正文长度 size_t ContentLength() const { // Content-Length: 1234\r\n bool ret = HasHeader("Content-Length"); if (ret == false) { return 0; } std::string clen = GetHeader("Content-Length"); return std::stol(clen); } // 判断是否是短链接 bool Close() const { // 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; } };
5、3 HttpResponse模块
当我们对Http请求进行处理后,还要对客户端进行响应。该模块就是让使用者向Response中填充响应要素,完毕后将其组织成HTTP响应格式的数据,发给客户端。Http的相应格式就不再过多解释,我们直接看实现代码:
class HttpResponse { public: int _statu; bool _redirect_flag; std::string _body; std::string _redirect_url; std::unordered_map<std::string, std::string> _headers; public: HttpResponse() : _redirect_flag(false) , _statu(200) {} HttpResponse(int statu) : _redirect_flag(false) , _statu(statu) {} void ReSet() { _statu = 200; _redirect_flag = false; _body.clear(); _redirect_url.clear(); _headers.clear(); } // 插入头部字段 void SetHeader(const std::string &key, const std::string &val) { _headers.insert(std::make_pair(key, val)); } // 判断是否存在指定头部字段 bool HasHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return false; } return true; } // 获取指定头部字段的值 std::string GetHeader(const std::string &key) { auto it = _headers.find(key); if (it == _headers.end()) { return ""; } return it->second; } void SetContent(const std::string &body, const std::string &type = "text/html") { _body = body; SetHeader("Content-Type", type); } void SetRedirect(const std::string &url, int statu = 302) { _statu = statu; _redirect_flag = true; _redirect_url = url; } // 判断是否是短链接 bool Close() { // 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接 if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") { return false; } return true; } };
5、4 HttpContext模块
这个模块是一个HTTP请求接收的上下文模块,主要是为了防止在一次接收的数据中,不是一个完整的HTTP请求,则解析过程并未完成,无法进行完整的请求处理,需要在下次接收到新数据后继续根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。
我们还对处于接收还是响应状态进行了不同的设置。
接收状态:
- 当前处理接受并且处理请求行的阶段——接受请求行;
- 表示接收头部的接收还没处理完毕——接受请求头部;
- 表示正文还没有接受完毕——接受正文;
- 这是一个可以对数据请求处理的阶段——接受数据处理完毕;
- 接受处理请求出错。
响应状态:
- 在请求的接受并且处理的过程中,有可能会出现各种不同的问题,解析出错,访问的资源不对,没有权限等等。而这些错误的响应状态码都是不一样的。
- 当处理完毕,状态就变成了已经接受并且处理请求信息。
实现起来只要跟着我们的状态变化的思路一步一步实现即可。具体实现接口如下:
- 接受请求行;
- 解析请求行;
- 接收头部;
- 解析头部;
- 接受正文;
- 返回解析完成的请求信息。
我们来看具体实现代码:
#define MAX_LINE 8192 class HttpContext { private: int _resp_statu; // 响应状态码 HttpRecvStatu _recv_statu; // 当前接收及解析的阶段状态 HttpRequest _request; // 已经解析得到的请求信息 private: bool ParseHttpLine(const std::string &line) { std::smatch matches; std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase); bool ret = std::regex_match(line, matches, e); if (ret == false) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } // 0 : GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1 // 1 : GET // 2 : /bitejiuyeke/login // 3 : user=xiaoming&pass=123123 // 4 : HTTP/1.1 // 请求方法的获取 _request._method = matches[1]; std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper); // 资源路径的获取,需要进行URL解码操作,但是不需要+转空格 _request._path = Util::UrlDecode(matches[2], false); // 协议版本的获取 _request._version = matches[4]; // 查询字符串的获取与处理 std::vector<std::string> query_string_arry; std::string query_string = matches[3]; // 查询字符串的格式 key=val&key=val....., 先以 & 符号进行分割,得到各个字串 Util::Split(query_string, "&", &query_string_arry); // 针对各个字串,以 = 符号进行分割,得到key 和val, 得到之后也需要进行URL解码 for (auto &str : query_string_arry) { size_t pos = str.find("="); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // BAD REQUEST return false; } std::string key = Util::UrlDecode(str.substr(0, pos), true); std::string val = Util::UrlDecode(str.substr(pos + 1), true); _request.SetParam(key, val); } return true; } bool RecvHttpLine(Buffer *buf) { if (_recv_statu != RECV_HTTP_LINE) return false; // 1. 获取一行数据,带有末尾的换行 std::string line = buf->GetLineAndPop(); // 2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大 if (line.size() == 0) { // 缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 缓冲区中数据不足一行,但是也不多,就等等新数据的到来 return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } bool ret = ParseHttpLine(line); if (ret == false) { return false; } // 首行处理完毕,进入头部获取阶段 _recv_statu = RECV_HTTP_HEAD; return true; } bool RecvHttpHead(Buffer *buf) { if (_recv_statu != RECV_HTTP_HEAD) return false; // 一行一行取出数据,直到遇到空行为止, 头部的格式 key: val\r\nkey: val\r\n.... while (1) { std::string line = buf->GetLineAndPop(); // 2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大 if (line.size() == 0) { // 缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的 if (buf->ReadAbleSize() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } // 缓冲区中数据不足一行,但是也不多,就等等新数据的到来 return true; } if (line.size() > MAX_LINE) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 414; // URI TOO LONG return false; } if (line == "\n" || line == "\r\n") { break; } bool ret = ParseHttpHead(line); if (ret == false) { return false; } } // 头部处理完毕,进入正文获取阶段 _recv_statu = RECV_HTTP_BODY; return true; } bool ParseHttpHead(std::string &line) { // key: val\r\nkey: val\r\n.... if (line.back() == '\n') line.pop_back(); // 末尾是换行则去掉换行字符 if (line.back() == '\r') line.pop_back(); // 末尾是回车则去掉回车字符 size_t pos = line.find(": "); if (pos == std::string::npos) { _recv_statu = RECV_HTTP_ERROR; _resp_statu = 400; // return false; } std::string key = line.substr(0, pos); std::string val = line.substr(pos + 2); _request.SetHeader(key, val); return true; } bool RecvHttpBody(Buffer *buf) { if (_recv_statu != RECV_HTTP_BODY) return false; // 1. 获取正文长度 size_t content_length = _request.ContentLength(); if (content_length == 0) { // 没有正文,则请求接收解析完毕 _recv_statu = RECV_HTTP_OVER; return true; } // 2. 当前已经接收了多少正文,其实就是往 _request._body 中放了多少数据了 size_t real_len = content_length - _request._body.size(); // 实际还需要接收的正文长度 // 3. 接收正文放到body中,但是也要考虑当前缓冲区中的数据,是否是全部的正文 // 3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据 if (buf->ReadAbleSize() >= real_len) { _request._body.append(buf->ReadPostion(), real_len); buf->MoveReadOffset(real_len); _recv_statu = RECV_HTTP_OVER; return true; } // 3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来 _request._body.append(buf->ReadPostion(), buf->ReadAbleSize()); buf->MoveReadOffset(buf->ReadAbleSize()); return true; } public: HttpContext() : _resp_statu(200), _recv_statu(RECV_HTTP_LINE) {} void ReSet() { _resp_statu = 200; _recv_statu = RECV_HTTP_LINE; _request.ReSet(); } int RespStatu() { return _resp_statu; } HttpRecvStatu RecvStatu() { return _recv_statu; } HttpRequest &Request() { return _request; } // 接收并解析HTTP请求 void RecvHttpRequest(Buffer *buf) { // 不同的状态,做不同的事情,但是这里不要break, 因为处理完请求行后,应该立即处理头部,而不是退出等新数据 switch (_recv_statu) { case RECV_HTTP_LINE: RecvHttpLine(buf); case RECV_HTTP_HEAD: RecvHttpHead(buf); case RECV_HTTP_BODY: RecvHttpBody(buf); } return; } };
下面对上述的主要成员函数的作用进行简单讲解一下:
- ParseHttpLine(const std::string &line): 解析HTTP请求行,根据正则表达式提取请求方法、资源路径、查询参数和协议版本,并进行URL解码操作。
- RecvHttpLine(Buffer *buf): 接收并解析HTTP请求行,从缓冲区中获取一行数据,判断缓冲区数据是否足够一行,如果不足则等待新数据,如果超过最大行限制则返回错误状态码,否则调用ParseHttpLine()函数解析请求行。
- ParseHttpHead(std::string &line): 解析HTTP请求头部,根据键值对格式提取键和值,并保存到请求头部对象中。
- RecvHttpHead(Buffer *buf): 接收并解析HTTP请求头部,从缓冲区中逐行获取数据,直到遇到空行为止,每行都调用ParseHttpHead()函数解析并保存到请求头部对象中。
- RecvHttpBody(Buffer *buf): 接收并处理HTTP请求正文,根据Content-Length头部字段获取正文长度,然后从缓冲区中读取对应长度的数据保存到请求正文对象中。
5、5 HttpServer模块
这个模块是最终给组件使用者提供的HTTP服务器模块了,用于以简单的接口实现HTTP服务器的搭建。HttpServer模块内部包含有一个TcpServer对象:TcpServer对象实现服务器的搭建。HttpServer模块内部包含有两个提供给TcpServer对象的接口∶连接建立成功设置上下文接口,数据处理接口。HttpServer模块内部包含有一个hash-map表存储请求与处理函数的映射表,这个表由组件使用者向HttpServer设置哪些请求应该使用哪些函数进行处理,等TcpServer收到对应的请求就会使用对应的区数进行处理。
我们再来看一下请求路由表:
表中记录了针对哪个请求,应该使用哪个函数来进行业务处理的映射关系。当服务器收到了一个请求,就在请求路由表中,查找有没有对应请求的处理函数,如果有,则执行对应的处理函数即可。说白了,什么请求,怎么处理,由用户来设定,服务器收到了请求只需要执行函数即可。这样做的好处:用户只需要实现业务处理函数,然后将请求与处理函数的映射关系,添加到服务器中。而服务器只需要接收数据,解析数据,查找路由表映射关系,执行业务处理函数。说白了就是用户只需要启动服务器,把请求所需要执行的方法告诉服务器即可。
我们再来看一下要实现简便的搭建HTTP服务器,所需要的要素和提供的功能和要素。
所需要苏:
- GET请求的路由映射表;
- POST请求的路由映射表;
- PUT请求的路由映射表;
- DELETE请求的路由映射表 —— 路由映射表记录对应请求方法的请求的处理函数映射关系;
- 高性能TCP服务器—— 进行连接的IO操作;
- 静态资源相对根目录 —— 实现静态资源的处理。
服务器的处理流程:
- 从socket接受数据放到接受缓冲区;
- 调用OnMessage回调函数进行业务处理;
- 对请求进行解析,得到了一个HTTPREQUEST结构,包含了所有的请求要素!
- 进行请求的路由映射 —— 找到对应请求的处理方法
- 静态资源请求 —— 一些实体文件资源的请求 html,image,将静态资源文件的数据读取出来,填充到HTTPresponse结构中
- 功能性请求 —— 在请求路由映射表中查找处理函数,找到了则执行函数,具体的业务请求,并进行HTTPREsponse结构的数据填充
- 对静态资源请求——功能性请求处理完毕后,得到一个填充了相应信息的httpResponse 的对象,组织http响应格式进行发送!
所需接口如下:
- 添加请求-处理函数映射信息(GET/POST/PUT/DELETE);
- 设置静态资源根目录;
- 设置是否启动超时连接关闭;
- 设置线程池中线程数量;
- 启动服务器;
- OnConnected ---用于给TcpServer设置协议上下文;
- OnMessage -----用于进行缓冲区数据解析处理;
- 请求的路由查找(静态资源请求查找和处理功能性请求的查找和处理);
- 组织响应进行回复。
#define DEFAULT_TIMEOUT 30 class HttpServer { using Handler = std::function<void(const HttpRequest &, HttpResponse *)>; using Handlers = std::vector<std::pair<std::regex, Handler>>; private: void ErrorHandler(const HttpRequest &req, HttpResponse* rsp) { // 组织一个错误展示页面 std::string body; body += "<html>"; body += "<head>"; body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>"; body += "</head>"; body += "<body>"; body += "<h1>"; body += std::to_string(rsp->_statu); body += " "; body += Util::StatuDesc(rsp->_statu); body += "</h1>"; body += "</body>"; body += "</html>"; // 2. 将页面数据,当作响应正文,放入rsp中 rsp->SetContent(body, "text/html"); } //将HttpResponse中的要素按照http协议格式进行组织,发送 void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) { if(req.Close() == true) { rsp.SetHeader("Connection", "close"); } else { rsp.SetHeader("Connection", "keep-alive"); } if(rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) { rsp.SetHeader("Content-Length", std::to_string(rsp._body.size())); } if(rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) { rsp.SetHeader("Content-Type", "application/octet-stream"); } if(rsp._redirect_flag == true) { rsp.SetHeader("Location", rsp._redirect_url); } std::stringstream rsp_str; rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu); for(auto &head : rsp._headers) { rsp_str << head.first << ": " << head.second << "\r\n"; } rsp_str << "\r\n"; rsp_str << rsp._body; conn->Send(rsp_str.str().c_str(), rsp_str.str().size()); } bool IsFileHandler(const HttpRequest& req) { // 1. 必须设置了静态资源根目录 if (_basedir.empty()) { return false; } // 2. 请求方法,必须是GET / HEAD请求方法 if (req._method != "GET" && req._method != "HEAD") { return false; } // 3. 请求的资源路径必须是一个合法路径 if (Util::ValidPath(req._path) == false) { return false; } // 4. 请求的资源必须存在,且是一个普通文件 // 有一种请求比较特殊 -- 目录:/, /image/, 这种情况给后边默认追加一个 index.html // index.html /image/a.png // 不要忘了前缀的相对根目录,也就是将请求路径转换为实际存在的路径 /image/a.png -> ./wwwroot/image/a.png std::string req_path = _basedir + req._path; // 为了避免直接修改请求的资源路径,因此定义一个临时对象 if (req._path.back() == '/') { req_path += "index.html"; } if (Util::IsRegular(req_path) == false) { return false; } return true; } void FileHandler(const HttpRequest &req, HttpResponse *rsp) { std::string req_path = _basedir + req._path; if (req._path.back() == '/') { req_path += "index.html"; } bool ret = Util::ReadFile(req_path, &rsp->_body); if (ret == false) { return; } std::string mime = Util::ExtMime(req_path); rsp->SetHeader("Content-Type", mime); return; } // 功能性请求的分类处理 void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) { // 在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404 // 思想:路由表存储的时键值对 -- 正则表达式 & 处理函数 // 使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理 // /numbers/(\d+) /numbers/12345 for (auto &handler : handlers) { const std::regex &re = handler.first; const Handler &functor = handler.second; bool ret = std::regex_match(req._path, req._matches, re); if (ret == false) { continue; } return functor(req, rsp); // 传入请求信息,和空的rsp,执行处理函数 } rsp->_statu = 404; } void Route(HttpRequest &req, HttpResponse *rsp) { // 1. 对请求进行分辨,是一个静态资源请求,还是一个功能性请求 // 静态资源请求,则进行静态资源的处理 // 功能性请求,则需要通过几个请求路由表来确定是否有处理函数 // 既不是静态资源请求,也没有设置对应的功能性请求处理函数,就返回405 if (IsFileHandler(req) == true) { // 是一个静态资源请求, 则进行静态资源请求的处理 return FileHandler(req, rsp); } if (req._method == "GET" || req._method == "HEAD") { return Dispatcher(req, rsp, _get_route); } else if (req._method == "POST") { return Dispatcher(req, rsp, _post_route); } else if (req._method == "PUT") { return Dispatcher(req, rsp, _put_route); } else if (req._method == "DELETE") { return Dispatcher(req, rsp, _delete_route); } rsp->_statu = 405; // Method Not Allowed return; } // 设置上下文 void OnConnected(const PtrConnection &conn) { conn->SetContext(HttpContext()); DBG_LOG("NEW CONNECTION %p", conn.get()); } // 缓冲区数据解析+处理 void OnMessage(const PtrConnection &conn, Buffer *buffer) { while (buffer->ReadAbleSize() > 0) { // 1. 获取上下文 HttpContext *context = conn->GetContext()->get<HttpContext>(); // 2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象 // 1. 如果缓冲区的数据解析出错,就直接回复出错响应 // 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理 context->RecvHttpRequest(buffer); HttpRequest &req = context->Request(); HttpResponse rsp(context->RespStatu()); if (context->RespStatu() >= 400) { // 进行错误响应,关闭连接 ErrorHandler(req, &rsp); // 填充一个错误显示页面数据到rsp中 WriteReponse(conn, req, rsp); // 组织响应发送给客户端 context->ReSet(); buffer->MoveReadOffset(buffer->ReadAbleSize()); // 出错了就把缓冲区数据清空 conn->Shutdown(); // 关闭连接 return; } if (context->RecvStatu() != RECV_HTTP_OVER) { // 当前请求还没有接收完整,则退出,等新数据到来再重新继续处理 return; } // 3. 请求路由 + 业务处理 Route(req, &rsp); // 4. 对HttpResponse进行组织发送 WriteReponse(conn, req, rsp); // 5. 重置上下文 context->ReSet(); // 6. 根据长短连接判断是否关闭连接或者继续处理 if (rsp.Close() == true) conn->Shutdown(); // 短链接则直接关闭 } return; } public: HttpServer(int port, int timeout = DEFAULT_TIMEOUT) : _server(port) { _server.EnableInactiveRelease(timeout); _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1)); _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2)); } void SetBaseDir(const std::string &path) { assert(Util::IsDirectory(path) == true); _basedir = path; } /*设置/添加,请求(请求的正则表达)与处理函数的映射关系*/ void Get(const std::string &pattern, const Handler &handler) { _get_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Post(const std::string &pattern, const Handler &handler) { _post_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Put(const std::string &pattern, const Handler &handler) { _put_route.push_back(std::make_pair(std::regex(pattern), handler)); } void Delete(const std::string &pattern, const Handler &handler) { _delete_route.push_back(std::make_pair(std::regex(pattern), handler)); } void SetThreadCount(int count) { _server.SetThreadCount(count); } void Listen() { _server.Start(); } private: Handlers _get_route; Handlers _post_route; Handlers _put_route; Handlers _delete_route; std::string _basedir; TcpServer _server; };
六、对服务器进行测试
6、1 长连接测试
我们知道一个长连接,当请求完一次资源后,并不会直接断开连接,而是仍然可以向服务器请求资源。短连接则就是请求一次资源后直接断开连接。对长连接测试思路:一个连接中每隔3s向服务器发送一个请求,查看是否会收到响应,同时直到超过超时时间看看是否正常。测试代码如下:
int main() { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); /*长连接测试:创建一个客户端持续给服务器发送数据,直到超过超时时间看看是否正常*/ std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0; }
超时时间是10s,如下图:
6、2 不完整报文请求
我们知道,再给服务器发送数据时,都会携带一个Content-length属性,表示有效数据的长度。当时我们现在给服务器发送一个数据,告诉服务器要发送1024字节的数据,但是实际发送的数据不足1024,查看服务器处理结果。其实我们也能想出来结果:
如果数据只发送一次,服务器将得不到完整请求,就不会进行业务处理,客户端也就得不到响应,最终超时关闭连接。
连着给服务器发送了多次小的请求,服务器会将后边的请求当作前边请求的正文进行处理,而后便处理的时候有可能就会因为处理错误而关闭连接。
测试代码如下:
int main() { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nGgggggtm"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); // assert(cli_sock.Send(req.c_str(), req.size()) != -1); // assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0; }
这里结果就不再给大家展示,大家可自行测试。
6、3 业务处理超时测试
接收请求的数据,但是业务处理的时间过长,超过了设置的超时销毁时间(服务器性能达到瓶颈),观察服务端的处理。预期结果:在一次业务处理中耗费太长时间,导致其他连接被连累超时,导致其他的连接有可能会超时释放。
假设有12345描述符就绪了,在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度,则存在两种可能处理结果:
- 如果接下来的2345描述符都是通信连接描述符,恰好本次也都就绪了事件,则并不影响,因为等1处理完了,接下来就会进行处理并刷新活跃度。
- 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉,这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)。
因此,在任意的事件处理中,都不应该直接对连接进行释放,而应该将释放操作压入到任务池中,等所有连接事件处理完了,然后执行任务池中的任务的时候再去进行释放。
测试代码如下:
int main() { signal(SIGCHLD, SIG_IGN); for (int i = 0; i < 10; i++) { pid_t pid = fork(); if (pid < 0) { DBG_LOG("FORK ERROR"); return -1; }else if (pid == 0) { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); } cli_sock.Close(); exit(0); } } while(1) sleep(1); return 0; }
我们只需要将业务处理休眠上15秒,即超过超时即可。具体如下去:
6、4 一次发送多条数据测试
给服务器发送的一条数据中包含有多个HTTP请求,观察服务器的处理。预期结果:每一条请求都有其对应的响应。
测试代码如下:
int main() { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n"; while(1) { assert(cli_sock.Send(req.c_str(), req.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); sleep(3); } cli_sock.Close(); return 0; }
6、5 大文件传输测试
使用put请求上传一个大文件进行保存,大文件数据的接收会被分在多次请求中接收,然后计算源文件和上传后保存的文件的MD5值,判断请求的接收处埋是否存在问题。(这里主要观察的是上下文的处理过程是否正常。)测试代码如下:
int main() { Socket cli_sock; cli_sock.CreateClient(8080, "127.0.0.1"); std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n"; std::string body; Util::ReadFile("./hello.txt", &body); req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n"; assert(cli_sock.Send(req.c_str(), req.size()) != -1); assert(cli_sock.Send(body.c_str(), body.size()) != -1); char buf[1024] = {0}; assert(cli_sock.Recv(buf, 1023)); DBG_LOG("[%s]", buf); sleep(3); cli_sock.Close(); return 0; }
我们还需要将put方法的处理进行修改,如下图:
6、6 性能测试
首先说明一下服务器测试环境:云服务器。配置为:CPU 2核 - 内存2GB,带宽:4Mbps。服务器程序采用1主3从reactor模式。具体如下图:
正常情况下,客户端应该不再使用该同一台服务器,因为会抢占云服务器资源。我们先来看一下在该服务器上进行本地还会测试。具体测试怎么进行呢?
我们采用了webbench工具。其原理是:创建大量的进程,在进程中创建客户端连接服务器发送请求,收到响应后关闭连接,开始下一个连接的建立。我们先使用webbench进行500并发量如下:
运行完后的结果:
QPS(每秒处理的包的数量)为2050。处理失败的包并没有。也就是500并发量没有任何问题。
接下来我们再来看一下处理5000并发量如何呢。如下图:
我们再来看一下结果:
QPS(每秒处理的包的数量)大概为2000左右。处理失败的包也并没有。也就是5000并发量没有任何问题。
我们再来看一下处理10000的并发量如何。具体如下图:
运行结果如下:
QPS(每秒处理的包的数量)大概为2000左右。处理失败的包也并没有。也就是轻轻松松可处理上万的并发量。
以上测试中,使用浏览器访问服务器,均能流畅获取请求的页面。但是根据测试结果能够看出,虽然并发量一直在提高,但是总的请求服务器的数量并没有增加,反而有所降低,侧面反馈了处理所耗时间更多了,基本上可以根据12w/min左右的请求量计算出10000并发量时服务器的极限了,但是这个测试其实意义不大,因为测试客户端和服务器都在同一台机器上,专输的速度更快,但同时抢占cpu也影响了处理,最好的方式就是在两台不同的机器上进行测试,这里只是通过这个方法告诉大家该如何对服务器进行性能测试。