文章目录
- select
- timeval结构体
- fd_set
- 优缺点分析
- 完整代码
本节要介绍的主题是多路转接式IO
select
先说结论,这个select是做什么的呢?
select是负责在Linux系统中,让一个人可以有多个鱼竿,可以不停的进行轮询,只要有一个准备好了就可以进行等待,先看一下它的函数参数:
这个函数参数还是有点复杂的,下面对于这些函数的参数进行解析:
首先是第一个参数nfds,这个参数的值是最大的文件描述符的值加一,比如现在有1234,对于这四个文件描述符来说,要填写的第一个参数的值就是5
下面看一下返回值:
简单来说,对于返回值n来说,如果n是大于0的,表示的是有n个fd已经就绪了,如果n是等于0的,表示的是超时,虽然没有错误,但是也没有资源就绪,如果n是小于0的,表示的是出错了,比如可能文件描述符被关了等等
timeval结构体
下面的参数是这个timeval结构体:
对于这个结构体来说,首先有两个成员,一个代表的是秒,一个代表的是微妙,这个参数的主要目的是给select设置一个等待的方式,比如可以进行一些合适的设置,使得这个select可以在规律的周期性醒来,如果要是把这个参数设置为0,表示的就是立马返回,其实就是一个非阻塞,不过一般也不这么设置,不过是可以这样设置的
同时需要注意的是,对于select当中,这个参数是一个输入输出参数,它不仅是输入,而且还会输出,输出的信息是剩余的时间,比如输入的是五秒钟,但是经过2秒钟资源就已经就绪了,那么就会返回3秒钟,表示还剩下3秒钟
fd_set
下面要进入的是select当中最重要的一个模块,fd_set类型的参数,这个参数是一个内核的数据类型,其实就是所谓的位图,这个参数主要是设置要监听什么事件,正常来说我们比较关心的是这个文件描述符的读写事件
比如现在要设置文件描述符是012的这三个内容,我们要关心它的写事件,那么就可以把位图的信息从0000 0000设置为0000 0111,而其中比特位的位置,表示的是文件描述符的编号,而其中的比特位的内容,表示的是这个东西内核是否需要关心
这个参数也是一个输入输出型的参数,在进行输入的时候,用户告诉内核,我要关心的是一个或者多个fd,你来帮我进行检测一下上面的读时间,如果要是检测到了,你要告诉我,而进行输出的时候,是内核告诉用户,你让我关心的这些事件当中,已经有xxx已经就绪了,你来进行读取吧,这就是这个位图可以带给用户的信息
说白了,这个位图的意义就是来让用户和内核进行交互,来查看fd是否已经就绪的信息的,这就意味着在进行select的操作当中,是有很多的位图操作的
那么下面,就用代码来对于这些内容进行验证:
// socket.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "Log.hpp"
enum
{
SocketErr = 2,
BindErr,
ListenErr,
};
// TODO
const int backlog = 10;
class Sock
{
public:
Sock()
{
}
~Sock()
{
}
public:
void Socket()
{
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd_ < 0)
{
lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
exit(SocketErr);
}
int opt = 1;
setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)
{
lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
exit(BindErr);
}
}
void Listen()
{
if (listen(sockfd_, backlog) < 0)
{
lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
exit(ListenErr);
}
}
int Accept(std::string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);
if(newfd < 0)
{
lg(Warning, "accept error, %s: %d", strerror(errno), errno);
return -1;
}
char ipstr[64];
inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));
*clientip = ipstr;
*clientport = ntohs(peer.sin_port);
return newfd;
}
bool Connect(const std::string &ip, const uint16_t &port)
{
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
peer.sin_family = AF_INET;
peer.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));
int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));
if(n == -1)
{
std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;
return false;
}
return true;
}
void Close()
{
close(sockfd_);
}
int Fd()
{
return sockfd_;
}
private:
int sockfd_;
};
// selectserver.hpp
#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"
using namespace std;
const uint16_t defaultport = 8888;
class selectserver
{
public:
selectserver(uint16_t port = defaultport) : _port(port)
{
}
~selectserver()
{
}
bool Init()
{
_listensock.Socket();
_listensock.Bind(_port);
_listensock.Listen();
return true;
}
void Start()
{
int listensock = _listensock.Fd();
for (;;)
{
fd_set rfds;
FD_ZERO(&rfds);
// 设置监听
FD_SET(listensock, &rfds);
struct timeval timeout = {1, 0};
int n = select(5, &rfds, nullptr, nullptr, /*&timeout*/nullptr);
switch (n)
{
case 0:
cout << "timeout : " << timeout.tv_sec << "." << timeout.tv_usec << endl;
break;
case -1:
cerr << "select error" << endl;
break;
default:
cout << "get a new link" << endl;
// 对select进行处理
break;
}
}
}
private:
Sock _listensock;
uint16_t _port;
};
对上述代码进行运行,进行链接后会发现,确实可以监听到效果
但是会非常快的打满整个屏幕,这告诉我们下面的结论
- 如果事件就绪了,但是上层不处理,select会一直通知用户
- select告诉就绪了,那么在接下来的一次读取的时候不会阻塞,因为事件已经就绪了
现在的这份代码注定是不完全的,起码对于建立的链接没有进行处理,所以下一步对于这样的链接要进行后续的处理,那现在的问题是,在进行处理的时候该如何进行处理?
由上面的结论可以看出,的确在select就绪的时候,说明下一次的读取是不会进行阻塞,可以直接进行读取的,因此在建立链接这件事上,是可以直接accept的,但是accept之后的内容呢?比如accept之后要进行接受数据,可以直接read吗?答案是不可以的,因为在建立链接之后用户未必会给你发消息,所以此时作为服务端要做的是要继续进行下一轮等待,再次进行read等待
所以等待也是要进行区分的,等的是accept还是read?所以在进行处理等到了的函数中,必然要对于等待的内容进行区分,如果等待的是accept,那么就建立链接,然后去等read,如果等待的是read,那么就可以直接去调用read了,所以下面继续对于这部分内容进行完善,我们要添加一个数组用来描述的建立的一个一个的文件描述符,位图的大小*8即可
void Dispatcher(fd_set &rfds)
{
// 对于等待的信息进行循环等待
for(int i = 0; i < fd_num_max; i++)
{
int fd = fd_array[i];
// 如果这个fd没被使用过,就跳过它
if(fd == defaultfd)
continue;
if(FD_ISSET(fd, &rfds))
{
// 如果是建立链接的select
if(fd == _listensock.Fd())
Accepter();
// 如果是等待读取信息的select
else
Recver(fd, i);
}
}
}
如上所示的是一个基本的逻辑,对于要建立链接的select,就让他去建立链接,如果是要建立读取的select,就让他去执行读取的逻辑
那我们先处理建立链接的select:
void Accepter()
{
// 接收客户端的ip和端口号
string clientip;
uint16_t clientport = 0;
int sock = _listensock.Accept(&clientip, &clientport);
if(sock < 0)
return;
lg(Info, "accept new link, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);
// 对于建立好的链接要去让它们进行等待select
int pos = 1;
// 建立链接要进行判断select还有没有空余位置,如果select都满了,那对于建立新的链接就无能为力了
for(; pos < fd_num_max; pos++)
{
if(fd_array[pos] != defaultfd)
continue;
else
break;
}
// 如果当前select已经满了,说明已经不能再建立新的链接了
if(pos == fd_num_max)
{
lg(Warning, "server is full, close %d", sock);
close(sock);
}
// 如果当前select没有满,那么就说明此时可以去进行等待了
else
{
fd_array[pos] = sock;
}
}
那如果当前识别到时要进行读取的select,说明接下来就可以直接进行读取了,不会进行阻塞了,下层已经把数据送上来了:
void Recver(int fd, int pos)
{
char buffer[1024];
// 此时可以直接进行读取,不会阻塞,因为已经是就绪了才会加到select当中的
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
cout << "get message " << buffer << endl;
}
else if (n == 0)
{
// 如果是0,就说明客户端已经退出了,那么服务端也就不用维护这段链接了
lg(Info, "client quit, server quit, close fd is %d", fd);
close(fd);
// 将对应的信息重新设置为-1,表示的是这个位置可以接收新的select了
fd_array[pos] = defaultfd;
}
else
{
// 如果是这样,就是接收失败了,这里也把这个链接直接关掉就可以了
lg(Warning, "read error, close fd is %d", fd);
close(fd);
// 将对应的信息重新设置为-1,表示的是这个位置可以接收新的select了
fd_array[pos] = defaultfd;
}
}
测试一下上面的代码
这样我们就完成了一个基本的select的多路转接
优缺点分析
优点
select有什么优点和缺点呢?对于select服务器来说,它的优点是比较明显的,因为它已经实现了一种多路转接的方案,在用单进程的方式实现了处理多个用户的请求,只要有内容就绪,那么就可以设置为就绪,用了一个辅助数组来标记到底有哪些数据已经就绪了
缺点
select的缺点也比较明显
- 等待的fd是有上限的,在我们当前这个版本来说,它能等待的最大值是1024,也就是说超过来了这个1024我们的处理方式是直接把链接的这个socket丢弃
- 输入输出型参数比较多,数据拷贝的频率比较高
- 输入输出型参数比较多,每次都要对关心的fd进行事件重置
- 在用户层来说,在使用第三方数组进行管理fd的时候,要进行很多次的遍历,在内核中检测fd的事件就绪的时候,也要进行遍历
完整代码
#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"
using namespace std;
const uint16_t defaultport = 8888;
const int fd_num_max = sizeof(fd_set) * 8;
int defaultfd = -1;
class selectserver
{
public:
selectserver(uint16_t port = defaultport) : _port(port)
{
for (int i = 0; i < fd_num_max; i++)
{
fd_array[i] = defaultfd;
}
}
~selectserver()
{
}
bool Init()
{
_listensock.Socket();
_listensock.Bind(_port);
_listensock.Listen();
return true;
}
void Accepter()
{
// 接收客户端的ip和端口号
string clientip;
uint16_t clientport = 0;
int sock = _listensock.Accept(&clientip, &clientport);
if (sock < 0)
return;
lg(Info, "accept new link, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);
// 对于建立好的链接要去让它们进行等待select
int pos = 1;
// 建立链接要进行判断select还有没有空余位置,如果select都满了,那对于建立新的链接就无能为力了
for (; pos < fd_num_max; pos++)
{
if (fd_array[pos] != defaultfd)
continue;
else
break;
}
// 如果当前select已经满了,说明已经不能再建立新的链接了
if (pos == fd_num_max)
{
lg(Warning, "server is full, close %d", sock);
close(sock);
}
// 如果当前select没有满,那么就说明此时可以去进行等待了
else
{
fd_array[pos] = sock;
}
}
void Recver(int fd, int pos)
{
char buffer[1024];
// 此时可以直接进行读取,不会阻塞,因为已经是就绪了才会加到select当中的
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
cout << "get message " << buffer << endl;
}
else if (n == 0)
{
// 如果是0,就说明客户端已经退出了,那么服务端也就不用维护这段链接了
lg(Info, "client quit, server quit, close fd is %d", fd);
close(fd);
// 将对应的信息重新设置为-1,表示的是这个位置可以接收新的select了
fd_array[pos] = defaultfd;
}
else
{
// 如果是这样,就是接收失败了,这里也把这个链接直接关掉就可以了
lg(Warning, "read error, close fd is %d", fd);
close(fd);
// 将对应的信息重新设置为-1,表示的是这个位置可以接收新的select了
fd_array[pos] = defaultfd;
}
}
void Dispatcher(fd_set &rfds)
{
// 对于等待的信息进行循环等待
for (int i = 0; i < fd_num_max; i++)
{
int fd = fd_array[i];
// 如果这个fd没被使用过,就跳过它
if (fd == defaultfd)
continue;
if (FD_ISSET(fd, &rfds))
{
// 如果是建立链接的select
if (fd == _listensock.Fd())
Accepter();
// 如果是等待读取信息的select
else
Recver(fd, i);
}
}
}
void Start()
{
int listensock = _listensock.Fd();
fd_array[0] = listensock;
for (;;)
{
fd_set rfds;
FD_ZERO(&rfds);
// 设置监听
int maxfd = fd_array[0];
// 循环判断有哪些需要被监听
for (int i = 0; i < fd_num_max; i++)
{
if (fd_array[i] == defaultfd)
continue;
FD_SET(fd_array[i], &rfds);
if (maxfd < fd_array[i])
{
maxfd = fd_array[i];
lg(Info, "max fd update, max fd is: %d", maxfd);
}
}
struct timeval timeout = {0, 0};
int n = select(5, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);
switch (n)
{
case 0:
cout << "timeout : " << timeout.tv_sec << "." << timeout.tv_usec << endl;
break;
case -1:
cerr << "select error" << endl;
break;
default:
cout << "get a new link" << endl;
// 对select进行处理
Dispatcher(rfds);
break;
}
}
}
private:
Sock _listensock;
uint16_t _port;
int fd_array[fd_num_max];
};