项目gitee:仿muduo: 仿muduo
一:项目目的
1.1项目简介
通过咱们实现的⾼并发服务器组件,可以简洁快速的完成⼀个⾼性能的服务器搭建。
并且,通过组件内提供的不同应⽤层协议⽀持,也可以快速完成⼀个⾼性能应⽤服务器的搭建(当前项⽬中提供HTTP协议组件的⽀持)。
在这⾥,要明确的是咱们要实现的是⼀个⾼并发服务器组件,因此当前的项⽬中并不包含实际的业务容。
1.2HTTP服务器
概念:
HTTP(Hyper Text Transfer Protocol),超⽂本传输协议是应⽤层协议,是⼀种简单的请求-响应协 议(客⼾端根据⾃⼰的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)。 协议细节在之前的博客上已经讲过,这⾥不再赘述。但是需要注意的是HTTP协议是⼀个运⾏在TCP协议之上 的应⽤层协议,这⼀点本质上是告诉我们,HTTP服务器其实就是个TCP服务器,只不过在应⽤层基于 HTTP协议格式进⾏数据的组织和解析来明确客⼾端的请求并完成业务处理。
因此实现HTTP服务器简单理解,只需要以下⼏步即可
1.
搭建⼀个TCP服务器,接收客⼾端请求。
2.
以HTTP协议格式进⾏解析请求数据,明确客⼾端⽬的。
3.
明确客⼾端请求⽬的后提供对应服务。
4.
将服务结果⼀HTTP协议格式进⾏组织,发送给客⼾端
实现⼀个HTTP服务器很简单,但是实现⼀个⾼性能的服务器并不简单,这个单元中将讲解基于
Reactor模式的⾼性能服务器实现。
当然准确来说,因为我们要实现的服务器本⾝并不存在业务,咱们要实现的应该算是⼀个⾼性能服务 器基础库,是⼀个基础组件。
1.3Reactor模型:
概念
Reactor 模式,是指通过⼀个或多个输⼊同时传递给服务器进⾏请求处理时的事件驱动处理模式。
服务端程序处理传⼊多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫
Dispatcher 模式。
简单理解就是使⽤
I/O
多路复⽤ 统⼀监听事件,收到事件后分发给处理进程或线程,是编写⾼性能
⽹络服务器的必备技术之⼀。
分类:
单Reactor单线程:单I/O多路复⽤+业务处理
1.
通过IO多路复⽤模型进⾏客⼾端请求监控
2.
触发事件后,进⾏事件处理
a.
如果是新建连接请求,则获取新建连接,并添加⾄多路复⽤模型进⾏事件监控。
b.
如果是数据通信请求,则进⾏对应数据处理(接收数据,处理数据,发送响应)。
优点:所有操作均在同⼀线程中完成,思想流程较为简单,不涉及进程/线程间通信及资源争抢问题。
缺点:⽆法有效利⽤CPU多核资源,很容易达到性能瓶颈。
适⽤场景:适⽤于客⼾端数量较少,且处理速度较为快速的场景。(处理较慢或活跃连接较多,会导
致串⾏处理的情况下,后处理的连接⻓时间⽆法得到响应)。
单Reactor多线程:单I/O多路复⽤+线程池(业务处理)
1.
Reactor线程通过I/O多路复⽤模型进⾏客⼾端请求监控
2.
触发事件后,进⾏事件处理
a.
如果是新建连接请求,则获取新建连接,并添加⾄多路复⽤模型进⾏事件监控。
b.
如果是数据通信请求,则接收数据后分发给Worker线程池进⾏业务处理。
c.
⼯作线程处理完毕后,将响应交给Reactor线程进⾏数据响应
优点:充分利⽤CPU多核资源
缺点:多线程间的数据共享访问控制较为复杂,单个Reactor 承担所有事件的监听和响应,在单线程中
运⾏,⾼并发场景下容易成为性能瓶颈。
多Reactor多线程:多I/O多路复⽤+线程池(业务处理)
1.
在主Reactor中处理新连接请求事件,有新连接到来则分发到⼦Reactor中监控
2.
在⼦Reactor中进⾏客⼾端通信监控,有事件触发,则接收数据分发给Worker线程池
3.
Worker线程池分配独⽴的线程进⾏具体的业务处理
a.
⼯作线程处理完毕后,将响应交给⼦Reactor线程进⾏数据响应
优点:充分利⽤CPU多核资源,主从Reactor各司其职
1.4⽬标定位:
One Thread One Loop主从Reactor模型⾼并发服务器
咱们要实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连
接,保证获取新连接的⾼效性,提⾼服务器的并发性能。
主Reactor获取到新连接后分发给⼦Reactor进⾏通信事件监控。⽽⼦Reactor线程监控各⾃的描述符的 读写事件进⾏数据读写以及业务处理。
One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进⾏,⼀个线程对应⼀个事件处理 的循环。 当前实现中,因为并不确定组件使⽤者的使⽤意向,因此并不提供业务层⼯作线程池的实现,只现 主从Reactor,⽽Worker⼯作线程池,可由组件库的使⽤者的需要⾃⾏决定是否使⽤和实现。
二:功能模块划分:
基于以上的理解,我们要实现的是⼀个带有协议⽀持的Reactor模型⾼性能服务器,因此将整个项⽬的 实现划分为两个⼤的模块:
•
SERVER模块:实现Reactor模型的TCP服务器;
•
协议模块:对当前的Reactor模型服务器提供应⽤层协议⽀持。
SERVER模块:
SERVER模块就是对所有的连接以及线程进⾏管理,让它们各司其职,在合适的时候做合适的事,最终 完成⾼性能服务器组件的实现。
⽽具体的管理也分为三个⽅⾯:
•
监听连接管理:对监听连接进⾏管理。
•
通信连接管理:对通信连接进⾏管理。
•
超时连接管理:对超时连接进⾏管理。
基于以上的管理思想,将这个模块进⾏细致的划分⼜可以划分为以下多个⼦模块:
Buffer模块:
Buffer模块是⼀个缓冲区模块,⽤于实现通信中⽤⼾态的接收缓冲区和发送缓冲区功能

Socket模块:
Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。

Channel模块:
Channel模块是对⼀个描述符需要进⾏的IO事件管理的模块,实现对描述符可读,可写,错误...事件的 管理操作,以及Poller模块对描述符进⾏IO事件监控就绪后,根据不同的事件,回调不同的处理函数功 能。

Connection模块
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对⼀个通信套 接字的整体的管理,每⼀个进⾏数据通信的套接字(也就是accept获取到的新连接)都会使⽤
Connection进⾏管理。
•
Connection模块内部包含有三个由组件使⽤者传⼊的回调函数:连接建⽴完成回调,事件回调,
新数据回调,关闭回调。
•
Connection模块内部包含有两个组件使⽤者提供的接⼝:数据发送接⼝,连接关闭接⼝
•
Connection模块内部包含有两个⽤⼾态缓冲区:⽤⼾态接收缓冲区,⽤⼾态发送缓冲区
•
Connection模块内部包含有⼀个Socket对象:完成描述符⾯向系统的IO操作
•
Connection模块内部包含有⼀个Channel对象:完成描述符IO事件就绪的处理
具体处理流程如下:
1.
实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描 述符添加到Poller事件监控中。
2.
当描述符在Poller模块中就绪了IO可读事件,则调⽤描述符对应Channel中保存的读事件处理函
数,进⾏数据读取,将socket接收缓冲区全部读取到Connection管理的⽤⼾态接收缓冲区中。然
后调⽤由组件使⽤者传⼊的新数据到来回调函数进⾏处理。
3.
组件使⽤者进⾏数据的业务处理完毕后,通过Connection向使⽤者提供的数据发送接⼝,将数据 写⼊Connection的发送缓冲区中。
4.
启动描述符在Poll模块中的IO写事件监控,就绪后,调⽤Channel中保存的写事件处理函数,将发 送缓冲区中的数据通过Socket进⾏⾯向系统的实际数据发送。

Acceptor模块:
Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管 理。
•
Acceptor模块内部包含有⼀个Socket对象:实现监听套接字的操作
•
Acceptor模块内部包含有⼀个Channel对象:实现监听套接字IO事件就绪的处理
具体处理流程如下:
1.
实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接
2.
为新连接构建⼀个Connection对象出来。

TimerQueue模块:
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管 理器中添加⼀个任务,任务将在固定时间后被执⾏,同时也可以通过刷新定时任务来延迟任务的执 ⾏。
这个模块主要是对Connection对象的⽣命周期管理,对⾮活跃连接进⾏超时后的释放功能。
TimerQueue模块内部包含有⼀个timerfd:linux系统提供的定时器。
TimerQueue模块内部包含有⼀个Channel对象:实现对timerfd的IO时间就绪回调处理

Poller模块:
Poller模块是对epoll进⾏封装的⼀个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接 功能。

EventLoop模块:
EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimerQueue模块, Socket模块的⼀个整体封装,进⾏所有描述符的事件监控。
EventLoop模块必然是⼀个对象对应⼀个线程的模块,线程内部的⽬的就是运⾏EventLoop的启动函 数。
EventLoop模块为了保证整个服务器的线程安全问题,因此要求使⽤者对于Connection的所有操作⼀ 定要在其对应的EventLoop线程内完成,不能在其他线程中进⾏(⽐如组件使⽤者使⽤Connection发 送数据,以及关闭连接这种操作)。
EventLoop模块保证⾃⼰内部所监控的所有描述符,都要是活跃连接,⾮活跃连接就要及时释放避免 资源浪费。
•
EventLoop模块内部包含有⼀个eventfd:eventfd其实就是linux内核提供的⼀个事件fd,专⻔⽤于
事件通知。
•
EventLoop模块内部包含有⼀个Poller对象:⽤于进⾏描述符的IO事件监控。
•
EventLoop模块内部包含有⼀个TimerQueue对象:⽤于进⾏定时任务的管理。
•
EventLoop模块内部包含有⼀个PendingTask队列:组件使⽤者将对Connection进⾏的所有操作, 都加⼊到任务队列中,由EventLoop模块进⾏管理,并在EventLoop对应的线程中进⾏执⾏。
•
每⼀个Connection对象都会绑定到⼀个EventLoop上,这样能保证对这个连接的所有操作都是在
⼀个线程中完成的。
具体操作流程:
1.
通过Poller模块对当前模块管理内的所有描述符进⾏IO事件监控,有描述符事件就绪后,通过描述 符对应的Channel进⾏事件处理。
2.
所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进⾏执⾏。
3.
由于epoll的事件监控,有可能会因为没有事件到来⽽持续阻塞,导致任务队列中的任务不能及时得 到执⾏,因此创建了eventfd,添加到Poller的事件监控中,⽤于实现每次向任务队列添加任务的时 候,通过向eventfd写⼊数据来唤醒epoll的阻塞。

TcpServer模块:
这个模块是⼀个整体Tcp服务器模块的封装,内部封装了Acceptor模块,EventLoopThreadPool模
块。
•
TcpServer中包含有⼀个EventLoop对象:以备在超轻量使⽤场景中不需要EventLoop线程池,只
需要在主线程中完成所有操作的情况。
•
TcpServer模块内部包含有⼀个EventLoopThreadPool对象:其实就是EventLoop线程池,也就是
⼦Reactor线程池
•
TcpServer模块内部包含有⼀个Acceptor对象:⼀个TcpServer服务器,必然对应有⼀个监听套接
字,能够完成获取客⼾端新连接,并处理的任务。
•
TcpServer模块内部包含有⼀个std::shared_ptr<Connection>的hash表:保存了所有的新建连接
对应的Connection,注意,所有的Connection使⽤shared_ptr进⾏管理,这样能够保证在hash表
中删除了Connection信息后,在shared_ptr计数器为0的情况下完成对Connection资源的释放操
作。
具体操作流程如下:
1.
在实例化TcpServer对象过程中,完成BaseLoop的设置,Acceptor对象的实例化,以及EventLoop
线程池的实例化,以及std::shared_ptr<Connection>的hash表的实例化。
2.
为Acceptor对象设置回调函数:获取到新连接后,为新连接构建Connection对象,设置
Connection的各项回调,并使⽤shared_ptr进⾏管理,并添加到hash表中进⾏管理,并为
Connection选择⼀个EventLoop线程,为Connection添加⼀个定时销毁任务,为Connection添加
事件监控,
3.
启动BaseLoop。

下面是模块之间的关系图
模块关系图

时间轮思想:
上述的例⼦,存在⼀个很⼤的问题,每次超时都要将所有的连接遍历⼀遍,如果有上万个连接,效率 ⽆疑是较为低下的。
这时候⼤家就会想到,我们可以针对所有的连接,根据每个连接最近⼀次通信的系统时间建⽴⼀个⼩ 根堆,这样只需要每次针对堆顶部分的连接逐个释放,直到没有超时的连接为⽌,这样也可以⼤⼤提 ⾼处理的效率。
上述⽅法可以实现定时任务,但是这⾥给⼤家介绍另⼀种⽅案:时间轮
时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针⾛到3的时候,就代表时间到
了。 同样的道理,如果我们定义了⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后 ⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需 要将任务添加到tick+3位置,则每秒中⾛⼀步,三秒钟后tick⾛到对应位置,这时候执⾏对应位置的任 务即可
但是,同⼀时间可能会有⼤批量的定时任务,因此我们可以给数组对应位置下拉⼀个数组,这样就可
以在同⼀个时刻上添加多个定时任务了。

当然,上述操作也有⼀些缺陷,⽐如我们如果要定义⼀个60s后的任务,则需要将数组的元素个数设置 为60才可以,如果设置⼀⼩时后的定时任务,则需要定义3600个元素的数组,这样⽆疑是⽐较⿇烦 的。
因此,可以采⽤多层级的时间轮,有秒针轮,分针轮,时针轮, 60<time<3600则time/60就是分针轮 对应存储的位置,当tick/3600等于对应位置的时候,将其位置的任务向分针,秒针轮进⾏移动。 因为当前我们的应⽤中,倒是不⽤设计的这么⿇烦,因为我们的定时任务通常设置的30s以内,所以简 单的单层时间轮就够⽤了。
但是,我们也得考虑⼀个问题,当前的设计是时间到了,则主动去执⾏定时任务,释放连接,那能不 能在时间到了后,⾃动执⾏定时任务呢,这时候我们就想到⼀个操作-类的析构函数。
⼀个类的析构函数,在对象被释放时会⾃动被执⾏,那么我们如果将⼀个定时任务作为⼀个类的析构 函数内的操作,则这个定时任务在对象被释放的时候就会执⾏。
但是仅仅为了这个⽬的,⽽设计⼀个额外的任务类,好像有些不划算,但是,这⾥我们⼜要考虑另⼀ 个问题,那就是假如有⼀个连接建⽴成功了,我们给这个连接设置了⼀个30s后的定时销毁任务,但是
在第10s的时候,这个连接进⾏了⼀次通信,那么我们应该时在第30s的时候关闭,还是第40s的时候关 闭呢?⽆疑应该是第40s的时候。也就是说,这时候,我们需要让这个第30s的任务失效,但是我们该 如何实现这个操作呢?
这⾥,我们就⽤到了智能指针shared_ptr,shared_ptr有个计数器,当计数为0的时候,才会真正释放 ⼀个对象,那么如果连接在第10s进⾏了⼀次通信,则我们继续向定时任务中,添加⼀个30s后(也就
是第40s)的任务类对象的shared_ptr,则这时候两个任务shared_ptr计数为2,则第30s的定时任务 被释放的时候,计数-1,变为1,并不为0,则并不会执⾏实际的析构函数,那么就相当于这个第30s的 任务失效了,只有在第40s的时候,这个任务才会被真正释放。
上述过程就是时间轮定时任务的思想了,当然这⾥为了更加简便的实现,进⾏了⼀些⼩⼩的调整实
现。
具体模块实现代码如下
#include <iostream>
#include <vector>
#include <unordered_map>
#include <functional>
#include <cstdint>
#include <memory>
#include <unistd.h>
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
uint64_t _id; // 定时器的ID
uint64_t _timeout; // 超时时间
bool _canceled; //取消定时任务
TaskFunc _task_cb; // 回调函数的任务
ReleaseFunc _release; // 清楚时间轮中的记录
public:
TimerTask(uint64_t id, uint64_t timeout, TaskFunc cb)
: _id(id),
_timeout(timeout),
_task_cb(cb),
_canceled(false)
{
}
void SetRelease(ReleaseFunc release)
{
_release = release;
}
~TimerTask()
{
if(!_canceled)
_task_cb();
_release();
}
void Cancel()
{
_canceled=true;
}
uint32_t DelayTime()
{
return _timeout;
}
};
class TimeWheel
{
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; // 秒针
int _capacity; // 最大定时时间
std::vector<std::vector<PtrTask>> _timeWheel; // 时间轮
std::unordered_map<uint32_t, WeakTask> _timers; // 用来保存定时器的信息
private:
void RemoveTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it != _timers.end())
{
_timers.erase(it);
}
}
public:
TimeWheel()
: _tick(0),
_capacity(60),
_timeWheel(_capacity)
{
}
void TimerAdd(uint64_t id, uint64_t timeout, TaskFunc cb)
{
PtrTask pt(new TimerTask(id, timeout, cb)); // 创建任务
pt->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id)); // 设置删除函数回调
int pos = (_tick + pt->DelayTime()) % _capacity; // 找到要插入到时间轮的位置
_timeWheel[pos].push_back(pt); // 插入时间轮
_timers[id] = WeakTask(pt); // 保存定时器的信息
}
void TimerRefresh(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没找着定时任务,没法刷新,没法延迟
}
PtrTask pt = it->second.lock();
int pos = (_tick + pt->DelayTime()) % _capacity; // 找到要插入到时间轮的位置
_timeWheel[pos].push_back(pt); // 插入时间轮
}
void TimerCancel(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没找着定时任务,没法刷新,没法延迟
}
PtrTask pt=it->second.lock();
pt->Cancel();
}
void RunTimerTask()
{
_tick=(_tick+1)%_capacity;
_timeWheel[_tick].clear();
}
};
//测试代码
/*
class Test {
public:
Test() {std::cout << "构造" << std::endl;}
~Test() {std::cout << "析构" << std::endl;}
};
void DelTest(Test *t) {
delete t;
}
int main()
{
TimeWheel tw;
Test *t = new Test();
std::cout<<1<<std::endl;
tw.TimerAdd(888, 5, std::bind(DelTest, t));
for(int i = 0; i < 5; i++) {
sleep(1);
//std::cout<<1<<std::endl;
tw.TimerRefresh(888);//刷新定时任务
//std::cout<<2<<std::endl;
tw.RunTimerTask();//向后移动秒针
std::cout << "刷新了一下定时任务,重新需要5s中后才会销毁\n";
}
//tw.TimerCancel(888);
while(1) {
sleep(1);
std::cout << "-------------------\n";
tw.RunTimerTask();//向后移动秒针
}
return 0;
}*/
正则库的简单使⽤:
正则表达式(regular expression)描述了⼀种字符串匹配的模式(pattern),可以⽤来检查⼀个串是否 含有某种⼦串、将匹配的⼦串替换或者从某个串中取出符合某个条件的⼦串等。
正则表达式的使⽤,可以使得HTTP请求的解析更加简单(这⾥指的时程序员的⼯作变得的简单,这并 不代表处理效率会变⾼,实际上效率上是低于直接的字符串处理的),使我们实现的HTTP组件库使⽤ 起来更加灵活。

例如下图就是一个关于正则表达式使用的案例,更多的规则可以根据需求去查询。

通⽤类型any类型的实现:
每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在
Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合
度,这个协议接收解析上下⽂就不能有明显的协议倾向,它可以是任意协议的上下⽂信息,因此就需 要⼀个通⽤的类型来保存各种不同的数据结构。
在C语⾔中,通⽤类型可以使⽤void*来管理,但是在C++中,boost库和C++17给我们提供了⼀个通⽤ 类型any来灵活使⽤,如果考虑增加代码的移植性,尽量减少第三⽅库的依赖,则可以使⽤C++17特性中的any,或者⾃⼰来实现。⽽这个any通⽤类型类的实现其实并不复杂,以下是简单的部分实现。
主要设计思路如下图

⾸先Any类肯定不能是⼀个模板类,否则编译的时候 Any<int> a, Any<float>b,需要传类型作
为模板参数,也就是说在使⽤的时候就要确定其类型
这是⾏不通的,因为保存在Content中的协议上下⽂,我们在定义any对象的时候是不知道他们的协
议类型的,因此⽆法传递类型作为模板参数
因此考虑Any内部设计⼀个模板容器holder类,可以保存各种类型数据
⽽因为在Any类中⽆法定义这个holder对象或指针,因为any也不知道这个类要保存什么类型的数
据,因此⽆法传递类型参数
所以,定义⼀个基类placehoder,让holder继承于placeholde,⽽Any类保存⽗类指针即可
当需要保存数据时,则new⼀个带有模板参数的⼦类holder对象出来保存数据,然后让Any类中的⽗ 类指针,指向这个⼦类对象就搞定了
#include <iostream>
#include <typeinfo>
#include <string>
class Any
{
private:
class holder
{
public:
holder(){}
virtual const std::type_info &type() = 0; // 返回保存数据的类型
virtual holder *clone() = 0; // 克隆自身
};
template <class T>
class placeHolder : public holder
{
public:
placeHolder(const T &val)
: _val(val)
{
}
virtual const std::type_info &type()
{
return typeid(_val);
}
virtual placeHolder *clone()
{
return new placeHolder(_val);
}
T _val;
};
holder *_content;
public:
//无参构造
Any() : _content(nullptr) {}
//有参构造
template <class T>
Any(const T val)
: _content(new placeHolder<T>(val))
{
}
//克隆函数
Any(const Any &other)
: _content(other._content == nullptr ? nullptr : other._content->clone()) {} // 这个地方用到了多态,父类指针调用子类的clone函数
//析构
~Any()
{
delete _content;
}
//交换自身和传入对象保存的内容
Any &Swap(Any &other)
{
std::swap(_content, other._content);
return *this;
}
template <class T>
T *get()
{
if (typeid(T) != _content->type())
return nullptr;
else
return &(((placeHolder<T>*)_content)->_val);
}
Any &operator=(const Any &other)
{
// 为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放
Any(other).Swap(*this);
return *this;
}
template <class T>
Any &operator=(const T val)
{
Any(val).Swap(*this);
return *this;
}
};
//测试用例
/*
class Test{
public:
Test() {std::cout << "构造" << std::endl;}
Test(const Test &t) {std::cout << "拷贝" << std::endl;}
~Test() {std::cout << "析构" << std::endl;}
};
int main()
{
Any a;
a = 10;
int *pa = a.get<int>();
std::cout << *pa << std::endl;
a = std::string("nihao");
std::string *ps = a.get<std::string>();
std::cout << *ps << std::endl;
return 0;
}*/
HTTP协议模块:
HTTP协议模块⽤于对⾼并发服务器模块进⾏协议⽀持,基于提供的协议⽀持能够更⽅便的完成指定协 议服务器的搭建。
⽽HTTP协议⽀持模块的实现,可以细分为以下⼏个模块。
Util模块:
这个模块是⼀个⼯具模块,主要提供HTTP协议模块所⽤到的⼀些⼯具函数,⽐如url编解码,⽂件读 写....等。 HttpRequest模块:
这个模块是HTTP请求数据模块,⽤于保存HTTP请求数据被解析后的各项请求元素信息。
HttpResponse模块:
这个模块是HTTP响应数据模块,⽤于业务处理后设置并保存HTTP响应数据的的各项元素信息,最终 会被按照HTTP协议响应格式组织成为响应信息发送给客⼾端。
HttpContext模块:
这个模块是⼀个HTTP请求接收的上下⽂模块,主要是为了防⽌在⼀次接收的数据中,不是⼀个完整的 HTTP请求,则解析过程并未完成,⽆法进⾏完整的请求处理,需要在下次接收到新数据后继续根据上 下⽂进⾏解析,最终得到⼀个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要⼀ 个上下⽂来进⾏控制接收和处理节奏。
HttpServer模块:
这个模块是最终给组件使⽤者提供的HTTP服务器模块了,⽤于以简单的接⼝实现HTTP服务器的搭 建。
HttpServer模块内部包含有⼀个TcpServer对象:TcpServer对象实现服务器的搭建
HttpServer模块内部包含有两个提供给TcpServer对象的接⼝:连接建⽴成功设置上下⽂接⼝,数据处 理接⼝。
HttpServer模块内部包含有⼀个hash-map表存储请求与处理函数的映射表:组件使⽤者向
HttpServer设置哪些请求应该使⽤哪些函数进⾏处理,等TcpServer收到对应的请求就会使⽤对应的函 数进⾏处理。
三:模块实现
SERVER服务器模块实现:
缓冲区Buffer类实现:
Buffer类⽤于实现⽤⼾态缓冲区,提供数据缓冲,取出等功能。


class Buffer
{
#define BUFFER_DEFAULT_SIZE 1024
private:
std::vector<char> _buffer;
uint64_t _reader_idx;//读位置索引
uint64_t _writer_idx;//写位置索引
public:
Buffer()
: _reader_idx(0),
_writer_idx(0),
_buffer(BUFFER_DEFAULT_SIZE)
{
}
// 返回起始位置
char *Begin()
{
return &*_buffer.begin();
}
// 获取写入位置
char *WritePosition()
{
return Begin() + _writer_idx;
}
// 获取读取位置
char *ReadPosition()
{
return Begin() + _reader_idx;
}
// 获取头剩余空间
uint64_t HeadIdleSize()
{
return _reader_idx;
}
// 获取尾部剩余空间
uint64_t TailIdleSize()
{
return _buffer.size() - _writer_idx;
}
// 获取可读数据大小
uint64_t ReadAbleSize()
{
return _writer_idx - _reader_idx;
}
// 将读偏移向后移动
void MoveReadOffset(uint64_t len)
{
if (len == 0)
return;
// 向后移动的大小,必须小于可读数据大小
if(len <= ReadAbleSize())
_reader_idx += len;
else ERR_LOG("MoveReadOffset ERROR");
}
// 将写偏移向后移动
void MoveWriteOffset(uint64_t len)
{
// 向后移动的大小,必须小于当前后边的空闲空间大小
if(len <= TailIdleSize())
_writer_idx += len;
else ERR_LOG("MoveWriteOffset ERROR");
}
// 确保可以空间足够,否则就扩容
void EnsureWriteSpace(uint64_t len)
{
if (TailIdleSize() >= len)
{
return;
}
else
{
if (len <= HeadIdleSize() + TailIdleSize())
{
// 如果头尾空间足够,就尽量节约空间不扩容,只将缓冲区重新整理即可
// 将数据移动到头部,将头部和尾部剩余空间结合
uint64_t rsz = ReadAbleSize();
std::copy(ReadPosition(), ReadPosition() + rsz, Begin());
_reader_idx = 0;
_writer_idx = rsz;
}
else
{
// 总体空间不够就进行扩容
_buffer.resize(_writer_idx + len);
}
}
}
// 写入数据
void Write(const void *data, int len)
{
// 1.保证有足够空间 2.将数据拷入缓冲区
EnsureWriteSpace(len);
const char *d = (const char *)data;
std::copy(d, d + len, WritePosition());
}
void WriteAndPush(const void *data, int len)
{
Write(data, len);
MoveWriteOffset(len);
}
// 写入字符串
void WriteString(std::string &data)
{
Write(data.c_str(), data.size());
}
void WriteStringAndPush(std::string &data)
{
WriteString(data);
MoveWriteOffset(data.size());
}
// 从其他缓冲区写入
void WriteBuffer(Buffer &buf)
{
Write((const void *)buf.ReadPosition(), buf.ReadAbleSize());
}
void WriteBufferAndPush(Buffer &buf)
{
Write((const void *)buf.ReadPosition(), buf.ReadAbleSize());
MoveWriteOffset(buf.ReadAbleSize());
}
// 读取数据
void Read(void *rec, int len)
{
if (len <= ReadAbleSize())
{
// std::copy(Begin() + _reader_size, Begin() + _writer_size, (char *)rec);
std::copy(ReadPosition(), ReadPosition() + len, (char*)rec);
}
else
{
ERR_LOG("读取数据出错");
}
}
void ReadAndPop(void *rec, int len)
{
if (len <= ReadAbleSize())
{
Read(rec, len);
MoveReadOffset(len);
}
else
{
ERR_LOG("ReadAndPop ERROR");
}
}
// 以字符串的形式读取数据
std::string ReadAsString(int len)
{
if (len <= ReadAbleSize())
{
std::string str;
str.resize(len);
Read(&str[0], len);
return str;
}
return "";
}
std::string ReadAsStringAndpop(int len)
{
if(len <= ReadAbleSize())
{
std::string ret = ReadAsString(len);
MoveReadOffset(len);
return ret;
}
else
{
ERR_LOG("ReadAsStringAndpop ERROR");
return "";
}
}
// 查找一行数据
char *FindCRLF()
{
void *res = memchr((void *)ReadPosition(), '\n', ReadAbleSize());
return (char *)res;
}
// 以字符串形式获取一行数据
std::string GetLine()
{
char *pos = FindCRLF();
if (pos == nullptr)
{
return "";
}
//+1是把换行符也取出来,因为pos是下一个行的起始地址
return ReadAsString(pos - ReadPosition() + 1);
}
std::string GetLineAndPop()
{
// 调用GetLine函数获取一行字符串
std::string ret = GetLine();
// 根据获取的字符串长度移动读取偏移量
MoveReadOffset(ret.size());
// 返回获取到的字符串
return ret;
}
// 清空数据
void Clear()
{
_writer_idx = 0;
_reader_idx = 0;
}
};
⽇志宏的实现:
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG
#define LOG(level, format, ...) do{\
if (level < LOG_LEVEL) break;\
time_t t = time(NULL);\
struct tm *ltm = localtime(&t);\
char tmp[32] = {0};\
strftime(tmp, 31, "%H:%M:%S", ltm);\
fprintf(stdout, "[%p %s %s:%d] " format "\n", (void*)pthread_self(), tmp, __FILE__, __LINE__, ##__VA_ARGS__);\
}while(0)
#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)
套接字Socket类实现:
#define MAX_LISEN 1024
class Socket
{
private:
int _socketfd;
public:
Socket() : _socketfd(-1) {}
Socket(int fd) : _socketfd(fd) {}
~Socket() {}
int Fd()
{
return _socketfd;
}
// 创建套接字
bool Create()
{
_socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_socketfd < 0)
{
ERR_LOG("CREATED SOCKET ERROR");
return false;
}
return true;
}
// 绑定端口号
bool Bind(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = bind(_socketfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("BIND ERROR");
return false;
}
return true;
}
// 监听套接字
bool Listen(int backLog = MAX_LISEN)
{
int ret = listen(_socketfd, backLog);
if (ret < 0)
{
ERR_LOG("LISTEN ERROR");
return false;
}
return true;
}
// 向服务器发起连接
bool Connect(const std::string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = connect(_socketfd, (struct sockaddr *)&addr, len);
if (ret < 0)
{
ERR_LOG("CONNECT FAILED");
return false;
}
return true;
}
// 获取新连接
int Accept()
{
int newfd = accept(_socketfd, nullptr, nullptr);
if (newfd < 0)
{
ERR_LOG("ACCEPT FAILED");
return -1;
}
return newfd;
}
// 接收数据
ssize_t Recv(void *buf, size_t len, int flag = 0)
{
ssize_t ret = recv(_socketfd, buf, len, flag);
if (ret <= 0)
{
if (errno == EAGAIN || errno == EINTR)
{
ERR_LOG("RECEIVE AGAIN");
return 0;
}
ERR_LOG("RECEIVE FAILED:%d", errno);
return -1;
}
return ret;
}
ssize_t NoBlockRecv(void *buf, size_t len)
{
// 使用Recv函数接收数据,并设置标志位为MSG_DONTWAIT以实现非阻塞接收
return Recv(buf, len, MSG_DONTWAIT);
}
// 发送数据
int Send(const void *buf, size_t len, int flag = 0)
{
DBG_LOG("SEND:%ld", len);
ssize_t ret = send(_socketfd, buf, len, flag);
if (ret <= 0)
{
if (ret < 0)
{
if (errno == EAGAIN || errno == EINTR)
{
return 0;
}
}
ERR_LOG("SEND FAILED");
return -1;
}
return ret;
}
ssize_t NoBlockSend(void *buf, size_t len)
{
return Send(buf, len, MSG_DONTWAIT);
}
// 关闭套接字
void Close()
{
if (_socketfd != -1)
{
close(_socketfd);
_socketfd = -1;
}
}
// 创建一个服务端连接
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false)
{
if (Create() == false)
return false;
if (block_flag)
NoBlock();
if (Bind(ip, port) == false)
return false;
if (Listen() == false)
return false;
ReuseAdress();
return true;
}
// 创建一个客户端连接
bool CreateClient(uint16_t port, const std::string &ip)
{
if (Create() == false)
return false;
if (Connect(ip, port) == false)
return false;
return true;
}
// 开启地址端口重用
void ReuseAdress()
{
int val = 1;
// 设置套接字选项 SO_REUSEADDR,允许地址重用
setsockopt(_socketfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));
val = 1;
// 设置套接字选项 SO_REUSEPORT,允许端口重用
setsockopt(_socketfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));
}
// 套接字设置为非阻塞
void NoBlock()
{
//获取原本的flag值
int flag = fcntl(_socketfd, F_GETFL, 0);
fcntl(_socketfd, F_SETFL, flag | O_NONBLOCK);//新增非阻塞
}
};
事件管理Channel类实现:
/*
一个channel对应一个fd,去监管一个fd上发生的所有事件
poller负责监管所有channel,poller才具有实际的epoll,poller才是实际的监控者,负责监管所有channel
channel保存一个envent表和相应的回调函数,这也poller才可以根据channel进行相应的监控(根据event判断是否要监控读,写等)和正确的回调(读,写等触发怎么样的回调)
*/
class Poller;
class EventLoop;
class Channel
{
private:
int _fd;
EventLoop *_loop;
uint32_t _events; // 当前要监控的事件
uint32_t _revents; // 对触发事件的监控
using EventCallBack = std::function<void()>;
EventCallBack _read_callback; // 可读事件的回调函数
EventCallBack _write_callback; // 可写事件的回调函数
EventCallBack _error_callback; // 错误事件的回调函数
EventCallBack _close_callback; // 关闭事件的回调函数
EventCallBack _event_callback; // 任意事件触发的回调函数
public:
Channel(EventLoop *Loop, int fd)
: _loop(Loop),
_fd(fd),
_events(0),
_revents(0)
{
}
int Fd()
{
return _fd;
}
// 设置实际就绪的事件
void SetRevents(uint32_t events)
{
_revents = events;
}
// 获取想要监控的事件
uint32_t Event()
{
return _events;
}
// 设置可读事件回调
void SetReadCallBack(const EventCallBack cb)
{
_read_callback = cb;
}
// 设置可写事件回调
void SetWriteCallBack(const EventCallBack cb)
{
_write_callback = cb;
}
// 设置错误事件回调
void SetErrorCallBack(const EventCallBack cb)
{
_error_callback = cb;
}
// 设置关闭事件回调
void SetCloseCallBack(const EventCallBack cb)
{
_close_callback = cb;
}
// 设置任意事件回调
void SetEventCallBack(const EventCallBack cb)
{
_event_callback = cb;
}
// 当前是否可读
bool ReadAble()
{
return _events & EPOLLIN;
}
// 当前是否可写
bool WriteAble()
{
return _events & EPOLLOUT;
}
// 启动读事件监控
void EnableRead()
{
_events |= EPOLLIN;
Update();
}
// 启动写事件监控
void EnableWrite()
{
_events |= EPOLLOUT;
Update();
}
// 关闭写事件监控
void DisableWrite()
{
_events &= ~EPOLLOUT;
Update();
}
// 关闭读事件监控
void DisableRead()
{
_events &= ~EPOLLIN;
Update();
}
// 关闭所有事件监控
void DisableAll()
{
_events = 0;
Update();
}
// 移除对事件的监控
void Remove();
void Update();
// 事件处理,一旦触发就调用该函数,让触发的事件自己决定如何处理
void HandleEvent()
{
//可读可写事件由于可能耗时比较长,所以防止先刷新活跃度导致处理事件的时候超时,所以先处理事件再刷新活跃度防止超时
// 可读事件调用
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
if (_read_callback)
_read_callback();
// 任何事件都调用,刷新活跃度
if (_event_callback)
_event_callback();
}
// 可写事件调用
if (_revents & EPOLLOUT)
{
if (_write_callback)
_write_callback();
// 任何事件都调用,刷新活跃度
if (_event_callback)
_event_callback();
}
//错误或者关闭事件由于可能会导致连接关闭,所以先刷新活跃度防止连接被释放以后再刷新导致错误
//并且由于可能导致连接关闭,所以一次只处理一个
// 错误事件调用
if (_revents & EPOLLERR)
{
// 任何事件都调用,刷新活跃度
if (_event_callback)
_event_callback();
if (_error_callback)
_error_callback();
}// 关闭事件调用
else if (_revents & EPOLLHUP)
{
// 任何事件都调用,刷新活跃度
if (_event_callback)
_event_callback();
DBG_LOG("_CLOSE_CALLBACK");
if (_close_callback)
{
_close_callback();
}
}
}
};
#define MAX_EPOLLEVENTS 1024
//实际的监控者,负责监控所有事件(并根据channel进行事件管理)
class Poller
{
private:
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
private:
// 对epoll直接操作
void Update(Channel *channel, int op)
{
// 获取Channel对象的文件描述符
int fd = channel->Fd();
// 创建epoll事件结构体
struct epoll_event ev;
// 设置事件结构体中的文件描述符
ev.data.fd = fd;
// 设置事件结构体中的事件类型
ev.events = channel->Event();
// 调用epoll_ctl函数更新epoll事件
int ret = epoll_ctl(_epfd, op, fd, &ev);
// 如果epoll_ctl函数执行失败
if (ret < 0)
{
// 打印错误信息
DBG_LOG("EPOLL_CTL FAILED");
}
}
// 判断channel是否已经被poller管理
bool HasChannel(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it != _channels.end())
return true;
else
return false;
}
public:
Poller()
{
_epfd = epoll_create(MAX_EPOLLEVENTS);
if (_epfd < 0)
{
DBG_LOG("EPOLL_CREATE ERROR");
abort();
}
}
// 添加,修改监控事件
void UpdateEvent(Channel *channel)
{
bool ret = HasChannel(channel);
if (ret)
{
// 存在就修改
Update(channel, EPOLL_CTL_MOD);
}
else
{
_channels[channel->Fd()] = channel;
Update(channel, EPOLL_CTL_ADD);
}
}
// 移除监控
void RemoveEvent(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it != _channels.end())
{
_channels.erase(it);
Update(channel, EPOLL_CTL_DEL);
return;
}
DBG_LOG("Remove Event Error");
}
// 开始监控,获取活跃连接
// epoll会返回就绪事件的fd,然后根据fd再查找对应的channel,再由EventLoop调用HandleEvent
void Poll(std::vector<Channel *> *active)
{
int _nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
if (_nfds < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("EPOLL_WAIT FAILED");
abort();
}
for (int i = 0; i < _nfds; i++)
{
auto it = _channels.find(_evs[i].data.fd);
if (it != _channels.end())
{
it->second->SetRevents(_evs[i].events); // 设置实际就绪事件类型
active->push_back(it->second);
}
else
{
DBG_LOG("_CHANNELS FIND ERROR");
}
}
}
};
描述符事件监控Poller类实现:
#define MAX_EPOLLEVENTS 1024
//实际的监控者,负责监控所有事件(并根据channel进行事件管理)
class Poller
{
private:
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
private:
// 对epoll直接操作
void Update(Channel *channel, int op)
{
// 获取Channel对象的文件描述符
int fd = channel->Fd();
// 创建epoll事件结构体
struct epoll_event ev;
// 设置事件结构体中的文件描述符
ev.data.fd = fd;
// 设置事件结构体中的事件类型
ev.events = channel->Event();
// 调用epoll_ctl函数更新epoll事件
int ret = epoll_ctl(_epfd, op, fd, &ev);
// 如果epoll_ctl函数执行失败
if (ret < 0)
{
// 打印错误信息
DBG_LOG("EPOLL_CTL FAILED");
}
}
// 判断channel是否已经被poller管理
bool HasChannel(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it != _channels.end())
return true;
else
return false;
}
public:
Poller()
{
_epfd = epoll_create(MAX_EPOLLEVENTS);
if (_epfd < 0)
{
DBG_LOG("EPOLL_CREATE ERROR");
abort();
}
}
// 添加,修改监控事件
void UpdateEvent(Channel *channel)
{
bool ret = HasChannel(channel);
if (ret)
{
// 存在就修改
Update(channel, EPOLL_CTL_MOD);
}
else
{
_channels[channel->Fd()] = channel;
Update(channel, EPOLL_CTL_ADD);
}
}
// 移除监控
void RemoveEvent(Channel *channel)
{
auto it = _channels.find(channel->Fd());
if (it != _channels.end())
{
_channels.erase(it);
Update(channel, EPOLL_CTL_DEL);
return;
}
DBG_LOG("Remove Event Error");
}
// 开始监控,获取活跃连接
// epoll会返回就绪事件的fd,然后根据fd再查找对应的channel,再由EventLoop调用HandleEvent
void Poll(std::vector<Channel *> *active)
{
int _nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
if (_nfds < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("EPOLL_WAIT FAILED");
abort();
}
for (int i = 0; i < _nfds; i++)
{
auto it = _channels.find(_evs[i].data.fd);
if (it != _channels.end())
{
it->second->SetRevents(_evs[i].events); // 设置实际就绪事件类型
active->push_back(it->second);
}
else
{
DBG_LOG("_CHANNELS FIND ERROR");
}
}
}
};
Reactor-EventLoop实现:
class EventLoop
{
using Functor = std::function<void()>;
std::thread::id _thread_id; // 线程Id
int _event_fd; // 唤醒由于IO事件可能导致的阻塞
std::unique_ptr<Channel> _event_channel;
Poller _poller; // 对描述符进行监控
std::vector<Functor> _tasks; // 任务池
std::mutex _mutex; // 保障对任务池操作的安全性
TimeWheel _timer_wheel;
public:
// 执行任务池中的所有操作
void RunAllTask()
{
std::vector<Functor> tasks;
{
//执行任务的时候直接将任务池中的任务置换出来,再处理,避免执行任务的时候阻塞任务池
std::unique_lock<std::mutex> _lock(_mutex);
tasks.swap(_tasks);
}
for (auto f : tasks)
{
f();
}
return;
}
static int CreateEventFd()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (efd < 0)
{
ERR_LOG("EVENT FD CREATE FAILED");
abort();
}
return efd;
}
void ReadEventFd()
{
uint64_t res = 0;
res = read(_event_fd, &res, sizeof(res));
if (res < 0)
{
if (errno == EINTR)
{
return;
}
ERR_LOG("ReadEventFd ERROR");
}
}
/**
* @brief 唤醒事件文件描述符
*
* 通过向事件文件描述符写入数据来唤醒等待该描述符的线程。
*
* @return 无返回值
*/
void WakeUpEventFd()
{
// 设置写入的值
uint64_t val = 1;
// 将值写入到事件文件描述符
int ret = write(_event_fd, &val, sizeof(val));
if (ret < 0)
{
// 判断错误码是否为EINTR或EAGAIN
// EINTR:被信号打断 EAGAIN:无数据可读
if (errno == EINTR || errno == EAGAIN)
{
// 如果是这两种错误码之一,则直接返回
return;
}
// 如果不是这两种错误码,则记录错误日志
ERR_LOG("ReadEventFd ERROR");
}
}
public:
EventLoop()
: _thread_id(std::this_thread::get_id()),
_event_fd(CreateEventFd()),
_event_channel(new Channel(this, _event_fd)),
_timer_wheel(this)
{
// 给Event_fd添加回调函数读取eventfd,通过readEventfd来打断IO阻塞
//由于epoll的事件监控,有可能会因为没有事件到来⽽持续阻塞,导致任务队列中的任务不能及时得
//到执⾏,因此创建了eventfd,添加到Poller的事件监控中,⽤于实现每次向任务队列添加任务的时
//候,通过向eventfd写⼊数据来唤醒epoll的阻塞。
_event_channel->SetReadCallBack(std::bind(&EventLoop::ReadEventFd, this));
_event_channel->EnableRead();
}
// 判断要执行的任务是否在当前线程中,如果是就直接执行,否则就压入队列
void RunInLoop(const Functor &cb)
{
if (IsInLoop())
{
cb();
}
else
{
QueueInLoop(cb);
}
}
// 开始运行三步走--事件监控=》就绪事件处理=》执行任务
void Start()
{
while (1)
{
std::vector<Channel *> actives;
_poller.Poll(&actives);
//先处理事件触发的回调(读事件回调,写事件回调.....)
for (auto &channel : actives)
{
channel->HandleEvent();
}
//然后再执行任务池中的任务
RunAllTask();
}
}
// 将任务压入队列
void QueueInLoop(const Functor &cb)
{
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.push_back(cb);
}
// 唤醒因为没有事件就绪从而导致的epoll阻塞
// 其实就是给eventfd写入一个数据,从而触发可读事件打断阻塞
// 因为任务池的处理是在有IO事件被触发以后才会被处理的,所有如果完全没有IO事件导致阻塞就会导致任务池的任务也一直不被执行
WakeUpEventFd();
}
// 判断当前线程是否是EventLoop对应的线程
bool IsInLoop()
{
return (_thread_id == std::this_thread::get_id());
}
// 更新事件监控
void UpdateEvent(Channel *channel)
{
_poller.UpdateEvent(channel);
}
// 移除事件监控
void RemoveEvent(Channel *channel)
{
_poller.RemoveEvent(channel);
}
//对定时器的操作封装,方便上层调用
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)
{
return _timer_wheel.TimerAdd(id, delay, cb);
}
void TimerRefresh(uint64_t id)
{
return _timer_wheel.TimerRefresh(id);
}
void TimerCancel(uint64_t id)
{
return _timer_wheel.TimerCancel(id);
}
bool HasTimer(uint64_t id)
{
return _timer_wheel.HasTimer(id);
}
};
定时器模块的整合
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
uint64_t _id; // 定时器的ID
uint64_t _timeout; // 超时时间
bool _canceled; // 取消定时任务
TaskFunc _task_cb; // 回调函数的任务
ReleaseFunc _release; // 清除时间轮中的记录
public:
TimerTask(uint64_t id, uint64_t timeout, TaskFunc cb)
: _id(id),
_timeout(timeout),
_task_cb(cb),
_canceled(false)
{
}
uint64_t ID()
{
return _id;
}
void SetRelease(ReleaseFunc release)
{
_release = release;
}
~TimerTask()
{
if (!_canceled)
_task_cb();
_release();
}
void Cancel()
{
DBG_LOG("TIMER CANCEL");
_canceled = true;
}
//获取超时时间
uint32_t DelayTime()
{
return _timeout;
}
};
class TimeWheel
{
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; // 秒针
int _capacity; // 最大定时时间
std::vector<std::vector<PtrTask>> _timeWheel; // 时间轮
std::unordered_map<uint32_t, WeakTask> _timers; // 用来保存定时器的信息
EventLoop *_loop;
int _timerfd; // 定时器描述符--可读事件回调就是读取计时器执行定时任务
std::unique_ptr<Channel> _timer_channel;
private:
void RemoveTimer(uint64_t id)
{
DBG_LOG("REMOVE TIMER");
auto it = _timers.find(id);
if (it != _timers.end())
{
_timers.erase(it);
}
}
//通过定时器来进行定时,触发一次读IO事件,保证一秒触发一次时间轮的走动
static int CreateTimerfd()
{
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
if (timerfd < 0)
{
ERR_LOG("CREATE TIMERFD FAILED");
abort();
}
struct itimerspec itime;
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0;
itime.it_value.tv_nsec = 0;
itime.it_value.tv_sec = 1;
timerfd_settime(timerfd, 0, &itime, nullptr);
DBG_LOG("CREATE TIMERFD SUCCESS :%d",timerfd);
return timerfd;
}
int ReadTimeFd()
{
uint64_t times;
int ret = read(_timerfd, ×, 8);
if (ret < 0)
{
ERR_LOG("READ TIME FD:%d FAILED %s",_timerfd, strerror(errno));
abort();
}
return times;
}
void RunTimerTask()
{
// DBG_LOG("时间轮被驱动");
_tick = (_tick + 1) % _capacity;
_timeWheel[_tick].clear();
}
//定时器的回调读事件,读事件回调会被IO事件触发引发读回调(设置为该OnTime函数),根据超时时间,移动时间轮去执行定时任务
void OnTime()
{
int ret = ReadTimeFd();
for (int i = 0; i < ret; i++)
RunTimerTask();
}
public:
TimeWheel(EventLoop *loop)
: _tick(0),
_capacity(60),
_timeWheel(_capacity),
_loop(loop),
_timerfd(CreateTimerfd()),
_timer_channel(new Channel(_loop, _timerfd))
{
_timer_channel->SetReadCallBack(std::bind(&TimeWheel::OnTime, this));
_timer_channel->EnableRead();
}
void TimerAdd(uint64_t id, uint64_t delay, TaskFunc cb);
void TimerRefresh(uint64_t id);
void TimerCancel(uint64_t id);
// 该接口存在线程安全问题,只能在对应的EventLoop线程中被调用
bool HasTimer(uint64_t id)
{
auto it = _timers.find(id);
if (it != _timers.end())
{
return true;
}
return false;
}
void TimerAddInLoop(uint64_t id, uint64_t timeout, TaskFunc cb)
{
PtrTask pt(new TimerTask(id, timeout, cb)); // 创建任务
pt->SetRelease(std::bind(&TimeWheel::RemoveTimer, this, id)); // 设置删除函数回调
int pos = (_tick + pt->DelayTime()) % _capacity; // 找到要插入到时间轮的位置
_timeWheel[pos].push_back(pt); // 插入时间轮
_timers[id] = WeakTask(pt); // 保存定时器的信息
}
void TimerRefreshInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没找着定时任务,没法刷新,没法延迟
}
PtrTask pt = it->second.lock();
int pos = (_tick + pt->DelayTime()) % _capacity; // 找到要插入到时间轮的位置
_timeWheel[pos].push_back(pt); // 插入时间轮
// DBG_LOG("时间轮被刷新");
}
void TimerCancelInLoop(uint64_t id)
{
auto it = _timers.find(id);
if (it == _timers.end())
{
return; // 没找着定时任务,没法刷新,没法延迟
}
PtrTask pt = it->second.lock();
if(pt)
pt->Cancel();
else DBG_LOG("TIMER CANCEL ERROR! PT IS NULL");
DBG_LOG("TIMER CANCEL IN LOOP");
}
};
Connection模块
// DISCONNECTED 已经关闭
// CONNECTING 连接建立成功-待处理状态
// CONNECTED 连接已经建立完成,各项设置已就绪,可以通信
// DISCONNECTING 准备断开连接
typedef enum
{
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING
} ConnStatu;
class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection : public std::enable_shared_from_this<Connection>
//继承自std::enable_shared_from_this<Connection>,这样就可以在Connection内部获取自己的shared_ptr管理对象
{
int _conn_id; // Connection的id
// _tiemr_id 定时器id,这里使用conn_id代替
int _sockfd; // 连接关联的描述符
ConnStatu _statu; // 连接状态
EventLoop *_loop; // 连接关联一个EventLoop
bool _enable_inactive_release = false; // 是否启动非活跃自动销毁,默认关闭
Socket _socket; // 对套接字的管理
Channel _channel; // 对连接事件的管理
Buffer _in_buffer; // 输入缓冲区
Buffer _out_buffer; // 输出缓冲区
Any _context; // 连接的上下文
using ConnectedCallBack = std::function<void(const PtrConnection &)>;
using MessageCallBack = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallBack = std::function<void(const PtrConnection &)>;
using AnyEventCallBack = std::function<void(const PtrConnection &)>;
ConnectedCallBack _connected_callback;
MessageCallBack _message_callback;
ClosedCallBack _closed_callback;
AnyEventCallBack _event_callback;
// 服务器的连接关闭回调函数--组件内设置,服务器会把所有连接管理起来
// 一旦某个连接要关闭,就应该把自己的信息从管理的地方移除
ClosedCallBack _server_closed_callback;
private:
// 监控可读事件的回调函数,读取缓冲区的数据调用_read_callback
void HandleRead()
{
DBG_LOG("HANDLE READ");
// 1. 接收socket的数据,放到缓冲区
char buf[65536];
ssize_t ret = _socket.NoBlockRecv(buf, 65535);
if (ret < 0)
{
// 出错了,不能直接关闭连接
DBG_LOG("RECV ERROR AND SHUT DOWN");
return ShutDownInLoop();
}
// 这里的等于0表示的是没有读取到数据,而并不是连接断开了,连接断开返回的是-1
// 将数据放入输入缓冲区,写入之后顺便将写偏移向后移动
_in_buffer.WriteAndPush(buf, ret);
// 2. 调用message_callback进行业务处理
if (_in_buffer.ReadAbleSize() > 0)
{
DBG_LOG("_message_callback()");
// shared_from_this--从当前对象自身获取自身的shared_ptr管理对象
if (_message_callback)
return _message_callback(shared_from_this(), &_in_buffer);
DBG_LOG("_message_callback()");
}
DBG_LOG("HANDLE READ END");
}
// 监控可写事件的回调函数,发送缓冲区中的数据
void HandleWrite()
{
DBG_LOG("HANDLE WRITE");
int ret = _socket.NoBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
if (ret < 0)
{
// 发送错误就要关闭连接,但是要先把缓冲区接收到剩余数据处理完毕
if (_in_buffer.ReadAbleSize() > 0)
{
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();
}
_out_buffer.MoveReadOffset(ret); // 这里应该是将读取数据位置向后偏移,而不是可写,可写是写入数据时移动的
DBG_LOG("剩余数据量:%ld,status:%d", _out_buffer.ReadAbleSize(), _statu);
if (_out_buffer.ReadAbleSize() == 0)
{
_channel.DisableWrite(); // 没有数据要发送,关闭写事件监控,避免被重复触发
// 如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放
if (_statu == DISCONNECTING)
{
return Release();
}
}
// 如果是待关闭连接状态,那么有数据就先处理数据,没有则直接释放
}
// 监控关闭事件的回调函数
void HandleClose()
{
DBG_LOG("HANDLE CLOSE");
if (_in_buffer.ReadAbleSize() > 0)
{
_message_callback(shared_from_this(), &_in_buffer);
}
return Release();
}
// 监控错误事件的回调函数
void HandleError()
{
return HandleClose();
}
// 监控任意事件的回调函数 1.刷新活跃度 2.调用回调
void HandleEvent()
{
if (_enable_inactive_release == true)
{
_loop->TimerRefresh(_conn_id);
}
if (_event_callback)
{
_event_callback(shared_from_this());
}
}
// 连接获取以后要进行各种设置(给channel设置回调,启动读监控)
void EstablishedInLoop()
{
// 1.修改连接状态 2.启动读事件监控 3.调用回调函数
// 当前步骤一定是处于半连接状态,否则必然是出错了
if (_statu != CONNECTING)
{
ERR_LOG("ESTABLISHED IN LOOP ERROR");
abort();
}
_statu = CONNECTED;
_channel.EnableRead();
if (_connected_callback)
_connected_callback(shared_from_this());
}
// 实际的释放接口
void ReleaseInLoop()
{
DBG_LOG("REALEASE IN LOOP");
// 1.修改连接状态,设为DISCONNECTED
_statu = DISCONNECTED;
// 2.移除连接事件监控
_channel.Remove();
// 3.关闭描述符
_socket.Close();
// DBG_LOG("REALEASE IN LOOP MIDDLE");
// 4.如果当前定时器队列中还有任务,则取消任务
if (_loop->HasTimer(_conn_id))
{
CancelInactiveReleaseInLoop();
}
DBG_LOG("REALEASE IN LOOP MIDDLE");
// 5.先调用关闭回调函数,避免先移除服务器内部信息导致Connection被销毁,再处理会出错
if (_closed_callback)
_closed_callback(shared_from_this());
// 移除服务器内部的连接信息
if (_server_closed_callback)
_server_closed_callback(shared_from_this());
DBG_LOG("REALEASE IN LOOP END");
}
// 并非实际的数据发送接口,只是将数据写入缓冲区,启动写事件
void SendInLoop(Buffer &buf)
{
if (_statu == DISCONNECTED)
return;
_out_buffer.WriteBufferAndPush(buf);
if (_channel.WriteAble() == false)
{
_channel.EnableWrite();
}
}
// 并非实际的关闭连接接口,还需要判断数据是否处理完毕
void ShutDownInLoop()
{
DBG_LOG("SHUT DWON IN LOOP");
_statu = DISCONNECTING;
if (_in_buffer.ReadAbleSize() > 0)
{
if (_message_callback)
_message_callback(shared_from_this(), &_in_buffer);
}
// 要么写入数据出错关闭,要么数据发送完然后再关闭
if (_out_buffer.ReadAbleSize() > 0)
{
if (_channel.WriteAble() == false)
{
_channel.EnableWrite();
}
}
if (_out_buffer.ReadAbleSize() == 0)
{
DBG_LOG("BUFFER EMPTY AND RELEASE IN LOOP");
Release();
}
DBG_LOG("SHUT DOWN IN LOOP:%ld", _out_buffer.ReadAbleSize());
}
// 启动非活跃连接超时
void EnableInactiveReleaseInLoop(int sec)
{
_enable_inactive_release = true;
// 如果定时销毁任务已经存在,则需要先刷新任务一下
if (_loop->HasTimer(_conn_id))
{
return _loop->TimerRefresh(_conn_id);
}
// 如果不存在定时销毁任务则新增
DBG_LOG("ENABLE INACTIVE RELEASE IN LOOP");
_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
}
void CancelInactiveReleaseInLoop()
{
_enable_inactive_release = false;
if (_loop->HasTimer(_conn_id))
{
return _loop->TimerCancel(_conn_id);
}
}
void UpgradeInLoop(const Any &context, const ConnectedCallBack &con, const MessageCallBack &msg, const ClosedCallBack &clo, const AnyEventCallBack &evt)
{
_context = context;
_connected_callback = con;
_message_callback = msg;
_closed_callback = clo;
_event_callback = evt;
}
public:
Connection(EventLoop *loop, uint64_t coon_id, int sockfd) : _conn_id(coon_id), _sockfd(sockfd), _enable_inactive_release(false), _loop(loop),
_statu(CONNECTING), _socket(sockfd), _channel(loop, _sockfd)
{
_channel.SetReadCallBack(std::bind(&Connection::HandleRead, this));
_channel.SetWriteCallBack(std::bind(&Connection::HandleWrite, this));
_channel.SetCloseCallBack(std::bind(&Connection::HandleClose, this));
_channel.SetErrorCallBack(std::bind(&Connection::HandleError, this));
_channel.SetEventCallBack(std::bind(&Connection::HandleEvent, this));
}
~Connection() { DBG_LOG("RELEASE CONNECTION :%p", this); }
// 发送数据,将数据放到缓冲区,启动写事件监控
void Send(const char *data, size_t len)
{
Buffer buf;
buf.WriteAndPush(data, len);
_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
}
int Fd()
{
return _sockfd;
}
int Id()
{
return _conn_id;
}
// 进行channel回调,启动读监控,调用_connected_callback
void Establish()
{
_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
}
// 是否处于连接状态
bool Connected()
{
return _statu;
}
// 设置上下文
void SetContext(const Any &context)
{
_context = context;
}
// 获取上下文
Any *GetContext()
{
return &_context;
}
// 设置回调
void SetConnectedCallBack(const ConnectedCallBack &cb)
{
_connected_callback = cb;
}
void SetMessageCallBack(const MessageCallBack &cb)
{
_message_callback = cb;
}
void SetClosedCallBack(const ClosedCallBack &cb)
{
_closed_callback = cb;
}
void SetAnyEventCallBack(const AnyEventCallBack &cb)
{
_event_callback = cb;
}
void SetSrvClosedCallBack(const ClosedCallBack &cb)
{
_server_closed_callback = cb;
}
// 提供给组件使用者的关闭接口,并不实际关闭,还需要判断数据实际是否处理完毕
void ShutDown()
{
_loop->RunInLoop(std::bind(&Connection::ShutDownInLoop, this));
}
void Release()
{
_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));
}
// 启动非活跃销毁
void EnableInactiveRelease(int sec)
{
_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
}
// 取消非活跃销毁
void CancelInactiveRelease()
{
_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));
}
// 切换协议-重置上下文及处理阶段性数据 非线程安全
void Upgrade(const Any &context, const ConnectedCallBack &con, const MessageCallBack &msg, const ClosedCallBack &clo, const AnyEventCallBack &evt)
{
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, con, msg, clo, evt));
}
};
Accepter模块
class Accepter
{
private:
Socket _socket; // 创建监听套接字
EventLoop *_loop; // 对监听套接字进行事件监控
Channel _channel; // 对监听事件进行事件管理
using AcceptCallBack = std::function<void(int)>;
AcceptCallBack _accept_callback;
private:
// 监听套接字读事件的回调--获取新连接 回调_accept_callback
void HandleRead()
{
int newfd = _socket.Accept();
if (newfd < 0)
{
return;
}
if (_accept_callback)
_accept_callback(newfd);
}
int CreateServer(int port)
{
bool ret = _socket.CreateServer(port);
if (ret == false)
{
abort();
}
return _socket.Fd();
}
public:
Accepter(EventLoop *loop, int port)
: _socket(CreateServer(port)),
_loop(loop),
_channel(loop, _socket.Fd())
{
_channel.SetReadCallBack(std::bind(&Accepter::HandleRead, this));
}
void SetAcceptCallBack(const AcceptCallBack &cb)
{
_accept_callback = cb;
}
/*不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动
否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏*/
void Listen()
{
_channel.EnableRead();
}
};
LoopThread模块
class LoopThread
{
private:
// 用于实现获取Loop的同步,避免线程已经被创建了,但是EventLoop没有被实例化之前就被获取导致的问题
std::mutex _mutex; // 互斥锁
std::condition_variable _cond; // 条件变量
EventLoop *_loop; // 实例对象需要在线程内再实例化
std::thread _thread; // EventLoop对应的线程
private:
// 实例化EventLoop对象,开始运行EventLoop模块的功能
void ThreadEntry()
{
// 实例化对象,唤醒所有被_cond.wait阻塞的线程
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
_cond.notify_all();
}
loop.Start();
}
public:
// 创建线程,设定线程入口函数
LoopThread()
: _loop(nullptr),
_thread(std::thread(&LoopThread::ThreadEntry, this))
{
}
// 获取EventLoop
EventLoop *GetLoop()
{
// 等待_loop不为空,也就是线程已经被创建并且EventLoop对象已经实例化
// 例如主Reactor线程在创建了LoopThread之后就立刻获取EventLoop对象,这时候对象还没有实例化就会导致问题
EventLoop *loop = nullptr;
{
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [&](){ return _loop != nullptr; });
loop = _loop;
}
return loop;
}
};
结合上述的分析所以我们要构造一个新模块
LoopThreadPool模块
// 线程池,用于分发任务到不同的IO线程
// baseloop是主Reactor所在的线程的EventLoop对象,用于分发到其他IO线程
// _threads是所有从Reactor线程的LoopThread对象
// _loops是所有从Reactor线程的EventLoop对象
class LoopThreadPool
{
private:
int _thread_count;
int _next_idx;
EventLoop *_baseloop;
std::vector<LoopThread *> _threads;
std::vector<EventLoop *> _loops;
public:
LoopThreadPool(EventLoop *baseloop)
: _thread_count(0),
_next_idx(0),
_baseloop(baseloop)
{
}
void SetThreadCount(int count)
{
_thread_count = count;
}
void Create()
{
if (_thread_count > 0)
{
_threads.resize(_thread_count);
_loops.resize(_thread_count);
for (int i = 0; i < _thread_count; i++)
{
_threads[i] = new LoopThread();
_loops[i] = _threads[i]->GetLoop();
}
}
}
//采用公平轮转分配(RR轮转)
EventLoop *NextLoop()
{
if (_thread_count == 0)
{
return _baseloop;
}
_next_idx = (_next_idx + 1) % _thread_count;
return _loops[_next_idx];
}
};
TcpServer模块
class TcpServer
{
using Functor = std::function<void()>;
private:
uint64_t _next_id; // 自增id
int _port;
int _timeout; // 非活跃连接的超时时间
bool _Enable_Inactive_Release; // 是否启动非活跃消耗标志
Accepter _accepter; // 监听套接字的管理对象
EventLoop _baseloop; // 主线程的EventLoop
LoopThreadPool _pool; // 从线程loop的线程池
std::unordered_map<uint64_t, PtrConnection> _coons; // 保存管理所有对应连接的shared_ptr对象
using ConnectedCallBack = std::function<void(const PtrConnection &)>;
using MessageCallBack = std::function<void(const PtrConnection &, Buffer *)>;
using ClosedCallBack = std::function<void(const PtrConnection &)>;
using AnyEventCallBack = std::function<void(const PtrConnection &)>;
ConnectedCallBack _connected_callback;
MessageCallBack _message_callback;
ClosedCallBack _closed_callback;
AnyEventCallBack _event_callback;
private:
// 为新连接构建一个Connection进行管理
void NewConnection(int fd)
{
_next_id++;
PtrConnection coon(new Connection(_pool.NextLoop(), _next_id, fd));
coon->SetMessageCallBack(_message_callback);
coon->SetClosedCallBack(_closed_callback);
coon->SetConnectedCallBack(_connected_callback);
coon->SetAnyEventCallBack(_event_callback);
coon->SetSrvClosedCallBack(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
if (_Enable_Inactive_Release)
coon->EnableInactiveRelease(_timeout);
coon->Establish();
_coons[_next_id] = coon;
}
void RemoveConnectionInLoop(const PtrConnection &coon)
{
int id = coon->Id();
auto it = _coons.find(id);
if (it != _coons.end())
{
_coons.erase(it);
}
}
// 从管理Connection的_conns种移除连接信息
void RemoveConnection(const PtrConnection &coon)
{
_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, coon));
}
void RunAfterInLoop(const Functor &task, int delay)
{
_next_id++;
_baseloop.TimerAdd(_next_id, delay, task);
}
public:
TcpServer(int port)
: _port(port),
_next_id(0),
_Enable_Inactive_Release(false),
_accepter(&_baseloop, port),
_pool(&_baseloop)
{
_accepter.SetAcceptCallBack(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
_accepter.Listen(); // 将监听事件挂到baseloop上开始监控
}
void SetThreadCount(int count)
{
return _pool.SetThreadCount(count);
}
void SetConnectedCallBack(const ConnectedCallBack &cb)
{
_connected_callback = cb;
}
void SetMessageCallBack(const MessageCallBack &cb)
{
_message_callback = cb;
}
void SetClosedCallBack(const ClosedCallBack &cb)
{
_closed_callback = cb;
}
void SetAnyEventCallBack(const AnyEventCallBack &cb)
{
_event_callback = cb;
}
void EnableInactiveRelease(int timeout)
{
_timeout = timeout;
_Enable_Inactive_Release = true;
}
// 用于添加一个定时任务
void RunAfter(const Functor &task, int delay)
{
_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));
}
void Start()
{
_pool.Create(); // 创建从属线程
_baseloop.Start();
}
};
Http协议模块实现
Util模块
class Util
{
public:
// 字符串分割函数 将src按照sep进行分割,将结果放入array,返回分割之后的字符串数量
static size_t Split(const std::string &src, const std::string &sep, std::vector<std::string> *array)
{
size_t offset = 0;
while (offset < src.size())
{
// 在src偏移量offset处向后查找字符sep,返回查到的位置
size_t pos = src.find(sep, offset);
if (pos == std::string::npos)
{
if (pos == src.size())
break;
array->push_back(src.substr(offset));
return array->size();
}
if (pos == offset)
{
offset = pos + sep.size();
continue; // 空串不添加
}
array->push_back(src.substr(offset, pos - offset));
offset = pos + sep.size();
}
return array->size();
}
// 读取文件内容
static bool ReadFile(const std::string filename, std::string *buf)
{
std::ifstream ifs(filename, std::ios::binary);
if (ifs.is_open() == false)
{
ERR_LOG("FILE %s OPEN FAILED", filename.c_str());
return false;
}
size_t fsize = 0;
ifs.seekg(0, ifs.end);
fsize = ifs.tellg(); // 获取读写位置与文件开头的偏移量,就是文件大小
ifs.seekg(0, ifs.beg);
buf->resize(fsize);
ifs.read(&((*buf)[0]), fsize);
if (ifs.good() == false)
{
ERR_LOG("READ %s FILE FAILED", filename.c_str());
ifs.close();
return false;
}
ifs.close();
return true;
}
// 写入文件内容
static bool WriteFile(const std::string &filename, const std::string &buf)
{
std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);
if (ofs.is_open() == false)
{
ERR_LOG("FILE %s OPEN FAILED", filename.c_str());
return false;
}
ofs.write(buf.c_str(), buf.size());
if (ofs.good() == false)
{
ERR_LOG("WRITE %s FILE FAILED", filename.c_str());
ofs.close();
return false;
}
ofs.close();
return true;
}
// Url编码 避免URL中资源路径与查询字符串中的特殊字符与http请求中的特殊字符产生歧义
// RFC3986文档规定,编码格式为%HH,其中也规定 . - _ ~ 数字,字母为绝对不编码字符 如C++ -> C%2B%2B
// W3C文档规定,查询字符串中的空格,需要编码为+,查询则是+转空格
static std::string UrlEncode(const std::string &url, bool convert_space_to_plus)
{
std::string ret;
for (auto c : url)
{
if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c))
{
ret += c;
continue;
}
if (c == ' ')
{
if (convert_space_to_plus == true)
{
ret += '+';
continue;
}
else
{
ret += ' ';
continue;
}
}
char tmp[4] = {0};
snprintf(tmp, 4, "%%%02X", c);
ret += tmp;
}
return ret;
}
static char HEXTOI(char c)
{
if (c >= '0' && c <= '9')
{
return c - '0';
}
if (c >= 'a' && c <= 'z')
{
return c - 'a' + 10;
}
if (c >= 'A' && c <= 'Z')
{
return c - 'A' + 10;
}
return -1;
}
// Url解码
static std::string UrlDecode(const std::string &url, bool convert_space_to_plus)
{
std::string ret;
// 遇到百分号就将其后的两个字符重新转化为数字,第一个字符向左移4位+第二个字符
for (int i = 0; i < url.size(); i++)
{
if (url[i] == '+' && convert_space_to_plus == true)
{
ret += ' ';
continue;
}
if (url[i] == '%' && i + 2 < url.size())
{
char val1 = url[i + 1];
char val2 = url[i + 2];
char v = (val1 << 4) + val2;
ret += v;
i += 2;
continue;
}
ret += url[i];
}
return ret;
}
// 响应状态码的描述信息获取
static std::string StatuDesc(int statu)
{
auto it = stu_msg.find(statu);
if (it == stu_msg.end())
{
return "UnKnow";
}
return it->second;
}
// 根据文件后缀名获取mime
/**
* @brief 根据文件名获取文件的MIME类型
*
* 根据提供的文件名,查找并返回该文件对应的MIME类型。
*
* @param filename 文件名
* @return 文件的MIME类型,如果无法确定则返回 "application/octet-stream"
*/
static std::string ExtMime(const std::string &filename)
{
size_t pos = filename.find_last_of('.');
if (pos == std::string::npos)
{
return "application/ocet-stream";
}
std::string ext = filename.substr(pos);
auto it = mime_msg.find(ext);
if (it == mime_msg.end())
{
return "application/ocet-stream";
}
return it->second;
}
// 判断一个文件是否是目录
static bool IsDirectory(const std::string &filename)
{
// 定义结构体stat变量st
struct stat st;
// 调用stat函数获取文件状态信息,将结果存储在st中
int ret = stat(filename.c_str(), &st);
// 如果stat函数返回小于0的值,说明获取文件状态信息失败
if (ret < 0)
{
// 返回false,表示该文件不是一个目录
return false;
}
// 返回S_ISDIR(st.st_mode)的结果,如果文件是一个目录,则返回true,否则返回false
return S_ISDIR(st.st_mode);
}
// 判断一个文件是否是普通文件
static bool IsRegular(const std::string &filename)
{
struct stat st;
int ret = stat(filename.c_str(), &st);
if (ret < 0)
{
return false;
}
return S_ISREG(st.st_mode);
}
// 判断http请求的资源路径有效性
// 防止用户访问到相对根目录之外
// 主要思路是控制目录深度
static bool ValidPath(const std::string &path)
{
int level = 0;
std::vector<std::string> subdir;
Split(path, "/", &subdir);
for (auto dir : subdir)
{
if (dir == "..")
{
level--;
if (level < 0)
{
return false;
}
}
else
{
level++;
}
}
return true;
}
};
class HttpRequest
{
public:
std::string _method; // 请求方法
std::string _path; // 请求路径
std::string _version; // 协议版本
std::string _body; // 正文
std::smatch _matches; // 资源路径的正则提取路径
std::unordered_map<std::string, std::string> _headers; // 头部字段
std::unordered_map<std::string, std::string> _params; // 查询字符串
public:
HttpRequest() : _version("HTTP/1.1") {}
void ReSet()
{
_method.clear();
_path.clear();
_version="HTTP/1.1";
_body.clear();
std::smatch tmp;
_matches.swap(tmp);
_headers.clear();
_params.clear();
}
// 插入头部字段
void SetHeader(const std::string &key, const std::string &val)
{
DBG_LOG("REQ SET HEADER KEY:%s,Val:%s",key.c_str(),val.c_str());
_headers[key] = val;
}
// 判断是否有头部字段
bool HasHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 获取头部字段
std::string GetHeader(const std::string &key) const
{
auto it = _headers.find(key.c_str());
if (it == _headers.end())
{
DBG_LOG("NOT FIND:%s",key.c_str());
return "";
}
return it->second;
}
// 插入查询字符串
void SetParam(const std::string &key, const std::string &val)
{
_params[key] = val;
}
// 判断是否有某个查询字符串
bool HasParam(const std::string &key)
{
auto it = _params.find(key);
if (it == _params.end())
{
return false;
}
return true;
}
// 获取指定查询字符串
std::string GetParam(const std::string &key)
{
auto it = _params.find(key);
if (it == _params.end())
{
return "";
}
return it->second;
}
// 获取正文长度
size_t ContentLenth()
{
bool ret = HasHeader("Content-Length");
if (ret == false)
{
return 0;
}
std::string len = GetHeader("Content-Length");
return std::stol(len);
}
// 判断是否是短连接
bool Close () const
{
// 没有Connection 或者Connection的值为false都是短连接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
DBG_LOG("CLOSE RETURN FALSE");
return false;
}
return true;
}
};
HttpResquest模块
class HttpRequest
{
public:
std::string _method; // 请求方法
std::string _path; // 请求路径
std::string _version; // 协议版本
std::string _body; // 正文
std::smatch _matches; // 资源路径的正则提取路径
std::unordered_map<std::string, std::string> _headers; // 头部字段
std::unordered_map<std::string, std::string> _params; // 查询字符串
public:
HttpRequest() : _version("HTTP/1.1") {}
void ReSet()
{
_method.clear();
_path.clear();
_version="HTTP/1.1";
_body.clear();
std::smatch tmp;
_matches.swap(tmp);
_headers.clear();
_params.clear();
}
// 插入头部字段
void SetHeader(const std::string &key, const std::string &val)
{
DBG_LOG("REQ SET HEADER KEY:%s,Val:%s",key.c_str(),val.c_str());
_headers[key] = val;
}
// 判断是否有头部字段
bool HasHeader(const std::string &key) const
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 获取头部字段
std::string GetHeader(const std::string &key) const
{
auto it = _headers.find(key.c_str());
if (it == _headers.end())
{
DBG_LOG("NOT FIND:%s",key.c_str());
return "";
}
return it->second;
}
// 插入查询字符串
void SetParam(const std::string &key, const std::string &val)
{
_params[key] = val;
}
// 判断是否有某个查询字符串
bool HasParam(const std::string &key)
{
auto it = _params.find(key);
if (it == _params.end())
{
return false;
}
return true;
}
// 获取指定查询字符串
std::string GetParam(const std::string &key)
{
auto it = _params.find(key);
if (it == _params.end())
{
return "";
}
return it->second;
}
// 获取正文长度
size_t ContentLenth()
{
bool ret = HasHeader("Content-Length");
if (ret == false)
{
return 0;
}
std::string len = GetHeader("Content-Length");
return std::stol(len);
}
// 判断是否是短连接
bool Close () const
{
// 没有Connection 或者Connection的值为false都是短连接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
DBG_LOG("CLOSE RETURN FALSE");
return false;
}
return true;
}
};
HttpResponse模块
class HttpResponse
{
public:
int _statu;
bool _redirect;
std::string _body;
std::string _redirect_url;
std::unordered_map<std::string, std::string> _headers;
public:
HttpResponse()
: _statu(200),
_redirect(false)
{
}
HttpResponse(int statu)
: _statu(statu),
_redirect(false)
{
}
void ReSet()
{
_statu = 200;
_redirect = false;
_body.clear();
_redirect_url.clear();
_headers.clear();
}
void SetHeader(const std::string &key, const std::string &val)
{
DBG_LOG("RES SET HEADER KEY:%s,Val:%s",key.c_str(),val.c_str());
_headers[key] = val;
}
bool HasHeader(const std::string &key)
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return false;
}
return true;
}
// 获取指定报头
std::string GetHeader(const std::string &key)
{
auto it = _headers.find(key);
if (it == _headers.end())
{
return "";
}
return it->second;
}
// 设置类型
void SetContent(const std::string &body, const std::string &type)
{
_body = body;
SetHeader("Content-Type", type);
}
// 设置重定向
void SetRedirect(const std::string &url, int statu = 302)
{
_statu = statu;
_redirect = true;
_redirect_url = url;
}
// 判断是否为短连接
bool Close()
{
DBG_LOG("JUDGE CLOSE");
// 没有Connection 或者Connection的值为false都是短连接
if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive")
{
return false;//长连接
}
DBG_LOG("TRUE:%s",GetHeader("Connection").c_str());
return true;
//短连接
}
};
HttpContext模块
typedef enum
{
RECV_HTTP_ERROR,
RECV_HTTP_LINE,
RECV_HTTP_HEAD,
RECV_HTTP_BODY,
RECV_HTTP_OVER
} HttpRecvStatu;
#define MAX_LINE 8 * 1024
class HttpContext
{
private:
int _resp_statu; // 响应状态码
HttpRecvStatu _recv_statu; // 接收及解析的状态
HttpRequest _request; // 已经解析到的请求信息
private:
// 首行接收
bool RecvHttpLine(Buffer *buf)
{
DBG_LOG("RecvHttpLine");
if (_recv_statu != RECV_HTTP_LINE) return false;
//1. 获取一行数据,带有末尾的换行
std::string line = buf->GetLineAndPop();
//2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大
if (line.size() == 0) {
//缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的
if (buf->ReadAbleSize() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414;//URI TOO LONG
return false;
}
//缓冲区中数据不足一行,但是也不多,就等等新数据的到来
return true;
}
if (line.size() > MAX_LINE) {
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414;//URI TOO LONG
return false;
}
bool ret = ParseHttpLine(line);
if (ret == false) {
return false;
}
//首行处理完毕,进入头部获取阶段
_recv_statu = RECV_HTTP_HEAD;
return true;
}
// 首行解析
bool ParseHttpLine(const std::string &line)
{
DBG_LOG("ParseHttpLine 1 %s",line.c_str());
std::smatch matches;
std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);
ERR_LOG("LINE=%s",line.c_str());
bool ret = std::regex_match(line, matches, e);
if (ret == false)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; // BAD REQUEST
return false;
}
// 0 : GET /juhuo/login?user=xiaoming&pass=123123 HTTP/1.1
// 1 : GET
// 2 : /juhuo/login
// 3 : user=xiaoming&pass=123123
// 4 : HTTP/1.1
// 请求方法的获取
_request._method = matches[1];
std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);
// 资源路径的获取,需要进行URL解码操作,但是不需要+转空格
_request._path = Util::UrlDecode(matches[2], false);
// 协议版本的获取
_request._version = matches[4];
// 查询字符串的获取与处理
std::vector<std::string> query_string_arry;
std::string query_string = matches[3];
// 查询字符串的格式 key=val&key=val....., 先以 & 符号进行分割,得到各个字串
Util::Split(query_string, "&", &query_string_arry);
// 针对各个字串,以 = 符号进行分割,得到key 和val, 得到之后也需要进行URL解码
for (auto &str : query_string_arry)
{
size_t pos = str.find("=");
if (pos == std::string::npos)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; // BAD REQUEST
return false;
}
std::string key = Util::UrlDecode(str.substr(0, pos), true);
std::string val = Util::UrlDecode(str.substr(pos + 1), true);
_request.SetParam(key, val);
}
DBG_LOG("ParseHttpLine 4");
return true;
}
// 解析报头
bool RecvHttpHead(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_HEAD)
//if (_recv_statu != RECV_HTTP_LINE)
return false;
// 一行一行解析数据,遇到空行结束
while (1)
{
std::string line = buf->GetLineAndPop();
if (line.size() == 0)
{
// 如果缓冲区数据不足一行,且缓冲区数据过大,则url可能存在问题
if (buf->ReadAbleSize() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // url too long
return false;
}
// 数据不足一行,但是数据不多则返回等待新数据
return true;
}
if (line.size() > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414; // url too long
return false;
}
// 解析完毕,结束解析
if (line == "\n" || line == "\r\n")
{
break;
}
bool ret = ParseHttpHead(line);
if (ret = false)
{
return false;
}
}
// 头部处理完毕进入正文阶段
_recv_statu = RECV_HTTP_BODY;
return true;
}
//解析报头
bool ParseHttpHead(std::string &line)
{
if (line.back() == '\n') line.pop_back();//末尾是换行则去掉换行字符
if (line.back() == '\r') line.pop_back();//末尾是回车则去掉回车字符
size_t pos = line.find(": ");
if (pos == std::string::npos)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400; // BAD REQUEST
return false;
}
std::string key = Util::UrlDecode(line.substr(0, pos), true);
std::string val = Util::UrlDecode(line.substr(pos + 2), true);
DBG_LOG("Parse Http Head:KEY:%s,Val:%s",key.c_str(),val.c_str());
_request.SetHeader(key, val);
return true;
}
bool RecvHttpBody(Buffer *buf)
{
if (_recv_statu != RECV_HTTP_BODY)
return false;
size_t content_length = _request.ContentLenth();
if (content_length == 0)
{
// 没有正文则请求结束
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 获取正文长度
// 当前已经接收多少正文
size_t real_length = content_length - _request._body.size(); // 实际还要接收的数据长度
// 接收正文到body中,也要考虑当前的缓冲区数据是否是一个正文
// 若恰好包含则直接提取
if (buf->ReadAbleSize() >= real_length)
{
_request._body.append(buf->ReadPosition(), real_length);
buf->MoveReadOffset(real_length);
_recv_statu = RECV_HTTP_OVER;
return true;
}
// 若不足则等待新数据
_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());
buf->MoveReadOffset(buf->ReadAbleSize());
return true;
}
public:
HttpContext()
: _resp_statu(200),
_recv_statu(RECV_HTTP_LINE)
{
}
void ReSet()
{
_resp_statu = 200;
_recv_statu = RECV_HTTP_LINE;
_request.ReSet();
}
int ResponseStatu()
{
return _resp_statu;
}
HttpRecvStatu RecvStatu()
{
return _recv_statu;
}
HttpRequest &Request()
{
return _request;
}
void RecvHttpRequest(Buffer *buf)
{
DBG_LOG("RecvHttpRequest");
switch (_recv_statu)
{
// 此处不需要break,本身就需要按照line,head,body依次接收解析!
case RECV_HTTP_LINE:
RecvHttpLine(buf);
case RECV_HTTP_HEAD:
DBG_LOG("RECV_HTTP_HEAD");
RecvHttpHead(buf);
case RECV_HTTP_BODY:
RecvHttpBody(buf);
}
}
};
HttpServer模块
class HttpServer {
private:
using Handler = std::function<void(const HttpRequest &, HttpResponse *)>;
using Handlers = std::vector<std::pair<std::regex, Handler>>;
Handlers _get_route;
Handlers _post_route;
Handlers _put_route;
Handlers _delete_route;
std::string _basedir; //静态资源根目录
TcpServer _server;
private:
void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) {
//1. 组织一个错误展示页面
std::string body;
body += "<html>";
body += "<head>";
body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
body += "</head>";
body += "<body>";
body += "<h1>";
body += std::to_string(rsp->_statu);
body += " ";
body += Util::StatuDesc(rsp->_statu);
body += "</h1>";
body += "</body>";
body += "</html>";
//2. 将页面数据,当作响应正文,放入rsp中
rsp->SetContent(body, "text/html");
}
//将HttpResponse中的要素按照http协议格式进行组织,发送
void WriteResponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) {
//1. 先完善头部字段
DBG_LOG("WRITE RESPONSE!!!");
if (req.Close() == true) {
rsp.SetHeader("Connection", "close");
}else {
rsp.SetHeader("Connection", "keep-alive");
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) {
rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));
}
if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) {
rsp.SetHeader("Content-Type", "application/octet-stream");
}
if (rsp._redirect == true) {
rsp.SetHeader("Location", rsp._redirect_url);
}
//2. 将rsp中的要素,按照http协议格式进行组织
std::stringstream rsp_str;
rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\r\n";
for (auto &head : rsp._headers) {
rsp_str << head.first << ": " << head.second << "\r\n";
}
rsp_str << "\r\n";
rsp_str << rsp._body;
//3. 发送数据
conn->Send(rsp_str.str().c_str(), rsp_str.str().size());
}
/**
* @brief 判断请求是否是文件处理请求
*
* 该函数用于判断传入的 HTTP 请求是否是一个文件处理请求。
*
* @param req HTTP 请求对象
* @return 如果请求是文件处理请求,则返回 true;否则返回 false。
*/
bool IsFileHandler(const HttpRequest &req) {
// 1. 必须设置了静态资源根目录
if (_basedir.empty()) {
return false;
}
// 2. 请求方法,必须是GET / HEAD请求方法
if (req._method != "GET" && req._method != "HEAD") {
return false;
}
// 3. 请求的资源路径必须是一个合法路径
DBG_LOG("hahah: %s",req._path.c_str());
if (Util::ValidPath(req._path) == false) {
return false;
}
// 4. 请求的资源必须存在,且是一个普通文件
// 有一种请求比较特殊 -- 目录:/, /image/, 这种情况给后边默认追加一个 index.html
// index.html /image/a.png
// 不要忘了前缀的相对根目录,也就是将请求路径转换为实际存在的路径 /image/a.png -> ./wwwroot/image/a.png
std::string req_path = _basedir + req._path;//为了避免直接修改请求的资源路径,因此定义一个临时对象
if (req._path.back() == '/') {
req_path += "index.html";
}
if (Util::IsRegular(req_path) == false) {
return false;
}
return true;
}
//静态资源的请求处理 --- 将静态资源文件的数据读取出来,放到rsp的_body中, 并设置mime
void FileHandler(const HttpRequest &req, HttpResponse *rsp) {
std::string req_path = _basedir + req._path;
if (req._path.back() == '/') {
req_path += "index.html";
}
bool ret = Util::ReadFile(req_path, &rsp->_body);
if (ret == false) {
return;
}
std::string mime = Util::ExtMime(req_path);
rsp->SetHeader("Content-Type", mime);
return;
}
//功能性请求的分类处理
void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) {
// 在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404
// 思想:路由表存储的时键值对 -- 正则表达式 & 处理函数
// 使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理
// /numbers/(\d+) /numbers/12345
DBG_LOG("ON Dispatcher");
// 遍历所有的处理器
for (auto &handler : handlers) {
// 获取正则表达式
const std::regex &re = handler.first;
// 获取处理函数
const Handler &functor = handler.second;
// 使用正则表达式对请求的资源路径进行正则匹配
bool ret = std::regex_match(req._path, req._matches, re);
// 如果匹配失败,则继续遍历下一个处理器
if (ret == false) {
continue;
}
// 匹配成功,调用处理函数,传入请求信息和空的响应对象
return functor(req, rsp);//传入请求信息,和空的rsp,执行处理函数
}
// 如果遍历完所有处理器都没有匹配到对应的处理函数,则设置响应状态为404
rsp->_statu = 404;
}
void Route(HttpRequest &req, HttpResponse *rsp) {
//1. 对请求进行分辨,是一个静态资源请求,还是一个功能性请求
// 静态资源请求,则进行静态资源的处理
// 功能性请求,则需要通过几个请求路由表来确定是否有处理函数
// 既不是静态资源请求,也没有设置对应的功能性请求处理函数,就返回405
DBG_LOG("Route");
if (IsFileHandler(req) == true) {
//是一个静态资源请求, 则进行静态资源请求的处理
return FileHandler(req, rsp);
}
if (req._method == "GET" || req._method == "HEAD") {
return Dispatcher(req, rsp, _get_route);
}else if (req._method == "POST") {
return Dispatcher(req, rsp, _post_route);
}else if (req._method == "PUT") {
return Dispatcher(req, rsp, _put_route);
}else if (req._method == "DELETE") {
return Dispatcher(req, rsp, _delete_route);
}
rsp->_statu = 405;// Method Not Allowed
return ;
}
//设置上下文
void OnConnected(const PtrConnection &conn) {
conn->SetContext(HttpContext());
DBG_LOG("NEW CONNECTION %p", conn.get());
}
//缓冲区数据解析+处理
void OnMessage(const PtrConnection &conn, Buffer *buffer) {
while(buffer->ReadAbleSize() > 0){
//1. 获取上下文
DBG_LOG("ON MESSAGE 1");
HttpContext *context = conn->GetContext()->get<HttpContext>();
//2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象
// 1. 如果缓冲区的数据解析出错,就直接回复出错响应
// 2. 如果解析正常,且请求已经获取完毕,才开始去进行处理
context->RecvHttpRequest(buffer);
HttpRequest &req = context->Request();
HttpResponse rsp(context->ResponseStatu());
if (context->ResponseStatu() >= 400) {
//进行错误响应,关闭连接
DBG_LOG("ON MESSAGE ERROR");
ErrorHandler(req, &rsp);//填充一个错误显示页面数据到rsp中
WriteResponse(conn, req, rsp);//组织响应发送给客户端
context->ReSet();
buffer->MoveReadOffset(buffer->ReadAbleSize());//出错了就把缓冲区数据清空
conn->ShutDown();//关闭连接
return;
}
if (context->RecvStatu() != RECV_HTTP_OVER) {
//当前请求还没有接收完整,则退出,等新数据到来再重新继续处理
return;
}
//3. 请求路由 + 业务处理
Route(req, &rsp);
//4. 对HttpResponse进行组织发送
WriteResponse(conn, req, rsp);
//5. 重置上下文
context->ReSet();
//6. 根据长短连接判断是否关闭连接或者继续处理
if (rsp.Close() == true)
{
DBG_LOG("rsp.close()");
conn->ShutDown();
//短链接则直接关闭
}
}
return;
}
public:
HttpServer(int port, int timeout = DEFALT_TIMEOUT):_server(port) {
_server.EnableInactiveRelease(timeout);
_server.SetConnectedCallBack(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
_server.SetMessageCallBack(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void SetBaseDir(const std::string &path) {
if(Util::IsDirectory(path) == true)
_basedir = path;
else abort();
}
/*设置/添加,请求(请求的正则表达)与处理函数的映射关系*/
void Get(const std::string &pattern, const Handler &handler) {
_get_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Post(const std::string &pattern, const Handler &handler) {
_post_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Put(const std::string &pattern, const Handler &handler) {
_put_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void Delete(const std::string &pattern, const Handler &handler) {
_delete_route.push_back(std::make_pair(std::regex(pattern), handler));
}
void SetThreadCount(int count) {
_server.SetThreadCount(count);
}
void Listen() {
_server.Start();
}
};
简单服务器搭建使用样例
#include "http.hpp"
#define WWWROOT "./wwwroot/"
std::string RequestStr(const HttpRequest &req)
{
std::stringstream ss;
ss << req._method << " " << req._path << " " << req._version << "\r\n";
for (auto &it : req._params)
{
ss << it.first << ": " << it.second << "\r\n";
}
for (auto &it : req._headers)
{
ss << it.first << ": " << it.second << "\r\n";
}
ss << "\r\n";
ss << req._body;
return ss.str();
}
void Hello(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestStr(req), "text/plain");
//sleep(15);
}
void Login(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestStr(req), "text/plain");
}
void PutFile(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestStr(req), "text/plain");
std::string pathname = WWWROOT + req._path;
Util::WriteFile(pathname, req._body);
}
void DelFile(const HttpRequest &req, HttpResponse *rsp)
{
rsp->SetContent(RequestStr(req), "text/plain");
}
int main()
{
HttpServer server(8085);
server.SetThreadCount(3);
server.SetBaseDir(WWWROOT); // 设置静态资源根目录,告诉服务器有静态资源请求到来,需要到哪里去找资源文件
server.Get("/hello", Hello);
server.Post("/login", Login);
server.Put("/1234.txt", PutFile);
server.Delete("/1234.txt", DelFile);
INF_LOG("服务器设置完毕");
server.Listen();
return 0;
}