目录
一IO基本理解
二五种IO模型
1五种IO模型示意图
2同步IO和异步IO
二非阻塞IO
1fcntl
2实现非阻塞IO
三多路复用
1select
1.1定位和作用
1.2介绍参数
1.3编写多路复用代码
1.4优缺点
2poll
2.1作用和定位
2.2介绍参数
2.3修改select代码
3epoll
3.1介绍参数
3.2工作原理
理解数据到达主机
3.3编写epoll代码
3.4优点
3.5工作模式
理解LT和ET
四Reactor
完整源代码
一IO基本理解
前面学习了网络通信,我们认识到:网络通信本质上是进程间通信;而进程间通信本质上是IO;关于IO:I:Input;O:Output;这又是什么意思?
站在进程角度上:Input是外面交给进程的数据;Output是进程发出去的数据; 站在系统角度上:Input是数据从硬件交给OS;Output是数据从OS输出出去; 站在内存角度上:Input是内存数据交给磁盘;Output是磁盘数据交给内存
总之:IO本质上是外设与内设之间的交互
那对应我们来说:IO该如何理解呢??
C/C++解除到的IO接口:read/recv write/send 当底层有新数据到来时(前提接收/发送缓冲区未被占满的情况下),这些接口才能给我们返回fd:让我们读/写数据,否则会一直阻塞下去知道新数据的到来!
所以说:IO = 等 + 拷贝
那什么叫做高效的IO呢?<-> 在单位时间内,减少IO等的比率
二五种IO模型
在现实生活中,有很多人喜欢钓鱼,也被我们称之为钓鱼佬;它们钓鱼方式都是按照自己对钓鱼的理解去实现的,所以也就产生出不同的钓鱼方式;这也就总共可分为以下五种:(钓鱼 = 等 + 钓)
张三:一动不动,检测鱼漂,钓;(阻塞IO)
李四:一直在动,随便检测鱼漂,钓;(非阻塞IO)
王五:在鱼漂上挂铃铛(铃铛没响就刷抖音),钓;(信号驱动IO)
赵六:准备很多钓鱼竿,定期检测鱼竿,钓;(多路复用IO)
田七:找司机小王钓,自己去做别的事(田七没有参与调用,只是发起钓鱼);(异步IO)
在以上例子中:
人:系统调用;
鱼竿:sockfd;
湖:系统内部;
鱼:数据;
鱼漂浮动:数据就绪;
钓:接收数据
田七:发起IO 小王:操作系统
通过上面的例子可以基本对应5中IO模型,但又有问题了:
阻塞IO vs 非阻塞IO
IO = 等 + 拷贝:拷贝(钓)的方式是一样的,就是等的方式不同(一个一直等,一个一直动)
谁的钓鱼效率最高呢?(单位时间内等待的时间短)
你可以会说是王五或者田七:但是答案是赵六!因为如果你是鱼的话,湖面上有104个诱饵(光赵六就有100个),你吃到赵六的鱼饵的概率高,也就是赵六更容易有鱼咬钩,自然比别人效率高,而王五田七他们只有一个鱼竿(假设与咬钩概率相等),一天内钓上来鱼的数量是差不多的,只是多做了一些事情(多刷了几个视频,多参加了几个会(对于别人来说))
1五种IO模型示意图
阻塞 IO(常见): 在内核将数据准备好之前, 系统调用会一直等待;所有的套接字,默认都是阻塞方式
非阻塞 IO: 如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回错误码
信号驱动 IO: 内核将数据准备好的时候, 使用 SIGIO 信号通知应用程序进行 IO操作
IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似. 实际上最核心在于 IO 多路转接能够同时等待多个文件描述符的就绪状态
异步 IO: 由内核在数据拷贝完成时, 通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)
2同步IO和异步IO
我们也将IO模型分为两大类:同步IO(前四种IO模型)和异步IO
之前也有人为了信号驱动IO是同步还是异步争吵过,但这里把信号驱动IO归结为同步IO:看似王五在做别的事,但是鱼咬钩你是不是要去把鱼给钓上来呢?实际上还是受到IO影响(刷着视频突然鱼咬钩了你要立刻放下手机去钓鱼)!
两者区别:你有没有参与IO(等)的过程
二非阻塞IO
非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询:这
对 CPU 来说是较大的浪费, 一般只有特定场景下才使用
1fcntl
作用:将fd设置成非阻塞IO
函数的第二个参数:
• 复制一个现有的描述符(cmd=F_DUPFD) .
• 获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD).
• 获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL).
• 获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN).
• 获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW).
2实现非阻塞IO
#include <iostream>
#include <sys/fcntl.h>
#include <string>
#include <unistd.h>
// 设置非阻塞轮询方式
void NoBlock(int fd)
{
int f1 = fcntl(fd, F_GETFL);
if (f1 < 0)
{
std::cerr << "fcntl fail" << std::endl;
return;
}
fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}
int main()
{
char buffer[128];
NoBlock(0);
while (true)
{
ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "echo:" << buffer;
}
else if (n == 0)
{
std::cout << "read none" << std::endl;
break;
}
else
{
// 如何区别read是出错了还是非阻塞了? -> 通过errno来判断!
if (errno == EWOULDBLOCK)
{
std::cout << "Non blocking polling" << std::endl;
std::cout << "Do other things" << std::endl;
sleep(1);
continue;
}
else
{
std::cout << "read error" << std::endl;
break;
}
}
}
return 0;
}
现象:
为什么我输入的时候信息会与打印的信息混杂在一起?--> OS(默认)回显
如果read被信号中断了,循环就会结束,不往下进行读了:我们也要保证该情况不会影响我们读取
#include <iostream>
#include <sys/fcntl.h>
#include <string>
#include <unistd.h>
// 设置非阻塞轮询方式
void NoBlock(int fd)
{
int f1 = fcntl(fd, F_GETFL);
if (f1 < 0)
{
std::cerr << "fcntl fail" << std::endl;
return;
}
fcntl(fd, F_SETFL, f1 | O_NONBLOCK);
}
int main()
{
char buffer[128];
NoBlock(0);
while (true)
{
ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "echo:" << buffer;
}
else if (n == 0)//ctrl+d
{
std::cout << "read none" << std::endl;
break;
}
else
{
// 如何区别read是出错了还是非阻塞了? -> 通过errno来判断!
if (errno == EWOULDBLOCK)
{
std::cout << "Non blocking polling" << std::endl;
std::cout << "Do other things" << std::endl;
sleep(1);
continue;//未来是要break的!
}
else if(errno==EINTR)//read被信号中断
{
continue;
}
else
{
std::cout << "read error" << std::endl;
break;
}
}
}
return 0;
}
三多路复用
IO多路复用(Input/Output Multiplexing)是一种在单个线程中管理多个输入/输出通道的技术。它允许一个线程同时监听多个输入流(例如sockfd、fd等),并在有数据可读或可写时进行相应的处理,而不需要为每个通道创建一个独立的线程。
1select
系统提供 select 函数来实现多路复用输入/输出模型
1.1定位和作用
定位:它只负责进行等,不负责IO
作用:等待多个fd(等该fd上面的新事件就绪),通知用户新事件已经就绪,可以来IO了
1.2介绍参数
1.3编写多路复用代码
使用select要注意以下细节:
1.select后accept还是阻塞吗?
一定不会阻塞(事件已经就绪)
2.read能直接读吗?如果不能,谁最清楚底层fd的数据就绪了?
绝对不能读,读取的时候,条件不一定满足; 由select来统一监管(select知道)
3.select要正常工作,要借助辅助数组(fd_array),来保存所有合法的fd! (select的fd_set* 是输入输出参数,它会继续参数重置)
4.select统一监管是怎么做到的?
(accept后)将最新的fd添加到辅助数组(fd_array)就行了
5.就绪了循环处理所有事件
//SelectServer.hpp
#pragma once
#include <cstdint>
#include <memory>
#include <sys/select.h>
#include "Socket.hpp"
const static int gnum = sizeof(fd_set) * 8;
const static int gfd = -1;
class SelectServer
{
public:
SelectServer(uint16_t port) : _port(port), _st(std::make_unique<TcpSocket>())
{
_st->Tcp_ServerSocket(_port);
}
void Init()
{
for (int i = 0; i < gnum; i++)
{
fd_array[i] = gfd;
}
// 自己先设置
fd_array[0] = _st->Sockfd();
}
void Loop()
{
while (true)
{
fd_set ft;
FD_ZERO(&ft); // 使用前先清除
int fd_max = gfd;
// 合法的fd添加到ft中
for (int i = 0; i < gnum; i++)
{
if (fd_array[i] == gfd)
continue;
FD_SET(fd_array[i], &ft);
// 更新出最大值fd_max
if (fd_max < fd_array[i])
fd_max = fd_array[i];
}
struct timeval tl = {10, 0};
// 只关心读事件
int n = ::select(fd_max + 1, &ft, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
std::cout << "time done " << tl.tv_sec << ' ' << tl.tv_usec << std::endl;
break;
case -1:
std::cerr << "select fail" << std::endl;
break;
default:
std::cout << "have even: " << n << std::endl;
HandEvent(ft); // 处理事件(不出来会一直通知)
PrintFd_array();
break;
}
}
}
//一定会存在大量的不用类型的sockfd!
void HandEvent(fd_set &ft)
{
for (int i = 0; i < gnum; i++)
{
if (fd_array[i] == gfd)
continue;
// 一定是合法的fd
// 判断fd是否就绪
if (FD_ISSET(fd_array[i], &ft)) // 动态检测
{
// 事件就绪
// sockfd类型?
if (fd_array[i] == _st->Sockfd())
{
Link();
}
else
{
Read_Write(i);
}
}
}
}
void Link()
{
InetAddr ir;
int sockfd = _st->AcceptSocket(&ir); // 一定不会阻塞!
if (sockfd > 0)
{
// 新的sockfd -> fd_array 让select进行管理
bool tmp = false;
for (int i = 1; i < gnum; i++)
{
if (fd_array[i] == gfd)
{
tmp = true;
fd_array[i] = sockfd;
std::cout << "add:" << fd_array[i] << std::endl;
break;
}
}
// fd_array满了
if (!tmp)
{
std::cout << "fd_array full" << std::endl;
::close(sockfd);
}
}
}
void Read_Write(int i)
{
//没协议,可以直接读(这里考虑IO)
char buffer[1024];
ssize_t n = ::read(fd_array[i], buffer, sizeof(buffer) - 1); // 阻塞?No
if (n > 0)
{
buffer[n] = 0;
std::cout << "Server read:" << buffer << std::endl;
std::string content = "<html><body><h1>hello warld</h1></body></html>";
std::string ech_str = "HTTP/1.0 200 OK\r\n";
ech_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";
ech_str += content;
//(这里)默认是就绪的
::write(fd_array[i], ech_str.c_str(), ech_str.size());//临时
}
else if (n == 0)
{
std::cout << "user quit.." << std::endl;
::close(fd_array[i]);
//清理fd
fd_array[i] = gfd;
}
else
{
std::cout << "recv fail.." << std::endl;
::close(fd_array[i]);
//清理fd
fd_array[i] = gfd;
}
}
void PrintFd_array()
{
for (int i = 0; i < gnum; i++)
{
if (fd_array[i] == gfd)
continue;
std::cout << fd_array[i] << ' ';
}
std::cout << std::endl;
}
private:
uint16_t _port;
std::unique_ptr<Socket> _st;
// fd数组->select管理
int fd_array[gnum];
};
//Main.cc
#include <iostream>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <pthread.h>
#include <cstring>
#include <string>
#include"Socket.hpp"
#include"SelectServer.hpp"
int main(int args, char *argv[])
{
if (args != 2)
{
std::cerr << "./Server Localport" << std::endl;
exit(0);
}
uint16_t serverport = std::stoi(argv[1]);
std::unique_ptr<SelectServer> sr=std::make_unique<SelectServer>(serverport);
sr->Init();
sr->Loop();
return 0;
}
//InetAddr.hpp
#pragma once
#include<iostream>
#include<string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class InetAddr
{
void ToHost()
{
//_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
_port=ntohs(_addr.sin_port);
char buffer[124];
inet_ntop(AF_INET,&_addr.sin_addr,buffer,sizeof(buffer));
_ip=buffer;
}
public:
InetAddr()
{
}
InetAddr(const struct sockaddr_in& addr)
:_addr(addr)
{
ToHost();
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
struct sockaddr_in Addr()
{
return _addr;
}
bool operator==(const InetAddr& ad)
{
return (this->_ip==ad._ip&&this->_port==ad._port);
}
std::string User()
{
std::string tmp=_ip+" "+std::to_string(_port)+":";
return tmp;
}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
//Socket.hpp
#pragma once
#include "InetAddr.hpp"
class Socket;
enum
{
SOCK_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
};
const int gbacklog = 8;
class Socket
{
public:
virtual void CreateSocket() = 0;
virtual void InitSocket(uint16_t port) = 0;
virtual int AcceptSocket(InetAddr *addr) = 0; // 对象/变量
virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败
virtual int Sockfd() = 0;
virtual void Close() = 0;
virtual ssize_t Recv(std::string *out) = 0;
virtual ssize_t Send(const std::string &in) = 0;
public:
void Tcp_ServerSocket(uint16_t port)
{
CreateSocket();
InitSocket(port);
}
bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
{
CreateSocket();
return ConnectSocket(port, ip);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd)
: _sockfd(sockfd)
{
}
~TcpSocket()
{
}
virtual void CreateSocket() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
std::cerr << "socket fail" << std::endl;
exit(SOCK_ERROR);
}
std::cout << "socket sucess" << std::endl;
}
virtual void InitSocket(uint16_t port) override
{
struct sockaddr_in perr;
memset(&perr, 0, sizeof(perr));
perr.sin_family = AF_INET;
perr.sin_port = htons(port);
perr.sin_addr.s_addr = INADDR_ANY;
if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
{
std::cerr << "bind fail" << std::endl;
exit(BIND_ERROR);
}
std::cout << "bind sucess" << std::endl;
if (::listen(_sockfd, gbacklog) < 0)
{
std::cerr << "listen fail" << std::endl;
exit(LISTEN_ERROR);
}
std::cout << "listen sucess" << std::endl;
}
virtual int AcceptSocket(InetAddr *addr) override // 外层要获取客户端信息 TODO
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
std::cerr << "accept fail" << std::endl;
return -1;
}
*addr = client;
std::cout << "get a new link " <<addr->User()<< std::endl;
return sockfd;
}
virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
virtual int Sockfd() override
{
return _sockfd;
}
virtual void Close() override
{
if (_sockfd > 0)
{
::close(_sockfd);
}
}
virtual ssize_t Recv(std::string *out) override
{
char buffer[4096];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer; // 细节
}
return n;
}
virtual ssize_t Send(const std::string &in) override
{
return ::send(_sockfd, in.c_str(), in.size(), 0);
}
private:
int _sockfd; // 两个角色
};
1.4优缺点
优点: • 实现简单 • 兼容性好,可以实现跨平台 • 在一些老内核中就只能使用select来实现多路复用
缺点: • 需要手动设置 fd 集合(使用起来麻烦(fd 集合混杂着不同的事件类型))
• 需要把 fd 集合从用户态拷贝到内核态(OS不相信任何人)(无法避免)
• 需要在内核遍历传递进来的所有 fd
• select 支持的文件描述符数量太小(进程打开的文件描述符也是有上限)
2poll
poll主要来解决手动设置核fd太小的问题(其它缺点poll同样存在)
2.1作用和定位
作用:等待多个fd上面的事件就绪,通知用户事件已经就绪,可以进行IO了
定位:只负责等:等就绪后进行事件派发
2.2介绍参数
events和revents的取值
仅仅用一个结构体就解决了:用户让OS关心的事件(设置)和用户让我关心的事件(返回)
不用因为select设计的缺陷, 对fd和fd关心的事件进行重新设定!
而数量太小问题:poll就从此就给了用户,由用户定义fd数量后交给poll
2.3修改select代码
//PollServer.hpp
#pragma once
#include <cstdint>
#include <memory>
#include <sys/poll.h>
#include "Socket.hpp"
const static int gnum = sizeof(fd_set) * 8;
const static int gfd = -1;
class PollServer
{
public:
PollServer(uint16_t port) : _port(port), _st(std::make_unique<TcpSocket>())
{
_st->Tcp_ServerSocket(_port);
}
void Init()
{
for (int i = 0; i < gnum; i++)
{
fd_array[i].fd = gfd;
fd_array[i].events = 0;
fd_array[i].revents = 0;
}
// 自己先设置
fd_array[0].fd = _st->Sockfd();
fd_array[0].events = POLLIN;
}
void Link()
{
InetAddr ir;
int sockfd = _st->AcceptSocket(&ir);
if (sockfd > 0)
{
// 新的sockfd -> fd_array 让select进行管理
bool tmp = false;
for (int i = 1; i < gnum; i++)
{
if (fd_array[i].fd == gfd)
{
tmp = true;
fd_array[i].fd = sockfd;
fd_array[i].events = POLLIN;
std::cout << "add:" << fd_array[i].fd << std::endl;
break;
}
}
// fd_array满了
if (!tmp)
{
std::cout << "fd_array full" << std::endl;
}
}
}
void Read_Write(int i)
{
char buffer[1024];
ssize_t n = ::read(fd_array[i].fd, buffer, sizeof(buffer) - 1); // 阻塞?No
if (n > 0)
{
buffer[n] = 0;
std::cout << "Server read:" << buffer << std::endl;
std::string content = "<html><body><h1>hello warld</h1></body></html>";
std::string ech_str = "HTTP/1.0 200 OK\r\n";
ech_str += "Content-Type: text/html\r\n";
ech_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";
ech_str += content;
::write(fd_array[i].fd, ech_str.c_str(), ech_str.size());
}
else if (n == 0)
{
std::cout << "user quit.." << std::endl;
::close(fd_array[i].fd);
fd_array[i].fd = gfd;
fd_array[i].events = 0;
fd_array[i].revents = 0;
}
else
{
std::cout << "recv fail.." << std::endl;
::close(fd_array[i].fd);
fd_array[i].fd = gfd;
fd_array[i].events = 0;
fd_array[i].revents = 0;
}
}
void HandEvent()
{
for (int i = 0; i < gnum; i++)
{
if (fd_array[i].fd == gfd)
continue;
// 哪个读事件就绪
if (fd_array[i].revents & POLLIN)
{
// 处理特定sockfd
if (fd_array[i].fd == _st->Sockfd())
{
Link();
}
// 处理普通fd
else
{
Read_Write(i);
}
}
}
}
void Loop()
{
while (true)
{
// 不用设置啦 -> 优雅
int n = ::poll(fd_array, gnum, 1000);
switch (n)
{
case 0:
std::cout << "time done " << std::endl;
break;
case -1:
std::cerr << "select fail" << std::endl;
break;
default:
std::cout << "have even: " << n << std::endl;
HandEvent();
PrintFd_array();
break;
}
}
}
void PrintFd_array()
{
for (int i = 0; i < gnum; i++)
{
if (fd_array[i].fd == gfd)
continue;
std::cout << fd_array[i].fd << ' ';
}
std::cout << std::endl;
}
private:
uint16_t _port;
std::unique_ptr<Socket> _st;
// fd数组->select管理
pollfd fd_array[gnum];
};
poll的底层也是要OS遍历所有的fd,来获取fd和对应的事件(效率与select一样不高)
所以就有了下面的epoll~
3epoll
按照 man 手册的说法: 为处理大批量句柄而作了改进的 poll
它是在 2.5.44 内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)它几乎具备了之前所说的一切优点, 被公认为 Linux2.6 下性能最好的多路 I/O 就绪通知方法
3.1介绍参数
event可以是几个宏的集合
• EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
• EPOLLOUT : 表示对应的文件描述符可以写;
• EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外
数据到来);
• EPOLLERR : 表示对应的文件描述符发生错误;
• EPOLLHUP : 表示对应的文件描述符被挂断;
• EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平触发来说的.
• EPOLLONESHOT: 只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里
虽然说是epoll的加强版,但两者差别真的挺大的!
3.2工作原理
介绍完参数,此刻的你或许有很多问题:不急先来谈谈epoll的工作原理
理解数据到达主机
主机从网络中收数据时通过网卡来解收的,从而往上交给数据链路层;数据链路层是OS管的,也就是说OS怎么知道网卡有数据了需要进行拷贝了?
有人会说:简单,让OS定期检测一遍不就行了?OS要管理小到进程行大到外设,所有事情都要轮询OS不得忙死,所以OS是通过网卡给我发送信号(硬件中断)来得知网卡有数据啦!
epoll函数与OS的关系
但实际上,OS管理就绪队列时不用创建节点的!
OS也要对epoll模型作管理:先描述,在组织
3.3编写epoll代码
//ServerEpoll.hpp
#pragma once
#include "Socket.hpp"
const static int gnum = 128;
const static int gsize = 128;
class ServerEpoll
{
public:
ServerEpoll(uint16_t port)
: _port(port), _st(std::make_unique<TcpSocket>())
{
_st->Tcp_ServerSocket(_port);
_epfd = ::epoll_create(gsize);
if (_epfd < 0)
{
LOG(FATAL, "epoll_creat fail\n");
exit(1);
}
LOG(INFO, "epoll_creat sucess,_epfd:%d\n", _epfd);
}
~ServerEpoll()
{
if (_epfd > 0)
::close(_epfd);
_st->Close();
}
void Init()
{
epoll_event et;
et.data.fd = _st->Sockfd(); // 为了在事件就绪时知道fd就绪了
et.events = EPOLLIN;
int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, _st->Sockfd(), &et);
if (n < 0)
{
LOG(ERROR, "epoll_ctl fail\n");
exit(2);
}
LOG(INFO, "epoll_ctl sucess,add %d\n", _st->Sockfd());
}
void Start()
{
int time = -1;
while (true)
{
int n = ::epoll_wait(_epfd, ets, gnum, time);
switch (n)
{
case 0:
LOG(INFO, "epoll_wait time done\n");
break;
case -1:
LOG(INFO, "epoll_wait fail\n");
break;
default:
LOG(INFO, "epoll sucess total:%d\n", n);
Handler(n);
break;
}
}
}
void Handler(int n)
{
for (int i = 0; i < n; i++)
{
int sockfd = ets[i].data.fd;
uint32_t event = ets[i].events;
LOG(INFO, "%d 事件就绪 event:%s\n", sockfd, EventToString(event).c_str());
if (event & EPOLLIN)
{
if (sockfd == _st->Sockfd())
{
Accept();
}
else
{
HandlerIO(sockfd);
}
}
}
}
void Accept()
{
InetAddr ar;
int sockfd = _st->AcceptSocket(&ar);
if (sockfd < 0)
{
LOG(ERROR, "gain line error sockfd: %d\n", sockfd);
return;
}
LOG(INFO, "gain link:%d client:%s\n", sockfd, ar.User().c_str());
epoll_event et;
et.data.fd = sockfd;
et.events = EPOLLIN;
// OS在底层建立红黑树节点 -- select/poll要循环自己维护~
int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &et);
if (n < 0)
{
LOG(ERROR, "epoll_ctl fail\n");
exit(2);
}
LOG(INFO, "epoll_ctl sucess,add %d\n", sockfd);
}
void HandlerIO(int fd)
{
// 不保证读到的数据是正确的:要通过协议来保证~
char buffer[4096];
ssize_t n = ::recv(fd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
std::cout << buffer;
std::string content = "<html><body><h1>Hello Warld</h1></body></html>";
std::string ech_str = "HTTP/1.0 200 OK\r\n";
ech_str += "Content-Length:" + std::to_string(content.size()) + "\r\n";
ech_str += "\r\n";
ech_str += content;
::send(fd, ech_str.c_str(), ech_str.size(), 0);
}
else if (n == 0)
{
LOG(INFO, "Clinet quit fd:%d\n", fd);
// epoll移除 fd fd要保证:健康&&合法
::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
// 再关闭
::close(fd);
}
else
{
LOG(INFO, "read fail\n");
// epoll移除 fd fd要保证:健康&&合法
::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);
::close(fd);
}
}
std::string EventToString(uint32_t event)
{
std::string et;
if (event & EPOLLIN)
et += "EPOLLIN";
if (event & EPOLLOUT)
et += " | EPOLLOUT";
return et;
}
private:
uint16_t _port;
std::unique_ptr<Socket> _st;
int _epfd;
epoll_event ets[gnum]; // 不用自己维护
};
//Main.cc
#include<iostream>
#include<memory>
#include<sys/epoll.h>
#include"ServerEpoll.hpp"
int main(int args,char* argc[])
{
if(args!=2)
{
std::cout<<"./Main Port"<<std::endl;
exit(1);
}
uint16_t port=std::stoi(argc[1]);
std::unique_ptr<ServerEpoll> usr=std::make_unique<ServerEpoll>(port);
usr->Init();
usr->Start();
return 0;
}
//InetAddr.hpp
#pragma once
#include<iostream>
#include<string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class InetAddr
{
void ToHost()
{
//_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
_port=ntohs(_addr.sin_port);
char buffer[124];
inet_ntop(AF_INET,&_addr.sin_addr,buffer,sizeof(buffer));
_ip=buffer;
}
public:
InetAddr()
{
}
InetAddr(const struct sockaddr_in& addr)
:_addr(addr)
{
ToHost();
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
struct sockaddr_in Addr()
{
return _addr;
}
bool operator==(const InetAddr& ad)
{
return (this->_ip==ad._ip&&this->_port==ad._port);
}
std::string User()
{
std::string tmp=_ip+" "+std::to_string(_port)+":";
return tmp;
}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
//Log.hpp
#pragma once
#include<iostream>
#include<string>
#include<cstring>
#include<fstream>
#include <stdarg.h>
#include <unistd.h>
#include <sys/syscall.h>
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string Getlevel(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
break;
case INFO:
return "INFO";
break;
case WARNING:
return "WARNING";
break;
case ERROR:
return "ERROR";
break;
case FATAL:
return "FATAL";
break;
default:
return "";
break;
}
}
std::string Gettime()
{
time_t now = time(nullptr);
struct tm *time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
time->tm_year + 1900,
time->tm_mon + 1,
time->tm_mday,
time->tm_hour,
time->tm_min,
time->tm_sec);
return buffer;
}
struct log_message
{
std::string _level;
int _id;
std::string _filename;
int _filenumber;
std::string _cur_time;
std::string _message;
};
#define SCREAM 1
#define FILE 2
#define DEVELOP 3
#define OPERATION 4
const std::string gpath = "./log.txt";
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
class log
{
public:
log(const std::string &path = gpath, const int status = DEVELOP)
: _mode(SCREAM), _path(path), _status(status)
{
}
void SelectMode(int mode)
{
_mode = mode;
}
void SelectStatus(int status)
{
_status = status;
}
void PrintScream(const log_message &le)
{
printf("[%s][%d][%s][%d][%s] %s",
le._level.c_str(),
le._id,
le._filename.c_str(),
le._filenumber,
le._cur_time.c_str(),
le._message.c_str());
}
void PrintFile(const log_message &le)
{
std::fstream in(_path, std::ios::app);
if (!in.is_open())
return;
char buffer[1024];
snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s] %s",
le._level.c_str(),
le._id,
le._filename.c_str(),
le._filenumber,
le._cur_time.c_str(),
le._message.c_str());
in.write(buffer, strlen(buffer)); // 不用sizeof
in.close();
}
void PrintLog(const log_message &le)
{
// 过滤
if (_status == OPERATION)
return;
// 线程安全
pthread_mutex_lock(&gmutex);
switch (_mode)
{
case SCREAM:
PrintScream(le);
break;
case FILE:
PrintFile(le);
break;
default:
break;
}
pthread_mutex_unlock(&gmutex);
}
void logmessage(int level, const std::string &filename, int filenumber, const char *message, ...)
{
log_message le;
le._level = Getlevel(level);
le._id = syscall(SYS_gettid);
le._filename = filename;
le._filenumber = filenumber;
le._cur_time = Gettime();
va_list vt;
va_start(vt, message);
char buffer[128];
vsnprintf(buffer, sizeof(buffer), message, vt);
va_end(vt);
le._message = buffer;
// 打印日志
PrintLog(le);
}
~log()
{
}
private:
int _mode;
std::string _path;
int _status;
};
// 方便上层调用
log lg;
// ##不传时可忽略参数
#define LOG(level, message, ...) \
do \
{ \
lg.logmessage(level, __FILE__, __LINE__, message, ##__VA_ARGS__); \
} while (0)
#define SleftScream() \
do \
{ \
lg.SelectMode(SCREAM); \
} while (0)
#define SleftFile() \
do \
{ \
lg.SelectMode(FILE); \
} while (0)
#define SleftDevelop() \
do \
{ \
lg.SelectStatus(DEVELOP); \
} while (0)
#define SleftOperation() \
do \
{ \
lg.SelectStatus(OPERATION); \
} while (0)
//#pragma once
#include "InetAddr.hpp"
#include "Log.hpp"
class Socket;
enum
{
SOCK_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
};
const int gbacklog = 8;
class Socket
{
public:
virtual void CreateSocket() = 0;
virtual void InitSocket(uint16_t port) = 0;
virtual int AcceptSocket(InetAddr *addr) = 0; // 对象/变量
virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败
virtual int Sockfd() = 0;
virtual void Close() = 0;
virtual ssize_t Recv(std::string *out) = 0;
virtual ssize_t Send(const std::string &in) = 0;
public:
void Tcp_ServerSocket(uint16_t port)
{
CreateSocket();
InitSocket(port);
}
bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
{
CreateSocket();
return ConnectSocket(port, ip);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd)
: _sockfd(sockfd)
{
}
~TcpSocket()
{
}
virtual void CreateSocket() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(ERROR,"create socket fail\n");
exit(SOCK_ERROR);
}
LOG(INFO,"create socket sucess,sockfd:%d\n",_sockfd);
}
virtual void InitSocket(uint16_t port) override
{
struct sockaddr_in perr;
memset(&perr, 0, sizeof(perr));
perr.sin_family = AF_INET;
perr.sin_port = htons(port);
perr.sin_addr.s_addr = INADDR_ANY;
if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
{
LOG(ERROR,"bind fail\n");
exit(BIND_ERROR);
}
LOG(INFO,"bind sucess\n");
if (::listen(_sockfd, gbacklog) < 0)
{
LOG(ERROR,"listen fail\n");
exit(LISTEN_ERROR);
}
LOG(INFO,"listen sucess\n");
}
virtual int AcceptSocket(InetAddr *addr) override // 外层要获取客户端信息 TODO
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
LOG(ERROR,"link fail\n");
return -1;
}
*addr = client;
return sockfd;
}
virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
virtual int Sockfd() override
{
return _sockfd;
}
virtual void Close() override
{
if (_sockfd > 0)
{
::close(_sockfd);
}
}
virtual ssize_t Recv(std::string *out) override
{
char buffer[4096];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer; // 细节
}
return n;
}
virtual ssize_t Send(const std::string &in) override
{
return ::send(_sockfd, in.c_str(), in.size(), 0);
}
private:
int _sockfd; // 两个角色
};
3.4优点
• 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
• 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)
• 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中(填写就绪队列的节点), epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度 O(1),拷贝到用户定义的数组虽然也是O(N),但它严格按顺序进行拷贝
3.5工作模式
在XX快递站中有两个快递员:张三和李四;张三工作相对负责,而李四工作较毛躁;有一天住在5楼的小王在网上买了5个快递,现在正在派发:有三个在张三手上,有两个在李四手上;现在张三先到了小王楼下,带电话给小王叫他下来去快递,但由于小王在玩游戏,也就没下去取;张三一看小王没下来就重复给小王打电话;等到小王游戏打完后就下楼拿快递了,但他只拿了两个就上楼了;现在李四到了问张三:你还有多少个快递没派完?“还有一个小王的”张三说;那正好我手上有2个小王的包裹,你帮我一起派发吧!张三也只好接下了2个小王新增的包裹,继续给小王打电话,知道他吧包裹全取走为止...
后来小王又在网上买了5个快递,派发是恰好3个在李四手上,2个在张三手上;现在轮到李四先到小王楼下;由于李四工作比较毛躁,打电话给小王说:我只打电话给你一次,如果你不下来取快递,派发完别人的我就先走了!小王一听对方语气有点强硬,也就刚上了就不下去取;此时张三到了小王楼下,看到李四问有小王的快递没?一听有,张三就把小王的快递交给了李四说:上次我帮了你,这次你帮我下;李四也是挺讲义气的,也就把包裹拿来了,这次手上新增了2个包裹,再次打给了小王,小王说:你不是说不再给我打电话的吗?“没办法,手上又新增了你的2个快递,你快点下来拿把,不然我就走了!”小王也就得毕竟是自己的快递没必要不下去拿,也就下去把5个快递都拿回家了~
张三:只要底层一直有快递(数据),就一直通知小王(上层)
李四:包裹从无到有,从有到多的时候,才会通知小王(上层)
张三和李四就对应着epoll的两种不同的工作模式:水平触发(LT)边缘触发(ET)
理解LT和ET
场景:客户端向(ET模式下)服务器发送10k请求数据,服务器只读了1k数据
在ET模式下只通知一次,本轮数据如果没读完不会再通知;如果服务器没读完数据,导致双方都要发数据给对方,对方才能进行处理数据(陷入死循环)
这就要求在ET模式下读事件一旦就绪,必须把数据读完;那我怎么知道我把数据读完了?
这就好比关系还不错的朋友来向你借钱,第一次借100,下一次再借100,那他下次会不会再向你借钱呢?肯定会(知道你有钱~);那他怎么知道你没钱了?第三次向你借钱时,你发信息给他:我只有36.66块钱了,这是你朋友就知道你没钱了!
也就是在设计时进行循环读取,只到读取不到数据就证明没数据了!
如果这时朋友发信息再次向你借钱,你没回复他,他也不知道是什么情况(没钱还是没看到信息),就会一直僵持着
也就是说:循环读取势必会引起阻塞问题,但服务器敢让你阻塞吗(一阻塞可能服务器就挂了)!所以我们要对fd设置非阻塞来解决该问题!
以上便是为了引出结论:在ET模式下,所有的fd必须是非阻塞的!LT模式阻塞非阻塞都行~
ET更高效,体现在:1通知效率更高;2IO效率更高(给对方通告一个更大的接收窗口,让对方能发送更多的数据,提高IO效率)
那LT也可以设置非阻塞,也可以循环读取所有数据啊!但它会有退路可言(设置成阻塞,一次一次读数据)而ET一定要让上层读取全部数据!这在设计上ET也比LT要复杂些;但虽然ET更高效,也并不意味着LT就没用了:在一些场景中如果要实现响应是有顺序的或者是要求设计简单的,就选LT~
四Reactor
编写Reactor的基本框架
在Reactor的代码中,我们要来解决读和写的问题
关于读:我们能够直接读吗? 不能!
我们要怎么知道接收缓冲区有数据了? 通过epoll等待读事件就绪!
我们能保证读到的就是一个完整的请求吗? 不能!
如何保证? 引入协议:对读到的内容进行报文解析
关于写:a当我们获取一个新的sockfd时,输入和输出缓冲区都是空的
b读事件就绪:就是输入缓冲区有了数据/底层有新连接到来
c写事件就绪:不是关心数据,而是关心发送缓冲区中有没有空间,有空间就证明写事件就绪;否则写事件不满足!
d把sockfd托管给select,poll,epoll:原因是sockfd上事件没就绪!
所以默认sockfd新建的情况下,读事件是不就绪的:接收缓冲区暂时没数据,所以我们要把它进行托管
而默认sockfd新建的情况下,写事件是就绪的:接收缓冲区本来就有空间,所以我们直接写
当写条件不满足时,我们才要按需开启对EPOLLOUT的关心!
什么情况下写条件不满足? 发送缓冲区写满了&&数据还没发完
设置EPOLLOUT之后,我们就不用管了!后续的剩余数据epoll会自动给我发送!
补充细节:
1如果我们直接开启EPOLLOUT关心,epoll就会一直就绪
2如果未来我们发完数据了,对EPOLLOUT的关心就会被关闭
3如果缓冲区没满&&数据发完了,我们就不用开启对EPOLLOUT关心
4如果我们设置对EPOLLOUT的关心,epoll默认只会就绪一次!
5直接发,我们怎么知道写入条件不满足? 我们写入时进行判断errno == EWOULDBLOCK就是缓冲区被写满了!
在代码中我们没有sockfd的概念,只有一个一个的connection到来:所以我们要对connection进行描述并构建成对象,有Reactor对象来管理connection对象(通过unordered_map数据结构来组织)
accept设置连接时并不知道有多少个连接即将到来,所以我们要把listensockfd设置成非阻塞,循环获取新连接!
获取新连接时,我们要对读 写 异常方法进行bind回调函数,我们不在listener类中进行bind,我们统一在Main.cc(外部)将方法bind,怎么实现? 在connection类中定义一个type:表示该连接是监听sockfd还是普通sockfd;在Reactor类中定义一批方法集(读 写 异常方法),对外提供接口设置这批方法(也就是先在外部bind);从此我们AddConnection()函数添加连接是通过type判断是那种连接需要添加从而进行注册对应的方法(注册进connection类的成员方法中 ,后面等新事件到来时就能通过connection找到要执行的方法从而回调出去!)
在connection类中定义一个Reactor* R指针? 进行回指找到Reacor中的AddConnection()函数(在获取连接时我们要把新连接添加到unordered_map中要通过Reactor对象才能实现)
完整源代码
未来服务器将不会有sockfd的概念,只有一个一个的Connection
//Connection.hpp
#pragma once
#include <string>
#include <functional>
#define ListenConnection 0
#define NornalConnection 1
class Connection;
class Reactor;
using handler = std::function<void(Connection *)>;
class Connection
{
public:
Connection(int sockfd)
: _sockfd(sockfd)
{
}
~Connection()
{
}
void SetEvent(uint32_t event)
{
_event = event;
}
void SetReactor(Reactor *R)
{
_R = R;
}
void SetType(int type)
{
_type = type;
}
void SetAddr(InetAddr &addr)
{
_addr = addr;
}
void AppendInbuffer(const std::string &in)
{
_inbuffer += in;
}
void AppendOutbuffer(const std::string &in)
{
_outbuffer += in;
}
int Sockfd()
{
return _sockfd;
}
uint32_t Event()
{
return _event;
}
handler Recver()
{
return _recver;
}
handler Sender()
{
return _sender;
}
handler Excepter()
{
return _excepter;
}
Reactor *R()
{
return _R;
}
int Type()
{
return _type;
}
std::string &Inbuffer() // 引用!
{
return _inbuffer;
}
InetAddr Addr()
{
return _addr;
}
std::string &Outbuffer()
{
return _outbuffer;
}
void RegisterHandler(handler recver, handler sender, handler excepter)
{
_recver = recver;
_sender = sender;
_excepter = excepter;
}
void DiscardOutbuffer(ssize_t n)
{
_outbuffer.erase(0, n);
}
void Close()
{
if (_sockfd >= 0)
::close(_sockfd);
}
private:
int _sockfd;
uint32_t _event;
std::string _inbuffer;
std::string _outbuffer;
handler _recver; // 处理读事件
handler _sender; // 处理写事件
handler _excepter; // 处理异常事件
Reactor *_R; // 定义了一个Reactor指针?
int _type;
InetAddr _addr;
};
Connection被Reactor进行管理
//Reactor.hpp
#pragma once
#include <unordered_map>
#include <memory>
#include "Multiplex.hpp"
#include "Connection.hpp"
// 只对Connection进行管理
class Reactor
{
const static int gnum = 128;
public:
Reactor()
: _epoller(std::make_unique<Epoller>()), _isrunning(false)
{
}
~Reactor()
{
}
// 我不想让外部去new Connection对象
// void AddConnection(int sockfd, uint32_t event, handler recver, handler sender, handler excepter)
void AddConnection(int sockfd, uint32_t event, InetAddr &addr, int type)
{
// 构建Connection -- 不要用智能指针!!!
Connection *conn = new Connection(sockfd);
conn->SetEvent(event);
conn->SetReactor(this); // 设置conn与R的回指
conn->SetType(type); // 设置connection的类型
conn->SetAddr(addr);
// 注册读写异常方法
// conn->RegisterHandler(recver, sender, excepter);
if (conn->Type() == ListenConnection)
conn->RegisterHandler(_OnAccepter, nullptr, nullptr);
if (conn->Type() == NornalConnection)
conn->RegisterHandler(_OnRecver, _OnSender, _OnExcepter);
// epoll托管 - epoll模型进行封装
if (_epoller->AddEvent(conn->Sockfd(), conn->Event()))
_conn.insert({sockfd, conn}); // TcpServer管理
}
void Dispatcher()
{
int timeout = -1;
_isrunning = true;
while (_isrunning)
{
LoopOnce(timeout);
// Do Other Thing
PrintDebug();
}
_isrunning = false;
}
void LoopOnce(int timeout)
{
int n = _epoller->WaitEvent(revs, gnum, timeout);
for (int i = 0; i < n; i++)
{
int sockfd = revs[i].data.fd;
uint32_t event = revs[i].events;
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLHUP)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if (IsConnExist(sockfd) && _conn[sockfd]->Recver())
_conn[sockfd]->Recver()(_conn[sockfd]); // 写事件派发
}
if (event & EPOLLOUT)
{
if (IsConnExist(sockfd) && _conn[sockfd]->Sender())
_conn[sockfd]->Sender()(_conn[sockfd]); // 读事件派发
}
}
}
bool IsConnExist(int sockfd)
{
return _conn.find(sockfd) != _conn.end();
}
void SetListenConnection(handler OnAccepter)
{
_OnAccepter = OnAccepter;
}
void SetNornalConnection(handler OnRecver, handler OnSender, handler OnExcepter)
{
_OnRecver = OnRecver;
_OnSender = OnSender;
_OnExcepter = OnExcepter;
}
handler OnAccepter()
{
return _OnAccepter;
}
handler OnRecver()
{
return _OnRecver;
}
handler OnSender()
{
return _OnSender;
}
handler OnExcepter()
{
return _OnExcepter;
}
void PrintDebug()
{
std::string str;
for (auto &conn : _conn)
{
str += std::to_string(conn.second->Sockfd()) + " ";
}
LOG(DEBUG, "conn list:%s\n", str.c_str());
}
void EnableReadWriteEvent(int sockfd, bool read, bool write)
{
if (!IsConnExist(sockfd))
return;
uint32_t event = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0) | EPOLLET;
// 修改
_conn[sockfd]->SetEvent(event);
// 设置进内核
_epoller->ModEvent(_conn[sockfd]->Sockfd(), _conn[sockfd]->Event());
}
void DelConnection(int sockfd)
{
// 进行安全检测
if (!IsConnExist(sockfd))
return;
LOG(INFO, "client %s quit,server delete all sources\n", _conn[sockfd]->Addr().User().c_str());
// 先从epoller中移除->细节!epoller移除的sockfd必须是健康的
// EnableReadWriteEvent(sockfd,false,false);
_epoller->DelEvent(sockfd);
// 关闭sockfd
_conn[sockfd]->Close();
// 从_conn中去除
_conn.erase(sockfd);
}
private:
// sockfd与connection进行映射
std::unordered_map<int, Connection *> _conn;
std::unique_ptr<Multiplex> _epoller;
bool _isrunning;
epoll_event revs[gnum];
// 设置方法集进行统一处理
handler _OnAccepter;
handler _OnRecver;
handler _OnSender;
handler _OnExcepter;
};
epoll接口封装成对象
//Multiplex.hpp
#pragma once
#include <sys/epoll.h>
#include "Log.hpp"
#include "Comm.hpp"
using namespace log_ns;
class Multiplex
{
public:
virtual bool AddEvent(int sockfd, uint32_t event) = 0;
virtual int WaitEvent(struct epoll_event revt[], int num, int timeout) = 0;
virtual bool ModEvent(int sockfd, uint32_t event) = 0;
virtual bool DelEvent(int sockfd) = 0;
};
class Epoller : public Multiplex
{
const static int size = 128;
private:
bool EventHelper(int sockfd, uint32_t event, int oper)
{
struct epoll_event evt;
evt.data.fd = sockfd;
evt.events = event;
int n = ::epoll_ctl(_epfd, oper, sockfd, &evt);
if (n < 0)
{
LOG(ERROR, "epoll_ctl failed\n");
return false;
}
LOG(INFO, "epoll_ctl %d events is %s sucess\n", sockfd, EventToString(event).c_str());
return true;
}
public:
Epoller()
{
_epfd = ::epoll_create(size);
if (_epfd < 0)
{
LOG(FATAL, "epoll_create error\n");
exit(EPOLL_CTL_ERROR);
}
LOG(INFO, "epoll_create sucess efd: %d\n", _epfd);
}
~Epoller()
{
}
std::string EventToString(uint32_t event)
{
std::string str;
if (event & EPOLLIN)
str += "EPOLLIN";
if (event & EPOLLOUT)
str += "|EPOLLOUT";
if (event & EPOLLET)
str += "|EPOLLET";
return str;
}
virtual bool AddEvent(int sockfd, uint32_t event) override
{
return EventHelper(sockfd, event, EPOLL_CTL_ADD);
}
virtual bool ModEvent(int sockfd, uint32_t event) override
{
return EventHelper(sockfd, event, EPOLL_CTL_MOD);
}
virtual bool DelEvent(int sockfd) override
{
return EventHelper(sockfd, 0, EPOLL_CTL_DEL);
}
virtual int WaitEvent(epoll_event revs[], int num, int timeout) override
{
return ::epoll_wait(_epfd, revs, num, timeout);
}
private:
int _epfd;
};
// class Poller:private Multiplex
// {
// };
// class Select:private Multiplex
// {
// };
对listensockfd的封装,内含读事件(连接)方法
//Listener.hpp
#pragma once
#include <memory>
#include "Socket.hpp"
#include "Connection.hpp"
#include "Log.hpp"
#include "Reactor.hpp"
using namespace socket_ns;
using namespace log_ns;
class Listener
{
public:
Listener(uint16_t port)
: _port(port), _ListenSocket(std::make_unique<TcpSocket>())
{
_ListenSocket->Tcp_ServerSocket(port);
}
int Listensockfd()
{
return _ListenSocket->Sockfd();
}
// listensockfd 读方法
void Accepter(Connection *conn)
{
while (true)
{
errno = 0;
InetAddr addr;
int code = 0;
int sockfd = _ListenSocket->AcceptSocket(&addr, &code);
if (sockfd > 0)
{
LOG(INFO, "get a new link %s sockfd: %d\n", addr.User().c_str(), sockfd);
conn->R()->AddConnection(sockfd, EPOLLIN | EPOLLET, addr, NornalConnection);
}
else
{
if (code == EWOULDBLOCK)
{
LOG(INFO, "gain connection is finished\n");
break;
}
else if (code == EINTR)
{
continue;
}
else
{
LOG(ERROR, "gain connection failed\n");
break;
}
}
}
}
private:
uint16_t _port;
std::unique_ptr<Socket> _ListenSocket;
};
//Socket.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <functional>
#include <unistd.h>
#include <memory>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <pthread.h>
#include "Log.hpp"
#include "InetAddr.hpp"
#include "Comm.hpp"
namespace socket_ns
{
using namespace log_ns;
const static int gbacklog = 8;
// 模板方法模式
class Socket
{
public:
virtual void CreateSocket() = 0;
virtual void InitSocket(uint16_t port, int backlog = gbacklog) = 0;
virtual int AcceptSocket(InetAddr *addr, int *code) = 0; // 对象/变量
virtual bool ConnectSocket(uint16_t port, const std::string &ip) = 0; // clinet连接成功与失败
virtual int Sockfd() = 0;
virtual void Close() = 0;
virtual ssize_t Recv(std::string *out) = 0;
virtual ssize_t Send(const std::string &in) = 0;
virtual void ReuseAddr() = 0;
public:
void Tcp_ServerSocket(uint16_t port)
{
CreateSocket();
ReuseAddr();
InitSocket(port);
}
bool Tcp_ClientSocket(uint16_t port, const std::string &ip)
{
CreateSocket();
return ConnectSocket(port, ip);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket()
{
}
TcpSocket(int sockfd)
: _sockfd(sockfd)
{
}
~TcpSocket()
{
}
virtual void CreateSocket() override
{
_sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
LOG(FATAL, "socket fail\n");
exit(SOCK_ERROR);
}
LOG(INFO, "socket sucess sockfd: %d\n", _sockfd);
NoBlock(_sockfd); // 设置非阻塞
}
virtual void InitSocket(uint16_t port, int backlog) override
{
struct sockaddr_in perr;
memset(&perr, 0, sizeof(perr));
perr.sin_family = AF_INET;
perr.sin_port = htons(port);
perr.sin_addr.s_addr = INADDR_ANY;
if (::bind(_sockfd, (struct sockaddr *)&perr, sizeof(perr)) < 0)
{
LOG(FATAL, "bind fail\n");
exit(BIND_ERROR);
}
if (::listen(_sockfd, backlog) < 0)
{
LOG(ERROR, "listen fail\n");
exit(LISTEN_ERROR);
}
}
virtual int AcceptSocket(InetAddr *addr, int *code) override
{
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = ::accept(_sockfd, (struct sockaddr *)&client, &len);
*code = errno;
if (sockfd < 0)
{
return -1;
}
NoBlock(sockfd); // 设置非阻塞
*addr = client;
return sockfd;
}
virtual bool ConnectSocket(uint16_t port, const std::string &ip) override
{
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &server.sin_addr);
int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
return false;
}
return true;
}
virtual int Sockfd() override
{
return _sockfd;
}
virtual void Close() override
{
if (_sockfd > 0)
{
::close(_sockfd);
}
}
virtual ssize_t Recv(std::string *out) override
{
char buffer[4096];
ssize_t n = ::recv(_sockfd, buffer, sizeof(buffer) - 1, 0);
if (n > 0)
{
buffer[n] = 0;
*out += buffer; // 细节
}
return n;
}
virtual ssize_t Send(const std::string &in) override
{
return ::send(_sockfd, in.c_str(), in.size(), 0);
}
virtual void ReuseAddr() override
{
int opt = 1;
::setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
private:
int _sockfd; // 两个角色
};
// class UdpServer:public Socket
//{}
}
//Log.hpp
#pragma once
#include <pthread.h>
#include <fstream>
#include <syscall.h>
#include <stdarg.h>
#include <unistd.h>
#include <cstring>
namespace log_ns
{
enum
{
DEBUG = 1,
INFO,
WARNING,
ERROR,
FATAL
};
std::string Getlevel(int level)
{
switch (level)
{
case DEBUG:
return "DEBUG";
break;
case INFO:
return "INFO";
break;
case WARNING:
return "WARNING";
break;
case ERROR:
return "ERROR";
break;
case FATAL:
return "FATAL";
break;
default:
return "";
break;
}
}
std::string Gettime()
{
time_t now = time(nullptr);
struct tm *time = localtime(&now);
char buffer[128];
snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",
time->tm_year + 1900,
time->tm_mon + 1,
time->tm_mday,
time->tm_hour,
time->tm_min,
time->tm_sec);
return buffer;
}
struct log_message
{
std::string _level;
int _id;
std::string _filename;
int _filenumber;
std::string _cur_time;
std::string _message;
};
#define SCREAM_TYPE 1
#define FILE_TYPE 2
#define DEVELOP 3
#define OPERATION 4
const std::string gpath = "./log.txt";
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
class log
{
public:
log(const std::string &path = gpath, const int status = DEVELOP)
: _mode(SCREAM_TYPE), _path(path), _status(status)
{
}
void SelectMode(int mode)
{
_mode = mode;
}
void SelectStatus(int status)
{
_status = status;
}
void PrintScream(const log_message &le)
{
printf("[%s][%d][%s][%d][%s] %s",
le._level.c_str(),
le._id,
le._filename.c_str(),
le._filenumber,
le._cur_time.c_str(),
le._message.c_str());
}
void PrintFile(const log_message &le)
{
std::fstream in(_path, std::ios::app);
if (!in.is_open())
return;
char buffer[1024];
snprintf(buffer, sizeof(buffer), "[%s][%d][%s][%d][%s] %s",
le._level.c_str(),
le._id,
le._filename.c_str(),
le._filenumber,
le._cur_time.c_str(),
le._message.c_str());
in.write(buffer, strlen(buffer)); // 不用sizeof
in.close();
}
void PrintLog(const log_message &le)
{
// 过滤
if (_status == OPERATION)
return;
// 线程安全
pthread_mutex_lock(&gmutex);
switch (_mode)
{
case SCREAM_TYPE:
PrintScream(le);
break;
case FILE_TYPE:
PrintFile(le);
break;
default:
break;
}
pthread_mutex_unlock(&gmutex);
}
void logmessage(const std::string &filename, int filenumber, int level, const char *message, ...)
{
log_message le;
le._level = Getlevel(level);
le._id = syscall(SYS_gettid);
le._filename = filename;
le._filenumber = filenumber;
le._cur_time = Gettime();
va_list vt;
va_start(vt, message);
char buffer[128];
vsnprintf(buffer, sizeof(buffer), message, vt);
va_end(vt);
le._message = buffer;
// 打印日志
PrintLog(le);
}
~log()
{
}
private:
int _mode;
std::string _path;
int _status;
};
// 方便上层调用
log lg;
// ##不传时可忽略参数
#define LOG(level, message, ...) \
do \
{ \
lg.logmessage(__FILE__, __LINE__, level, message, ##__VA_ARGS__); \
} while (0)
#define SleftScream() \
do \
{ \
lg.SelectMode(SCREAM_TYPE); \
} while (0)
#define SleftFile() \
do \
{ \
lg.SelectMode(FILE_TYPE); \
} while (0)
}
//InetAddr.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class InetAddr
{
void ToHost()
{
//_ip=inet_ntoa(_addr.sin_addr);//4字节地址->char*
_port = ntohs(_addr.sin_port);
char buffer[124];
inet_ntop(AF_INET, &_addr.sin_addr, buffer, sizeof(buffer));
_ip = buffer;
}
public:
InetAddr(const std::string &ip, uint16_t port)
{
_ip = ip;
_port = port;
_addr.sin_family = AF_INET6;
_addr.sin_port = htons(port);
_addr.sin_addr.s_addr = INADDR_ANY;
}
InetAddr()
{
}
InetAddr(const struct sockaddr_in &addr)
: _addr(addr)
{
ToHost();
}
std::string Ip()
{
return _ip;
}
uint16_t Port()
{
return _port;
}
struct sockaddr_in Addr()
{
return _addr;
}
bool operator==(const InetAddr &ad)
{
return (this->_ip == ad._ip && this->_port == ad._port);
}
std::string User()
{
std::string tmp = _ip + ":" + std::to_string(_port);
return tmp;
}
private:
std::string _ip;
uint16_t _port;
struct sockaddr_in _addr;
};
对读 写 异常 方法进行封装
//HandlerConnection.hpp
#pragma once
#include "Connection.hpp"
#include "Log.hpp"
using namespace log_ns;
class HandlerConnection
{
static const int gsize = 1024;
public:
HandlerConnection(handler server)
{
_server = server;
}
// 就绪的conn
void HandlerRecver(Connection *conn)
{
while (true)
{
errno = 0;
char bufferstr[gsize];
ssize_t n = ::recv(conn->Sockfd(), bufferstr, sizeof(bufferstr), 0);
if (n > 0)
{
bufferstr[n] = 0;
conn->AppendInbuffer(bufferstr);
}
else
{
if (errno == EWOULDBLOCK)
{
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
conn->Excepter()(conn); // 统一处理异常
return; // 这里是返回!
}
}
}
// 读取完成 - 进行内容解析(处理粘包问题)
// 交给上层
_server(conn);
}
void HandlerSender(Connection *conn)
{
// 直接写
while (true)
{
errno = 0;
ssize_t n = ::send(conn->Sockfd(), conn->Outbuffer().c_str(), conn->Outbuffer().size(), 0);
if (n > 0)
{
conn->DiscardOutbuffer(n);
}
else if (n == 0)
{
break;
}
else
{
if (errno == EWOULDBLOCK)
{
break;
}
else if (errno == EINTR)
{
continue;
}
else
{
conn->Excepter()(conn);
return;
}
}
}
// 一定是读条件不就绪(发送缓冲区满了)
if (!conn->Outbuffer().empty())
{
// 开启读事件->剩下的数据epoll会自动帮我发完
conn->R()->EnableReadWriteEvent(conn->Sockfd(), true, true);
}
else
{
conn->R()->EnableReadWriteEvent(conn->Sockfd(), true, false);
}
}
void HandlerExcepter(Connection *conn)
{
conn->R()->DelConnection(conn->Sockfd());
}
private:
handler _server;
};
读事件要进行报文解析,读到完整请求后进行处理,生成应答后发出!
//Protocol.hpp
#pragma once
#include <iostream>
#include <memory>
#include <jsoncpp/json/json.h>
// #define SELF 1
// "len\r\n{json}\r\n" -- 自己设计出完整报文 len json的长度
// "\r\n" 第一个:区分len 和json边界 第二个:观察现象方便
const std::string sym = "\r\n";
const std::string space = " ";
// jsonstr变成完整报文
std::string Encode(const std::string &jsonstr)
{
int len = jsonstr.size();
return std::to_string(len) + sym + jsonstr + sym;
}
// 把json提取出来
// "len\r"
// "len\r\n{json}\r"
// "len\r\n{json}\r\n"
// "len\r\n{json}\r\n""len\r\n{json}\r\n""len\r\n{j"
std::string Decode(std::string &nameplate)
{
size_t pos = nameplate.find(sym);
if (pos == std::string::npos)
{
return "";
}
std::string lenstr = nameplate.substr(0, pos);
int len = std::stoi(lenstr);
// 计算出完整报文长度
int TotalLen = lenstr.size() + sym.size() + len + sym.size();
if (nameplate.size() < TotalLen)
{
return "";
}
// 读报文
std::string jsonstr = nameplate.substr(pos + sym.size(), len);
// 删报文
nameplate.erase(0, TotalLen);
return jsonstr;
}
class req
{
public:
req()
{
}
req(int x, int y, std::string &sym)
: _x(x), _y(y), _sym(sym)
{
}
// 结构化->字符串
void Serialize(std::string *out)
{
#ifdef SELF
// 1.自己做 -> "_x _sym _y"
// 2.使用现成库:jsoncpp
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["sym"] = _sym;
Json::FastWriter writer;
*out = writer.write(root);
#else
//"len\r\n {_x _sym _y} \r\n"
std::string x = std::to_string(_x);
std::string y = std::to_string(_y);
*out = x + space + _sym + space + y;
#endif
}
// 字符串->结构化
bool Deserialize(std::string &in)
{
#ifdef SELF
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res)
return false;
_x = root["x"].asInt();
_y = root["y"].asInt();
_sym = root["sym"].asString();
return true;
#else
//"len\r\n {_x _sym _y} \r\n"
auto left_space = in.find(space);
if (left_space == std::string::npos)
return false;
auto right_space = in.rfind(space);
if (right_space == std::string::npos)
return false;
if (left_space + space.size() + 1 != right_space)
return false;
std::string x = in.substr(0, left_space);
if (x.empty())
return false;
std::string sym = in.substr(left_space + space.size(), right_space);
if (sym.empty())
return false;
std::string y = in.substr(right_space + space.size());
if (y.empty())
return false;
_x = std::stoi(x.c_str());
_sym = sym;
_y = std::stoi(y.c_str());
return true;
#endif
}
void SetValue(int x, int y, char sym)
{
_x = x;
_y = y;
_sym = sym;
}
~req() {}
int _x;
int _y;
std::string _sym; //-x +-*/ _y
};
class rep
{
public:
rep()
: _result(0), _code(0), _des("sucess")
{
}
void Serialize(std::string *out)
{
#ifdef SELF
Json::Value root;
root["result"] = _result;
root["code"] = _code;
root["des"] = _des;
Json::FastWriter writer;
*out = writer.write(root);
#else
std::string result = std::to_string(_result);
std::string code = std::to_string(_code);
*out = result + space + code + space + _des;
#endif
}
bool Deserialize(std::string &in)
{
#ifdef SELF
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res)
return false;
_result = root["result"].asInt();
_code = root["code"].asInt();
_des = root["des"].asString();
return true;
#else
auto left_space = in.find(space);
if (left_space == std::string::npos)
return false;
auto right_space = in.rfind(space);
if (right_space == std::string::npos)
return false;
if (left_space + space.size() + 1 != right_space)
return false;
std::string result = in.substr(0, left_space);
if (result.empty())
return false;
std::string code = in.substr(left_space + space.size(), right_space);
if (code.empty())
return false;
std::string des = in.substr(right_space + space.size());
if (des.empty())
return false;
_result = std::stoi(result.c_str());
_code = std::stoi(code.c_str());
_des = des;
return true;
#endif
}
~rep() {}
int _result;
int _code; // 0-> sucess 1->div zero 2->mod zero 3->fail
std::string _des;
};
// 工厂模式
class Factory
{
public:
static std::shared_ptr<req> BuidRequest()
{
return std::make_shared<req>();
}
static std::shared_ptr<rep> BuidReponse()
{
return std::make_shared<rep>();
}
};
//PackageParse.hpp
#pragma once
#include <iostream>
#include <functional>
#include "Log.hpp"
#include "Protocol.hpp"
#include "Connection.hpp"
using namespace log_ns;
using protocol_t = std::function<std::shared_ptr<rep>(std::shared_ptr<req>)>;
class PackageParse
{
public:
PackageParse(protocol_t process)
: _process(process)
{
}
~PackageParse()
{
}
void server(Connection *conn)
{
while (true)
{
// 报文解析 -- 不能保证读到完整的报文
std::string message = ::Decode(conn->Inbuffer());
if (message == "")
break;
// 反序列化 -- 能保证读到完整报文
std::shared_ptr<req> q = Factory::BuidRequest();
q->Deserialize(message);
// 处理事务
auto p = _process(q);
// 序列化处理
std::string result;
p->Serialize(&result);
LOG(DEBUG, "client request finish: %s\n", result.c_str());
// 添加报头
result = Encode(result);
// 发送
conn->AppendOutbuffer(result);
// break;
}
// 至少处理了一个请求,有一个应答
// 方法1:直接发 - 单进程
if (!conn->Outbuffer().empty()) conn->Sender()(conn);
// 方法2:激活对事件的关心即可,IO全都是由Reactor处理 -> 解决线程安全
// 半同步半异步模式
//if (!conn->Outbuffer().empty()) conn->R()->EnableReadWriteEvent(conn->Sockfd(),true,true);
}
private:
protocol_t _process;
};
//Netcal.hpp
#pragma once
#include "Protocol.hpp"
// req -> rep
class Cal
{
public:
Cal() {}
~Cal() {}
std::shared_ptr<rep> Count(std::shared_ptr<req> q)
{
auto p = Factory::BuidReponse();
const char *a = q->_sym.c_str();
switch (*a)
{
case '+':
p->_result = q->_x + q->_y;
break;
case '-':
p->_result = q->_x - q->_y;
break;
case '*':
p->_result = q->_x * q->_y;
break;
case '/':
{
if (q->_y == 0)
{
p->_result = -1;
p->_code = 1;
p->_des = "div zero";
}
else
{
p->_result = q->_x / q->_y;
}
break;
}
case '%':
{
if (q->_y == 0)
{
p->_result = -1;
p->_code = 2;
p->_des = "mod zero";
}
else
{
p->_result = q->_x % q->_y;
}
break;
}
default:
p->_result = -1;
p->_code = 3;
p->_des = "illegal operation";
break;
}
return p;
}
};
现象
对单进程版Reactor进行改进:添加线程池
另外一种方案
实现思路
以上便是IO模型的全部内容,有错误欢迎在评论区指正,感谢观看!