之前的内容我们基本掌握了基础IO,如套接字,文件描述符,重定向,缓冲区等知识都是文的基本认识,而高级IO则是指更加高效的IO。
对于应用层,在读写的时候,本质就是把数据写给OS,若一方为空,就会进行阻塞式等待,读写的本质也就是数据的拷贝。
所以高效的 IO就需要在多线程的情况下,单位时间的的拷贝的数据更多,等待的比重越小。
目录
五种IO模型
非阻塞式IO
多路复用(多路转接)
IO多路转接之select
初识select
select函数原型
IO多路转接之epoll
1.认识有关epoll的接口
2.epoll原理
3.编写epoll
4.epoll的工作模式
注意:
读写的改进 reactor
五种IO模型
当前有五种IO模型:
我们以钓鱼佬钓鱼为例:(鱼竿:文件描述符) (上钩:读写就绪)(鱼:数据)
1.钓鱼佬张三,在完成打窝,之后选择好地点就开始钓鱼,在等待的过程中,雷打不动,任何人都干扰不到他,他死死的盯着鱼漂,一有动静,钩被咬了就提竿上鱼。这种等待的方式我们称为阻塞式IO,这也是大部分常用的读写接口的方式。
2.钓鱼佬李四是一个资深的钓鱼佬,在完成准备工作时,李四就先观察了鱼漂,发现没动静,就拿起了手机刷视频,看一会儿之后再检查鱼漂发现上钩了,于是提杆上鱼。李四每隔一段时间进行检查,如果满足条件就读写,反之就干自己的事。这种方式称为非阻塞式IO--非阻塞轮询。
3.钓鱼佬王五,拥有较为先进的鱼竿,完成准备工作后,直接就把鱼竿插入地上,一心一意的干其它事,鱼竿上有报警装置,一旦上鱼,立刻发送警报告知王五,所以王五只需要看有没有报警信号。这种方式被称为信号驱动式IO。
4.赵六,身家万贯的热爱钓鱼的钓鱼佬,秉持着杆多鱼多的准则,直接准备了一卡车鱼竿,依次完成准备工作,入水后准备开钓,由于鱼竿数量多,赵六在岸上来回检查看哪只鱼竿上货了。这种方式称为多路复用(多路转接)。
5.田七,以吃鱼为爱好的世界五百强上市公司CEO,司机小王拉着田七来钓鱼,刚准备钓鱼,由于公司业务繁忙,对钓鱼不是很感冒,田七又回到了公司处理,且告知小王车上装备齐全,你在这里钓鱼,掉满了之后再给我打电话,我来接你。对于田七来说,这种方式称为异步IO。
首先我们来看看阻塞式与非阻塞式 IO:
两者在效率上基本一样(有人觉得非阻塞式效率会高一点,这指的是非阻塞可以去干别的事,但是对于IO效率(等+拷贝)是一样的),区别是等的方式不同,一个一直等,一个间隔等一会儿。
再看同步与异步:
同步:参与等或者参数与了拷贝,都可以是同步IO。异步:两者都没参与,只拿数据。
那么综上:哪一种效率更高呢?
实际上就是田七,多路复用IO。我们学习的重点也就是多路复用.
非阻塞式IO
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
传入的 cmd 的值不同 , 后面追加的参数也不相同 .fcntl 函数有 5 种功能 :复制一个现有的描述符( cmd=F_DUPFD ) .获得 / 设置文件描述符标记 (cmd=F_GETFD 或 F_SETFD).获得 / 设置文件状态标记 (cmd=F_GETFL 或 F_SETFL).获得 / 设置异步 I/O 所有权 (cmd=F_GETOWN 或 F_SETOWN).获得 / 设置记录锁 (cmd=F_GETLK,F_SETLK 或 F_SETLKW).我们此处只是用第三种功能 , 获取 / 设置文件状态标记 , 就可以将一个文件描述符设置为非阻塞
void SetNoBlock(int fd) {
int fl = fcntl(fd, F_GETFL);
if (fl < 0) {
perror("fcntl");
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
了解到这两个主要的接口,我们就可以简单的实现一下非阻塞:
#include<iostream>
#include<string.h>
#include<unistd.h>
#include<fcntl.h>
#include<cstdio>
using namespace std;
//默认阻塞,我们设置为非阻塞
void setNonBlock(int fd)
{
int f1=fcntl(fd,F_GETFL);
if(f1<0)
{
cerr<<"get failed"<<endl;
}
//设置标记位,实际上是在原基础上新增一个标记位
fcntl(fd,F_SETFL,f1 | O_NONBLOCK);//以非阻塞式打开
cout<<"set O_NONBLOCK succed"<<endl;
}
int main()
{
char buffer[1024];//缓冲区
while(true)
{
printf("please enter#");
fflush(stdout);
setNonBlock(0);
sleep(1);
ssize_t size=read(0,buffer,sizeof(buffer)-1);//从标准输入中读取
if(size>0)
{
buffer[size-1]=0;
cout<<"读取到数据 :"<<buffer<<endl;
}else if(size==0)
{
cout<<"read done"<<endl;
break;
}else{
cerr<<errno<<":"<<strerror(errno)<<endl;
}
}
return 0;
}
一般阻塞式,系统就会等待着我们输入数据后,才进行下一步工作(这里是打印输入的字符)。
我们设置为非阻塞式有几点需要注意:
1.设置为非阻塞,系统不会等待着我们输出,才打印,而是以读取到数据<0打印错误信息。
2,出错非两种情况:第一个就是fd真的异常 ,第二个就是底层还没就绪,你就返回了。(这里是第二种)。
3.非阻塞也能获取到数据并打印,不过给你就绪的时间非常短。
对于第二点,如何区分是还没就绪还是fd异常,可以通过打印错误码而知晓。
多路复用(多路转接)
实际上对于阻塞式,非阻塞式,非阻塞式轮询这些等待和拷贝一个函数就会搞定了,但是对于多路转接,因为要提高IO效率,因此等待与拷贝是分开的的,多路复用的本质就是非阻塞轮询,因此我们需要将非阻塞和轮询逐一实现。
IO多路转接之select
初识select
系统提供 select 函数来实现多路复用输入 / 输出模型 .select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的 ;程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状态改变
select函数原型
select的函数原型如下: #include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
其中有三个参数类型为fd_set,位图类型,为输入输出型参数。
为输入型参数时:用户告诉内核,我给一些fd,你帮我注意观察一些fd的读事件,如果读事件就绪,你就告诉我
为输出型参数时:内核告诉用户,你给我传来的fd,我已经帮你知晓了有哪些已经就绪(进行位图修改),你可以读了。
至于为什么用位图,因为位图本质就是设置某个数的第几位为0/1,举例:如果此时内核发现0号和1号文件被描述符已经就绪,就会把第0位和第1位比特位置为1, (比特位的位置代表对应文件描述符,从右向左)。比特位的内容就可以表示内核是否要关心。
作为输入型位图:
比特位的位置代表文件描述符
比特位的内容代表是否内核需要关心
作为输出型位图:
比特位的位置还是文件描述符
比特位的内容代表用户关心的fd的读事件就绪了
于是就提供了一系列位图的接口用来修改为位图。
其中timeout为一个结构体,里面有两个成员:一个代表秒,一个代表微秒。、
如果设置了timeout,在该时间内如果有已经就绪的,立即返回,之后的timeout表示剩下的时间。
如果没设置,只要有一个就绪了,就返回。
select的返回值:
返回值:
- 正常情况下,返回已准备好的文件描述符的个数。
- 经过超时时长后仍无设备准备好,返回值为 0。
- 如果
select
被某个信号中断,返回 -1 并设置errno
为EINTR
。- 如果出错,返回 -1 并设置相应的
errno
。
对于服务端来说,使用多路复用可以提高IO效率,当没有用访问服务端使,此时select的个数为零,但一有客户端访问,就会有对应的文件描述符(接收缓冲区)被创建,并且添加进行状态检测。
select.hpp
#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
const int max_fd = (sizeof(fd_set)) * 8; // 8个位图最多存储的bit位
static const uint32_t defaultport = 8080;
class SelectServer
{
public:
SelectServer(const uint32_t Port = defaultport) : port(Port)
{
// 对辅助数组初始化为-1
for (int i = 0; i < 1024; i++)
{
fd_arry[i] = defaultfd;
}
}
~SelectServer()
{
_listenfd.Close();
}
bool Init()
{
_listenfd.Createsockfd();
_listenfd.Bind(port);
_listenfd.Listen();
return true;
}
void Accept()
{
// 连接就绪,获取新连接可以通信了
std::string clientip;
uint16_t clientport;
int sockfd = _listenfd.Accept(&clientip, &clientport); // 就绪了,不在阻塞
if (sockfd < 0)
return;
std::cout << "获取连接成功"
<< "描述符:" << sockfd << std::endl;
// 获取之后不能进行读取,而是设置select,不然还是会阻塞在select中
int pos = 1;
for (; pos < max_fd; pos++)
{
if (fd_arry[pos] != defaultfd)
{
continue;
}
else
{
break;
}
}
if (pos == max_fd)
{
std::cout << "can not handle" << std::endl;
close(sockfd); // 无法处理就关掉接收的文件描述符
}
else
{
// 找到了空闲位置,此时放到fd_arry里面管理
fd_arry[pos] = sockfd;
}
}
void recver(int fd,int i)
{
// 不在位图中代表的是接收后新的fd
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
std::cout << "read meesage:" << buffer << std::endl;
}else if(n==0)
{
std::cout<<"client closed" << std::endl;
//如果为空,说明要关闭连接了
close(fd);
fd_arry[i]=defaultfd;
}else{
std::cout<<"read warning"<<std::endl;
close(fd);
fd_arry[i]=defaultfd;
}
}
void Disapacher(fd_set fds)
{
// 为了确定就绪的文件描述符。需要遍历整个数组
for (int i = 0; i < max_fd; i++)
{
int fd = fd_arry[i];
if (fd == defaultfd)
{
continue;
}
// 先判断文件描述符是否在位图中
if (fd == _listenfd.Fd() && FD_ISSET(fd, &fds)) // 判断是否是位图中的,是就代表就绪
{
Accept();
}
else
{
recver(fd,i);
}
}
}
void Start()
{
int listensock = _listenfd.Fd(); // 获取套接字的文件描述符
fd_arry[0] = listensock;
fd_set rfds; // 读的文件描述符位图
FD_ZERO(&rfds); // 先进行清空
struct timeval timeout = {10, 0}; // 设置时间戳,每隔五秒检验一次
int fdmax = fd_arry[0];
while (true)
{
for (int i = 0; i < max_fd; i++)
{
if (fd_arry[i] == defaultfd)
{
continue;
}
else
{
// 如果不为-1,说明该位置有要检测的文件描述符
FD_SET(fd_arry[i], &rfds); // 将指定的文件描述符添加进来(交给select进行检测)
// 同时找出最大的文件描述符
if (fd_arry[i] > fdmax)
{
fdmax = fd_arry[i];
std::cout << "max has updated:" << fdmax << std::endl;
}
}
}
// 这里不能直接进行accept,accept的本质就是检测并获取listenfd上面的事件,accept只能阻塞等一个
// 因为我们要实现多路复用,所以这里是要去等待多个套接字并检测他们的状态,使用select
int n = select(fdmax + 1, &rfds, nullptr, nullptr, &timeout); // 通过内核的select帮我进行检测,刚开始启动只有一个listenfd
switch (n)
{
case 0:
std::cout << "time out" << std::endl;
break;
case -1:
std::cout << "select error" << std::endl;
default:
// 有链接了,如果对链接不处理,select会一直通知你,需要处理并设置位图
// 新连接到来我们认为是读事件,
Disapacher(rfds);//名为事件派发器
break;
}
}
}
private:
Sock _listenfd;
uint32_t port;
int fd_arry[max_fd]; // 辅助数组,统计所有描述符
};
main.cc
#include"SelectSever.hpp"
#include<memory>
#include<iostream>
void usehelp(char*s)
{
printf("use correct code:%s port[1024]\n",s);
}
int main(int argc,char*argv[])
{
std::unique_ptr<SelectServer> server(new(SelectServer));
server->Init();
server->Start();
return 0;
}
编写select的思路:
1.首先创建,绑定,监听套接字,之后创建文件描述符数组对所有文件描述符管理。
2.插入最开始的listenfd到数组里,遍历数组找出最大的文件描述符,设置位图用来标识就绪的文件描述符,之后select数组最大fd+1文。
2.select之后判断是否有新链接,有新的连接(这里我们默认是读事件)就会有新的文件描述符,对事件进行处理。
3.处理时,数组里的文件描述符分两种,一种使位图中的就绪的fd,一种是就绪后进行accept的新的fd,因此对整个数组进行判断,是位图中的,就行新接受新连接,是accept的进行读事件,反之是-1,continue。
但是select是有很明显的缺陷:
1.等待的fd是有上限的--1024
2.输入输出参数较多,数据拷贝频率高
3.输入输出参数多,每一次都对需要关心的fd重置以达到复用
4.管理数组fd,有太多次进行遍历
为了解决种种情况,我们使用了下一种接口(poll):
说明fds 是一个 poll 函数监听的结构列表 . 每一个元素中 , 包含了三部分内容 : 文件描述符 , 监听的事件集合 , 返 回的事件集合.nfds 表示 fds 数组的长度 .timeout 表示 poll 函数的超时时间 , 单位是毫秒 (ms).
返回结果返回值小于 0, 表示出错 ;返回值等于 0, 表示 poll 函数等待超时 ;返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回
这里我们介绍一下,重点放在select_poll上。
IO多路转接之epoll
epoll的说明是:为了处理大批量句柄而改进的poll,
1.认识有关epoll的接口
epoll_create 创建一个epoll模型
epoll_wait 等待多长时间进行一次事件检查 返回就绪的fd与event
返回值位已经准备就绪的个数。对于event的宏表示的就绪时间,类型如下:
EPOLLIN : 表示对应的文件描述符可以读 ( 包括对端 SOCKET 正常关闭 );EPOLLOUT : 表示对应的文件描述符可以写 ;EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 ( 这里应该表示有带外数据到来 );EPOLLERR : 表示对应的文件描述符发生错误 ;EPOLLHUP : 表示对应的文件描述符被挂断 ;EPOLLET : 将 EPOLL 设为边缘触发 (Edge Triggered) 模式 , 这是相对于水平触发 (Level Triggered) 来说的 .EPOLLONESHOT :只监听一次事件 , 当监听完这次事件之后 , 如果还需要继续监听这个 socket 的话 , 需要再次把这个 socket 加入到 EPOLL 队列里 .
epoll_ctl 作用是对系统的文件描述符的事件增加,删除,修改。即对文件描述符做管理
第一个参数就是epoll_create的返回值,op代表三个选项(增加,修改,删除),第三个代表那个fd对应的哪个事件。
对于epoll的使用时要使用这三个系统接口的。
2.epoll原理
当某一进程调用 epoll_create 方法时, Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成 员与epoll 的使用方式密切相关每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来 的事件.这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来 ( 红黑树的插 入时间效率是lgn ,其中 n 为树的高度 ).而所有添加到 epoll 中的事件都会与设备 ( 网卡 ) 驱动程序建立回调关系,也就是说,当响应的事件发生时 会调用这个回调方法.这个回调方法在内核中叫 ep_poll_callback, 它会将发生的事件添加到 rdlist 双链表中 .在 epoll 中,对于每一个事件,都会建立一个 epitem结构体
实际上epoll和poll是两个不同的模型,没太多关系,epoll是专门为用户设计的一个底层事件调度的模型。
3.编写epoll
对于select来说,我们使用一个系统调用接口加上文件描述符数组来进行非阻塞式的等待。
那么对于epoll来说就是通过三个系统调用接口,1.创建模型,3.进行非阻塞式等待(时间自己设置),2.使用epoll_ctl管理事件(本质上时使用红黑树的节点绑定事件进行高效的管理)。
每一个事件和她的描述符都被封装在一个结构体epoll_event当中,之后通过一个数组管理这里epol_event。
第一个不太完美的版本的eopp如下:
epoler.hpp
//封装epoll接口
#pragma once
#include <sys/epoll.h>
#include <unistd.h>
#include<iostream>
#include<string.h>
#include"Nocopy.hpp"
uint32_t EVENT_IN=(EPOLLIN); //读事件
uint32_t EVENT_OUT=(EPOLLOUT);//写事件
//因为我们不想该类被拷贝,所以继承不被拷贝的类
class Epoll :public nocopy
{
public:
static const int size=128;
Epoll():_timeout(3000) //设置检测时间3s一次,若为零就是非阻塞等待
{
//创建epoll模型
_epfd=epoll_create(size);
if(_epfd==-1)
{
std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;
}else
{
std::cout<<"create succed ,epfd:"<<_epfd<<std::endl;
}
}
//使用epoll接口进行等待
int Epollwait(struct epoll_event events[],int num) //输入参数events表示就序的事件 num表示个数
{
int ret=epoll_wait(_epfd,events,num,_timeout);//就绪了就从这里拿
}
int Epollctl(int oper,int sock,uint32_t event)
{
//向指定模型根据oper添加修改删除事件
if(oper==EPOLL_CTL_DEL)
{
//这里对删除事件进行单独处理
int ret=epoll_ctl(_epfd,oper,sock,nullptr);
}else
{
//注册事件
struct epoll_event ev; //创建事件结构体并设置
ev.events=event;//设置事件
ev.data.fd=sock;//设置fd
int ret=epoll_ctl(_epfd,oper,sock,&ev);//进行什么操作呢 本质就是向红黑树新增节点,节点绑定事件
if(ret==-1)
{
std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;
}else
{
std::cout<<"add succed ,epfd:"<<_epfd<<std::endl;
}
return ret;
}
}
~Epoll()
{
if(_epfd==-1)
{
close(_epfd);
std::cout<<"close succed ,epfd:"<<_epfd<<std::endl;
}
}
private:
int _epfd;
int _timeout;
};
epollserver.hpp
#pragma once
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Nocopy.hpp"
#include<iostream>
#include<memory>
#include<sys/epoll.h>
//设置你要关心的时事件
const uint16_t defalutport=8080;
class EpollServer :public nocopy
{
public:
static const int num=64;//每次从就绪队列中取64个就绪事件
EpollServer(uint16_t port=defalutport):_port(port),_listensock(new Sock),_epoller(new Epoll)
{
}
bool Init()
{
_listensock->Createsockfd();
_listensock->Bind(_port);
_listensock->Listen();
return true;
}
void Start()
{
//将listensock添加到epoll中检测你关心的读写事件,这里就是添加
_epoller->Epollctl(EPOLL_CTL_ADD,_listensock->Fd(),EVENT_IN);
struct epoll_event evns[num];//num代表最多就绪的个数,若果等待的个数太多,就会分配批次取wait
while(true)
{
int n=_epoller->Epollwait(evns,num); //返回值代表就绪的个数
if(n>0)
{
//在事件插入之后,有事件就绪了
std::cout<<"检测对应的fd的事件,fd:"<<evns[0].data.fd<<std::endl;
//既然就绪,那就开始处理事件
HandlerEvent(evns,n);
}else if(n==0)
{
std::cout<<"time out"<<std::endl;
}else
{
std::cout<<"wait error"<<std::endl;
}
}
}
void Accepter()
{
std::string clientip;
uint16_t clientport;
int sock =_listensock->Accept(&clientip,&clientport);
if(sock>0)
{
//获取到新连接再插入到红黑树当中
_epoller->Epollctl(EPOLL_CTL_ADD,sock,EVENT_IN);
}else
{
std::cout<<"accept error"<<std::endl;
}
}
void Dispatcher(int fd)
{
char buffer[1024];
// 已经获取新连接,开始read
int n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
std::cout << "读取到数据:" << buffer << std::endl;
//读完之后写回给客户端
std::string echo_string="server get message";
std::string content=echo_string+std::string(buffer);
write(fd,content.c_str(),content.size());
}else if(n==0)
{
std::cout << "client close:"<< std::endl;
//删除对应的文件描述符
_epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fd
close(fd);
}else
{
std::cout << "read error!!" << std::endl;
_epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fd
close(fd);
}
}
//这里的事件假设就两种,读 写
void HandlerEvent( struct epoll_event evns[],int num)
{
for(int i=0;i<num;i++)
{
uint32_t event=evns[i].events;
int fd=evns[i].data.fd;
//判断是哪一种事件
if(event&EVENT_IN)
{
//判断fd是等待的fd,还是已经获取新连接的fd
if(fd==_listensock->Fd())
{
Accepter();
}else
{
Dispatcher(fd);
}
}else if(event & EVENT_OUT)
{
//写事件就绪
}
}
}
~EpollServer()
{
_listensock->Close();
}
private:
std::shared_ptr<Sock> _listensock;
std::shared_ptr<Epoll> _epoller;
uint16_t _port;
};
4.epoll的工作模式
epoll的工作模式有利各种:ET 和LT
LT(level triggered)工作模式--水平工作模式
1.当 epoll 检测到 socket 上事件就绪的时候 , 可以不立刻进行处理 . 或者只处理一部分 .如上面的例子 , 由于只读了 1K 数据 , 缓冲区中还剩 1K 数据 , 在第二次调用 epoll_wait 时 , epoll_wait 仍然会立刻返回并通知socket 读事件就绪 .直到缓冲区上所有的数据都被处理完 , epoll_wait 才不 会立刻返回 .2.支持阻塞读写和非阻塞读写
通俗点说,如果上层有新的数据,你不拿走,上层就一直通知你让你取数据,只要有就拿走。
ET(Edge Triggered)工作模式 --边缘工作模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.
当 epoll 检测到 socket 上事件就绪时 , 必须立刻处理 .如上面的例子 , 虽然只读了 1K 的数据 , 缓冲区还剩 1K 的数据 , 在第二次调用 epoll_wait 的时候 ,epoll_wait 不会再返回了 .也就是说 , ET 模式下 , 文件描述符上的事件就绪后 , 只有一次处理机会 .ET 的性能比 LT 性能更高 ( epoll_wait 返回的次数少了很多 ). Nginx 默认采用 ET 模式使用 epoll.只支持非阻塞的读写
通俗点说,只有发生变化(从没链接到有新连接,从一个变多个,从多个变没),就会进行通知取走数据,只有变化,才会拿走。
由于通知的次数少,且通知是有限的,所以一般而言ET的工作效率更高一些,但也因为之中特性,程序员必须每次把数据都取完(循环读取,直到读取出错),否则就会遗漏。
但是读完数据,我们此时的read是阻塞的,我们不能让她阻塞,这会导致服务器挂起(这是不好的),所以我们需要设置读是非阻塞的。直到非阻塞状态下还读完了,就会返回错误码。
且由于我们把缓冲区的数据都把走了,这一位则TCP通信时,窗口就更大了,一次拿去的数据也多了。
注意:
即使如此,一般情况ET比LT高效,但是你是一直比LT高效吗,而且既然你ET都能一次性非阻塞的读取完,我LT不行吗?只是我们一般去使用ET,如果可以,你也可以去使用LT修改。
读写的改进 reactor
所以我们的读写是不合理的,首先没有协议的定制,其次读取并不是非阻塞的。
这里就还是以编写计算器的客户端与服务端为例:
重要的读写部分在TcpServer.hpp中:
TcpServer.hpp
#pragma once
#include<string>
#include<iostream>
#include<memory>
#include<functional>
#include<unordered_map>
#include <arpa/inet.h>
#include"Nocopy.hpp"
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Comen.hpp"
#include"Protocol.hpp"
#include"Calculator.hpp"
class Connection;
class TcpServer;
using funct=std::function<void (std::shared_ptr<Connection>)>; //定义了返回值为void,参数为td::shared_ptr<Connection>指针的包装器
const static int buffersize=1024;
const uint16_t defaultport=8080;
const int num=64;
class Connection
{
public:
void SetHandle(funct &recv_cb,funct &send_cb,funct &except_cb)
{
_recver_callback=recv_cb;
_send_callback=send_cb;
_except_callback=except_cb;
}
void Appendinbuffer(std::string buffer)
{
_inbuffer+=buffer;//输入缓冲区
}
void Appendoubuffer(std::string buffer)
{
_outbuffer+=buffer;
}
const std::string &inbuffer()
{
return _inbuffer;
}
Connection(int sock,std::shared_ptr<TcpServer> tcpserver_ptr):_sock(sock),_tcpserver_callback(tcpserver_ptr)//回值指针用来传递TCPserver
{
}
int getsock()
{
return _sock;
}
std::string& getinbuf()
{
return _inbuffer;
}
std::string& getoutbuf()
{
return _outbuffer;
}
~Connection(){}
private:
int _sock;
private:
std::string _inbuffer;//输入缓冲区
std::string _outbuffer;//输出缓冲区
public:
funct _recver_callback; //接收回调
funct _send_callback; //发送回调
funct _except_callback; //异常回调
//tcpserver回值指针
std::shared_ptr<TcpServer> _tcpserver_callback;
};
class TcpServer
{
public:
TcpServer(funct OnMessage,uint16_t port=defaultport):_epoll_server(new Epoll),_listensocket(new Sock),_port(port),_quit(true)//回调函数用来设置处理事件
{
_OnMessage=OnMessage;
_listensocket->Createsockfd();
_listensocket->Bind(_port);
_listensocket->Listen();
SetNonBlock(_listensocket->Fd()); //将文件设置为非阻塞等待
AddConnection(_listensocket->Fd(),EVENT_IN,std::bind(&TcpServer::Accepter,this,std::placeholders ::_1),nullptr,nullptr);
}
void Init()
{
}
void Accepter(std::shared_ptr<Connection> connection)
{
//连接到来开始接受
while(true)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int fd=::accept(connection->getsock(),(sockaddr*)&peer,&len); //获取新链接
if(fd>0)
{
uint16_t clientport=ntohs(peer.sin_port);
char ipbuffer[10];
inet_ntop(AF_INET,&peer.sin_addr,ipbuffer,sizeof(ipbuffer));
std::string ip=std::string(ipbuffer);
std::cout<<"get new link,ip:"<<ip<<",fd :"<<fd<<",port"<<clientport<<std::endl;
//获取的新连接设置非阻塞,现在是ET模式
SetNonBlock(fd);
//之后再添加到哈希表中,epoll_event中,
AddConnection(fd,EVENT_IN,\
std::bind(&TcpServer::Recver,this,std::placeholders::_1), //根据函数地址与参数bind成一个包装器对象
std::bind(&TcpServer::Sender,this,std::placeholders::_1),\
std::bind(&TcpServer::Exepter,this,std::placeholders::_1));
}else
{
//直到非阻塞被读到没有新的连接了
if(errno==EWOULDBLOCK)
{
break;
}else if(errno==EINTR)
{
continue;
}else{
//读出错
break;
}
}
}
}
void AddConnection(int sock,uint32_t event,funct recv_cb,funct send_cb,funct except_cb)
{
// 将listensock添加到epoll中检测你关心的读写事件,这里就是添加 并且设置为边缘工作模式
_epoll_server->Epollctl(EPOLL_CTL_ADD, sock, event);
//c除此之外,每一个fd都要构建成Connetcion对象,通过connect管理事件的处理
std::shared_ptr<Connection> new_connection=std::make_shared<Connection>(sock,std::shared_ptr<TcpServer>(this));
new_connection->SetHandle(recv_cb,send_cb,except_cb);
//第三步,就是添加到unordered_map
_connection_map[sock]=new_connection;
}
//事件管理,进行事件的处理
void Recver(std::shared_ptr<Connection> connection)
{
int sock=connection->getsock();
//ET模式读
while(true)
{
char buffer[buffersize];
memset(buffer,0,sizeof(buffer));
ssize_t n=recv(sock,buffer,sizeof(buffer)-1,0); //这里的数据默认为字符流,如果还有二进制,需要换为vector
if(n>0)
{
//读取成功
connection->Appendinbuffer(buffer);
}else if(n==0)
{
//连接关闭
std::cout<<"link closed !"<<std::endl;
connection->_except_callback(connection);
}else
{
//读取错误
if(errno==EWOULDBLOCK)
{
break;
}else if(errno==EINTR)
{
continue;
}else{
std::cout<<"read error!"<<std::endl;
connection->_except_callback(connection);
break;
}
}
}
//读完之后,将数据交给上层
_OnMessage(connection); //协议的定制
}
void Sender(std::shared_ptr<Connection> connection)
{
while(true)
{
std::string &buffer=connection->getoutbuf();
int sockfd=connection->getsock();
ssize_t n=send(sockfd,buffer.c_str(),buffer.size(),0);
if(n>0)
{
//清空缓冲区
buffer.erase(0,n);
if(buffer.empty())
break;
}else if(n==0)
{
return ;
}else
{
if(errno==EWOULDBLOCK)
{
break;
}else if(errno==EINTR)
{
continue;//信号中断,跳过本次
}else
{
//发失败了,进入异常处理
std::cout<<"send error"<<std::endl;
connection->_except_callback(connection);
return;
}
}
if(!buffer.empty())
{
//对写事件的关心
EnableEvent(connection->getsock(),true,true);
}else
{
//关闭对写事件的处理
EnableEvent(connection->getsock(),true,false);
}
}
}
void EnableEvent(int fd,bool readable,bool writeable )
{
uint32_t event=0;
event |=((readable ? EPOLLIN:0)|(writeable ? EPOLLOUT : 0)|EPOLLET);
_epoll_server->Epollctl(EPOLL_CTL_MOD,fd,event);//修改事件的读写,如果没发完,就继续发,通过修改事件为写
}
void Exepter(std::shared_ptr<Connection> connection)
{
//处理链接异常或着断开
std::cout<<"事件异常,断开连接!"<<std::endl;
//首先移除链接
//EnableEvent(connection->getsock(),false,false);//读写都设置为False
_epoll_server->Epollctl(EPOLL_CTL_DEL,connection->getsock(),0);//移出事件
//关闭异常的文件描述符
close(connection->getsock());
std::cout<<"已移除connection,已关闭文件描述符:"<<connection->getsock()<<std::endl;
//把map中的也删了
}
bool isConnectionsafe(int fd)
{
auto it=_connection_map.find(fd);
if(it==_connection_map.end())
{
return false;
}
return true;
}
void Disptcher(int timeout)
{
//进行等待就绪时间
int n=_epoll_server->Epollwait(events,num,timeout);
for(int i=0;i<n;i++)
{
uint32_t event=events[i].events;
int sock=events[i].data.fd;
//进行事件判断,并转化成读写事件
if((event & EVENT_IN)&& isConnectionsafe(sock))
{
//读就绪
if(_connection_map[sock]->_recver_callback)//函数存在
{
_connection_map[sock]->_recver_callback(_connection_map[sock]);//fd对应的connetcion信息,其中调用对应的方法,参数为connection对象
}
}
if((event & EPOLLOUT)&& isConnectionsafe(sock))
{
//写就绪
if(_connection_map[sock]->_recver_callback)
{
_connection_map[sock]->_send_callback(_connection_map[sock]);
}
}
if(event & EPOLLHUP)
{
//读关闭
event |=(EPOLLIN & EPOLLOUT);
}
if(event &EPOLLERR)
{
//错误
event |=(EVENT_IN & EVENT_OUT);
}
}
}
void Loop()
{
_quit=false;
while (!_quit)
{
Disptcher(3000);//3s
}
_quit=true;
}
~TcpServer(){}
private:
std::shared_ptr<Epoll> _epoll_server; //epoll接口
std::unordered_map<int,std::shared_ptr<Connection>> _connection_map; //连接池
std::shared_ptr<Sock> _listensocket; //套接字接口
struct epoll_event events[num];
uint32_t _port;
bool _quit; //是否退出
funct _OnMessage;//上层回调处理信息
};
剩余代码的所在地:reactor · 但成伟/编程学习 - 码云 - 开源中国 (gitee.com)
主要难点是对事件与链接的管理,事件的执行,读写非阻塞这几点。