一、 铺垫
- I,即input为输入;O,即output为输出,IO,即input + output为输入输出。
- IO一般是基于网卡,磁盘,光盘,U盘,磁盘,磁带等毫秒级别的外存,相较于纳秒级别的内存效率很低。
- 用户IO的本质,即等待读写事件就绪,然后拷贝数据给操作系统,或者从操作系统拷贝数据,即「等+拷贝」。
- IO效率已经是很低了,还要需要花费时间进行等待,那么就出现了高效的IO,但归根结底就是降低等待的时间,甚至避免等待和拷贝,那么等所占时间的总比重越小,我们就认为IO的效率越高。
- 非阻塞IO,纪录锁,系统V流机制,I/O多路转接(也叫I/O多路复用),readv和writev函数以及存储映射IO(mmap)等统称为高级IO。
补充:
- 读写事件就绪,就是等的状态结束,可以进行读写;
- 用户有用户缓存区,操作系统有内核级缓存区; 可以从安全性的角度简单理解。
- 读就绪就是操作系统将数据拷贝从内核缓存区拷贝到了用户缓存区,用户可以从用户缓存区读数据。
- 写就绪就是将数据从用户缓存区拷贝到了内核缓存区,用户可以向用户缓存区写数据。
- 拷贝到内核的数据,操作系统如何处理,则跟用户无关,用户只需要相信操作系统即可。
二、模型
总的来说,IO灰常地形象,就拿钓鱼举例,「鱼竿」就可理解为「文件描述符」,「鱼」就类似于「数据」,「鱼上钩」就类似于「IO事件就绪」,「把鱼放到桶里」就可理解为「拷贝数据」的过程。根据「你」参不参与IO的过程,通常可以分为同步IO和异步IO。
同步IO
- 阻塞式IO,说白话就是一个人,拿着一根鱼竿钓鱼,一直在等待,直到有鱼上钩为止。更专业点就是一个线程或者一个进程,拿着一个文件描述符,调用相应的阻塞式IO函数,直到数据可以读写为止。
- 图解:
- 非阻塞式IO,说白话就是一个人,拿着一根鱼竿固定在河边,每过一会儿就来看看是否有鱼上钩,中间刷个剧,吃点零食什么的消遣一会儿。更专业点就是一个线程或者一个进程,拿着一个文件描述符,调用相应的非阻塞式IO函数看一眼,如果返回读写事件没有就绪,就利用这点时间干点别的事情;就绪之后,进行读写事件即可。
- 图解:
- 信号驱动式IO,说白话就是一个人,拿着一根鱼竿固定在河边,鱼竿上有报警器,有鱼上钩之后,就里面把鱼钓上来,否则就干别的。更专业点就是一个线程或者一个进程,用文件描述符设置信号的发生和处理函数,当读写事件发生时,就会发送信号调用对应的处理函数,拷贝数据。当读写事件没就绪时,就一直干别的事情。
- 图解:
- 多路复用,也叫多路转接式IO,说白话就是一个人,拿着几百根鱼竿钓鱼,遍历检查看是否有鱼上钩。更专业点就是一个进程/线程,拿着大量的文件描述符,轮询检查看是否有IO事件就绪,有就处理,没有就检查下一个。
我们简单的讨论一下这几种IO模式的效率——
- 阻塞式IO,非阻塞式IO,信号驱动式IO参与在傻傻等鱼上钩的时间是逐步递减的,不考虑其它的消耗,可以简单认为IO效率逐步递增。
- 多路复用时IO,从概率的角度理解,假设一根鱼竿上钩的概率为千分之一,那么一百根鱼竿,遍历一次有鱼上钩的概率就为十分之一,因此从理论上看就是扩大基数,等待时间并行来不断进行检测,进而提高IO效率。实际过程中我们通常采用这种方式实现更加高效的IO。
异步IO
- 简单的来讲就是张三,请另一个人拿着一根鱼竿钓鱼,直到钓上鱼之后,通知张三将鱼拿走,整个调用的过程张三 只是发起了"请"这个动作,对于钓鱼的全过程,即等和拷贝并没有参与。专业点讲,一个线程或者一个进程,用文件描述符,等待和处理IO事件时,直接交给比如操作系统或者其它的线程帮你处理,等到事件完成之后,再通知进程IO事件的结果,进程没有参与IO的全过程,只是发起IO和判断结果。
- 图解:
- 一般来讲,由于「别人」可能并不靠谱,所以交给别人执行的逻辑是十分混乱且复杂的,面试常考的协程,可以在一定程度上简化执行逻辑。因为一般来说线程的切换是不确定的,即并不知道CPU当前运转哪一个线程,因此哪个线程谁先执行是不确定的,但是协程,简单可以看做一个轻量化的线程,可以让用户控制切换的顺序,减少了不确定性,因此也极大的提高了效率。
简单总结一下,同步IO参与等和拷贝的过程;而异步IO不参与等和拷贝,只参与发起和接收执行完毕之后的结果的过程。
三、函数
- 声明一下:下面的全部实现代码可在博主的Gitee链接中进行查看,以下内容注重思路的讲解。
1.fcntl
接口说明
/*
头文件:
*/
#include <unistd.h>
#include <fcntl.h>
/*
函数声明:
*/
int fcntl(int fd, int cmd, ... /* arg */ );
/*
参数:
1.文件描述符。
2.命令选项,通常的有F_GETFL(获取属性),F_SETFL(设置属性),
3.选项,可变参数列表。
其它常见的命令选项:
1.F_DUPFD,返回一个新的文件描述符,但与原文件描述符指向同一个文件。
2.F_GETFD或F_SETFD,用于获取与设置描述符标记,目前只有FD_CLOEXEC,意味在execl时,自动将文件描述符进行关闭。
3.F_GETFL或F_SETFL,用于获取与设置描述符状态属性,常见的有O_APPEND,O_NONBLOCK,O_CREAT,O_RDONLY,O_WRONLY,O_RDWR。
4.F_GETOWN或F_SETOWN,获取与设置异步IO进程拥有者的pid。
5.F_GETLK用于获取锁,F_SETLK用于设置锁,F_SETLKW用于表示没有冲突的锁存在。
返回值:
1. 设置失败,返回一个小于0的数表示。
2. 否则表示成功。
*/
下面我们通过代码实现来进一步了解IO模型,并熟练使用此函数。
非阻塞IO
#include<iostream>
#include<stdio.h>
#include<unistd.h>
#include<cstring>
int main()
{
char buff[128];
while(true)
{
printf("please enter@");
fflush(stdout);
//按下ctrl + D,结束输入。
int n = read(0,buff,sizeof(buff) - 1);
//1.返回值为0时,这里我们是读到了终端,需要根据。
//2.返回值为-1时,可能读取出错
//3.返回值大于0时,读取成功的字节数。
if(n == 0)
{
std::cout << "read end of file." << std::endl;
break;
}
else if(n > 0)
{
//实际上我们按下回车时,本质上就是输入了一个\n。
buff[n-1] = '\0';
std::cout <<"echo: "<< buff << std::endl;
}
else
{
std::cout << "error reason is " << strerror(errno) << ", errno code is " << errno << std::endl;
break;
}
}
return 0;
}
- 实验结果:
非阻塞式
#include<iostream>
#include<stdio.h>
#include<unistd.h>
#include<fcntl.h>
#include<cstring>
void SetNoBlock(int fd)
{
//第一步:获取当前文件描述符的状态。
int n = fcntl(fd, F_GETFL);
if(n < 0)
{
perror("fcntl:F_GETFL:");
return;
}
//第二步:添加非阻塞模式到当前文件描述符。
n = fcntl(fd,F_SETFL,n | O_NONBLOCK);
if(n < 0)
{
perror("fcntl:F_SETFL:");
}
std::cout << "set none block done" << std::endl;
}
int main()
{
char buff[128];
SetNoBlock(0);
while(true)
{
//按下ctrl + D,结束输入。
int n = read(0,buff,sizeof(buff) - 1);
//1.返回值为0时,读到了文件的结尾。
//2.返回值为-1时,可能读取出错
//3.返回值大于0时,读取成功的字节数。
if(n == 0)
{
std::cout << "read end of file." << std::endl;
break;
}
else if(n > 0)
{
//实际上读取时,按下\n会刷新缓存区。
buff[n-1] = '\0';
std::cout <<"echo: "<< buff << std::endl;
}
else
{
/*
1.这里当缓存区没有资源时,会出现11号报错,即Resource temporarily unavailable,
这其实不算是报错,只是缓存区暂时还没有资源。
2.11号用宏来表示就是EWOULDBLOCK
*/
if(errno == EWOULDBLOCK)
{
std::cout << "do other things...." << std::endl;
sleep(1);
}
else if(errno == EINTR)
{
//被信号中断了。
continue;
}
else
{
std::cout << "read error,error code is " << errno << ", reason is " << strerror(errno) << std::endl;
break;
}
}
}
return 0;
}
实验结果:
说明:
- F_GETFL:返回的文件描述符状态属性,失败返回-1。
- F_SETFL:设置文件描述符属性,失败返回 -1。
- O_NONBLOCK:使用或运算,因此本质上是计算机二进制的特定的位置,表示某种属性。
- EWOULDBLOCK:非阻塞模式下,read读取小于的返回值时的错误码为11,表示缓存区没有资源可读,这并不算错误,需要特殊处理一下。
- EINTR:表示调用函数的过程中被信号中断了。
信号驱动式
#include<iostream>
#include<cstring>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include<signal.h>
int fd = 0;
void handler(int sig)
{
char buffer[1024];
int n = read(fd,buffer,sizeof(buffer));
if(n == 0)
{
std::cout << "read end of file" << std::endl;
}
else if(n > 0)
{
buffer[n-1] = '\0';
std::cout << "echo: " << buffer << std::endl;;
}
else
{
if(errno != EWOULDBLOCK)
perror("read:");
}
}
void SetSigDive(int fd)
{
//Centos 7下必须设置读取为非阻塞模式,因为打印信息也会向进程发送信号。
//fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
//ubuntu下要开启异步模式,可以不用设置为非阻塞
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_ASYNC);
//设置文件描述符的所有者为当前进程,以便接收信号
int n = fcntl(fd, F_SETOWN, getpid());
if(n < 0)
{
perror("fcntl:");
return;
}
//将信号设置进操作系统中
struct sigaction act;
memset(&act,0,sizeof(act));//清空信号集。
act.sa_handler = handler;
if(sigaction(SIGIO,&act,nullptr) == -1)
std::cout << "fail to add signal..." << std::endl;
else
std::cout << "Set Signal Dive IO Success." << std::endl;
}
void Sleep(int sec);
int main()
{
SetSigDive(fd);
while(true)
{
std::cout << "do nothing...." << std::endl;
Sleep(1);
}
return 0;
}
void Sleep(int sec)
{
sigset_t mask,pre_mask;
sigemptyset(&mask);
sigaddset(&mask,SIGIO);
sigprocmask(SIG_BLOCK,&mask,&pre_mask);
sleep(sec);
sigprocmask(SIG_SETMASK,&pre_mask,nullptr);
}
- ubuntu下的实验结果:
说明一下:
- 不同的Linux机器上具体实现的细节可能不同,这一段时间博主由于由阿里云的Cenos7切换到了ubuntu的云服务上之后,观察到执行上述的代码结果就不同。
- Centos 7上就可以观察到,当cout时也是在给SIGIO发信号,也会调用信号处理函数进行处理,就不得不将文件描述符设置为非阻塞的状态,从而避免因误调而陷入阻塞的情况,并且我们也可以看到sleep(1)刹不住车的现象,这是由于信号中断导致的,因此就有了上面的自定义的休眠函数,添加了阻塞信号的功能,确保调用过程中不被信号打扰。
- 在ubuntu下,使用原先的代码就啥也观察不到,还必须得把O_ASYN异步模式打开才能观察到现象,而且cout的输出也不会再触发信号了,而且信号设置为阻塞的,也不会有啥问题。因此两者虽然都是Linux系统,但是在一些细节的处理上可能各不相同。
异步式
#include<iostream>
#include<cstring>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include<signal.h>
int fd = 0;
void handler(int sig)
{
char buffer[1024];
int n = read(fd,buffer,sizeof(buffer));
if(n == 0)
{
std::cout << "read end of file" << std::endl;
}
else if(n > 0)
{
buffer[n - 1] = '\0';
std::cout << "echo: " << buffer << std::endl;
}
else
{
if(errno != EWOULDBLOCK)
perror("read:");
}
}
int main()
{
signal(SIGIO,handler);//跟上述的sigaction函数的功能一样,不过用起来更为简单。
fcntl(fd,F_SETFL,fcntl(fd, F_GETFL) | O_ASYNC);
pid_t pid = fork();
if(pid == 0)
{
while(true)
{
sleep(1);
}
}
else
{
fcntl(fd,F_SETOWN,pid);
while(true)
{
std::cout << "do other things..." << std::endl;
sleep(1);
}
}
return 0;
}
- 实验结果:
如果信号驱动设置为异步没有明白的话,想必这个例子更浅显易懂,父进程获取到子进程的pid之后,将读取信息的任务交给了子进程进行读取,完成任务的发起动作,在之后都是子进程在收到信号并对数据进行拷贝和处理,而父进程则没有参与到这个过程。这是一个非常典型的异步demo,结合之前的理论知识希望能够帮助读者进一步理解异步IO。
2.多路转接
2.1 select
接口说明
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
struct timeval
{
time_t tv_sec; //seconds,秒
suseconds_t tv_usec; //microseconds,毫秒
};
/*
参数:
1:nfds,最大的文件描述符加1,当做左闭右开理解式记忆。
2,3,4: readfds,writefds,exceptfds,输入输出型参数,输入要关心的文件描述符,输出已经就绪的文件描述符。
5:输入输出型参数,输入设置的秒数,输出剩下的秒数,最低为0.
返回值:
1.等于0,表示等待超时,超过设定的时间,且没有文件描述符就绪。
2.大于0,表示已经就绪的文件描述符的个数,不过没啥用。
3.小于0,表示等待过程中出错。
*/
//位图操作函数:
void FD_CLR(int fd, fd_set *set);//从位图中移除文件描述符。
int FD_ISSET(int fd, fd_set *set);//查看文件描述符在位图中是否存在。
void FD_SET(int fd, fd_set *set);//在位图中设置文件描述符。
void FD_ZERO(fd_set *set);//清空或者初始化位图。
//内核中的fd_set
#define __NFDBITS (8 * sizeof(unsigned long))
#define __FD_SETSIZE 1024
#define __FDSET_LONGS (__FD_SETSIZE/__NFDBITS) // 1024 / (8 * 4 or 8) => 2^10^ / 2^5^ or 2^6^ => 32 or 16
typedef struct
{
unsigned long fds_bits [__FDSET_LONGS];
//32 * 4 or 16 * 8,但总大小都是128*8 = 1024比特位,即最大能存放这么多个文件描述符。
} __kernel_fd_set;
设计思路
- 成员变量设置一个中间存储数组,便于成员函数之间获取并进行相应的处理。
- 最开始时,在这个数组中初始化监听套接字,即创建,绑定,监听这三个步骤。
- 每次循环中,需要遍历查找最大的文件描述符,值得注意的是最大的文件描述符加一才是第一个参数。
- 初始化,要关心的fd_set结构,下面的实现代码只关心读事件就绪的位图,其它的类似。
- select函数执行完毕时,对返回值进行分析,对大于0的情况调用相应的事件处理函数进行处理。
- 处理时,我们需要首先判断是否是有新连接来临了,有,就将连接拿到上层,且放到数组中。
- 然后查看其它文件描述符,看是否有读事件就绪,如果有,使用相应的函数直接进行读取相应的内容即可。
实现代码:
- SelectServer.hpp
#include<unistd.h>
#include<sys/types.h>
#include<sys/time.h>
#include<algorithm>
#include "../Tools/Log.hpp"
#include "../Tools/Socket.hpp"
const int max_fd_nums = 1024;
const int max_buffer_size = 10240;
class SelectServer
{
public:
SelectServer(uint16_t port = 8080,string ip = "0.0.0.0")
:list_fd(port,ip)
{
//初始化时,将文件描述符数组,全置为-1
memset(fd_array,-1,sizeof(fd_array));
}
void Init()
{
list_fd.Socket();
list_fd.Bind();
list_fd.Listen();
}
void Accepter()
{
//如果就绪直接获取新连接即可
//输出型参数:
sockaddr_in client_msg;
socklen_t len;
int new_fd = list_fd.Accept(&client_msg,&len);
if(new_fd < 0) return;
//这里就要解决第一个问题:即将关心的读事件添加到下次调用的rset中,这就需要一个中间数组进行存储。
//进行遍历找到空位,即找到值为-1的下标即可。
int pos = 1;
while(pos < max_fd_nums)
{
if(fd_array[pos] == -1) break;
}
if(pos == max_fd_nums)
{
//说明没位置了,只能将获取的套接字进行关闭
close(new_fd);
}
else
{
fd_array[pos] = new_fd;
lg(INFORE,"add a new fd...");
}
}
void Recver(int fd,int i)
{
//说明就绪了,直接进行读取即可
char buffer[max_buffer_size] = {0};
int n = read(fd,buffer,sizeof(buffer) - 1);
if(n > 0)
{
buffer[n - 1] = '\0';
//读取成功,打印查看读取的内容
cout <<"echo: "<< buffer << endl;
}
else if(n == 0)
{
//说明,读取到文件的结尾,或者客户端的连接关闭了,关闭套接字,将所在位置的值置为-1,让出来即可。
close(fd);
fd_array[i] = -1;
lg(INFORE,"close fd: %d",fd);
}
else
{
//说明读取出错,查看错误原因即可。
lg(WARNNING,"read error, fd is %d ,errno code is %d, reason is %s",fd,errno,strerror(errno));
}
}
void Dispatcher(fd_set& rset)
{
//1.首先看监听事件是否就绪。
if(FD_ISSET(list_fd.GetSocket(),&rset))
{
Accepter();
}
//其次遍历其余就绪的事件进行读数据即可
for(int i = 1; i < max_fd_nums; i++)
{
int cur_fd = fd_array[i];
if(FD_ISSET(cur_fd,&rset))
{
Recver(cur_fd,i);
}
}
}
void Run()
{
//起初就将监听套接字放在0号下标处
fd_array[0] = list_fd.GetSocket();
for(;;)
{
fd_set rset;
//内核提供的128字节的一个定长数组,当做位图进行使用,最多能够容纳128 * 8 = 1024个文件描述符
//1.初始化,将位图清零,并添加初始套接字。
FD_ZERO(&rset);
for(auto fd : fd_array)
{
if(fd != -1)
{
FD_SET(fd,&rset);
}
}
//select首参数,文件描述符的最大值加1
int nfds = *max_element(fd_array,fd_array + max_fd_nums) + 1;
//遍历更新最大值,或者直接用接口
/*
struct timeval {
time_t tv_sec; //seconds
suseconds_t tv_usec; //microseconds
};
*/
//设置等待时间,这里设置3秒
struct timeval w_time = {3,0};
int n = select(nfds,&rset,nullptr,nullptr,&w_time);
/*
返回值:
1.等于0,表示等待时间超时,没有事件就绪。
2.大于0,表示就绪的事件个数。
3.小于0,表示出错。
第二到第四个参数:
1.输入时,为要关心的文件描述符。
2.输出时,为已经就绪的文件描述符。
第五个参数:
1.输入时,为等待的时间,设置为0,表示非阻塞等待。
2.输出时,为剩余的时间。
*/
switch (n)
{
case 0:
cout << "time out...." << endl;
//下一个循环会重新刷新等待的时间。
break;
case -1:
cout << "select errnor...." << endl;
break;
default:
//事件就绪,进行处理即可。
cout << "Event is already...." << endl;
Dispatcher(rset);
break;
}
}
}
private:
Sock list_fd;
int fd_array[max_fd_nums];
};
- Main.cc
#include<iostream>
#include<memory>
#include"SelectServer.hpp"
int main()
{
std::unique_ptr<SelectServer> ss_ptr(new SelectServer());
ss_ptr->Init();
ss_ptr->Run();
return 0;
}
注意:
- 遍历查找最大的文件描述符之后,我们还要对其加一才为select的第一个参数。
- 输入输出型参数,我们要注意对其进行重复的刷新和进行初始化。
- 链接放不下时,即在数组中找不到位置时,我们需要将连接及时地释放。
缺陷:
- 最重要的是select中的fd_set位图,是一个内置的自定义类型。最多只能存储1024个文件描述符。
- 使用select在内核查找和用户设置参数需要多次遍历,进而导致效率的降低。
- 输入输出型参数需要多次重新设置,拷贝次数多,进而导致效率的降低。
2.2 poll
接口说明:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd
{
int fd; /* 文件描述符 */
short events; /* 所关心的事件 */
short revents; /* 返回时已经就绪的事件 */
};
/*
参数:
1.fds,指的是数组的指针。
2.nfds,指的时数组的最大大小。
3.timeout,等待的时长。
返回值:
1.等于0,表示等待超时,超过设定的时间,且没有文件描述符就绪。
2.大于0,表示已经就绪的文件描述符的个数,不过没啥用。
3.小于0,表示等待过程中出错。
*/
/*
常见的事件就绪的情况~
POLLIN:读事件
POLLOUT:写事件
POLLPRI:表示有紧急数据可读取。
POLLNVAL表示无效请求:文件描述符未打开。
当写出错时:
POLLERR:发生了错误条件,具体要根据情况分析。
POLLHUP:表示发生了挂断的情况,通常表示被监视的文件描述符已经断开连接,或者管道关闭。
当Linux定义了,_XOPEN_SOURCE时,如下的宏可使用:
POLLRDNORM:等效于POLLIN,表示普通数据可读取。
POLLRDBAND:表示优先级带数据可读取,在Linux上通常未使用。
POLLWRNORM:等效于POLLOUT,表示普通数据可写入。
POLLWRBAND:表示可以写入优先级带数据。
当定义了_GNU_SOURCE时,可以使用:
POLLRDHUP(自Linux 2.6.17起):表示流套接字的对等端关闭了连接,或者关闭了连接的写一半。
*/
设计思路:
- 设置一个指针,指向struct pollfd,是一个动态数组,以及维护这个数组的最大大小。
- 当运行服务器时,先将监听套接字添加到这个动态数组中,当连接放不下时可以动态进行扩容。
- 设置适合的等待时间,然后使用这个指针,以及其最大大小,调用poll函数。
- 对返回值进行判断,如果返回值大于0,说明有事件就绪了。
- 进行事件的处理,可能是有新连接来临,需要设置对新的文件描述符关心的事件,也有可能是其它事件就绪。
说明:如果我们实现过上面的select的话,其实这里的实现就简单了很多。
基本实现:
- PollServer.hpp
#include<unistd.h>
#include<sys/types.h>
#include<sys/time.h>
#include<poll.h>
#include<algorithm>
#include "../Tools/Log.hpp"
#include "../Tools/Socket.hpp"
const int max_buffer_size = 1024;
const int default_event = 0;
const int default_fd = -1;
const int default_sz = 2;
class PollServer
{
public:
PollServer(uint16_t port = 8080,string ip = "0.0.0.0")
:list_fd(port,ip),sz(default_sz)
{
// event_fds = (struct pollfd*)malloc(sizeof(struct pollfd) * sz);
event_fds = new struct pollfd[sz];
//进行初始化
for(int i = 0; i < sz; i++)
{
event_fds[i].events = event_fds[i].revents = default_event;
event_fds[i].fd = default_fd;
}
}
~PollServer()
{
delete[] event_fds;
}
void Init()
{
list_fd.Socket();
//在绑定监听之前使用有效!
int opt = 1;
int fd = list_fd.GetSocket();
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT\
,&opt,sizeof(opt));
list_fd.Bind();
list_fd.Listen();
}
void Accepter()
{
//如果就绪直接获取新连接即可
//输出型参数:
sockaddr_in client_msg;
socklen_t len;
int new_fd = list_fd.Accept(&client_msg,&len);
event_fds[0].revents = default_event;
if(new_fd < 0) return;
//这里就要解决第一个问题:即将关心的读事件添加到下次调用的rset中,这就需要一个中间数组进行存储。
//进行遍历找到空位,即找到值为-1的下标即可。
int pos = 1;
while(pos < sz)
{
if(event_fds[pos].fd == default_fd)
break;
else
pos++;
}
if(pos == sz)
{
//说明没位置了,进行扩容
int new_sz = sz * 2;
struct pollfd* tmp = new struct pollfd[new_sz];
memcpy(tmp,event_fds,sizeof(struct pollfd) * sz);
delete[] event_fds;
//扩容。
event_fds = tmp;
//进行后续的初始化
for(int i = sz; i < new_sz; i++)
{
event_fds[i].events = event_fds[i].revents = default_event;
event_fds[i].fd = default_fd;
}
//更新sz
sz = new_sz;
cout << "扩容成功....." << endl;
}
event_fds[pos].fd = new_fd;
event_fds[pos].events = POLLIN;
event_fds[pos].revents = default_event;
lg(INFORE,"add a new fd...");
}
void Recver(int fd,int i)
{
//说明就绪了,直接进行读取即可
char buffer[max_buffer_size];
int n = read(fd,buffer,sizeof(buffer) - 1);
if(n > 0)
{
buffer[n - 1] = '\0';
//读取成功,打印查看读取的内容
cout <<"echo: "<< buffer << endl;
}
else if(n == 0)
{
//说明,读取到文件的结尾,或者客户端的连接关闭了,关闭套接字让出来即可。
close(fd);
event_fds[i].fd = default_fd;
event_fds[i].events = event_fds[i].revents = default_event;
lg(INFORE,"close fd: %d",fd);
}
else
{
//说明读取出错,查看错误原因即可。
lg(WARNNING,"read error, fd is %d ,errno code is %d, reason is %s",fd,errno,strerror(errno));
}
}
void Dispatcher()
{
if(event_fds[0].revents & POLLIN)
{
Accepter();
}
for(int i = 1; i < sz; i++)
{
if(event_fds[i].revents & POLLIN)
{
Recver(event_fds[i].fd,i);
}
}
}
void Run()
{
//起初就将监听套接字放在0号下标处
event_fds[0].fd = list_fd.GetSocket();
event_fds[0].events = POLLIN;
int wait_time = 3000;//3000ms == 3s
for(;;)
{
int n = poll(event_fds,sz,wait_time);
switch (n)
{
case 0:
cout << "time out...." << endl;
break;
case -1:
cout << "select errnor...." << endl;
break;
default:
//事件就绪,进行处理即可。
cout << "Event is already...." << endl;
Dispatcher();
break;
}
}
}
private:
Sock list_fd;
struct pollfd* event_fds;
int sz;
};
- Main.cc
#include<iostream>
#include<memory>
#include"PollServer.hpp"
int main()
{
std::unique_ptr<PollServer> ps_ptr(new PollServer());
ps_ptr->Init();
ps_ptr->Run();
return 0;
}
优点:
- 相比较select,更为简单,且可以实现动态扩容的效果。
- 其次,将要关心的事件与已经就绪的事件进行了分离,减少了拷贝次数。
缺点:
- 还是要多次对遍历数组,进而确认文件描述符的就绪情况。
2.3 epoll
接口说明:
int epoll_create(int size);
/*
参数:设置内核结构的大小,从Linux2.6.8之后参数已经是无效的了,但需要传入一个大于0的数。
返回值:成功返回epfd,用于之后的传参控制要关心的事件。失败返回-1,并设置合适的错误码。
*/
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
/*
参数:
1.epfd,eppoll_create创建成功之后的返回值。
2.events,输出型参数,返回已经就绪的事件。
3.maxevents,设置的返回就绪事件的最大个数。
4.timeout,调用函数的等待的时间,单位为ms。
返回值:
1.等于0,表示等待超时,超过设定的时间,且没有文件描述符就绪。
2.大于0,表示已经就绪的文件描述符的个数,不过没啥用。
3.小于0,表示等待过程中出错,设置合适的错误码。
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/*
参数:
1.epfd,eppoll_create创建成功之后的返回值。
2.op,要对,对应事件进行的可能操作,如下:
EPOLL_CTL_ADD:添加事件。
EPOLL_CTL_MOD:修改事件。
EPOLL_CTL_DEL:删除事件。
3.fd,要操作的文件描述符。
4.events,输入型参数,所要关心的事件。
返回值:
1.成功返回0.
2.失败返回-1,并设置合适的错误码。
*/
typedef union epoll_data
//1.这是一个联合体,也就意味着,epoll_data可能不仅是描述符,也可能是一个指针,变量。
//2.这样设计拓宽了epoll的使用场景。
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
基本原理:
//相关的结构体信息:
struct eventpoll
{
rwlock_t lock;//读写锁
struct rw_semaphore sem;//信号量
wait_queue_head_t wq;//一个被sys_epoll_wait()接口使用的等待队列。
wait_queue_head_t poll_wait;
//一个被file->poll()使用的等待队列。
struct list_head rdllist;
//已经就绪的文件描述符的队列。
struct rb_root rbr;
//红黑树的根结点。
};
struct epitem
{
struct rb_node rbn;//红黑树的结点。
struct list_head rdllink;//用于将该结构体链接到事件轮询的准备就绪队列。
struct epoll_filefd ffd;//文件指针以及文件描述符的结构体。
int nwait;//等待队列的数量
struct list_head pwqlist;//用于存储轮询等待队列
struct eventpoll *ep;//指向该结构体所属的事件的指针
struct epoll_event event;//关心的事件的相关信息。
atomic_t usecnt;//引用计数。
struct list_head fllink;//文件的链表头,便于访问文件实例。
struct list_head txlink;//文本信息的链表头,便于访问传输信息实例。
unsigned int revents;//收集就绪的事件。
};
形象图解:
说明:
- 首先信息,也就是读写事件就绪之后,会触发对应的硬件中断信号,进而操作系统可以立马进行获取并处理。
- 当操作系统获取之后,会触发对应的回调函数,继续向上交付,直到交给上层的接收队列,上层进行相应的处理。
- 上层会使用其相关信息,比如文件描述符,在红黑树中查找对应的结点,将文件描述符所在的结构体中事件的属性标记为就绪。
- 然后将结点进行拷贝,并连接到已经就绪的文件描述符的队列当中,最终epoll_wait从这个队列中等待获取事件即可。
设计思路:
- 为了后续实现服务器方便,先对epoll三个接口进行封装,形成一个类,方便进行调用,其次由于在整个过程中只需初始化一次,即调用一次epoll_create。除此之外成员变量当中,我们可以进行封装即一个_epfd(epoll_create),一个struct event的数组,以及这个数组的大小,对epoll_wait进行再度封装。
- 在服务器的成员变量,设置为封装的套接字,以及Epoll类即可,如果设计为unique_ptr的智能指针,博主觉着更好。
- 启动函数中,我们需要将监听套接字设置进内核的红黑树当中,再进行对应的等待事件就绪的操作。
- 事件来临时,我们需要进行判断,第一个为新连接来临,对其进行接收和添加,第二个为其它事件就绪,进行相应的处理。
- 事件处理完毕之后,如果要对关心的事件进行删除,我们需要注意应先调用epoll_ctl函数,再关闭文件描述符。
实现代码:
- 封装Epoller类,实现多路转接。
#include<unistd.h>
#include<sys/epoll.h>
#include<cstring>
#include"../Tools/nocopy.hpp"
#include"../Tools/Log.hpp"
enum
{
EPOLL_CREATE_FAIL = 3,
};
class Epoller : public Nocpy
{
public:
Epoller(int capacity = 64)
:_capacity(capacity)
{
_events = new struct epoll_event[capacity];
_epfd = epoll_create(_init_size);
if(_epfd == -1)
{
lg(CRIT,"epoll_create fail....");
exit(EPOLL_CREATE_FAIL);
}
}
~Epoller()
{
delete[] _events;
}
pair<struct epoll_event*,int> Epoll_Wait()
{
int sz = epoll_wait(_epfd,_events,_capacity,_timeout);
struct epoll_event* ret = sz > 0 ? _events : nullptr;
return {ret,sz};
}
pair<int,int> Epoll_Parse(const struct epoll_event& event)
{
return {event.data.fd,event.events};
}
struct epoll_event Epoll_Create(int fd,uint32_t event)
{
struct epoll_event obj;
obj.data.fd = fd;
obj.events = event;
return obj;
}
void Set_Wait_Way(int choice)
{
//0为非阻塞,-1为阻塞式等待
_timeout = choice;
}
int Epoll_Ctl(int fd,int oper,struct epoll_event* event = nullptr)
{
int ret;
if(oper == EPOLL_CTL_DEL)
{
//这里只是为了强调删除操作,event参数传入空即可,这也是缺省参数给空的原因,外面无需再传。
ret = epoll_ctl(_epfd,oper,fd,event);
}
else
{
ret = epoll_ctl(_epfd,oper,fd,event);
}
if(ret == -1)
{
lg(ERRO,strerror(errno));
}
return ret;
}
private:
int _epfd;
struct epoll_event* _events;
int _capacity;
static const int _init_size = 1;
int _timeout = 3000;
};
说明
成员变量:
- _epfd,用来管理连接的文件描述符,这里我们可以理解为通过此结点可以找到红黑树进而可以实现对红黑树增删改查。
- _events,指向对堆空间的数组的指针,类构造时,用来管理struct epoll_event类型的数组。
- _capacity,初始化_events时,new空间的大小,这里我们默认设置为64。
- _init_size,epoll_create的参数,只需要设置一个大于0的整数即可,别的倒是没有啥意义。
- _timeout,设置默认的超时时间,单位为毫秒,这里设置为3000ms,也就是3s。
方法:
- Epoll_Wait,我们这里进行了再度封装,将所有的参数都设置为了成员变量,只需要通过对象无参调用即可,返回值我们设置为pair类型的,更加符合写C++的程序员的胃口。
- Epoll_Ctl,这里当删除时,我们实际传空指针即可,因此第三个参数直接给为缺省参数,实际删除无需再传,使用缺省参数即可。
- Epoll_Parse和Epoll_Create,我们可以不用epoll_event,输入对应的参数,然后只需要关心结果即可。
- Set_Wait_Way,用于更改循环等待的方式,0为非阻塞式等待,-1为阻塞式等待。
- 构造和析构,自动完成对资源的初始化,以及在对象生命周期结束时,自动完成资源的清理工作。
- EpollServer.hpp
#include<unistd.h>
#include<sys/types.h>
#include<sys/time.h>
#include<poll.h>
#include<algorithm>
#include"Epoller.hpp"
#include "../Tools/Log.hpp"
#include "../Tools/Socket.hpp"
const int max_buffer_size = 1024;
const int default_event = 0;
const int default_fd = -1;
const int default_capcity = 64;
class EpollServer
{
public:
EpollServer(uint16_t port = 8080,string ip = "0.0.0.0",int cap = default_capcity)
:_list_fd_ptr(new Sock(port,ip)),_epoller_ptr(new Epoller(cap))
{}
void Init()
{
_list_fd_ptr->Socket();
//在绑定监听之前使用有效!
int opt = 1;
_listen_fd = _list_fd_ptr->GetSocket();//初始化_listen_fd的同时,也将端口设为重复使用。
setsockopt(_listen_fd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));
_list_fd_ptr->Bind();
_list_fd_ptr->Listen();
}
void Accepter()
{
sockaddr_in client_msg;
socklen_t len;
int fd = _list_fd_ptr->Accept(&client_msg,&len);
if(fd < 0)
{
lg(ERRO,strerror(errno));
}
else
{
auto event = _epoller_ptr->Epoll_Create(fd,EPOLLIN);
_epoller_ptr->Epoll_Ctl(fd,EPOLL_CTL_ADD,&event);
}
}
void Recver(int fd)
{
char buffer[1024] = {0};
int n = read(fd,buffer,sizeof(buffer) - 1);
if(n == 0)
{
lg(INFORE,"read end of file or client closed connect...");
//注意:在文件描述符有效之前先删除内核中的结构。再关闭文件描述符。
_epoller_ptr->Epoll_Ctl(fd,EPOLL_CTL_DEL);
close(fd);
}
else if(n > 0)
{
buffer[n-1] = '\0';
lg(INFORE,buffer);
}
else
{
lg(ERRO,strerror(errno));
}
}
void Dispatcher(int sz,struct epoll_event* events)
{
for(int i = 0; i < sz; i++)
{
auto[fd,revents] = _epoller_ptr->Epoll_Parse(events[i]);
if(revents & EPOLLIN)
{
if(fd == _listen_fd)
{
//新连接到了。
Accepter();
}
else
{
//处理就绪事件。
Recver(fd);
}
}
}
}
void Run()
{
//将监听套接字加入到内核的红黑树当中
auto event = _epoller_ptr->Epoll_Create(_listen_fd,EPOLLIN);
_epoller_ptr->Epoll_Ctl(_listen_fd,EPOLL_CTL_ADD,&event);
for(;;)
{
auto [events,n] = _epoller_ptr->Epoll_Wait();
if(n == 0)
{
lg(INFORE,"time out...");
}
else if(n > 0)
{
Dispatcher(n,events);
sleep(1);
}
else
{
lg(ERRO,strerror(errno));
}
}
}
private:
unique_ptr<Sock> _list_fd_ptr;
int _listen_fd;
unique_ptr<Epoller> _epoller_ptr;
};
- Main.cc
#include<iostream>
#include<functional>
#include<memory>
#include"EpollServer.hpp"
int main()
{
std::unique_ptr<EpollServer> eptr(new EpollServer());
eptr->Init();
eptr->Run();
return 0;
}
优点:
- 接口使用方便。如果在依据这三个接口,封装成一个对象,那就更个性化,更简单了。
- 拷贝轻量化。只是在使用EPOLL_CTL_ADD选项时,只会在红黑树中添加一个结点,发送一次拷贝。
- 无需检测就绪。检测是否就绪会使用对应的回调机制将就绪的结点链入到就绪队列中,上层直接进行拷贝即可。
- 数量理论上无限制。实际上受内存空间的影响。
- 注意:在epoll_wait时,是要发生内存拷贝的,因为是将数据放在我们开辟空间内,网上说使用mmap映射到用户态减少拷贝的说法是不准确的,甚至是不正确的。
重点:
- 水平触发,只要有事件就绪,且没有处理完毕,就一直提醒用户。如果用户一直不处理,那就会一直提醒用户,这在一定程度上,降低了IO处理的效率。
- 边沿触发,只要就绪事件的状态发生变化,就提醒用户进行处理。如果用户不处理完,那么下一次就不知道得等到什么猴年马月的时候处理了,这就倒逼用户(程序员)实现代码时,一次将数据读取完毕,因此在一定程度上提高了IO处理的效率。
- 水平触发是有系统通知作为大保底的,而边沿触发则没有,因此从程序实现的角度上考虑,边沿触发会让程序员在实现时更加地小心。但并不代表水平触发就一定比边沿触发的效率低,程序实现时也可以一次将事件全部处理完毕,只是程序员通常在实现过程中可能会由于有大保底而实现的比较随意。
- LT是默认设置,如果想要设置为ET,需要手动在epoll_ctl添加事件时设置EPOLLET,且如果要将事件处理完毕,那我们在读数据时就得将方式设置为非阻塞的,避免最后一次读取陷入阻塞。
- 联系TCP延迟应答,既然边沿触发倒逼着将数据处理完毕,那么就会有一个更大的窗口,对方就能发送更多的数据。
下面的内容我们将基于ET模式与封装的Epoller类简单实现一个Recator模式的服务器。
3.Recator
3.1理论铺垫
- Reactor模式称为反应器模式或应答者模式,是基于事件驱动的设计模式,拥有一个或多个并发输入源,有一个服务处理器和多个请求处理器,服务处理器会同步的将输入的请求事件以多路复用的方式分发给相应的请求处理器。
- 事件驱动(Event-Driven)编程是一种编程范式,它基于事件的发生和处理来组织程序的逻辑。在事件驱动编程中,程序的执行流程是由事件的发生和处理来驱动的。
- 在事件驱动的应用中,将一个或多个客户端的请求分离和调度给应用程序,同步有序地接收并处理多个服务请求。对于高并发系统经常会使用到Reactor模式,用来替代常用的多线程处理方式以节省系统资源并提高系统的吞吐量。
在事件驱动编程中,主要包含以下几个概念:
- 事件(Event):事件是程序中可以触发或监听的某种动作或状态变化,例如用户点击鼠标、键盘输入等。
- 事件源(Event Source):事件源是产生事件的对象或组件,比如说网络连接中的客户端。
- 事件处理器(Event Handler):事件处理器是负责处理特定类型事件的函数或方法。当事件发生时,事件处理器会被调用来处理事件。
- 事件循环(Event Loop):事件循环负责监听各种事件的发生,并调用相应的事件处理器来处理事件。它通常是一个无限循环,不断地检查事件队列中是否有待处理的事件。
- 从网络的角度简单理解,事件源就是服务端保存的客户端的连接。事件就是客户端是否向服务器发起连接,或者发送请求。事件处理器就是建立连接或者处理请求并发送应答。事件循环就是使用多路转接函数,死循环不断检查事件是否就绪。
一般的处理模式
特点:
- 采用阻塞式IO的方式处理每一个线程的输入输出。
- 每一个请求应用程序都会为其创建一个新的线程,使用完毕之后会将线程进行释放。
缺点:
- 采用阻塞式会导致等待的时间过长,进而导致线程资源的利用率降低,浪费线程资源。
- 大量的连接到来时,可能会创建大量的线程,过度占用系统资源;其次反复的申请和释放线程也会消耗一定的系统资源。
单Recator单线程的处理模式
特点:
- Recator通常采用IO多路复用的方式,提高IO处理的效率。
- 只采用一个线程,全权负责对事件的等待,派发,处理等工作,实现较为简单。
缺点:
- 对于多核CPU来说,一次能跑多个线程,只用一个线程,不利于释放CPU的性能。
- 从安全性的角度来讲,只要这一个线程出异常,那么整个程序都将无法正常运转,进而导致程序的瘫痪。
单Recator多线程的处理模式
特点:
- 在单Recator单线程的处理模式的基础之上,引入了多线程即线程池,提高了并发度,有利于释放CPU的性能。
缺点: - 多线程的处理逻辑一般是比较复杂的,不易于调试。
3.2设计与实现
基本设计思路
- 首先为了方便对连接事件进行管理,我们将创建Connection对连接进行封装。Connection主要包含连接的相关信息,比如ip地址,端口号等,以及事件到来时的对应的处理方法。
- 其次为了让监听套接字与Recator解耦,将使用Listener类对其进行封装。主要包含接收连接,以及设置监听套接字的成员变量和方法。
- 然后为了更好的管理连接,将创建Timer类与新创建的连接建立联系,以便不断的处理历史连接,并对连接最近一次调用的时间进行更新。
- 接着为了使服务器具备异步处理的功能,提高运行的并发度,我们还将连接事件再封装形成了Task类,并引入线程池,方便Recator不断的Push任务,线程池中的线程异步获取任务,并进行处理。
- 最终以上的步骤完成之后,我们将完成Recator框架的基本搭建和快速实现,然后为了以更加现实的角度的测试实现的服务器,博主还将之前写的网络版本计算器引入,以便更好地进行体验和测试。
时间检查类
实现框架:
#pragma once
#include<time.h>
#include<memory>
class Connection;
class RecatorSvr;
enum
{
MAX_WAIT_TIME = 6,
MAX_TIME = 15,
};
using s_c_t = std::shared_ptr<Connection>;
using fun_t = RecatorSvr*;
struct Timer
{
Timer(fun_t rsvr ,s_c_t con ,time_t outdate)
:_rsvr(rsvr),_con(con),_out_date(outdate),_recent_time(outdate)
{}
void OutOfTimeHandler();
void UpDateAccpetTime();
void UpDateRunTime();
bool IsOutTime();
bool operator < (time_t cur_time);
s_c_t _con;//建立连接时,设置引用指向连接。
time_t _recent_time;//最近一次被调用的时间 + MAX_WAIT_TIME,用于堆结构中重新更新时间,。
time_t _out_date;//初始化超时时间戳,后续如果调用的话就更新,超时的话执行对应的异常处理函数即可。
fun_t _rsvr;//指向Recator的回指指针。
};
成员变量:
- _con,用于指向与之绑定的Connection,便于在超时处理时,直接执行其对应的异常方法即可。
- _recent_time,最近一次更新的超时时间戳,当事件就绪且被处理方法调用时就会更新。
- _out_date,可能的超时时间戳,当前的时间戳大于outdate我们认为可能超时,需要使用_recent_time进一步判断。
- _rsvr,指向Recator类型的指针,可以使用其指向的方法,对当前类的方法进行补充和实现,达成进一步的封装。
方法:
- OutOfTimeHandler,用于连接超时进行相应的异常处理。
- UpDateAccpetTime,用于更新最新一次接收连接请求的超时时间戳。
- UpDateRunTime,用于更新最新一次被读写事件被调用的超时时间,默认为当前时间加上6秒。
- IsOutTime,用于确认是否真的超时,即与_recent_time进行比较。
- operator <,用于判断当前的超时时间戳,即与_out_date进行比较看是否可能超时。
说明:博主这里是维护了一个小根堆管理的Timer指针,且排序是根据_out_date进行的,如果在调用时直接进行更新,那么小根堆就失效了,因此我们需要_recent_time保留最新一次的超时时间戳,等到所有事件处理完毕之后,我们再根据_out_date和_recent_time进行统一的更新。具体详见最后给出的Recator关于Timer的实现方法。
功能实现代码:
void OutOfTimeHandler()
{
_con->_excep(_con);
}
void UpDateAccpetTime()
{
_recent_time = time(nullptr) + MAX_TIME;
}
void UpDateRunTime()
{
_recent_time = time(nullptr) + MAX_WAIT_TIME;
}
bool IsOutTime()
{
//如果不大于,自动将_out_date更新为_recent_time
if(_recent_time > time(nullptr))
{
_out_date = _recent_time;
return false;//说明不超时。
}
return true;
}
bool operator < (time_t cur_time)
{
return _out_date < cur_time;
}
连接管理类
实现框架
//...
class Connection;
class RecatorSvr;
class Timer;
using cal_t = std::function<void(std::shared_ptr<Connection>)>;
using s_c_t = std::shared_ptr<Connection>;
typedef RecatorSvr* fun_t;
//链接管理器
class Connection
{
public:
Connection()
{}
~Connection()
{}
//默认的是服务器的配置。
Connection(int fd,fun_t rsvr,cal_t recv = nullptr,cal_t send = nullptr\
,cal_t excep = nullptr,int port = 8888,const std::string& ip = "0.0.0.0")
:_fd(fd),_recv(recv),_send(send),_excep(excep),_port(port),_ip(ip),_rsvr(rsvr)
{}
cal_t _recv;
cal_t _send;
cal_t _excep;
fun_t _rsvr;
Timer* _tp;//用来更新对应的时间。
//初始化_tp成员变量
void InitTime(Timer* tm);
//获取成员变量_fd
int Getfd();
//追加字符串到_inbuffer之后。
void Append(const std::string& str);
//获取_inbuffer,注意这里是引用。
std::string& GetInbuf();
//获取_outbuffer,同理。
std::string& GetOutBuf();
private:
//输入输出,一次性存放的地方。
std::string _inbuffer;//a bug: 二进制流的处理问题。
std::string _outbuffer;
int _fd;
std::string _ip;
int _port;
};
成员变量:
- _recv,收到读就绪事件的处理方法;_send,写事件就绪的处理方法;_excep,异常事件的处理方法。
- _fd,文件描述符;_ip,绑定的ip地址,_port,端口号,两者可以标识唯一一台主机。
- inbuffer,由于设计的是ET模式,即在读事件就绪时,我们要将所有的数据都拿到用户层,即存放在inbuffer中。
- outbuffer,同理,我们要把数据都写给内核,写不完就等到下一回写事件就绪再写,写不完的数据存放在outbuffer中。
- _rsvr,一个指向Recator的回指指针,借着这个指针可以使用指向Recator的方法和成员变量,实现自身的方法,进一步完成封装。
- _tp,其指向的Timer用于管理和检测连接是否超时,以及完成连接的最近调用时间的封装的功能。
方法:
- GetOutBuf(),GetInbuf(),Append(),Getfd(),InitTime(Timer* tm),以及构造和析构等方法,很简单,其实都是完成对成员变量的初始化与获取和对变量修改的操作,这里就不过多介绍了。
方法实现:
void InitTime(Timer* tm)
{
_tp = tm;
}
int Getfd()
{
return _fd;
}
void Append(const std::string& str)
{
// std::cout << inbuffer.size() << std::endl;
inbuffer = inbuffer + str;
// std::cout << inbuffer.size() << std::endl;
// std::cout << inbuffer << std::endl;
}
std::string& GetInbuf()
{
return inbuffer;
}
std::string& GetOutBuf()
{
return outbuffer;
}
连接监听类
实现框架
#pragma once
#include<iostream>
#include<functional>
#include"TcpServer.hpp"
//监听器。
class Listener
{
public:
//构造函数初始化
Listener(fun_t svr)
:_lisptr(new Sock()),_rsvr(svr);
//进行连接的获取
void Accepter(s_c_t con);
private:
std::shared_ptr<Sock> _lisptr;
fun_t _rsvr;//回指指针,方便添加连接。
};
成员:
- Sock类,使用_listenptr的智能指针进行管理,用于类初始化时创建,绑定,监听套接字。
- _rsvr,一个指向Recator的回指指针,借着这个指针可以使用指向Recator的方法和成员变量,实现自身的方法,这里指的是Accepter和Listent方法,下面的实现我们都使用了_rsvr变量辅助进行完成,完成进一步的封装。
方法:
- 构造函数,Listener,完成监听套接字的创建,绑定,监听。除此之外,我们还设置对接口的复用。
- Accepter,连接管理器,主要是完成对到来连接的处理并将连接添加到Recator中,因为是ET模式且为非阻塞的,所以这里是一个死循环,等将全部连接获取完毕之后错误码会出现 EWOULDBLOCK,从这里break。
方法实现代码
Listener(fun_t svr)
:_lisptr(new Sock()),_rsvr(svr)
{
//1.创建绑定监听套接字。
_lisptr->Socket();
int fd = _lisptr->GetSocket();
//对端口号进行复用
int opt = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR|SO_REUSEPORT,&opt,sizeof(opt));
_lisptr->Bind();
_lisptr->Listen();
//2.将连接添加到服务器管理的队列中。
if(_rsvr != nullptr)
{
_rsvr->AddConnection(fd,std::bind(&Listener::Accepter,this,std::placeholders::_1),
nullptr,std::bind(&RecatorSvr::Exceper,_rsvr,std::placeholders::_1));
lg(INFORE,"listen_fd is added into Recator successfully.");
}
}
void Accepter(s_c_t con)
{
//设置超时时间为15秒,保证监听套接字是最后退出的。
con->_tp->_recent_time = time(nullptr) + 60;
while(true)
{
sockaddr_in client_msg;
socklen_t len;
int fd = con->Getfd();
int sock = accept(fd,(sockaddr*)&client_msg,&len);
if(sock > 0)
{
//首先,将网络序列转为主机序列。
uint32_t ip = client_msg.sin_addr.s_addr;
uint16_t port = ntohs(client_msg.sin_port);
char ip_buff[100] = {0};
inet_ntop(AF_INET,&ip,ip_buff,sizeof(ip_buff) - 1);
lg(INFORE,"accept a new link,the user is %s:%d",ip_buff,port);
//其次,添加到对应的哈希表和放入到内核当中。
_rsvr->AddConnection(sock,std::bind(&RecatorSvr::Recver,_rsvr,std::placeholders::_1),\
std::bind(&RecatorSvr::Sender,_rsvr,std::placeholders::_1)\
,std::bind(&RecatorSvr::Exceper,_rsvr,std::placeholders::_1),port,ip_buff);
}
else
{
if(errno == EWOULDBLOCK)
{
//说明连接获取完了。
break;
}
else if(errno == EINTR)
{
//说明被异常信号唤醒了。
continue;
}
lg(ERRO,strerror(errno));
//执行对应的异常处理逻辑。
con->_excep(con);
return;
}
}
}
任务类
实现代码:
#pragma once
#include<functional>
#include<memory>
#include"../../Tools/Log.hpp"
#include"Connection.hpp"
class Connection;
struct Task
{
Task(s_c_t con,cal_t handler = nullptr)
:_con(con),_handler(handler)
{}
void Service()
{
_handler(_con);
}
void operator()()
{
Service();
};
s_c_t _con;
cal_t _handler;
};
成员变量:
- _con为指向对应连接的指针,_handler为指向连接的处理方法,可以是读事件的方法,写事件的方法,也可以是异常事件的处理方法。
- 处理方法是在初始化由外层传参进行确定的,而_con指向调用的方法不止一种,因此不能通过_con找对应的处理方法。
方法: - 我们对Service使用了 operator()进行了再度封装,那么只需要通过函数对象 + ()即可调用,更加简便。
线程池
说明:
- 我们采用的是基于互斥和同步的机制,以阻塞队列的形式设计的线程池,采用的是生产消费者模型,具体接口使用以及相关细节详见文章:线程。
- 除此之外,我们设计了一个懒汉的单例模式,便于获取线程池,我们可以在Recator实现中添加一个ThreadPool类型的指针用于获取单例。
实现代码:
#pragma once
#include<vector>
#include<queue>
#include<unistd.h>
#include<string>
#include<vector>
#include<strings.h>
#include<cstring>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include"Task.hpp"
using std::vector;
using std::queue;
typedef void(*cal)();
class Task;
class ThreadPool
{
public:
static ThreadPool* GetInstance()
{
if(tpool == nullptr)
{
UnLock();
if(tpool == nullptr)
{
tpool = new ThreadPool();
}
Lock();
}
return tpool;
}
void Lock()
{
pthread_mutex_lock(&_t_mutex);
}
void UnLock()
{
pthread_mutex_unlock(&_t_mutex);
}
~ThreadPool()
{
for(int i = 0; i < _capacity; i++)
{
pthread_join(tids[i],nullptr);
}
}
void start()
{
for(int i = 0; i < _capacity; i++)
{
pthread_create(&tids[i],nullptr,handler,this);
}
}
void Push(const Task& data)
{
//push这里只有主线程在push,因此没必要加锁。
_que.push(data);
pthread_cond_signal(&_t_cond);
}
static void* handler(void* args)
{
ThreadPool* ptr = static_cast<ThreadPool*>(args);
ptr->_handler();
return nullptr;
}
void _handler()
{
while(true)
{
Lock();
while(_que.empty())
{
pthread_cond_wait(&_t_cond,&_t_mutex);
}
_que.front()();_que.pop();
UnLock();
}
}
private:
static ThreadPool* tpool;
ThreadPool(int num = defaultnum)
:_capacity(num),tids(num)
{
pthread_mutex_init(&_t_mutex,nullptr);
pthread_cond_init(&_t_cond,nullptr);
}
const static int defaultnum = 5;
//线程的锁和条件变量
pthread_cond_t _t_cond;
pthread_mutex_t _t_mutex;
queue<Task> _que; //任务的场所
vector<pthread_t> tids;
int _capacity;
int cnt = 0;
};
ThreadPool* ThreadPool::tpool = nullptr;
Reactor
框架:
#pragma once
#include"Connection.hpp"
#include"Timer.hpp"
#include"Task.hpp"
#include"threadpool.hpp"
namespace cmp
{
template<class T>
struct Greater
{
bool operator()(const Timer*x,const Timer* y)
{
return x->_out_date > y->_out_date;
}
};
}
//Reactor服务器
class RecatorSvr
{
public:
//获取单例并给予数据的处理方法。
RecatorSvr(cal_t handler = nullptr)
:_eptr(new Epoller()),_thptr(ThreadPool::GetInstance());
~RecatorSvr();
//添加连接
void AddConnection(int fd,cal_t recv = nullptr,、
cal_t send = nullptr,cal_t excep = nullptr,int port = 8888,const string& ip = "0.0.0.0");
//默认的用户处理信息的方法
void DefaultUsrHandler(s_c_t con);
//接收信息的处理函数
void Recver(s_c_t con);
//发送信息的处理函数
void Sender(s_c_t con);
//对事件进行派发,简称事件派发器。
void Dispatcher(struct epoll_event* events,int sz);
//异常处理器
void Exceper(s_c_t con);
//用于更新与处理超时时间
void UpDate();
//运行Recator的主逻辑
void Run();
private:
//用于实现多路转接
std::shared_ptr<Epoller> _eptr;
//用于快速根据文件描述符查找对应的连接
std::unordered_map<int,std::shared_ptr<Connection>> hash;
//维护小根堆,用于更新与处理超时时间。
std::priority_queue<Timer*,vector<Timer*>,cmp::Greater<Timer*>> heap;
//用户的处理接收信息的函数。
cal_t _usr_handler;
//指向线程池的指针用于获取单例
ThreadPool* _thptr;
};
说明
成员变量:
- _eptr,主要指向的是前面实现epoll代码时封装的类Epoller,以更加简便的方式使用接口。
- hash,一个键为文件描述符,值为指向的连接的哈希表,用于快速根据文件描述符找到对应的连接。
- heap,一个维护连接的超时时间的最小堆,当堆顶元素指向的连接超时,就可以执行对应的超时处理方法,并从堆顶弹出,不超时更新为最近一次调用时间,为了后续的连接也能更新,因此需要用vector之类的容器保存一下在最后再进行入堆。
- _usr_handler,用户处理接收信息的函数,交由用户进行自主实现。
- _thptr,获取指向单例线程池的指针,在构造函数调用时,初始化列表获取单例时进行初始化。
方法:
- AddConnection,用于创建Connection对象,并初始化与绑定对应的Timer类。并且要将创建的Connection添加到hash成员变量中。
- DefaultUsrHandler,用于默认处理接收缓存区的信息,这里设置为将收到的消息再发出去。
- Recver,读取信息的处理方法,跟之前的多路转接代码类似。
- Sender,发送信息的处理方法,需要注意由于写事件通常是一直就绪的,并不需要一直关心,我们只需要当写不进去时关心即可。
- Dispatcher,事件的派发器,与之前的多路转接代码类似,不同的是我们需要通过文件描述符找到对应的连接,然后直接将对应连接以及处理方法推送到线程池即可。
- UpDate,用于更新与处理超时连接,这时我们需要先判断是否有可能超时,即_out_date是否小于当前的时间,然后再判断是否真的超时了,即比较_recent_time是否小于当前的时间,如果不是真的超时,那就将_out_date更新为_recent_time,并保存在一个vector,反之直接将调用对应的异常处理即可。更新完毕我们再将vector中的元素再入堆即可。
- Run,用于不断的进行事件检查,并进行循环等待,与之前的多路转接的代码的处理逻辑相似。
方法实现代码
//Reactor服务器
RecatorSvr(cal_t handler = nullptr)
:_eptr(new Epoller()),_thptr(ThreadPool::GetInstance())
{
_thptr->start();
if(handler == nullptr)
{
_usr_handler = std::bind(&RecatorSvr::DefaultUsrHandler,this,std::placeholders::_1);
}
else
{
_usr_handler = handler;
}
}
~RecatorSvr()
{
lg(INFORE,"~RecatorSvr:%d",__LINE__);
}
void AddConnection(int fd,cal_t recv = nullptr,cal_t send = nullptr,\
cal_t excep = nullptr,int port = 8888,const string& ip = "0.0.0.0")
{
if(!hash.count(fd))
{
//将文件描述符设置为非阻塞的。
SetNoneBlock(fd);
std::shared_ptr<Connection> res(new Connection(fd,this,recv,send,excep,port,ip));
//初始化对应文件描述符的超时时间。这里的time
//这里就会出现内存泄漏的问题。
Timer* tp = new Timer(this,res,time(nullptr) + MAX_WAIT_TIME);
res->InitTime(tp);
heap.push(tp);//这里设置超时时间为6
//在哈希表中生成对应的结构
hash.insert({fd,res});
//在Epoller中进行添加fd。
auto event = _eptr->Epoll_Create(fd,EPOLLIN | EPOLLET);//添加ET模式
_eptr->Epoll_Ctl(fd,EPOLL_CTL_ADD,&event);
//将时间更新出来,并添加到堆中。
}
else
{
lg(ERRO,"did not clear completly");
}
}
void DefaultUsrHandler(s_c_t con)
{
//将输入缓存区的内容放入到输出缓冲区
string& inbuf = con->GetInbuf();
string& outbuf = con->GetOutBuf();
outbuf += inbuf;
inbuf.resize(0);
//然后调用Recver进行处理数据
con->_send(con);
}
void Recver(s_c_t con)
{
con->_tp->UpDateRunTime();
//更新最近一次调用的时间
for(;;)
{
char buffer[1024] = {0};
int n = recv(con->Getfd(),buffer,sizeof(buffer) - 1,0);
if(n == 0)
{
lg(INFORE,"client has closed the connect.");
//执行对应的异常事件
con->_excep(con);
return;
}
else if(n < 0)
{
if(errno == EWOULDBLOCK)
{
lg(INFORE,"nothing to read in Linux");
break;
}
else if(errno == EINTR)
{
//说明被异常信号唤醒了。
continue;
}
lg(ERRO,strerror(errno));
con->_excep(con);
return;
}
else
{
//将数据追加到输入缓存区当中。
//使用telnet时需要对数据做处理,因为回车是\r\n而不是\n。
const string& str = buffer;
// auto pos = str.find("\r\n");
// con.Append(str.substr(0,pos));
con->Append(str);
lg(INFORE,"echo: %s",con->GetInbuf().c_str());
}
}
//读完之后,对数据进行处理。。。。
_usr_handler(con);
}
//a small Bug?
void Sender(s_c_t con)
{
con->_tp->UpDateRunTime();
for(;;)
{
int fd = con->Getfd();
string& outbuf = con->GetOutBuf();
int n = send(con->Getfd(),outbuf.c_str(),outbuf.size(),0);
if(n < 0)
{
if(errno == EWOULDBLOCK)
{
lg(INFORE,"the buffer in Linux has no space.");
return;
}
else if(errno == EINTR)
{
//说明被异常信号唤醒了。
lg(INFORE,"Sender is called by a signal");
continue;
}
else
{
lg(ERRO,strerror(errno));
//执行对应的异常处理函数
con->_excep(con);
return;
}
}
else
{
lg(INFORE,"write %d bytes to Linux buffer in fact.",n);
//将已经写入的数据进行丢弃,继续写。
outbuf.erase(0,n);//for debug
if(outbuf.size() == 0)
{
//说明写完了:取消对写事件的关心
auto event = _eptr->Epoll_Create(fd,EPOLLIN | EPOLLET);
_eptr->Epoll_Ctl(fd,EPOLL_CTL_MOD,&event);
break;
}
else
{
lg(INFORE,"the buffer in Linux has no space.");
//我们设置对写事件的关心
auto event = _eptr->Epoll_Create(fd,EPOLLOUT | EPOLLET);
_eptr->Epoll_Ctl(fd,EPOLL_CTL_MOD,&event);
}
}
}
}
void Dispatcher(struct epoll_event* events,int sz)
{
for(int i = 0; i < sz; i++)
{
auto[fd,event] = _eptr->Epoll_Parse(events[i]);
//更新超时时间会出问题的,那小根堆就失效了,我们这里只更新最近一次的调用的时间。
//将异常事件统一转换为读写事件。
if((event & EPOLLERR) || (event & EPOLLHUP))
event |= (EPOLLIN | EPOLLOUT);
if(event & EPOLLIN)
{
// hash[fd]->_recv(hash[fd]);//fix a bug
//交给线程池去执行。
_thptr->Push(Task(hash[fd],hash[fd]->_recv));
//
}
else if(event & EPOLLOUT)
{
_thptr->Push(Task(hash[fd],hash[fd]->_send));
}
}
}
void Exceper(s_c_t con)
{
//首先将fd在内核的结构删除。
int fd = con->Getfd();
lg(INFORE,"%d Exceper:delete a fd in rb_tree of Linux.",__LINE__);
auto it = hash.find(fd);
if(it != hash.end())
{
//其次删除在哈希表中的相应的结构
_eptr->Epoll_Ctl(fd,EPOLL_CTL_DEL);
hash.erase(it);
//最后关闭文件描述符
close(fd);
lg(INFORE,"%d Exceper:closed a %d in Linux.",__LINE__,fd);
}
else
{
lg(INFORE,"%d Exceper: fd: %d had been delted.",__LINE__,fd);
return;
}
}
void UpDate()
{
vector<Timer*> ans;
while(!heap.empty())
{
Timer* top = heap.top(); heap.pop();
time_t tim = time(nullptr);
//第一次判断是用于更新超时时间
if(*top < tim)
{
//第二次判断是判断是否真正超时
if(top->IsOutTime())
{
//bug,可能客户端断开连接的更新的时候也进行了异常的处理,这里再进行处理就不太合适。
top->OutOfTimeHandler();//执行对应的处理即可。
lg(INFORE,"fd:%d is time out, will be closed.",top->_con->Getfd());
//处理完毕释放对应的空间。
delete top;
}
else
{
//最后判断完之后再统一入堆即可
ans.push_back(top);
}
}
else
{
//说明没有超时的,再将其入堆,直接返回即可。
heap.push(top);
break;
}
}
//将更新之后的超时再进行入堆。
for(auto& val : ans)
{
heap.push(val);
}
lg(INFORE,"heap’s size is %d.",heap.size());
}
void Run()
{
for(;;)
{
auto[events,n] = _eptr->Epoll_Wait();
if(n == 0)
{
//首先在等待超时时处理数据。
lg(INFORE,"time was out....");
}
else if(n > 0)
{
lg(INFORE,"%d events is already.",n);
Dispatcher(events,n);
//其次在处理完毕时处理数据。
}
else
{
lg(ERRO,strerror(errno));
}
//将连接进行更新。
UpDate();
//没有连接了。
if(heap.size() == 0) break;
}
}
测试模块
- 接入网络版本计算器
说明:文章:网络 —— “?“ (下),中对网络版本计算器的代码进行分模块以及原理的讲解,这里就直接将相关的代码贴出,便于最终的测试。
自定义协议代码:
#pragma once
#include<iostream>
#include<string>
#include<jsoncpp/json/json.h>
using std::to_string;
using std::stoi;
using std::cout;
using std::endl;
using std::string;
// #define SELF 1
char space = ' ';
char newline = '\n';
//解码
string Decode(string& str)
{
int pos = str.find(newline);
if(pos == string::npos) return "";
int len = stoi(str.substr(0,pos));
int totalsize = pos + len + 2;
if(totalsize > str.size())
{
return "";
}
//将有效载荷截取出来
string actual_load = str.substr(pos + 1,len);
//将完整的报文丢弃,便于下一次进行读取。
str.erase(0,totalsize);
return actual_load;
}
//编码
string InCode(const string& str)
{
string text = to_string(str.size()) + newline + str + newline;
return text;
}
struct Request
{
Request(int x,int y,char oper)
:_x(x),_y(y),_oper(oper)
{}
Request()
{}
bool Deserialize(string& str)
{
cout << "+++++++++++++++++++++++++++++" << endl;
//首先把字符串的报头和有效载荷进行分离
string content = Decode(str);
if(content == "") return false;
#ifdef SELF
//解析字符串:字符 + 空格 + 字符
int left = content.find(space);
int right = content.rfind(space);
if(left + 1 != right - 1)
{
//说明是无效的字符
return false;
}
_x = stoi(content.substr(0,left));
_y = stoi(content.substr(right + 1));
_oper = content[left + 1];
#else
Json::Value root;
Json::Reader r;
r.parse(content,root);
_x = root["_x"].asInt();
_y = root["_y"].asInt();
_oper = root["_oper"].asInt();
#endif
cout << "解析的字符串:"<< _x << _oper << _y << endl;
cout <<"待读取的字符串:" << endl << str << endl;
cout << "-------------------------------" << endl;
return true;
}
string Serialize()
{
// cout << "+++++++++++++++++++++++++++++" << endl;
string package;
#ifdef SELF
//首先对结构体进行编码
//编码格式:字符 + 空格 + 操作符 + 空格 + 字符
package = to_string(_x) + space + _oper + space + to_string(_y);
// cout << "有效载荷:" << package << endl;
//对报文再进行封装
#else
Json::Value root;
root["_x"] = _x;
root["_y"] = _y;
root["_oper"] = _oper;
Json::FastWriter w;
// Json::StyledWriter sw;
package = w.write(root);
#endif
package = InCode(package);
// cout << "报文:"<< package << endl;
// cout << "-------------------------------" << endl;
return package;
}
int _x;
int _y;
char _oper;
};
struct Response
{
Response(int res,int code)
:_res(res),_code(code)
{}
Response()
{}
bool Deserialize(string& str)
{
string content = Decode(str);
if(content == "") return false;
cout << "+++++++++++++++++++++++++++++++" << endl;
#ifdef SELF
//编码格式:字符 + 空格 + 字符
int pos = content.find(space);
_res = stoi(content.substr(0,pos));
_code = stoi(content.substr(pos + 1));
#else
Json::Value root;
Json::Reader r;
r.parse(content,root);
_res = root["_res"].asInt();
_code = root["_code"].asInt();
#endif
cout <<"转换结果:"<< _res << " " << _code << endl;
cout << "待读取的字符串:" << endl << str << endl;
cout << "-------------------------------" << endl;
return true;
}
string Serialize()
{
// cout << "+++++++++++++++++++++++++++++++" << endl;
#ifdef SELF
//首先对结构体进行编码
//编码格式:字符 + 空格 + 字符
string package = to_string(_res) + space + to_string(_code);
#else
string package;
Json::Value root;
root["_res"] = _res;
root["_code"] = _code;
Json::FastWriter w;
package = w.write(root);
#endif
// cout << "有效载荷:" << endl << package << endl;
//对报文再进行封装
package = InCode(package);
// cout << "报文:" << endl << package << endl;
// cout << "-------------------------------" << endl;
return package;
}
int _res;
int _code;
};
计算逻辑实现代码:
#pragma once
#include<iostream>
#include"Log.hpp"
#include"protocol.hpp"
enum CAL
{
DIV_ZERO = 1,
MOD_ZERO,
};
struct CalHelper
{
string Calculation(string& str)
{
Request req;
if(req.Deserialize(str) == false) return "";
int x = req._x;
int y = req._y;
char op = req._oper;
int res = 0, code = 0;
switch(op)
{
case '+':
res = x + y;
break;
case '-':
res = x - y;
break;
case '*':
res = x * y;
break;
case '/':
if(!y)
{
code = DIV_ZERO;
break;
}
res = x / y;
break;
case '%':
if(!y)
{
code = MOD_ZERO;
break;
}
res = x % y;
break;
default:
break;
}
return Response(res,code).Serialize();
}
};
主程序Main.cc实现代码:
#include<iostream>
#include<memory>
#include<functional>
#include"TcpServer.hpp"
#include"Listener.hpp"
#include"../../Tools/servercal.hpp"
CalHelper usr_cal;
void UsrHandler(s_c_t con)
{
string& inbuff = con->GetInbuf();
string res = usr_cal.Calculation(inbuff);
string& outbuff = con->GetOutBuf();
outbuff += move(res);
con->_send(con);
}
int main()
{
std::unique_ptr<RecatorSvr> rsvr(new RecatorSvr(UsrHandler));
std::unique_ptr<Listener> lsr(new Listener(&(*rsvr)));
rsvr->Run();
return 0;
}
- 实验结果:
尾序
恭喜你看到这里!我是舜华,期待与你的下一次相遇!