目录
- 理解五种IO模型
- 非阻塞IO的设置
- 多路转接之select
- 实现一个简易的select服务器
- select服务器的优缺点
- 多路转接之poll
- 实现一个简易的poll服务器
- poll服务器的优缺点
- 多路转接之epoll
- epoll原理
- epoll的优势
- 用epoll实现一个简易的echo服务器
- epoll的LT和ET工作模式
- 什么是LT和ET
- 实现一个简易的reactor服务器
五种IO模型
如何理解五种IO模型:
应用层调用read/recv和write/send的时候,本质就是数据在用户层和操作系统的缓冲区的拷贝交互。要进行拷贝,必须先判断条件成立,该条件就是读写事件就绪。如果条件不成立,这些函数就会被阻塞住,等待资源就绪。
所以输入输出IO可以分为两个步骤:等待 + 拷贝
给大家举个例子帮助大家理解五种IO模型:
钓鱼 = 等待 + 钓(动作)
河边的张三在钓鱼,张三的钓鱼动作:一直盯着鱼漂,直到鱼漂抖动就说明有鱼来了。
李四也在钓鱼,李四的钓鱼动作:一边做自己的事,一边时不时观察鱼漂。
王五有个高级鱼竿,鱼漂上带着铃铛,王五的钓鱼动作:一直做自己的事,铃铛鱼漂响了,就去钓鱼
赵六买了20个鱼竿钓鱼,赵六的钓鱼动作:一直检查每个鱼漂,任何一个鱼漂动了就钓鱼。
小王开车载田七去公司的时候,田七路过的时候看见别人钓鱼,突然自己想吃野生鱼了,就叫小王去钓鱼,钓到了就给田七送过去。
人:进程/线程。鱼:数据。河:内核空间。鱼漂:数据就绪的事件。鱼竿:文件描述符(都是通过文件描述符进行操作)。钓鱼:recv/read
张三对应的是阻塞式IO。李四对应的是非阻塞IO,王五对应的是信号阻塞式IO(其中王五提前是知道铃铛响了该怎么办,该钓鱼),赵六对应的是多路转接/多路复用IO。田七对应的是异步IO(小王(操作系统)有鱼了告诉就告诉田七,田七直接用即可)
在这几个人中,你会觉得哪个人的钓鱼效率是最高的(钓的鱼最多的)?很明显是赵六,因为河中的鱼对每个钩子的咬钩概率是相同的,而赵六的钩子占比是最大的,所以必定单位时间内钓鱼的数量是最多的。
阻塞式IO:在内核将数据准备好之前,系统调用会一直等待,所有的文件描述符,默认都是阻塞方式。阻塞IO是最常见的IO模型,因为简单易上手。
非阻塞IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回EWOULDBLOCK错误码。非阻塞IO往往需要程序员循环的方式反复尝试读写该文件描述符,这个过程称为轮询,这对CPU来说是较大的浪费,一般只有特定场景下才使用。
信号驱动IO:内核将数据准备好的时候,使用SIGIO信号通知应用程序进行IO操作
IO多路转接:虽然从流程上看起来和阻塞IO类似,实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态
异步IO:由内核在数据拷贝完成时,通知应用程序(而信号驱动是告诉进程何时可以开始拷贝数据,需要进程自己拷贝)
什么叫做高效IO呢?
任何IO过程,都包含两个步骤,等待 + 拷贝。而且在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间。让IO更高效,最核心的办法就是让等待的时间尽量少。在单位时间内,IO过程中,等待对于拷贝的比重越小,IO效率越高。
阻塞式IO和非阻塞IO有什么区别?
区别:等待时的状态不同。
阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在到得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
同步IO和异步IO有什么区别?
同步,就是在发出一个调用时,在没有得到该调用的结果之前,该调用就不会返回。但是一旦调用返回,就得到返回值了。换句话说,就是有调用者主动等待这个调用结果。
异步,调用在发出之后,这个调用就直接返回,所以没有返回结果。直到被动的通过状态/信号来通知来处理
非阻塞IO的设置
如何设置非阻塞IO?
以前我们在写网络编程的时候,recv的flag选项填的是0
如果我们想非阻塞等待,则可以设flags为MSG_DONTWAIT。也可以用open非阻塞的方式打开,方法有很多。
但提供一种更通用的做法,文件描述符本质就是下标,每一个下标指向的就是内核里的文件对象,文件对象中是有文件描述符标志的。
传入的cmd值不同,后面追加的参数也不相同
fcntl函数有5种功能:
复制一个现有的描述符(cmd = F_DUPFD)
获得/设置文件描述符标记(cmd = F_GETFD或F_SETFD)
获得/设置文件状态标记(cmd = F_GETFL或F_SETFL)
获得/设置异步I/O所有权(cmd = F_GETOWN或F_SETOWN)
获得/设置记录锁(cmd = F_GETLK, F_SETLK或F_SETLKW)
实现非阻塞轮询方式读取标准输入
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
void SetNoBlock(int fd)
{
int flag = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
int main()
{
SetNoBlock(0);
char buffer[4096];
while (1)
{
std::cout << "Enter#";
fflush(stdout);
int n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << buffer << std::endl;
}
else if (n == 0)
{
std::cout << "read done" << std::endl;
break;
}
else
{
std::cout << "read error: " << strerror(errno) <<std::endl;
break;
}
}
return 0;
}
为什么还没有输入就直接读出错了?错误信息是资源暂时还没有就绪。
结论:
设置成为非阻塞,如果底层fd数据没有就绪,recv/read/write/send,返回值会以出错的形式返回。这样会有两种情况:a、真的出错 b、底层资源没有就绪
我们怎么区分呢? – 通过errno区分
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
void SetNoBlock(int fd)
{
int flag = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
}
int main()
{
SetNoBlock(0);
char buffer[4096];
while (1)
{
int n = read(0, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << buffer << std::endl;
}
else if (n == 0)
{
std::cout << "read done" << std::endl;
break;
}
else
{
if (errno == EWOULDBLOCK)
{
//只是底层没有数据就绪,这种错误是可以容忍的
//do_other_thing(); //也可以做其他事情,设置一些方法,就不演示了
continue;
}
std::cout << "read error: " << strerror(errno) <<std::endl;
break;
}
}
return 0;
}
多路转接之select
select是干什么的?select负责多路转接IO中的等待,一次可以等待多个文件描述符,并不负责拷贝。
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的,程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态变化。
nfds:填写要监视的所有要监视的fd中的最大值 + 1。这与底层内核有关系,内核会从0一直遍历到nfds。
timeout:为输入输出型参数
如果设置为nullptr,则为阻塞式。
若设置timeout = {0, 0};为非阻塞,会一直函数返回。
若设置timeout = {5, 0};如果一直收不到消息:阻塞5s,每过5s就超时(函数返回)一次,这样反复循环。如果过2s收到了消息,则该参数会返回{3, 0},表示还有3s才超时。
return返回值:大于0,有几个fd就绪了。等于0,超时/非阻塞返回了。小于0,select调用失败了
readfds:为输入输出参数
做输入参数时:表示用户告诉内核,你要帮我关心的文件描述符的读,fd_set表示位图,可以通过位图设置多个文件描述符。
做输出参数时:表示内核告诉用户,你要关心的文件描述中有哪些已经就绪了。
writefds和exceptfds同readfds参数
该位图既然是一种类型,必然有大小,所以能够想关心的fd的个数一定是有上限的。
int main()
{
std::cout << sizeof(fd_set) << std::endl;
return 0;
}
128 * 8 = 1024bit,最多只能关心1024个fd。bit位的位置代表fd是多少,为0则代表没设置该fd,为1则代表设置了该fd。
实现一个简易的select服务器
为了节省篇幅,后面再贴完整代码
#include "Socket.hpp"
class SelectServer
{
public:
SelectServer(uint16_t port)
:_port(port)
{}
void Init()
{
_listensock.CreateSocket();
_listensock.Setsockopt(); //设置端口复用
_listensock.Bind(_port);
_listensock.Listen();
}
void Start()
{
while (1)
{
//能否直接调用accept?不能直接accept,因为会阻塞!我们的目标就是写多路转接IO,所以不能出现阻塞
}
}
~SelectServer()
{
_listensock.Close();
}
private:
Socket _listensock;
uint16_t _port;
};
更新代码
#include "Socket.hpp"
class SelectServer
{
public:
SelectServer(uint16_t port)
:_port(port)
{}
void Init()
{
_listensock.CreateSocket();
_listensock.Setsockopt();//设置地址复用
_listensock.Bind(_port);
_listensock.Listen();
}
void HandleEvent()
{}
void Start()
{
while (1)
{
//能否直接调用accept?不能直接accept,因为会阻塞!
fd_set readfd;
FD_SET(_listensock.Fd(), &readfd);
struct timeval timeout = {3, 0};
int n = select(_listensock.Fd() + 1, &readfd, nullptr, nullptr, &timeout);//暂时这样设置
if (n < 0)
{
std::cout << "select error: " << strerror(errno) << std::endl;
break;
}
if (n == 0)
{
lg(Debug, "[%d: %d]", timeout.tv_sec, timeout.tv_usec);
}
else
{
std::cout << "Get a New Link" << std::endl;
sleep(2);
HandleEvent();
}
}
}
~SelectServer()
{
_listensock.Close();
}
private:
Socket _listensock;
uint16_t _port;
};
运行结果如下:
timeout设置为{3, 0}可以看到select每隔3s就超时一次,同理如果timeout设置为{0, 0},则会一直超时,如果设置为null,则会阻塞。
客户端连接上后,我们也看到了,他一直在打印Get a New Link,为什么一直打印?因为上层没有把底层数据拿上去处理,所以select就会一直提醒事件就绪了。
怎么处理?
更新代码
#include "Socket.hpp"
class SelectServer
{
public:
SelectServer(uint16_t port)
:_port(port)
{}
void Init()
{
_listensock.CreateSocket();
_listensock.Setsockopt();
_listensock.Bind(_port);
_listensock.Listen();
}
void HandleEvent(fd_set* rfd)
{
//代码走到这里就说明我们的连接事件就绪了
if (FD_ISSET(_listensock.Fd(), rfd) == true)
{
string clientIp; uint16_t clientPort;
int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
if (newSockfd < 0)
{
return;
}
lg(Info, "accept success: %s, %d\n", clientIp.c_str(), clientPort);
}
}
void Start()
{
while (1)
{
//能否直接调用accept?不能直接accept,因为会阻塞!
fd_set readfd;
FD_SET(_listensock.Fd(), &readfd);
struct timeval timeout = {3, 0};
int n = select(_listensock.Fd() + 1, &readfd, nullptr, nullptr, /*&timeout*/0);
if (n < 0)
{
std::cout << "select error: " << strerror(errno) << std::endl;
break;
}
if (n == 0)
{
lg(Debug, "[%d: %d]", timeout.tv_sec, timeout.tv_usec);
}
else
{
std::cout << "Get a New Link" << std::endl;
sleep(1);
HandleEvent(&readfd);
}
}
}
~SelectServer()
{
_listensock.Close();
}
private:
Socket _listensock;
uint16_t _port;
};
代码运行结果:
更新HandleEvent函数的代码
void HandleEvent(fd_set* rfd)
{
//代码走到这里就说明我们的连接事件就绪了
if (FD_ISSET(_listensock.Fd(), rfd) == true)
{
string clientIp; uint16_t clientPort;
int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
if (newSockfd < 0)
{
return;
}
lg(Info, "accept success: %s, %d\n", clientIp.c_str(), clientPort);
char buffer[4096];
ssize_t n = read(newSockfd, buffer, sizeof(buffer) - 1);
/*我们已经得到了新的sock,可以直接用read接口吗?不可以,因为我们这里是单线程/单进程,直接read可能会被阻塞住,以前我们的代码是直接托管给了
多线程/多进程,他们的阻塞不会影响主进程。我们这个单进程应该将新到来的newSocketfd交给select,让select帮我们来管理*/
}
}
所以此时,我们需要将HandleEvent函数里面的newSocketfd传递给Start函数。如何做呢?可以使用数组作为该类的成员 – 这个数组也叫做辅助数组,这也是select最大的特点之一:使用辅助数组,让文件描述符在函数之间互相传递。
#include "Socket.hpp"
class SelectServer
{
static const int MaxFdNum = sizeof(fd_set) * 8; //因为位图fd_set有大小,所以最多只能管理1024个fd
static const int DefaultNum = -1;
public:
SelectServer(uint16_t port)
:_port(port)
{
//初始化辅助数组
for (int i = 0; i < MaxFdNum; ++i)
{
read_fd_array[i] = DefaultNum;
}
}
void Init()
{
_listensock.CreateSocket();
_listensock.Setsockopt();
_listensock.Bind(_port);
_listensock.Listen();
}
void HandleEvent(fd_set* rfd)
{
for (int i = 0; i < MaxFdNum; ++i)//遍历所有的read_fd_array中是否有满足条件FD_ISSET
{
int socket = read_fd_array[i];
if (socket == DefaultNum)
continue;
if (FD_ISSET(socket, rfd) == true && _listensock.Fd() == socket)
{
//代码走到这里就说明我们的连接事件就绪了
string clientIp; uint16_t clientPort;
int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
if (newSockfd < 0)
{
return;
}
lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);
//添加新的文件描述符到辅助数组
for (int i = 0; i < MaxFdNum; ++i)
{
if (read_fd_array[i] == DefaultNum)
{
if (_maxfd < newSockfd)
_maxfd = newSockfd;
read_fd_array[i] = newSockfd;
break;
}
if (i == MaxFdNum - 1)
{
lg(Warning, "server is full, close sock: %d", newSockfd);
close(newSockfd);
}
}
}
else if (FD_ISSET(socket, rfd) == true)
{
//说明是其他的套接字读事件就绪了
char buffer[4096];
int n = read(socket, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
lg(Info, "client say:%s", buffer);
}
else if (n == 0)
{
//说明对端把连接关闭了
lg(Info, "client closed...");
//把该socket就可以移除关心状态了
read_fd_array[i] = DefaultNum;
close(socket);
}
else
{
//读出错了
lg(Error, "read error");
//把该socket就可以移除关心状态了
read_fd_array[i] = DefaultNum;
close(socket);
}
}
}
}
void Start()
{
//将lisntensockfd设置进辅助数组
read_fd_array[0] = _listensock.Fd();
_maxfd = read_fd_array[0];
while (1)
{
//因为select的参数为输入输出参数,所以我们每次调用select的时候都需要重新设置一遍select的参数,所以需要将下面的语句写入循环里
fd_set readfd;
FD_ZERO(&readfd); //一定要设置为0,否则可能会出现select失败的问题
for (int i = 0; i < MaxFdNum; ++i)
{
if (read_fd_array[i] != DefaultNum)
{
if (_maxfd < read_fd_array[i])
_maxfd = read_fd_array[i];
//将辅助数组中所要关心的文件描述符全部都设置进去
FD_SET(read_fd_array[i], &readfd);
}
}
int n = select(_maxfd + 1, &readfd, nullptr, nullptr, 0);//为了易于观察,我们将timout参数设置为0
if (n < 0)
{
std::cout << "select error: " << strerror(errno) << std::endl;
break;
}
if (n == 0)
{
//超时返回(非阻塞返回)
lg(Info, "timeout...");
}
else
{
HandleEvent(&readfd);
}
}
}
~SelectServer()
{
_listensock.Close();
}
private:
int _maxfd; //最大的文件描述符是什么
Socket _listensock;
uint16_t _port;
int read_fd_array[MaxFdNum]; //记录的是需要关心的读事件的文件描述符,-1表示不关心,非-1表示需要关心的文件描述符是什么
// int write_fd_array[MaxFdNum]; //仅为了理解多路转接IO,不考虑这两种情况,情况简单化,只考虑读事件
// int except_fd_array[MaxFdNum];
};
代码运行结果:
注意,我们的可是单进程哦。实现了并发处理多个请求。
整理代码,完整代码(允许结果和上面演示效果相同):
Log.hpp文件 – 往期文章实现过
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
using namespace std;
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define fileName "log.txt"
//使用前需要创建log目录
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
void printOneFile(string logname, const string& logtxt)
{
logname = path + logname;
int fd = open(logname.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);//open只会创建文件不会创建目录
if (fd < 0)
{
perror("open failed");
return;
}
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const string& logtxt)
{
string filename = fileName;
filename += ".";
filename += leveltoString(level);
printOneFile(filename, logtxt);
}
void printLog(int level, const string& logtxt)
{
if (printMethod == Screen)
{
cout << logtxt << endl;
return;
}
else if (printMethod == Onefile)
{
printOneFile(fileName, logtxt);
return;
}
else if (printMethod == Classfile)
{
printClassFile(level, logtxt);
return;
}
}
const char* leveltoString(int level)
{
if (level == Info) return "Info";
else if (level == Debug) return "Debug";
else if (level == Error) return "Error";
else if (level == Fatal) return "Fatal";
else return "default";
}
void operator()(int level, const char* format, ...)
{
time_t t = time(nullptr);
struct tm* st = localtime(&t);
char leftbuffer[4096];
snprintf(leftbuffer, sizeof(leftbuffer), "year: %d, month: %d, day: %d, hour: %d, minute: %d, second: %d\n\
[%s]:",
st->tm_year + 1900, st->tm_mon + 1, st->tm_mday, st->tm_hour, st->tm_min, st->tm_sec, leveltoString(level));
char rightbuffer[4096];
va_list start;
va_start(start, format);
vsnprintf(rightbuffer, sizeof(rightbuffer), format, start);
va_end(start);
char logtxt[4096 * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
printLog(level, logtxt);
}
private:
int printMethod;
string path;//路径与文件名解耦,最后将路径和文件粘合起来,再用open打开即可
};
Sock.hpp文件 – 往期文章实现过
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include "Log.hpp"
Log lg;
class Socket
{
public:
Socket()
{}
~Socket()
{}
void CreateSocket()
{
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
lg(Fatal, "create socket failed:%s", strerror(errno));
exit(1);
}
}
void Bind(uint16_t serverPort)
{
struct sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(serverPort);
server.sin_addr.s_addr = INADDR_ANY;
int n = bind(_sockfd, (struct sockaddr*)&server, sizeof(server));
if (n < 0)
{
lg(Fatal, "bind socket failed:%s", strerror(errno));
exit(2);
}
}
void Listen()
{
int n = listen(_sockfd, 5);
if (n < 0)
{
lg(Fatal, "listen socket failed:%s", strerror(errno));
exit(3);
}
}
int Accept(string* clientIp, uint16_t* clinetPort)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newsocket = accept(_sockfd, (struct sockaddr*)&peer, &len);
if (newsocket < 0)
{
lg(Error, "accept error:%s", strerror(errno));
return -1;
}
*clientIp = inet_ntoa(peer.sin_addr);
*clinetPort = ntohs(peer.sin_port);
return newsocket;
}
int Connect(const string& serverIp, uint16_t serverPort)
{
struct sockaddr_in server;
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(serverIp.c_str());
server.sin_port = htons(serverPort);
int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server));
if (n < 0)
{
lg(Error, "connect error:%s", strerror(errno));
return -1;
}
return 0;
}
void Setsockopt()
{
int opt = 1;
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt)) < 0)
lg(Error, "%s\n", strerror(errno));
}
void Close()
{
close(_sockfd);
}
int Fd()
{
return _sockfd;
}
private:
int _sockfd;
};
SelectServer.hpp文件
#include "Socket.hpp"
class SelectServer
{
static const int MaxFdNum = sizeof(fd_set) * 8; //因为位图fd_set有大小,所以最多只能管理1024个fd
static const int DefaultNum = -1;
public:
SelectServer(uint16_t port)
:_port(port)
{
//初始化辅助数组
for (int i = 0; i < MaxFdNum; ++i)
{
read_fd_array[i] = DefaultNum;
}
}
void Init()
{
_listensock.CreateSocket();
_listensock.Setsockopt();
_listensock.Bind(_port);
_listensock.Listen();
}
void Recver(int socket, int pos)
{
char buffer[4096];
int n = read(socket, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
lg(Info, "client say:%s", buffer);
}
else if (n == 0)
{
//说明对端把连接关闭了
lg(Info, "client closed...");
//把该socket就可以移除关心状态了
read_fd_array[pos] = DefaultNum;
close(socket);
}
else
{
//读出错了
lg(Error, "read error");
//把该socket就可以移除关心状态了
read_fd_array[pos] = DefaultNum;
close(socket);
}
}
void Acceptor()
{
string clientIp; uint16_t clientPort;
int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
if (newSockfd < 0)
{
return;
}
lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);
//添加新的文件描述符到辅助数组
for (int i = 0; i < MaxFdNum; ++i)
{
if (read_fd_array[i] == DefaultNum)
{
if (_maxfd < newSockfd)
_maxfd = newSockfd;
read_fd_array[i] = newSockfd;
break;
}
if (i == MaxFdNum - 1)
{
lg(Warning, "server is full, close sock: %d", newSockfd);
close(newSockfd);
}
}
}
void Dispatcher(fd_set* rfd) //事件派发器
{
for (int i = 0; i < MaxFdNum; ++i)//遍历所有的read_fd_array中是否有满足条件FD_ISSET
{
int socket = read_fd_array[i];
if (socket == DefaultNum)
continue;
if (FD_ISSET(socket, rfd) == true && _listensock.Fd() == socket)
{
//代码走到这里就说明我们的连接事件就绪了
Acceptor();
}
else if (FD_ISSET(socket, rfd) == true)
{
//说明是其他的套接字读事件就绪了
Recver(socket, i);
}
}
}
void EventLoop() //事件循环
{
//将lisntensockfd设置进辅助数组
read_fd_array[0] = _listensock.Fd();
_maxfd = read_fd_array[0];
while (1)
{
//因为select的参数为输入输出参数,所以我们每次调用select的时候都需要重新设置一遍select的参数,所以需要将下面的语句写入循环里
fd_set readfd;
FD_ZERO(&readfd); //一定要设置为0,否则可能会出现select失败的问题
for (int i = 0; i < MaxFdNum; ++i)
{
if (read_fd_array[i] != DefaultNum)
{
if (_maxfd < read_fd_array[i])
_maxfd = read_fd_array[i];
//将辅助数组中所要关心的文件描述符全部都设置进去
FD_SET(read_fd_array[i], &readfd);
}
}
int n = select(_maxfd + 1, &readfd, nullptr, nullptr, 0);//为了易于观察,我们将timout参数设置为0
if (n < 0)
{
std::cout << "select error: " << strerror(errno) << std::endl;
break;
}
if (n == 0)
{
//超时返回(非阻塞返回)
lg(Info, "timeout...");
}
else
{
Dispatcher(&readfd); //事件派发器
}
}
}
~SelectServer()
{
_listensock.Close();
}
private:
int _maxfd; //最大的文件描述符是什么
Socket _listensock;
uint16_t _port;
int read_fd_array[MaxFdNum]; //记录的是需要关心的读事件的文件描述符,-1表示不关心,非-1表示需要关心的文件描述符是什么
// int write_fd_array[MaxFdNum]; //仅为了理解多路转接IO,不考虑这两种情况,情况简单化,只考虑读事件
// int except_fd_array[MaxFdNum];
};
main.cc文件
#include "SelectServer.hpp"
int main()
{
SelectServer svr(8080);
svr.Init();
svr.EventLoop();
return 0;
}
select服务器的优缺点
优点:能实现成多路转接的服务器,即单进程也能处理多用户的请求
缺点:
1.同时等待的fd是有上限的,因为位图fd_set是有大小的。
2.用户必须借助第三方数组来维护合法的fd
3.使用select接口设置参数时,每次都要对关心的fd进行事件重置。
4.数据拷贝的频率比较高,select函数需要每次都需要重新设置参数,每传一次位图fd_set内核就需要拷贝一次。
5.如果只有一个fd就绪,用户层也需要全部遍历(虽然可以改进但没必要)
6.内核中检测位图fd_set的事件就绪也要遍历(这也就是为什么select的第一个参数传的是文件描述符的最大值,因为内核要以这个范围来遍历)
多路转接之poll
为了解决上面一些的问题,poll就出现了
fds:struct pollfd数组的首地址
nfds:数组元素的个数
timeout:设置超时时间,单位是ms
填0:非阻塞等待
小于0:表示阻塞等待
大于0:每过timeout就超时一次。(理解同select)
fd:你要让内核关心的某一个fd
events:输入参数,用户让内核关心fd的事件
revents:输出参数,内核告诉用户,你要关心的fd哪些事件就绪了
从这里看出来了,poll将输入和输出参数分离,所以poll不需要像select一样每次都需要将参数重新设定。
short是一个16bit的位图,events和revents的取值如下:
事件 | 描 述 | 是否可作为输入 | 是否可作为输出 |
---|---|---|---|
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | POLLRDHUPTCP连接被对方关闭,或者对方关闭了写操作。它GNU 引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 是 |
select只分为了读、写、异常事件,而poll将fd的事件分的更细了。
实现一个简易的poll服务器
参考代码:
Socket.hpp、Log.hpp文件同上
PollerServer.hpp文件
#include "Socket.hpp"
#include <poll.h>
#include <vector>
class PollServer
{
static const int DefaultNum = -1;
public:
PollServer(uint16_t port)
:_port(port)
{}
void Init()
{
_listensock.CreateSocket();
_listensock.Setsockopt();
_listensock.Bind(_port);
_listensock.Listen();
}
void Recver(int socket, int pos)
{
char buffer[4096];
int n = read(socket, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
lg(Info, "client say:%s", buffer);
}
else if (n == 0)
{
//说明对端把连接关闭了
lg(Info, "client closed...");
//把该socket就可以移除关心状态了
_event_fds.erase(_event_fds.begin() + pos);
close(socket);
}
else
{
//读出错了
lg(Error, "read error");
//把该socket就可以移除关心状态了
_event_fds.erase(_event_fds.begin() + pos);
close(socket);
}
}
void Acceptor()
{
string clientIp; uint16_t clientPort;
int newSockfd = _listensock.Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
if (newSockfd < 0)
{
return;
}
lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);
//添加新的文件描述符到辅助数组
struct pollfd event;
event.fd = newSockfd;
event.events |= POLLIN;
_event_fds.push_back(move(event));
}
void Dispatcher() //事件派发器
{
for (int i = 0; i < _event_fds.size(); ++i)//遍历所有的_event_fds中是否有revents事件就绪的
{
int socket = _event_fds[i].fd;
if ((_event_fds[i].revents & POLLIN) == true)
{
if (_listensock.Fd() == socket)
{
//代码走到这里就说明我们的连接事件就绪了
Acceptor();
}
else
{
//说明是其他的套接字读事件就绪了
Recver(socket, i);
}
}
else if ((_event_fds[i].revents & POLLOUT) == true)
{
//写事件就绪
}
else
{
//...
}
}
}
void EventLoop() //事件循环
{
//将lisntensockfd设置进辅助数组
struct pollfd event;
event.fd = _listensock.Fd();
event.events |= POLLIN;
_event_fds.push_back(move(event)); //将listensock添加到_event_fds数组里
while (1)
{
int n = poll(_event_fds.data(), _event_fds.size(), -1); //易于观察就设置为-1
if (n < 0)
{
std::cout << "poll error: " << strerror(errno) << std::endl;
break;
}
if (n == 0)
{
//超时返回(非阻塞返回)
lg(Info, "timeout...");
}
else
{
Dispatcher(); //事件派发器
}
}
}
~PollServer()
{
_listensock.Close();
}
private:
Socket _listensock;
uint16_t _port;
std::vector<struct pollfd> _event_fds;
};
main.cc文件
#include "PollServer.hpp"
int main()
{
PollServer svr(8080);
svr.Init();
svr.EventLoop();
return 0;
}
运行结果
poll服务器的优缺点
优点:
pollfd结构包含了要监视的event事件和就绪的revent事件,输入与输出分离了,接口使用比select方便。
poll解除了fd有上限的问题,数组为vector了,vector最大能有多大取决于操作系统了,不关poll函数的事了。
缺点:
用户层和内核层都需要遍历该数组,都是o(n)的效率,如果用户要关心的fd有上万个,频繁的遍历会影响效率问题。
每次调用poll,都需要把大量的pollfd结果从用户态拷贝到内核中。
有大量fd,若只有少量的fd处于就绪状态,也需要全部线性遍历一遍,随着监视描述符数量的增长,其效率也会线性下降。
总结:解决了select的1、3问题
多路转接之epoll
按照man手册的说法:是为处理大批量的socketfd而作了改进的poll。它是在2.5.44内核中被引进的,linux2.6下,它几乎具备了之前所说的一切优点,被公认为性能最好的多路转接IO。现在只要涉及服务器组件的,都用的是epoll。
size:该参数已经被废弃,但是要填大于0的值
该函数会创建一个epoll模型,返回epoll模型的文件描述符,什么是epoll模型,后面会讲。
epfd:epoll模型的描述符
events:输出型参数,struct epoll_event数组的首地址
maxevents:该数组的个数
timeout:意义同epoll参数的timeout
返回值和select、poll一样
fd:当事件就绪时,能知道该事件是哪个fd就绪了events可以是以下几个宏的集合
事件 | 描 述 |
---|---|
EPOLLIN | 表示对应的文件描述符可以读(包括对端SOCKET正常关闭) |
EPOLLOUT | 表示对应的文件描述符可以写 |
POLLPRI | 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来) |
POLLERR | 表示示对应的文件描述符发生错误 |
POLLPRI | 表示对应的文件描述符被挂断 |
POLLPRI | 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的 |
EPOLLONESHOT | 只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里 |
epfd:epoll模型
op、fd、event:用户告诉内核,要对fd的event事件进行op处理
op:
EPOLL_CTL_ADD添加事件
EPOLL_CTL_MOD修改事件
EPOLL_CTL_DEL删除事件
epoll解决了select的所有缺点,epoll是怎样设计的呢?它的原理是什么?
epoll原理
如图所示,内核中的三个部分(红黑树,回调函数,就绪队列)组成了epoll模型。epoll模型并且由struct_file结构体所指向,想管理epoll模型,就能通过fd找到epoll模型。
红黑树管理已注册的文件描述符:epoll_ctl对红黑树的增删改操作时间复杂度为O(logN)
就绪事件的通知机制:每当有fd就绪了,网卡驱动就会通知(调用回调函数)epoll,并执行内核曾经注册好的回调函数。有了这个就绪事件通知机制,内核就不用再O(N)的遍历所有的文件描述符是否已经就绪
就绪列表的管理:epoll_wait所捞取的都是已经就绪的
epoll的优势
1.用户层不再需要每次调用接口时都将关心fd拷贝给内核了
2.管理fd个数没有上限
3.不再需要辅助数组了,因为内核中的红黑树就代替了曾经用户要管理的数组
4.内核中通过事件通知机制O(1)就能将就绪fd放入就绪队列,不再需要遍历所有的fd,来判断是否就绪
5.对事件的管理更方便,只需要对epoll_ctl进行操作
select和poll的底层如图所示
用epoll实现一个简易的echo服务器
参考代码:
Log.hpp文件 和 Socket.hpp文件和之前一样
Epoller.hpp文件
#include <sys/epoll.h>
#include "Log.hpp"
#include <cstring>
class Epoller
{
public:
Epoller()
{}
void EpollerCreate()
{
_epollfd = epoll_create(64);
if (_epollfd < 0)
{
lg(Fatal, "Create epoll failed: %s", strerror(errno));
abort();
}
}
void EpollerUpate(int oper, int fd, uint32_t event)
{
int n = 0;
if (oper == EPOLL_CTL_ADD || oper == EPOLL_CTL_MOD)
{
struct epoll_event epEvent;
epEvent.data.fd = fd;
epEvent.events = 0;
epEvent.events |= event;
n = epoll_ctl(_epollfd, oper, fd, &epEvent);
if (n < 0)
{
lg(Error, "epoll ctl failed: %s", strerror(errno));
}
}
else if (oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epollfd, oper, fd, 0);
if (n < 0)
{
lg(Error, "epoll ctl failed: %s", strerror(errno));
}
}
}
int Wait(struct epoll_event* events, int n, int timeout)
{
int m = epoll_wait(_epollfd, events, n, timeout);
if (m < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
}
else if (m == 0)
{
//超时返回(非阻塞返回)
lg(Info, "timeout...");
}
return m;
}
int Fd()
{
return _epollfd;
}
private:
int _epollfd;
};
nocpy.hpp文件
//编程技巧:写防拷贝的类的时候直接继承,就不用自己再手动写了
class nocpy
{
public:
nocpy(){}
~nocpy(){}
private:
nocpy(const nocpy&) = delete;
nocpy& operator=(const nocpy&) = delete;
};
EpollServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
class EpollServer : public nocpy
{
static const int num = 64; //一次性最多捞取上来多少个就绪fd, 一次捞取不完下一次可以继续捞取
public:
EpollServer(uint16_t port)
:_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller)
{}
void Init()
{
_listensock_ptr->CreateSocket();
_listensock_ptr->Setsockopt();
_listensock_ptr->Bind(_port);
_listensock_ptr->Listen();
lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
_epoller_ptr->EpollerCreate();
lg(Info, "create epoller success");
}
void Recver(int socket, int pos)
{
char buffer[4096];
int n = read(socket, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
lg(Info, "client say:%s", buffer);
string s = "server echo:";
s += buffer;
int m = write(socket, s.c_str(), s.size());
if (m < 0)
{
lg(Error, "write failed: %s", strerror(errno));
}
}
else if (n == 0)
{
//说明对端把连接关闭了
lg(Info, "client closed...");
//把该socket就可以移除关心状态了
_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, socket, 0);
close(socket);
}
else
{
//读出错了
lg(Error, "read error");
//把该socket就可以移除关心状态了
_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, socket, 0);
close(socket);
}
}
void Acceptor()
{
string clientIp; uint16_t clientPort;
int newSockfd = _listensock_ptr->Accept(&clientIp, &clientPort); //会不会阻塞在这里?不会,因为走到这里说明资源已经就绪了
if (newSockfd < 0)
{
return;
}
lg(Info, "accept success, sock: %d, clientIp: %s, clientPort: %d\n", newSockfd, clientIp.c_str(), clientPort);
//添加新的文件描述符到Epoller里面
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, newSockfd, EPOLLIN);
}
void Dispatcher(struct epoll_event* recvs, int n) //事件派发器
{
for (int i = 0; i < n; ++i)//recvs数组里面全部都是就绪的fd
{
int socket = recvs[i].data.fd;
if ((recvs[i].events & EPOLLIN) == true)
{
if (_listensock_ptr->Fd() == socket)
{
//代码走到这里就说明我们的连接事件就绪了
Acceptor();
}
else
{
//说明是其他的套接字读事件就绪了
Recver(socket, i);
}
}
else if ((recvs[i].events & EPOLLOUT) == true)
{
//写事件就绪
}
else
{
//...
}
}
}
void EventLoop() //事件循环
{
//添加_listensock到Epoller里面
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), EPOLLIN);
struct epoll_event recvs[num];
while (1)
{
int n = _epoller_ptr->Wait(recvs, num, -1); //易于观察就设置为-1
Dispatcher(recvs, n); //捞取上来了n个fd
}
}
~EpollServer()
{
_listensock_ptr->Close();
}
private:
std::shared_ptr<Socket> _listensock_ptr; //智能指针优势:1.可共享拷贝资源 - 不怕浅拷贝 2.灵活的生命周期,可随时释放 3.可延迟初始化 4.异常安全
std::shared_ptr<Epoller> _epoller_ptr;
uint16_t _port;
};
main.cc文件
#include "EpollServer.hpp"
int main()
{
std::unique_ptr<EpollServer> svr(new EpollServer(8080 ));
svr->Init();
svr->EventLoop();
return 0;
}
代码运行结果:
epoll的LT和ET工作模式
什么是LT和ET
一个现象:
我们将这行代码注释掉,上层不处理新连接到来的这个事件
代码运行结果:
发现底层会一直通知上层,有新事件到来,请上层处理。
一旦有新的连接到来或者有新的数据到来,上层如果你不取走,底层会一直通知你去取走哦,这种模式就叫做LT
LT:水平触发Level Triggered ,ET:边缘触发Edge Triggered
LT、ET,和示波器很像
LT一直处于高电平,表示为真。ET只有从低点到高点变化的时候,才为真
epoll默认模式:LT模式。事件到来,但是上层不处理,高电平,一直有效
ET:底层数据或者连接,从无到有,从有到多,变化的时候,才会通知我们一次。
给大家举一个例子
张三是个快递员,是一个比较负责的快递员。假如你买的快递到了,有6个快递,下楼取一下,你可能在打游戏没时间取,过一会儿,张三又给你打电话:你有6个快递,你下来取一下。你游戏没打完就是不取,但是呢,张三期间也一直打电话。张三的同事李四,李四也有你的快递,路过,张三就给李四说:我帮你派发吧。张三现在一共有十个快递,让你下来取,你不下来取张三就一直会给你打电话,张三送快递的模式就叫做LT模式。
过了一周,又要派发你的快递。但是是李四来派发你的快递,李四给你打电话说:你不来取,我就不给你打电话了,我只通知你一次。你一听:那我还是下去取吧,要是走了我快递里就找不到了,我的快递在一堆快递里,除了快递员不知道哪个是我的快递。你取的时候,发现你只能拿走6个当中的4个,一次拿不完,你把快递拿上去之后剩下的快递你不知道在哪了,索性就不要了,你又去打你的游戏了。李四看到张三也有你的快递,还人情就帮张三派发,又给你打电话:你有新的快递到了,你不下来取,我就走了。你一听,这人只通知一次,你就又下去取了,这次就把上次没取完的一并给取走了。
总结:
张三是有快递就一直给你打电话。
李四是从无到有,从有到多,只给你打一次电话,你没取干净,我也不再通知你。
你认为两个快递员谁的工作效率更高呢?
ET,因为这个快递员在单位时间内通知的人数是更多的。ET一小时内可以通知50人,而LT可能只通知了10人。主要是ET的通知效率高。
ET:因为只会通知一次,所以会倒逼程序员使用ET工作模式的时候,每次通知,都必须要把本轮数据全部取走。你怎么知道你把本次就绪底层的数据读取完毕了呢?循环读取,知道读取不到数据了。一般的fd,是阻塞式的fd,如果没有数据了会阻塞,所以在ET模式下,我们的fd必须是非阻塞的。
ET的通知效率高,不仅仅如此,ET的IO效率也更高,原因在于,每通知一次就要求程序员把本轮数据全部取走,这意味着tcp会向对方通告一个更大的窗口,从而概率上让对方一次能给我发送更多的数据,提高了网络的吞吐量,则提高IO效率。
ET的效率也不是一定比LT高,LT也可以将所有的fd设置成为非阻塞,然后循环读取,通知第一次的时候就全部取走,不就和ET一样了嘛。LT是epoll的默认行为,使用ET能够减少epoll触发的次数,但是代价就是强逼着程序员一次就绪响应就把所有的数据都处理完。所以说,如果LT情况下如果也能做到每次就绪的文件描述符都立即处理,不让这个就绪被重复提示的话,其实性能也是一样的。那为什么LT不代替ET呢?因为程序员不能保证完全的替代,会可能写出bug。
实现一个简易的reactor服务器
我们之前read函数只读一次,因为tcp是面向字节流的,所以不能保证读上来的是一个完整的报文。
如果读一次没有读完,那需要读第二次,也就是循环读,所以读到的数据需要存到一个缓冲区里。
下面我们就用epoll来实现一个ET模式的reactor服务器
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
const int num = 64; //一次性最多捞取上来多少个fd
class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
Connection(int fd, TcpServer* tcp_server_ptr)
:_sockfd(fd), _tcp_server_ptr(tcp_server_ptr)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
private:
int _sockfd;
std::string _inbuffer;
std::string _outbuffer;
func_t _recv_cb; //读回调函数
func_t _send_cb; //写回调函数
func_t _except_cb; //异常回调函数
std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};
class TcpServer : public nocpy
{
public:
TcpServer(uint16_t port)
:_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller)
{}
void Init()
{
_listensock_ptr->CreateSocket();
_listensock_ptr->Setsockopt();
_listensock_ptr->Bind(_port);
_listensock_ptr->SetNonBlock(); //设置非阻塞
_listensock_ptr->Listen();
_epoller_ptr->EpollerCreate();
lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
}
void Dispatcher(int n)
{
for (int i = 0; i < n; ++i)
{
}
}
void Start()
{
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), EVENT_IN);
while (1)
{
int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
if (n < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
break;
}
else if (n == 0)
{
lg(Info, "timeout...");
}
else
{
Dispatcher(n);
}
}
}
~TcpServer()
{}
private:
std::shared_ptr<Socket> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
struct epoll_event recvs[num]; //捞取就绪事件的数组
std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接
int _port;
};
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>
const int num = 64; //一次性最多捞取上来多少个fd
class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
Connection(int fd, TcpServer* tcp_server_ptr)
:_sockfd(fd), _tcp_server_ptr(tcp_server_ptr)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
public:
int _sockfd;
std::string _inbuffer;
std::string _outbuffer;
func_t _recv_cb; //读回调函数
func_t _send_cb; //写回调函数
func_t _except_cb; //异常回调函数
std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};
class TcpServer : public nocpy
{
public:
TcpServer(uint16_t port)
:_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller)
{}
void Init()
{
_listensock_ptr->CreateSocket();
_listensock_ptr->Setsockopt();
_listensock_ptr->Bind(_port);
_listensock_ptr->SetNonBlock(); //设置非阻塞
_listensock_ptr->Listen();
_epoller_ptr->EpollerCreate();
lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN, nullptr, nullptr, nullptr);
}
void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
{
//建立连接的同时,挂到epoller中
//1.将listensock添加到Connection中,同时,将listen和Connection放入Connections中
std::shared_ptr<Connection> conn(new Connection(_listensock_ptr->Fd(), this));
conn->SetHandler(recv_cb, send_cb, except_cb);
_connections.insert(make_pair(_listensock_ptr->Fd(), conn));
//2.添加对应的事件,放入到epoller中
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
}
void Dispatcher(int n)
{
//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
for (int i = 0; i < n; ++i)
{
uint32_t event = recvs[i].events;
int fd = recvs[i].data.fd;
auto pos = _connections.find(fd);
assert(pos != _connections.end());
//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLHUP)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if (pos->second->_recv_cb)
pos->second->_recv_cb(pos->second);
}
if (event & EPOLLOUT)
{
if (pos->second->_send_cb)
pos->second->_send_cb(pos->second);
}
}
}
void Start()
{
while (1)
{
int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
if (n < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
break;
}
else if (n == 0)
{
lg(Info, "timeout...");
}
else
{
Dispatcher(n);
}
}
}
~TcpServer()
{}
private:
std::shared_ptr<Socket> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
struct epoll_event recvs[num]; //捞取就绪事件的数组
std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接
int _port;
};
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>
const int num = 64; //一次性最多捞取上来多少个fd
class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
Connection(int fd, TcpServer* tcp_server_ptr)
:_sockfd(fd), _tcp_server_ptr(tcp_server_ptr)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
public:
int _sockfd;
std::string _inbuffer;
std::string _outbuffer;
func_t _recv_cb; //读回调函数
func_t _send_cb; //写回调函数
func_t _except_cb; //异常回调函数
std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
};
class TcpServer : public nocpy
{
public:
TcpServer(uint16_t port)
:_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller)
{}
void Init()
{
_listensock_ptr->CreateSocket();
_listensock_ptr->Setsockopt();
_listensock_ptr->Bind(_port);
_listensock_ptr->SetNonBlock(); //设置非阻塞
_listensock_ptr->Listen();
_epoller_ptr->EpollerCreate();
lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
}
void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
{
//建立连接的同时,挂到epoller中
//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
std::shared_ptr<Connection> conn(new Connection(fd, this));
conn->SetHandler(recv_cb, send_cb, except_cb);
_connections.insert(make_pair(fd, conn));
//2.添加对应的事件,放入到epoller中
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
}
void Acceptor(std::shared_ptr<Connection> conn)
{
while (1)
{
std::string clientIp;
uint16_t clientPort;
Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
if (newSocket.Fd() < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "listensock accept failed: %s", strerror(errno));
break;
}
lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
newSocket.SetNonBlock();
AddConnection(newSocket.Fd(), EVENT_IN, nullptr, nullptr, nullptr);
}
}
void Dispatcher(int n)
{
//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
for (int i = 0; i < n; ++i)
{
uint32_t event = recvs[i].events;
int fd = recvs[i].data.fd;
auto pos = _connections.find(fd);
assert(pos != _connections.end());
//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLHUP)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if (pos->second->_recv_cb)
pos->second->_recv_cb(pos->second);
}
if (event & EPOLLOUT)
{
if (pos->second->_send_cb)
pos->second->_send_cb(pos->second);
}
}
}
void Start()
{
while (1)
{
int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
if (n < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
break;
}
else if (n == 0)
{
lg(Info, "timeout...");
}
else
{
Dispatcher(n);
}
}
}
~TcpServer()
{}
private:
std::shared_ptr<Socket> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
struct epoll_event recvs[num]; //捞取就绪事件的数组
std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接
int _port;
};
用三个客户端连接,代码运行结果
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>
const int num = 64; //一次性最多捞取上来多少个fd
class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
Connection(int fd, TcpServer* tcp_server_ptr)
:_sockfd(fd), _tcp_server_ptr(tcp_server_ptr)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
public:
int _sockfd;
std::string _inbuffer;
std::string _outbuffer;
func_t _recv_cb; //读回调函数
func_t _send_cb; //写回调函数
func_t _except_cb; //异常回调函数
std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
std::string _ip;
uint16_t _port;
};
class TcpServer : public nocpy
{
public:
TcpServer(uint16_t port, func_t OnMessage)
:_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage)
{}
void Init()
{
_listensock_ptr->CreateSocket();
_listensock_ptr->Setsockopt();
_listensock_ptr->Bind(_port);
_listensock_ptr->SetNonBlock(); //设置非阻塞
_listensock_ptr->Listen();
_epoller_ptr->EpollerCreate();
lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
}
void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
{
//建立连接的同时,挂到epoller中
//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
std::shared_ptr<Connection> conn(new Connection(fd, this));
conn->SetHandler(recv_cb, send_cb, except_cb);
_connections.insert(make_pair(fd, conn));
//2.添加对应的事件,放入到epoller中
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
}
void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可
{
lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);
auto it = _connections.find(conn->_sockfd);
assert (it != _connections.end());
_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放
_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);
close(conn->_sockfd);
}
void Recver(std::shared_ptr<Connection> conn)
{
int fd = conn->_sockfd;
char buffer[4096];
while (1)
{
int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取
if (n < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "read failed: %s", strerror(errno));
conn->_except_cb(conn);
return;
}
else if (n == 0)
{
lg(Info, "server closed...");
conn->_except_cb(conn);
return;
}
buffer[n] = 0;
conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理
}
_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理
}
void Acceptor(std::shared_ptr<Connection> conn)
{
while (1)
{
std::string clientIp;
uint16_t clientPort;
Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
if (newSocket.Fd() < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "listensock accept failed: %s", strerror(errno));
break;
}
lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
newSocket.SetNonBlock();
AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
nullptr, std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
}
}
void Dispatcher(int n)
{
//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
for (int i = 0; i < n; ++i)
{
uint32_t event = recvs[i].events;
int fd = recvs[i].data.fd;
auto pos = _connections.find(fd);
assert(pos != _connections.end());
//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLHUP)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if (pos->second->_recv_cb)
pos->second->_recv_cb(pos->second);
}
if (event & EPOLLOUT)
{
if (pos->second->_send_cb)
pos->second->_send_cb(pos->second);
}
}
}
void Start()
{
while (1)
{
int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
if (n < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
break;
}
else if (n == 0)
{
lg(Info, "timeout...");
}
else
{
Dispatcher(n);
}
}
}
~TcpServer()
{}
private:
std::shared_ptr<Socket> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
struct epoll_event recvs[num]; //捞取就绪事件的数组
std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接
int _port;
func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"
void DefaultOnMessage(std::shared_ptr<Connection> conn)
{
std::cout << "sock: " << conn->_sockfd <<"的缓冲区里的数据为:" << conn->_inbuffer << std::endl;
}
int main()
{
std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));
svr->Init();
svr->Start();
return 0;
}
代码运行结果如下
但是发现了一个问题:类似于循环引用,异常事件关闭连接时,导致程序出错。
更新代码
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>
const int num = 64; //一次性最多捞取上来多少个fd
class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
Connection(int fd)
:_sockfd(fd)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
public:
int _sockfd;
std::string _inbuffer;
std::string _outbuffer;
func_t _recv_cb; //读回调函数
func_t _send_cb; //写回调函数
func_t _except_cb; //异常回调函数
std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
std::string _ip;
uint16_t _port;
};
class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:
TcpServer(uint16_t port, func_t OnMessage)
:_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage)
{}
void Init()
{
_listensock_ptr->CreateSocket();
_listensock_ptr->Setsockopt();
_listensock_ptr->Bind(_port);
_listensock_ptr->SetNonBlock(); //设置非阻塞
_listensock_ptr->Listen();
_epoller_ptr->EpollerCreate();
lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
}
void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
{
//建立连接的同时,挂到epoller中
//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
std::shared_ptr<Connection> conn(new Connection(fd));
conn->SetHandler(recv_cb, send_cb, except_cb);
_connections.insert(make_pair(fd, conn));
if (fd == _listensock_ptr->Fd())
conn->_tcp_server_ptr = shared_ptr<TcpServer>(this); //如果是listensock连接,就构造智能指针来管理TcpServer
else
conn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理
//2.添加对应的事件,放入到epoller中
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
}
void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可
{
lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);
auto it = _connections.find(conn->_sockfd);
assert (it != _connections.end());
_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放
_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);
close(conn->_sockfd);
}
void Recver(std::shared_ptr<Connection> conn)
{
int fd = conn->_sockfd;
char buffer[4096];
while (1)
{
int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取
if (n < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "read failed: %s", strerror(errno));
conn->_except_cb(conn);
return;
}
else if (n == 0)
{
lg(Info, "server closed...");
conn->_except_cb(conn);
return;
}
buffer[n] = 0;
conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理
}
_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理
}
void Acceptor(std::shared_ptr<Connection> conn)
{
while (1)
{
std::string clientIp;
uint16_t clientPort;
Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
if (newSocket.Fd() < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "listensock accept failed: %s", strerror(errno));
break;
}
lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
newSocket.SetNonBlock();
AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
nullptr, std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
}
}
void Dispatcher(int n)
{
//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
for (int i = 0; i < n; ++i)
{
uint32_t event = recvs[i].events;
int fd = recvs[i].data.fd;
auto pos = _connections.find(fd);
assert(pos != _connections.end());
//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLHUP)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if (pos->second->_recv_cb)
pos->second->_recv_cb(pos->second);
}
if (event & EPOLLOUT)
{
if (pos->second->_send_cb)
pos->second->_send_cb(pos->second);
}
}
}
void Start()
{
while (1)
{
int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
if (n < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
break;
}
else if (n == 0)
{
lg(Info, "timeout...");
}
else
{
Dispatcher(n);
}
}
}
~TcpServer()
{}
private:
std::shared_ptr<Socket> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
struct epoll_event recvs[num]; //捞取就绪事件的数组
std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接
int _port;
func_t _OnMessage; //让上层处理信息
};
运行结果如下:
每一个socket都配套有一个读写缓冲区,为什么要有读缓冲区?因为你不能保证一次read上来的数据就是一个完整的数据。为什么要有写缓冲区?因为你不能保证内核缓冲区是有空间的,也就是你无法保证发送条件是否就绪。
异常和读事件我们都处理了,如何处理写事件呢?
通常情况下,发送缓冲区一般都是有空间的,写事件一般都是就绪的,如果我们设置对EPOLLOUT关心,那EPOLLOUT几乎每次都是就绪,会导致epoll_wait经常返回,浪费CPU资源。
结论:对于读事件,设置常关心,对于写事件,按需设置。什么是按需设置?直接写入,如果写入完成就结束。如果没有将这一轮的数据写完,outbuffer里还有数据,我们就需要对写事件进行关心了,如果写完了,就去掉对写事件的关心。
更新代码
TcpServer.hpp文件
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>
const int num = 64; //一次性最多捞取上来多少个fd
class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
Connection(int fd)
:_sockfd(fd)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
public:
int _sockfd;
std::string _inbuffer;
std::string _outbuffer;
func_t _recv_cb; //读回调函数
func_t _send_cb; //写回调函数
func_t _except_cb; //异常回调函数
std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
std::string _ip;
uint16_t _port;
};
class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:
TcpServer(uint16_t port, func_t OnMessage)
:_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage)
{}
void Init()
{
_listensock_ptr->CreateSocket();
_listensock_ptr->Setsockopt();
_listensock_ptr->Bind(_port);
_listensock_ptr->SetNonBlock(); //设置非阻塞
_listensock_ptr->Listen();
_epoller_ptr->EpollerCreate();
lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
}
void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
{
//建立连接的同时,挂到epoller中
//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
std::shared_ptr<Connection> conn(new Connection(fd));
conn->SetHandler(recv_cb, send_cb, except_cb);
_connections.insert(make_pair(fd, conn));
if (fd == _listensock_ptr->Fd()) //如果是listensock连接,就构造智能指针来管理TcpServer
conn->_tcp_server_ptr = shared_ptr<TcpServer>(this);
else
conn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理
//2.添加对应的事件,放入到epoller中
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
}
void Sender(std::shared_ptr<Connection> conn)
{
while (1)
{
int n = write(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size());
if (n > 0)
{
conn->_outbuffer.erase(0, n);
if (conn->_outbuffer.empty() == true)
{
return;
}
}
else if (n == 0)
{
return;
}
else
{
if (errno == EWOULDBLOCK || errno == EAGAIN)
{
//说明底层数据不就绪,即内核写缓冲区没有空间了
break;
}
if (errno == EINTR)
{
continue;
}
lg(Error, "write failed, socket: %d", conn->_sockfd);
conn->_except_cb(conn);
return;
}
}
if (!conn->_outbuffer.empty())
{
//说明没有将数据发完,则需要设置写事件关心了
uint32_t event = EVENT_OUT | EVENT_IN;
_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);
}
else
{
//说明将数据发完了,则取消对写事件关心了
uint32_t event = EVENT_IN;
_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);
}
}
void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可
{
lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);
auto it = _connections.find(conn->_sockfd);
assert (it != _connections.end());
_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放
_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);
close(conn->_sockfd);
}
void Recver(std::shared_ptr<Connection> conn)
{
int fd = conn->_sockfd;
char buffer[4096];
while (1)
{
int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取
if (n < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "read failed: %s", strerror(errno));
conn->_except_cb(conn);
return;
}
else if (n == 0)
{
lg(Info, "server closed...");
conn->_except_cb(conn);
return;
}
buffer[n] = 0;
conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理
}
_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理
}
void Acceptor(std::shared_ptr<Connection> conn)
{
while (1)
{
std::string clientIp;
uint16_t clientPort;
Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
if (newSocket.Fd() < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "listensock accept failed: %s", strerror(errno));
break;
}
lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
newSocket.SetNonBlock();
AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
std::bind(&TcpServer::Sender, this, std::placeholders::_1),\
std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
}
}
void Dispatcher(int n)
{
//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
for (int i = 0; i < n; ++i)
{
uint32_t event = recvs[i].events;
int fd = recvs[i].data.fd;
auto pos = _connections.find(fd);
assert(pos != _connections.end());
//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLHUP)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if (pos->second->_recv_cb)
pos->second->_recv_cb(pos->second);
}
if (event & EPOLLOUT)
{
if (pos->second->_send_cb)
pos->second->_send_cb(pos->second);
}
}
}
void Start()
{
while (1)
{
int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
if (n < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
break;
}
else if (n == 0)
{
lg(Info, "timeout...");
}
else
{
Dispatcher(n);
}
}
}
~TcpServer()
{}
private:
std::shared_ptr<Socket> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
struct epoll_event recvs[num]; //捞取就绪事件的数组
std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接
int _port;
func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"
std::string Handle(std::string& inbuffer) //业务处理
{
std::string tmp = inbuffer;
inbuffer.clear();
return tmp;
}
void DefaultOnMessage(std::shared_ptr<Connection> conn)
{
string response_str = Handle(conn->_inbuffer);
if (response_str.empty()) return;
conn->_outbuffer += response_str;
if (conn->_send_cb)
conn->_send_cb(conn);
}
int main()
{
std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));
svr->Init();
svr->Start();
return 0;
}
代码运行结果如下:
套用之前写的网络版本计算器业务逻辑 - 文末附完整代码
代码运行结果
reactor模型是半同步半异步的模型
半同步体现的是等:由epoll来做的:保证了就绪事件的通知和进行IO
半异步的体现:负责了业务处理,如果把业务处理放到线程池中去处理,就是半异步了。因为我们的业务处理并不耗时,所以就没有开多线程
还有一种模式叫做Proactor,纯异步的编写方式,在linux服务器设计里面没有,我们就不涉及了。
reactor也叫做反应堆,是什么意思呢?如同打地鼠的游戏
玩游戏的人就相当于是一个多路转接,我们要检测每一个洞口有没有地鼠出来,虽然没出来,但是我们知道一旦出来了我就要执行我的回调方法(来砸它) – 回调函数是提前设置好的。
游戏的面板就相当于我们的reactor,每一个洞就相当于Connection,老鼠上来了就叫做事件就绪,执行砸方法就是执行回调函数。这种就叫做反应堆
redis底层用的就是单reactor,处理用的是reactor的LT模式
完整版代码如下:
文件目录
Epoller.hpp文件
#include <sys/epoll.h>
#include "Log.hpp"
#include <cstring>
class Epoller
{
public:
Epoller()
{}
void EpollerCreate()
{
_epollfd = epoll_create(64);
if (_epollfd < 0)
{
lg(Fatal, "Create epoll failed: %s", strerror(errno));
abort();
}
}
void EpollerUpate(int oper, int fd, uint32_t event)
{
int n = 0;
if (oper == EPOLL_CTL_ADD || oper == EPOLL_CTL_MOD)
{
struct epoll_event epEvent;
epEvent.data.fd = fd;
epEvent.events = 0;
epEvent.events |= event;
n = epoll_ctl(_epollfd, oper, fd, &epEvent);
if (n < 0)
{
lg(Error, "epoll ctl failed: %s", strerror(errno));
}
}
else if (oper == EPOLL_CTL_DEL)
{
n = epoll_ctl(_epollfd, oper, fd, 0);
if (n < 0)
{
lg(Error, "epoll ctl failed: %s", strerror(errno));
}
}
}
int Wait(struct epoll_event* events, int n, int timeout)
{
int m = epoll_wait(_epollfd, events, n, timeout);
if (m < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
}
else if (m == 0)
{
//超时返回(非阻塞返回)
lg(Info, "timeout...");
}
return m;
}
int Fd()
{
return _epollfd;
}
private:
int _epollfd;
};
Log.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
using namespace std;
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define fileName "log.txt"
//使用前需要创建log目录
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
void printOneFile(string logname, const string& logtxt)
{
logname = path + logname;
int fd = open(logname.c_str(), O_WRONLY | O_APPEND | O_CREAT, 0666);//open只会创建文件不会创建目录
if (fd < 0)
{
perror("open failed");
return;
}
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const string& logtxt)
{
string filename = fileName;
filename += ".";
filename += leveltoString(level);
printOneFile(filename, logtxt);
}
void printLog(int level, const string& logtxt)
{
if (printMethod == Screen)
{
cout << logtxt << endl;
return;
}
else if (printMethod == Onefile)
{
printOneFile(fileName, logtxt);
return;
}
else if (printMethod == Classfile)
{
printClassFile(level, logtxt);
return;
}
}
const char* leveltoString(int level)
{
if (level == Info) return "Info";
else if (level == Debug) return "Debug";
else if (level == Error) return "Error";
else if (level == Fatal) return "Fatal";
else return "default";
}
void operator()(int level, const char* format, ...)
{
time_t t = time(nullptr);
struct tm* st = localtime(&t);
char leftbuffer[4096];
snprintf(leftbuffer, sizeof(leftbuffer), "year: %d, month: %d, day: %d, hour: %d, minute: %d, second: %d\n\
[%s]:",
st->tm_year + 1900, st->tm_mon + 1, st->tm_mday, st->tm_hour, st->tm_min, st->tm_sec, leveltoString(level));
char rightbuffer[4096];
va_list start;
va_start(start, format);
vsnprintf(rightbuffer, sizeof(rightbuffer), format, start);
va_end(start);
char logtxt[4096 * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
printLog(level, logtxt);
}
private:
int printMethod;
string path;//路径与文件名解耦,最后将路径和文件粘合起来,再用open打开即可
};
nocpy.hpp文件
#pragma once
//编程技巧:写防拷贝的类的时候直接继承,就不用自己再手动写了
class nocpy
{
public:
nocpy(){}
~nocpy(){}
private:
nocpy(const nocpy&) = delete;
nocpy& operator=(const nocpy&) = delete;
};
Socket.hpp文件
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <cstring>
#include "Log.hpp"
Log lg;
class Socket
{
public:
Socket(int fd = -1)
:_sockfd(fd)
{}
~Socket()
{}
void CreateSocket()
{
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
lg(Fatal, "create socket failed:%s", strerror(errno));
exit(1);
}
}
void Bind(uint16_t serverPort)
{
struct sockaddr_in server;
server.sin_family = AF_INET;
server.sin_port = htons(serverPort);
server.sin_addr.s_addr = INADDR_ANY;
int n = bind(_sockfd, (struct sockaddr*)&server, sizeof(server));
if (n < 0)
{
lg(Fatal, "bind socket failed:%s", strerror(errno));
exit(2);
}
}
void Listen()
{
int n = listen(_sockfd, 5);
if (n < 0)
{
lg(Fatal, "listen socket failed:%s", strerror(errno));
exit(3);
}
}
int Accept(string* clientIp, uint16_t* clinetPort)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newsocket = accept(_sockfd, (struct sockaddr*)&peer, &len);
if (newsocket < 0)
{
return -1;
}
*clientIp = inet_ntoa(peer.sin_addr);
*clinetPort = ntohs(peer.sin_port);
return newsocket;
}
int Connect(const string& serverIp, uint16_t serverPort)
{
struct sockaddr_in server;
server.sin_family = AF_INET;
server.sin_addr.s_addr = inet_addr(serverIp.c_str());
server.sin_port = htons(serverPort);
int n = connect(_sockfd, (struct sockaddr*)&server, sizeof(server));
if (n < 0)
{
return -1;
}
return 0;
}
void Setsockopt()
{
int opt = 1;
if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (const void*)&opt, sizeof(opt)) < 0)
lg(Error, "%s\n", strerror(errno));
}
void SetNonBlock()
{
int flag = fcntl(_sockfd, F_GETFL);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
void Close()
{
close(_sockfd);
}
int Fd()
{
return _sockfd;
}
private:
int _sockfd;
};
TcpServer.hpp文件如下
#include "Socket.hpp"
#include "Epoller.hpp"
#include "nocpy.hpp"
#include <memory>
#include <functional>
#include <unordered_map>
#include <cassert>
const int num = 64; //一次性最多捞取上来多少个fd
class Connection;
using func_t = function<void(std::shared_ptr<Connection>)>;
class TcpServer;
uint32_t EVENT_IN = (EPOLLIN | EPOLLET);
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET);
//每一个文件描述符都有一套自己的缓冲区和回调函数,每当该文件描述符就绪了,就调用自己的回调函数
class Connection
{
public:
Connection(int fd)
:_sockfd(fd)
{}
void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}
public:
int _sockfd;
std::string _inbuffer;
std::string _outbuffer;
func_t _recv_cb; //读回调函数
func_t _send_cb; //写回调函数
func_t _except_cb; //异常回调函数
std::shared_ptr<TcpServer> _tcp_server_ptr; //回指指针
std::string _ip;
uint16_t _port;
};
class TcpServer : public nocpy, public std::enable_shared_from_this<TcpServer> //继承此的作用仅是为了能获取到this对应的shared_ptr
{
public:
TcpServer(uint16_t port, func_t OnMessage)
:_port(port), _listensock_ptr(new Socket), _epoller_ptr(new Epoller), _OnMessage(OnMessage)
{}
void Init()
{
_listensock_ptr->CreateSocket();
_listensock_ptr->Setsockopt();
_listensock_ptr->Bind(_port);
_listensock_ptr->SetNonBlock(); //设置非阻塞
_listensock_ptr->Listen();
_epoller_ptr->EpollerCreate();
lg(Info, "create listen socket success, socket: %d", _listensock_ptr->Fd());
AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Acceptor, this, std::placeholders::_1), nullptr, nullptr);
}
void AddConnection(int fd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)
{
//建立连接的同时,挂到epoller中
//1.将fd添加到Connection中,同时,将fd和Connection放入Connections中
std::shared_ptr<Connection> conn(new Connection(fd));
conn->SetHandler(recv_cb, send_cb, except_cb);
_connections.insert(make_pair(fd, conn));
if (fd == _listensock_ptr->Fd()) //如果是listensock连接,就构造智能指针来管理TcpServer
conn->_tcp_server_ptr = shared_ptr<TcpServer>(this);
else
conn->_tcp_server_ptr = shared_from_this(); //如果是其他连接,就共享该资源来管理
//2.添加对应的事件,放入到epoller中
_epoller_ptr->EpollerUpate(EPOLL_CTL_ADD, fd, event);
}
void Sender(std::shared_ptr<Connection> conn)
{
while (1)
{
int n = write(conn->_sockfd, conn->_outbuffer.c_str(), conn->_outbuffer.size());
if (n > 0)
{
conn->_outbuffer.erase(0, n);
if (conn->_outbuffer.empty() == true)
{
return;
}
}
else if (n == 0)
{
return;
}
else
{
if (errno == EWOULDBLOCK || errno == EAGAIN)
{
//说明底层数据不就绪,即内核写缓冲区没有空间了
break;
}
if (errno == EINTR)
{
continue;
}
lg(Error, "write failed, socket: %d", conn->_sockfd);
conn->_except_cb(conn);
return;
}
}
if (!conn->_outbuffer.empty())
{
//说明没有将数据发完,则需要设置写事件关心了
uint32_t event = EVENT_OUT | EVENT_IN;
_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);
}
else
{
//说明将数据发完了,则取消对写事件关心了
uint32_t event = EVENT_IN;
_epoller_ptr->EpollerUpate(EPOLL_CTL_MOD, conn->_sockfd, event);
}
}
void Excepter(std::shared_ptr<Connection> conn) //异常事件触发了,转为读事件,就会调用读函数,读的时候肯定会异常,直接调用异常函数即可
{
lg(Warning, "Excepter hander sockefd: %d, client info: %s:%d", conn->_sockfd, conn->_ip.c_str(), conn->_port);
auto it = _connections.find(conn->_sockfd);
assert (it != _connections.end());
_connections.erase(it); //因为此时connection被_connections管理着的,一旦删了it,则connection也会被释放
_epoller_ptr->EpollerUpate(EPOLL_CTL_DEL, conn->_sockfd, 0);
close(conn->_sockfd);
}
void Recver(std::shared_ptr<Connection> conn)
{
int fd = conn->_sockfd;
char buffer[4096];
while (1)
{
int n = recv(fd, buffer, sizeof(buffer) - 1, 0);//虽然这里的最后一个参数时阻塞读取,但是之前设置了非阻塞,所以还是非阻塞读取
if (n < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "read failed: %s", strerror(errno));
conn->_except_cb(conn);
return;
}
else if (n == 0)
{
lg(Info, "server closed...");
conn->_except_cb(conn);
return;
}
buffer[n] = 0;
conn->_inbuffer += buffer; //因为read读上来的数据可能不完整,所以读后要放到_inbuffer来统一处理
}
_OnMessage(conn); //统一处理:1.数据有了,但是不一定全,所以要检测数据是否全。2.如果检测到完整数据,就处理
}
void Acceptor(std::shared_ptr<Connection> conn)
{
while (1)
{
std::string clientIp;
uint16_t clientPort;
Socket newSocket = _listensock_ptr->Accept(&clientIp, &clientPort);
if (newSocket.Fd() < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
//如果底层没有数据了
break;
}
else if (errno == EINTR)
{
//被信号打断了
continue;
}
lg(Error, "listensock accept failed: %s", strerror(errno));
break;
}
lg(Info, "get a new socket: %d, clientIp: %s, clientPort: %d", newSocket.Fd(), clientIp.c_str(), clientPort);
newSocket.SetNonBlock();
AddConnection(newSocket.Fd(), EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1),\
std::bind(&TcpServer::Sender, this, std::placeholders::_1),\
std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
}
}
void Dispatcher(int n)
{
//对比之前,我现在还没写Acceptor和Recv函数就已经把Dispatcher函数完成了,这就是代码的解耦
for (int i = 0; i < n; ++i)
{
uint32_t event = recvs[i].events;
int fd = recvs[i].data.fd;
auto pos = _connections.find(fd);
assert(pos != _connections.end());
//统一把事件异常转换成为读写问题,这样就只用考虑读和写即可
if (event & EPOLLERR)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLHUP)
event |= (EPOLLIN | EPOLLOUT);
if (event & EPOLLIN)
{
if (pos->second->_recv_cb)
pos->second->_recv_cb(pos->second);
}
if (event & EPOLLOUT)
{
if (pos->second->_send_cb)
pos->second->_send_cb(pos->second);
}
}
}
void Start()
{
while (1)
{
int n = _epoller_ptr->Wait(recvs, num, -1); //为了方便观察,设置为-1
if (n < 0)
{
lg(Error, "epoll wait failed: %s", strerror(errno));
break;
}
else if (n == 0)
{
lg(Info, "timeout...");
}
else
{
Dispatcher(n);
}
}
}
~TcpServer()
{}
private:
std::shared_ptr<Socket> _listensock_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
struct epoll_event recvs[num]; //捞取就绪事件的数组
std::unordered_map<int, std::shared_ptr<Connection>> _connections; //管理所有的连接
int _port;
func_t _OnMessage; //让上层处理信息
};
main.cc文件
#include "TcpServer.hpp"
#include "ServerCal.hpp"
void DefaultOnMessage(std::shared_ptr<Connection> conn)
{
ServerCal calculator;
std::string response_str = calculator.Calculator(conn->_inbuffer);
if (response_str.empty()) return;
conn->_outbuffer += response_str;
if (conn->_send_cb)
conn->_send_cb(conn);
}
int main()
{
std::unique_ptr<TcpServer> svr(new TcpServer(8080, DefaultOnMessage));
svr->Init();
svr->Start();
return 0;
}
Protocol.hpp文件
#pragma once
#include <iostream>
#include <string>
#include <jsoncpp/json/json.h>
using namespace std;
const string blank_space_sep = " ";
const string protocol_sep = "\n";
//添加报头:"内容" 转为 "长度\n内容\n"
bool Encode(string *package, const string& content)
{
package->clear();
*package += to_string(content.size());
*package += protocol_sep;
*package += content;
*package += protocol_sep;
return true;
}
//去除报头:"长度\n内容\n" 转为 "内容" -- 可能存在本来是5\n1 + 1\n 但接受到的是5\n1 +
bool Decode(string &package, string *content)
{
content->clear();
int pos = package.find(protocol_sep);
if (pos == string::npos) return false;
int len = stoi(package.substr(0, pos));
int totalLen = pos + len + 2;
if (package.size() < totalLen) return false;//如果报文不完整,说明
*content = package.substr(pos + protocol_sep.size(), len);
package.erase(0, totalLen);
return true;
}
//提取字符串
bool Extract(const string& in, int& x, int& y, char& oper)
{
int pos = in.find(blank_space_sep);
if (pos == string::npos) return false;
x = stoi(in.substr(0, pos));
oper = *in.substr(pos + blank_space_sep.size(), 1).c_str();
int pos2 = in.rfind(blank_space_sep);
if (pos2 == string::npos) return false;
y = stoi(in.substr(pos2 + blank_space_sep.size()));
return true;
}
//请求协议
class Request
{
public:
Request(int x = 0, int y = 0, char oper = '+')
:_x(x), _y(y), _oper(oper)
{}
//序列化 -- 将结构体转为"_x + _y"
bool Serialize(string* out)
{
#ifdef MySelf
out->clear();
*out += to_string(_x);
*out += blank_space_sep;
*out += _oper;
*out += blank_space_sep;
*out += to_string(_y);
return true;
#else
Json::Value root;
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::StyledWriter w;
*out = w.write(root);
return true;
#endif
}
//反序列化 -- 将"_x + _y"转为结构体
bool Deserialize(const string& in)
{
#ifdef MySelf
return Extract(in, _x, _y, _oper);
#else
Json::Reader r;
Json::Value root;
r.parse(in, root);
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
return true;
#endif
}
public:
int _x;
int _y;
char _oper;
};
//响应协议
class Response
{
const string blank_space_sep = " ";
public:
Response(int result = 0, int code = 0)
:_result(), _code(code)
{}
//序列化 -- 将结构体转为"_result _code"
bool Serialize(string* out)
{
#ifdef MySelf
*out = to_string(_result) + blank_space_sep + to_string(_code);
return true;
#else
Json::Value root;
root["code"] = _code;
root["result"] = _result;
Json::StyledWriter w;
*out = w.write(root);
return true;
#endif
}
//反序列化 -- 将"_result _code"转为结构体
bool Deserialize(const string& in)
{
#ifdef MySelf
int pos = in.find(blank_space_sep);
if (pos == string::npos) return false;
_result = stoi(in.substr(0, pos));
_code = stoi(in.substr(pos + blank_space_sep.size()));
return true;
#else
Json::Reader r;
Json::Value root;
r.parse(in, root);
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
#endif
}
public:
int _result;
int _code; // 0,可信,否则!0具体是几,表明对应的错误原因
};
ServerCal.hpp文件
#pragma once
#include "Protocol.hpp"
//服务器的计算服务
class ServerCal
{
public:
ServerCal()
{}
Response CalculatorHelper(const Request &req)
{
int x = req._x;
char oper = req._oper;
int y = req._y;
Response rsp(0, 0);
switch (oper)
{
case '+':
{
rsp._result = x + y;
rsp._code = 0;
break;
}
case '-':
{
rsp._result = x - y;
rsp._code = 0;
break;
}
case '*':
{
rsp._result = x * y;
rsp._code = 0;
break;
}
case '/':
{
if (y == 0)
{
rsp._result = 0;
rsp._code = -1;
break;
}
rsp._result = x / y;
rsp._code = 0;
break;
}
case '%':
{
if (y == 0)
{
rsp._result = 0;
rsp._code = -1;
break;
}
rsp._result = x % y;
rsp._code = 0;
break;
}
default:
break;
}
return rsp;
}
string Calculator(string& s)
{
string content;
if(!Decode(s, &content))//将"长度/n内容/n" -> "内容"
return "";
Request rq;
rq.Deserialize(content);//将"内容"反序列化
Response rsp = CalculatorHelper(rq);//得到答案内容
content.clear();
rsp.Serialize(&content);//序列化答案内容
string ret;
Encode(&ret, content);//将"内容" -> "长度/n内容/n"
return ret;
}
};
TcpClient.cc文件
#include "Protocol.hpp"
#include "Socket.hpp"
static void Usage(const std::string &proc)
{
std::cout << "\nUsage: " << proc << "\tserverIp\tport\n" << std::endl;
}
int main(int argc, char* argv[])
{
if (argc != 3)
{
Usage(argv[0]);
exit(0);
}
string serverIp = argv[1];
uint16_t serverPort = atoi(argv[2]);
Socket skt;
skt.CreateSocket();
skt.Connect(serverIp, serverPort);
string streamBuffer;
while (1)
{
cout << "Enter#";
fflush(stdout);
string content;
getline(cin, content);
//提取字符串,得到”请求反序列化“
Request rq;
Extract(content, rq._x, rq._y, rq._oper);
string s;
rq.Serialize(&s);
string ret;
Encode(&ret, s);
write(skt.Fd(), ret.c_str(), ret.size());
char readBuffer[4096];
int n = read(skt.Fd(), readBuffer, sizeof(readBuffer) - 1);
if (n > 0)
{
readBuffer[n] = 0;
streamBuffer += readBuffer;
string ret;
Decode(streamBuffer, &ret);
Response rsp;
rsp.Deserialize(ret);
cout << "code: " << rsp._code << " result: " << rsp._result << endl;
}
else if (n == 0)
{
cerr << "Server closed..." << endl;
break;
}
else
{
cerr << "read failed:" << strerror(errno) << endl;
exit(3);
}
}
return 0;
}
Makefile文件
1=main
.PHONY:all
all:$1 TcpClient
$1:$1.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
TcpClient:TcpClient.cc
g++ -o $@ $^ -std=c++11 -ljsoncpp
.PHONY:clean
clean:
rm -rf $1 TcpClient