目录
Reactor完整代码连接
前置知识:
1.普通的epoll读写有什么问题?
2.Connection内的回调函数是什么
3.服务器的初始化(Connection只是使用的一个结构体)
4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;
5._listenSock读取 :Accepter函数获取新链接怎么处理的
6.普通套接字读取:Recver
7.对写事件的关心是按需关系的:
8.执行效果:
9.Reactor的优势:
Reactor完整代码连接
前置知识:
Reactor叫做反应堆模式;反应:对已经就绪的事件(读、写、异常)进行处理;
我们使用epoll来实现,select、poll、epoll是多路转接的发展史,epoll完善了select、poll的缺点;
- 需要程序员维护数组 select/poll都有这个缺点
- 有大量的遍历 select/poll都有这个缺点
- 大量参数为输入输出型参数,需要重新设置 select有
- 管理的fd有上限 select有
1.普通的epoll读写有什么问题?
- 使用的是一个静态数组读取;报文长,读到就是不完整报文,报文短,一次读取可能有多个报文,最后一个报文可能不是完整的;
- 这样的错误报文没法分析和处理,就不能构造响应报文;
综上所述:问题为没法保证读取到的是完整报文,导致没法分析和处理、构建响应报文;
void Read(int fd)
{
char buff[1024];
ssize_t s = read(fd, buff, 1023);
if(s > 0){
buff[s] = 0;
LOG2(INFO, buff, fd);
}
解决办法:将文件描述符封装,并且有接受发送缓冲区,使用string就可以;
- 使用静态数组读取,然后添加到接受缓冲区保存;
- 读取完毕,对接受缓冲区分析是否有完整报文;
- 处理完整请求报文,构建响应报文添加到发送缓冲区;
using func_t = std::function<void(Connection*)>;
using cals_t = std::function<void(std::string &, Connection*)>;
class Connection{
public:
Connection(int fd = -1 ):_fd(fd), _ts(nullptr)
{}
~Connection()
{
if(_fd >= 0)
close(_fd);
}
public:
int _fd;
//读写异常回调方法
func_t _recver;
func_t _sender;
func_t _exception;
//接受缓冲区
std::string _outbuff;
//发送缓冲区
std::string _inbuff;
//TcpServer的回指指针,对写事件的关心是按需打开
TcpServer *_ts;
//连接最近活跃活动的时间
time_t _times;
};
2.Connection内的回调函数是什么
一个包装器;返回值为void,参数为Connection*,的函数指针、仿函数、lamada表达,它都可以接受;
using func_t = std::function<void(Connection*)>;
优势:
- _listenSock是读方法是接受新连接,普通是读取请求报文
- 初始化时设置读写异常回调方法(回调:使用函数指针执行的函数),不需要判断是_listenSock还是普通套接字,统一使用con->_recver;
3.服务器的初始化(Connection只是使用的一个结构体)
- 套接字创建,bind,监听;
- 构建epoll模型,epoll函数也封装了,_epfd封装在Epoll类内;
- 初始化_listenSock Connection结构体;读取回调方法Accept是类函数,多一个this指针,需要使用std::bind来改变参数个数,进行传递给包装器;
- epoll_wait的事件就绪队列初始化;
class TcpServer{
const static int gport = 8080;
const static int gnum = 128;
TcpServer(int port = gport, int num = gnum):_port(gport), _evts_num(num)
{
//套接字,创建bind监听
_listenSock = Sock::Socket();
Sock::Bind(_listenSock,_port);
Sock::Listen(_listenSock);
//构建epoll模型
_epoll.CreateEpoll();
//listensock添加epoll模型和_connections管理起来
AddConnection(_listenSock, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
//epoll_wait就绪队列,获取已就绪的事件
_evts = new epoll_event[_evts_num];
}
~TcpServer()
{
if(_listenSock >= 0)
close(_listenSock);
if(_evts != nullptr)
delete[] _evts;
for(auto &pr : _connections)
{
_connections.erase(pr.first);
delete pr.second;
}
}
private:
int _listenSock;
//epoll
Epoll _epoll;
//就绪队列
epoll_event* _evts;
int _evts_num;
//管理connection对象
std::unordered_map<int, Connection*> _connections;
int _port;
//业务处理的回调指针
cals_t _cb;
};
4.等待就绪事件:有事件就绪,对使用Connection的不同对象(封装fd,回调方法)调用对应的回调方法;
void LoopOnce()
{
int n = _epoll.WaitEpoll(_evts, _evts_num);
//有事件就绪
if(n > 0)
{
//LOG2(INFO, "epoll wait success",fd);
for(int i=0; i<n; i++)
{
int fd = _evts[i].data.fd;
int events = _evts->events;
//连接关闭或者错误,改为读写统一处理,读写失败调异常处理;
if( events & EPOLLHUP){
LOG2(INFO,"连接关闭",fd);
events |= (EPOLLIN | EPOLLOUT);
}
if( events & EPOLLERR){
LOG2(INFO,"错误",fd);
events |= (EPOLLIN | EPOLLOUT);
}
if(_connections.count(fd) && events & EPOLLIN)
{
if(IsConnectionExits(fd) && _connections[fd]->_recver != nullptr){
_connections[fd]->_recver(_connections[fd]);
}
}
if(_connections.count(fd) && events & EPOLLOUT)
{
if(IsConnectionExits(fd) && _connections[fd]->_sender != nullptr)
_connections[fd]->_sender(_connections[fd]);
}
}
}
else if(n == 0)
{
LOG(INFO, "timeout");
}
else
{
LOG(FATAL,"epoll wait fail");
exit(4);
}
}
void Dispatcher(cals_t cb)
{
_cb = cb;
while(true)
{
//去除不活跃的连接
DeleteInactivity();
LoopOnce();
}
}
5._listenSock读取 :Accepter函数获取新链接怎么处理的
- 得到新连接,如果新的fd是合法的,设置对应的读写异常回调方法,读:读取请求报文,写:发送响应报文,异常:出现错误进行处理;
- 所有的套接字都是使用ET模式(通知效率高,只支持非阻塞读写),EPOLLET因该被设置;
void Accepter(Connection * con)
{
while(true)
{
con->_times = time(nullptr);
struct sockaddr_in tmp;
socklen_t tlen = sizeof(tmp);
int new_sock = accept(con->_fd, (struct sockaddr *)&tmp, &tlen);
if(new_sock < 0)
{
//所以事件处理完毕
if(errno == EAGAIN || errno == EWOULDBLOCK)
break;
else if(errno == EINTR)//可能被信号中断,概率极小
continue;
else
{
std::cout << "accept fail , errno :" << errno << strerror(errno) << std::endl;
break;
}
}
else//添加到epoll模型和_connections中管理;
{
if(AddConnection(new_sock, std::bind(&TcpServer::Recver, this, std::placeholders::_1),
std::bind(&TcpServer::Sender, this, std::placeholders::_1),
std::bind(&TcpServer::Exception, this, std::placeholders::_1)
))
LOG2(INFO, "add connection success",new_sock);
else
LOG2(RISK, "add connection fail", new_sock);
}
}
}
6.普通套接字读取:Recver
- 一直读取,直到错误或者读取完毕;每次读取的结果都放到接受缓冲区;
- 读取完毕,对接受缓冲区处理,拿出一个个完整的报文,对请求报文进行业务处理;
void Recver(Connection *con)
{
con->_times = time(nullptr);
const int buff_size = 1024;
char buff[buff_size];
while(true)
{
ssize_t s = recv(con->_fd, buff, buff_size - 1, 0);
if(s > 0)
{
buff[s] = 0;
con->_outbuff += buff;
}
else if(s == 0)
{
LOG2(INFO, "写端关闭", con->_fd);
con->_exception(con);
return;
}
else
{
//读取完毕
if(errno == EAGAIN || errno == EWOULDBLOCK )
{
LOG2(INFO, "处理完毕", con->_fd);
break;
}
else if(errno == EINTR)
continue;
else
{
LOG2(ERROR, "recv fail ,fd: ", con->_fd);
con->_exception(con);
return;
}
}
}
std::cout << "fd: " << con->_fd << "outbuff: " << con->_outbuff <<std::endl;
//对outbuff内的完整报文,进行处理
std::vector<std::string> out;
//分隔报文,函数在protocol.hpp
SplitMessage(out, con->_outbuff);
for(auto &s : out)
_cb(s, con);//业务逻辑回调指针,在主函数
}
7.对写事件的关心是按需关系的:
- 如果开启关心,还没有数据发送,写事件会一直就绪;所以按需关心;
- 请求报文业务处理完毕,构建好响应报文,一定有响应,打开对写事件的关心;
- 也是Connection为什么封装一个Tcperver指针的原因,这里开启写事件的关心;
//业务处理
void CalArguments(std::string &str, Connection *con)
{
//请求报文反序列化
Request req;
//std::cout<<str <<std::endl;
if(!req.Deserialize(str)){
LOG2(ERROR, "deseroalize fail" ,con->_fd);
return;
}
//对数据处理
Response res;
calculator(req, res);
//响应报文序列化
std::string s = res.Serialize();
//添加到inbuff
con->_inbuff += s;
//一定有响应报文,打开写事件的关系
con->_ts->EnableReadWrite(con->_fd, true, true);
}
8.执行效果:
- 我写的协议是对任意两个数加减乘除;
- 每个请求或者响应都是用 x 做为分割符的;
9.Reactor的优势:
和进程/线程做比较:
- 它是一个单进程的就可以并发处理请求的服务器,比进程/线程减少了创建、销毁、调度的时间;
- 它的等待一批fd,减少了单位等待时间;一个线程等待对应一个fd;
- 有很高的复用性,替换业务逻辑就行了;