前言
- IO = 等待 + 数据拷贝,比如read/recv,write/send
- 只要在单位事件里,让等的比重减低,IO的效率就越高
五种IO模型
钓鱼小案例
阻塞式
- 阻塞式: 张三拿着一根鱼竿,一直在岸边钓鱼,期间一直盯着鱼竿,等待鱼上钩
非阻塞式轮询式
- 非阻塞式轮询式: 李四拿着一根鱼竿,在岸边钓鱼,期间一会看手机,一会看鱼竿,等待鱼上钩
信号驱动
- 信号驱动: 王五拿着一根鱼竿,并鱼竿上挂着一个铃铛,在岸边钓鱼,期间一直是否有铃铛声,等待鱼上钩
多路转接
- 多路复用,多路转接: 赵六拿着一堆鱼竿,在岸边钓鱼,期间一直盯着一堆鱼竿,等待鱼上钩
异步IO
- 异步IO : 田七也钓鱼,但是他叫小王给他钓鱼,不要过程,只要结果
这五种IO效率最高的是 多路复用,多路转接
- 在单位事件里,等的比重最低,I
如果一个进程/线程要参与IO,我们就称之为同步IO
- IO = 等 + 拷贝,所谓的"参与",就只有三种情况: 等,拷贝,等 && 拷贝
fcntl
fcntl函数有5种功能:
-
复制一个现有的描述符(cmd=F_DUPFD).
-
获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
-
获得/设置文件状态标记(cmd= F_GETFL 或 F_SETFL ).
-
获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
-
获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW
#include <iostream>
#include <cstring>
#include <ctime>
#include <cassert>
#include <cerrno>
#include <fcntl.h>
#include <unistd.h>
#include <sys/time.h>
bool SetNonBlock(int fd)
{
int fl = fcntl(fd, F_GETFL); // 在底层获取当前fd对应的文件读写标志位
if (fl < 0)
return false;
fcntl(fd, F_SETFL, fl | O_NONBLOCK); // 设置非阻塞
return true;
}
int main()
{
// 0
SetNonBlock(0); //只要设置一次,后续就都是非阻塞了
char buffer[1024];
while (true)
{
sleep(1);
errno = 0;
ssize_t s = read(0, buffer, sizeof(buffer) - 1);
// 因为将0的文件读写标志位设置成了非阻塞,将不会等待输入了,read返回0,执行else语句
if (s > 0)
{
buffer[s-1] = 0;
std::cout << "echo# " << buffer << " errno[---]: " << errno << " errstring: " << strerror(errno) << std::endl;
}
else
{
// 如果失败的errno值是11,就代表其实没错,只不过是底层数据没就绪
//std::cout << "read \"error\" " << " errno: " << errno << " errstring: " << strerror(errno) << std::endl;
if(errno == EWOULDBLOCK || errno == EAGAIN)
{
std::cout << "当前0号fd数据没有就绪, 请下一次再来试试吧" << std::endl;
continue;
}
else if(errno == EINTR)
{
std::cout << "当前IO可能被信号中断,在试一试吧" << std::endl;
continue;
}
else
{
//进行差错处理
}
}
}
return 0;
}
- 让IO非阻塞,打开的时候,就可以指定非阻塞接口
- 我们用统一的方式来进行非阻塞设置fcntl()
I/O多路转接之select
解释timeval类型
select等待多个fd,等待策略可以选择:
- 阻塞式 nullptr
- 非阻塞式 {0,0}
- 可以设置timeout时间,时间内阻塞,时间到,立马返回{5,0}
- 等待时间内,如果有fd就绪,则timeout输出距离下一次timeout,剩余的时间
解释fd_set类型
- fd_set是一个位图结构,就是一个文件描述符集
- void FD_CLR(int fd,fd_set * set);
- int FD_ISSET(int fd,fd_set *set);
- void FD_SET(int fd,fd_set *set);
- void FD_ZERO(fd_set *set);
比如说: readfds参数
- 输入时: 用户->内核,比特位中,比特位的位置: 表示文件描述符,比特位的内容表示:是否关心
- 0000 1010,从右向左: 关心1号和3号文件描述符的读
- 输出时: 内核->用户,比特位中,比特位的文件: 表示文件描述符,比特位的内容表示:是否就绪
- 0000 1000,从右向左: 用户可以直接读取3号文件描述符,而不会被阻塞
注意: 用户和内核都会修改同一个位图结构,所以参数用了一次之后,一定需要进行重新设定
快速编写select代码
selectServer.hpp
#ifndef __SELECT_SVR_H__
#define __SELECT_SVR_H__
#include <iostream>
#include <string>
#include <vector>
#include <sys/select.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"
#define BITS 8
#define NUM (sizeof(fd_set)*BITS)
#define FD_NONE -1
using namespace std;
// select 我们只完成读取,写入和异常不做处理 -- epoll(写完整)
class SelectServer
{
public:
SelectServer(const uint16_t &port = 8080) : _port(port)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG,"%s","create base socket success");
for(int i = 0; i < NUM; i++) {
_fd_array[i] = FD_NONE;
}
// 规定 : _fd_array[0] = _listensock;
_fd_array[0] = _listensock;
}
void Start()
{
while (true)
{
DebugPrint();
fd_set rfds;
FD_ZERO(&rfds);
int maxfd = _listensock;
for(int i = 0; i < NUM; i++)
{
if(_fd_array[i] == FD_NONE) continue;
FD_SET(_fd_array[i], &rfds);
if(maxfd < _fd_array[i]) maxfd = _fd_array[i];
}
// rfds未来,一定会有两类sock,listensock,普通sock
// 我们select中,就绪的fd会越来越多!
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
// printf("hello select ...\n");
logMessage(DEBUG, "%s", "time out...");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
break;
default:
// 成功的
logMessage(DEBUG, "get a new link event...");
// 为什么会一直打印连接到来呢?连接已经建立完成,就绪了,但是你没有取走,select要一直通知你!
HandlerEvent(rfds);
break;
}
}
}
~SelectServer()
{
if (_listensock >= 0)
close(_listensock);
}
private:
void HandlerEvent(const fd_set &rfds) // fd_set 是一个集合,里面可能会存在多个sock
{
for(int i = 0; i < NUM; i++)
{
// 1. 去掉不合法的fd
if(_fd_array[i] == FD_NONE) continue;
// 2. 合法的就一定就绪了?不一定
if(FD_ISSET(_fd_array[i], &rfds))
{
//指定的fd,读事件就绪
// 读事件就绪:连接时间到来,accept
if(_fd_array[i] == _listensock) Accepter();
else Recver(i);
}
}
}
void Accepter()
{
string clientip;
uint16_t clientport = 0;
// listensock上面的读事件就绪了,表示可以读取了
// 获取新连接了
int sock = Sock::Accept(_listensock, &clientip, &clientport); // 这里在进行accept会不会阻塞?不会!
if(sock < 0)
{
logMessage(WARNING, "accept error");
return;
}
logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
int pos = 1;
for(; pos < NUM; pos++){
if(_fd_array[pos] == FD_NONE) break;
}
if(pos == NUM){
logMessage(WARNING, "%s:%d", "select server already full,close: %d", sock);
close(sock);
}else{
_fd_array[pos] = sock;
}
}
void Recver(int pos)
{
// 读事件就绪:INPUT事件到来、recv,read
logMessage(DEBUG, "message in, get IO event: %d", _fd_array[pos]);
// 暂时先不做封装, 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
// 这样读取有bug吗?有的,你怎么保证以读到了一个完整包文呢?
char buffer[1024];
int n = recv(_fd_array[pos], buffer, sizeof(buffer)-1, 0);
if(n > 0){
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fd_array[pos], buffer);
}
else if(n == 0){
logMessage(DEBUG, "client[%d] quit, me too...", _fd_array[pos]);
// 1. 我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2. 不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
else{
logMessage(WARNING, "%d sock recv error, %d : %s", _fd_array[pos], errno, strerror(errno));
// 1. 我们也要关闭不需要的fd
close(_fd_array[pos]);
// 2. 不要让select帮我关心当前的fd了
_fd_array[pos] = FD_NONE;
}
}
void DebugPrint()
{
cout << "_fd_array[]: ";
for(int i = 0; i < NUM; i++)
{
if(_fd_array[i] == FD_NONE) continue;
cout << _fd_array[i] << " ";
}
cout << endl;
}
private:
uint16_t _port;
int _listensock;
int _fd_array[NUM];
};
#endif
select优缺点
- 优点 --任何一个多路转接方案,都具备:
- 效率高
- 应用场景: 有大量的连接,但是只有少量是活跃的!,省资源
- 缺点:
- 为了维护第三方数组,select服务器会充满大量的遍历,OS底层帮我们关心fd的时候,也要遍历
- 每一次都要对select输出参数进行重新设定
- 能够同时管理的fd的个数是有上限的
- 因为几乎每一个参数都是输入输出型的,select一定会频繁的进行用户到内核,内核到用户的参数数据拷贝
- 编码比较复杂
快速编写 poll代码
参数说明
- fds是一个poll函数监听的结构列表. 每一个元素中,
包含了三部分内容: 文件描述符, 监听的事件集合, 返回的事件集合. - nfds表示fds数组的长度.
- timeout表示poll函数的超时时间, 单位是毫秒(ms)
events和revents的取值:
返回结果
-
返回值小于0, 表示出错;
-
返回值等于0, 表示poll函数等待超时;
-
返回值大于0, 表示poll由于监听的文件描述符就绪而返回.
pollServer.hpp
#ifndef __POLL_SVR_H__
#define __POLL_SVR_H__
#include <iostream>
#include <string>
#include <vector>
#include <poll.h>
#include <sys/time.h>
#include "Log.hpp"
#include "Sock.hpp"
#define FD_NONE -1
using namespace std;
// select 我们只完成读取,写入和异常不做处理 -- epoll(写完整)
class PollServer
{
public:
static const int nfds = 100;
public:
PollServer(const uint16_t &port = 8080) : _port(port), _nfds(nfds)
{
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
logMessage(DEBUG,"%s","create base socket success");
_fds = new struct pollfd[_nfds];
for(int i = 0; i < _nfds; i++) {
_fds[i].fd = FD_NONE;
_fds[i].events = _fds[i].revents = 0;
}
_fds[0].fd = _listensock;
_fds[0].events = POLLIN;
_timeout = 1000;
}
void Start()
{
while (true)
{
int n = poll(_fds, _nfds, _timeout);
switch (n)
{
case 0:
logMessage(DEBUG, "%s", "time out...");
break;
case -1:
logMessage(WARNING, "select error: %d : %s", errno, strerror(errno));
break;
default:
// 成功的
HandlerEvent();
break;
}
}
}
~PollServer()
{
if (_listensock >= 0)
close(_listensock);
if (_fds) delete [] _fds;
}
private:
void HandlerEvent() // fd_set 是一个集合,里面可能会存在多个sock
{
for(int i = 0; i < _nfds; i++)
{
// 1. 去掉不合法的fd
if(_fds[i].fd == FD_NONE) continue;
// 2. 合法的就一定就绪了?不一定
if(_fds[i].revents & POLLIN)
{
//指定的fd,读事件就绪
// 读事件就绪:连接事件到来,accept
if(_fds[i].fd == _listensock) Accepter();
else Recver(i);
}
}
}
void Accepter()
{
string clientip;
uint16_t clientport = 0;
// listensock上面的读事件就绪了,表示可以读取了
// 获取新连接了
int sock = Sock::Accept(_listensock, &clientip, &clientport); // 这里在进行accept会不会阻塞?不会!
if(sock < 0)
{
logMessage(WARNING, "accept error");
return;
}
logMessage(DEBUG, "get a new line success : [%s:%d] : %d", clientip.c_str(), clientport, sock);
int pos = 1;
for(; pos < _nfds; pos++){
if(_fds[pos].fd == FD_NONE) break;
}
if(pos == _nfds){
// 对struct pollfd进行自动扩容
logMessage(WARNING, "%s:%d", "poll server already full,close: %d", sock);
close(sock);
}else{
_fds[pos].fd = sock;
_fds[pos].events = POLLIN;
}
}
void Recver(int pos)
{
// 读事件就绪:INPUT事件到来、recv,read
logMessage(DEBUG, "message in, get IO event: %d", _fds[pos]);
// 暂时先不做封装, 此时select已经帮我们进行了事件检测,fd上的数据一定是就绪的,即 本次 不会被阻塞
// 这样读取有bug吗?有的,你怎么保证以读到了一个完整包文呢?
char buffer[1024];
int n = recv(_fds[pos].fd, buffer, sizeof(buffer)-1, 0);
if(n > 0){
buffer[n] = 0;
logMessage(DEBUG, "client[%d]# %s", _fds[pos].fd, buffer);
}
else if(n == 0){
logMessage(DEBUG, "client[%d] quit, me too...", _fds[pos].fd);
// 1. 我们也要关闭不需要的fd
close(_fds[pos].fd);
// 2. 不要让select帮我关心当前的fd了
_fds[pos].fd = FD_NONE;
_fds[pos].events = 0;
}
else{
logMessage(WARNING, "%d sock recv error, %d : %s", _fds[pos].fd, errno, strerror(errno));
// 1. 我们也要关闭不需要的fd
close(_fds[pos].fd);
// 2. 不要让select帮我关心当前的fd了
_fds[pos].fd = FD_NONE;
_fds[pos].events = 0;
}
}
void DebugPrint()
{
cout << "_fd_array[]: ";
for(int i = 0; i < _nfds; i++)
{
if(_fds[i].fd == FD_NONE) continue;
cout << _fds[i].fd << " ";
}
cout << endl;
}
private:
uint16_t _port;
int _listensock;
struct pollfd *_fds;
int _nfds;
int _timeout;
};
#endif
poll的优点:
- 效率高
- 有大量的连接,但是只有少量的是活跃的,节省资源
- 输入输入参数分离,不需要进行大量的重置
- poo参数级别没有可以管理fd的上限
poll的缺点:
- poll依旧需要不少的遍历,在用户层检测时间就绪,与内核检测fd就绪,都是一样的(用户还是要维护数组)
- poo需要内核到用户的拷贝--少不了的
- poll的代码也比较复杂--比select容易
小结
-
无论是select还是poll,都是需要用户自己维护一个数组,来进行保存fd与特定事件的 -- 成本 -- 用户程序员
-
select or poll都要遍历
-
select or poll 工作模式
-
通过select or poll,用户告诉内核,你要帮我关系那些fd上的那些event
-
通过select or poll 返回,内核告诉用户,那些fd上的那些event已经发生了
-
I/O多路转接之epoll
epoll的相关系统调用
epoll_create
int epoll_create(int size);
创建一个epoll的句柄.
- 自从linux2.6.8之后,size参数是被忽略的.
- 用完之后, 必须调用close()关闭
epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event) ;
epoll的事件注册函数
-
它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
-
第一个参数是epoll_create()的返回值(epoll的句柄).
-
第二个参数表示动作,用三个宏来表示.
-
第三个参数是需要监听的fd.
-
第四个参数是告诉内核需要监听什么事
第二个参数的取值:
-
EPOLL_CTL_ADD :注册新的fd到epfd中;
-
EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
-
EPOLL_CTL_DEL :从epfd中删除一个fd
struct epoll_event结构如下
typedef union epoll_data
{
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
struct epoll_event
{
uint32_t events;// Epoll events
epoll_data_t data;// user data varibale
}__EPOLL_PACKED;
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
- EPOLLERR : 表示对应的文件描述符发生错误;
-
EPOLLHUP : 表示对应的文件描述符被挂断;
-
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
-
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里.
epoll_wait
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
收集在epoll监控的事件中已经发送的事件
-
参数events是分配好的epoll_event结构体数组.
-
epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存).
-
maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size.
-
参数timeout是超时时间 (毫秒,0会立即返回,-1是永久阻塞).
-
如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败
epoll的工作原理
-
红黑树的时候,是要用key值的,文件描述符
-
用户只需要设置关系,获取结果即可,不用在关心任何对fd与event管理
-
epoll比select和poll的优势是,不需要维护数组,不需要遍历,所以高效
- 底层只要有fd就绪了,OS自己会给我构建节点,连入到就绪队列中
上层只需要不断的从就绪队列中将数据拿走,就完成了获取就绪事件的任务- 生产者消费者模型!共享资源 - epoll已经保证所有的epoll接口都是线程安全的
- 如果底层没有就绪事件,我们的上层就只能阻塞等待了
Epoll.hpp
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include <unistd.h>
class Epoll
{
public:
static const int gsize = 256;
public:
static int CreateEpoll()
{
int epfd = epoll_create(gsize);
if(epfd > 0) return epfd;
exit(5);
}
static bool CtlEpoll(int epfd, int oper, int sock, uint32_t events)
{
struct epoll_event ev;
ev.events = events;
ev.data.fd = sock;
int n = epoll_ctl(epfd, oper, sock, &ev);
return n == 0;
}
static int WaitEpoll(int epfd, struct epoll_event revs[], int num, int timeout)
{
// 细节1:如果底层就绪的sock非常多,revs承装不下,怎么办??不影响!一次拿不完,就下一次再拿
// 细节2:关于epoll_wait的返回值问题:有几个fd上的事件就绪,就返回几,epoll返回的时候,会将所有
// 就绪的event按照顺序放入到revs数组中!一共有返回值个!
return epoll_wait(epfd, revs, num, timeout);
}
};
EpollServer.hpp
#ifndef __EPOLL_SERVER_HPP__
#define __EPOLL_SERVER_HPP__
#include <iostream>
#include <string>
#include <functional>
#include <cassert>
#include "Log.hpp"
#include "Sock.hpp"
#include "Epoll.hpp"
namespace ns_epoll
{
const static int default_port = 8080;
const static int gnum = 64;
//只处理读取
class EpollServer
{
using func_t = std::function<void(std::string)>;
public:
EpollServer(func_t HandlerRequest, const int &port = default_port)
: _port(port), _revs_num(gnum), _HandlerRequest(HandlerRequest)
{
// 0. 申请对应的空间
_revs = new struct epoll_event[_revs_num];
// 1. 创建listensock
_listensock = Sock::Socket();
Sock::Bind(_listensock, _port);
Sock::Listen(_listensock);
// 2. 创建epoll模型
_epfd = Epoll::CreateEpoll();
logMessage(DEBUG, "init success, listensock: %d, epfd: %d", _listensock, _epfd); // 3, 4
// 3. 将listensock,先添加到epoll中,让epoll帮我们管理起来
if (!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, _listensock, EPOLLIN))
exit(6);
logMessage(DEBUG, "add listensock to epoll success."); // 3, 4
}
void Accepter(int listensock)
{
std::string clientip;
uint16_t clientport;
int sock = Sock::Accept(listensock, &clientip, &clientport);
if(sock < 0)
{
logMessage(WARNING, "accept error!");
return;
}
// 能不能直接读取?不能,因为你并不清楚,底层是否有数据!
// 将新的sock,添加给epoll
if (!Epoll::CtlEpoll(_epfd, EPOLL_CTL_ADD, sock, EPOLLIN)) return;
logMessage(DEBUG, "add new sock : %d to epoll success", sock);
}
void Recver(int sock)
{
// 1. 读取数据
char buffer[10240];
ssize_t n = recv(sock, buffer, sizeof(buffer)-1, 0);
if(n > 0)
{
//假设这里就是读到了一个完整的报文 // 如何保证??
buffer[n] = 0;
_HandlerRequest(buffer); // 2. 处理数据
}
else if(n == 0)
{
// 1. 先在epoll中去掉对sock的关心
bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);
assert(res);
(void)res;
// 2. 在close文件
close(sock);
logMessage(NORMAL, "client %d quit, me too...", sock);
}
else
{
// 1. 先在epoll中去掉对sock的关心
bool res = Epoll::CtlEpoll(_epfd, EPOLL_CTL_DEL, sock, 0);
assert(res);
(void)res;
// 2. 在close文件
close(sock);
logMessage(NORMAL, "client recv %d error, close error sock", sock);
}
}
void HandlerEvents(int n)
{
assert(n > 0);
for(int i = 0; i < n; i++)
{
uint32_t revents = _revs[i].events;
int sock = _revs[i].data.fd;
// 读事件就绪
if(revents & EPOLLIN)
{
if(sock == _listensock) Accepter(_listensock); // 1. listensock 就绪
else Recver(sock); // 2. 一般sock 就绪 - read
}
if(revents & EPOLLOUT)
{
//TODO?
}
}
}
void LoopOnce(int timeout)
{
int n = Epoll::WaitEpoll(_epfd, _revs, _revs_num, timeout);
//if(n == _revs_num) //扩容
switch (n)
{
case 0:
logMessage(DEBUG, "timeout..."); // 3, 4
break;
case -1:
logMessage(WARNING, "epoll wait error: %s", strerror(errno));
break;
default:
// 等待成功
logMessage(DEBUG, "get a event");
HandlerEvents(n);
break;
}
}
void Start()
{
int timeout = -1;
while(true)
{
LoopOnce(timeout);
}
}
~EpollServer()
{
if (_listensock >= 0)
close(_listensock);
if (_epfd >= 0)
close(_epfd);
if (_revs)
delete[] _revs;
}
private:
int _listensock;
int _epfd;
uint16_t _port;
struct epoll_event *_revs;
int _revs_num;
func_t _HandlerRequest;
};
} // namespace name
#endif
main.cc
#include "EpollServer.hpp"
#include <memory>
using namespace std;
using namespace ns_epoll;
void change(std::string request)
{
//完成业务逻辑
std::cout << "change : " << request.c_str() << std::endl;
}
int main()
{
unique_ptr<EpollServer> epoll_server(new EpollServer(change));
epoll_server->Start();
return 0;
}
epoll的工作模式
- LT模式: 有数据就会一直通知
- ET模式: 有数据就只会通知一次
为什么ET模式更高效:
- 更少的返回次数
- ET模式会倒逼程序员尽快将接收缓冲区中的数据全部取走,
应用层尽快的取走了缓冲区的数据,那么在单位时间下,该模式下工作的服务器,
就可以给发送方一个更大的接收窗口,所以对方就可以有更大的滑动窗口,一次向我们发送更多的数据,提高IO吞吐
程序员要一次把数据全部拿走,就必须一直循环读取,在最后一次正常读取完毕,我们势必还要进行下一次读取(无法确定是否读取完成),则必须把sock设置成非阻塞
而为了保证正确读取,则每一个sock都需要有属于自己的缓冲区
相关代码:Reactor