多线程select并发
只需要在上面代码的基础上对服务器端做些更改, 主要逻辑如下:
主线程在检测到有新的客户端连接之后, 创建一个子线程完成accept
操作, 具体如下:
if(FD_ISSET(lfd, &rdtemp)){
auto* info = new fdInfo;
info->fd = lfd;
info->maxfd = &maxfd;
info->rdset = &rdset;
// 创建子线程
pthread_t tid;
pthread_create(&tid, nullptr, acceptConn, info);
pthread_join(tid, nullptr);
}
acceptConn
就是任务函数,info
是一个自定义的结构体
还有就是在检测到有通信描述符在rdsert集合中被返回时, 在创建一个子线程, 完成通信操作
for (int i = 0; i <= maxfd; ++i) {
// 判断从监听的文件描述符之后到maxfd这个范围内的文件描述符是否读缓冲区有数据
if(i != lfd && FD_ISSET(i, &rdtemp)){
// 创建子线程
auto* info = new fdInfo;
info->fd = i;
info->rdset = &rdset;
// 创建子线程
pthread_t tid;
pthread_create(&tid, nullptr, conmmunication, info);
pthread_join(tid, nullptr);
}
}
整体代码如下
//
// Created by 47468 on 2024/1/24.
//
#include "arpa/inet.h"
#include <cstdio>
#include "unistd.h"
#include "iostream"
#include "string"
#include "cctype"
using namespace std;
#include "pthread.h"
struct fdInfo{
int fd;
int* maxfd;
fd_set* rdset;
};
pthread_mutex_t mutex;
void* acceptConn(void* arg){
// cout << "线程id: " << pthread_self() << endl;
auto* info = (fdInfo*)arg;
sockaddr_in cliaddr{};
int len = sizeof(cliaddr);
int cfd = accept(info->fd, (struct sockaddr *) &cliaddr, (socklen_t *) (&len));
char ip[32];
cout << "有客户端成功连接, 客户端ip: "
<< inet_ntop(AF_INET, &cliaddr.sin_addr.s_addr, ip, sizeof(ip))
<< ", port: "
<< ntohs(cliaddr.sin_port)
<< endl;
// 得到了有效的文件描述符
// 通信的文件描述符添加到读集合
// 在下一轮select检测的时候, 就能得到缓冲区的状态
pthread_mutex_lock(&mutex);
FD_SET(cfd, info->rdset);
// 更新maxfd
*info->maxfd = max(*info->maxfd, cfd);
pthread_mutex_unlock(&mutex);
// 释放资源
delete info;
cout << "accept done" << endl;
return nullptr;
}
void* conmmunication(void* arg){
// cout << "线程id: " << pthread_self() << endl;
auto* info = static_cast<fdInfo*>(arg);
// 接收数据
char buf[10] = {0};
// 一次只能接收10个字节, 客户端一次发送100个字节
// 一次是接收不完的, 文件描述符对应的读缓冲区中还有数据
// 下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次
// 循环会一直持续, 直到缓冲区数据被读完位置
ssize_t len = read(info->fd, buf, sizeof(buf));
if(len == 0){
cout << "客户端断开了连接..." << endl;
pthread_mutex_lock(&mutex);
FD_CLR(info->fd, info->rdset);
pthread_mutex_unlock(&mutex);
close(info->fd);
delete info;
return nullptr;
}
else if(len > 0){
buf[len] = '\0';
// 收到了数据
cout << "客户端: " << buf << endl;
// 数据处理
for (int j = 0; j < len; ++j) {
buf[j] = toupper(buf[j]);
}
// 发送回去
write(info->fd, buf, len);
} else{
// 异常
perror("read");
}
delete info;
return nullptr;
}
int main(){
pthread_mutex_init(&mutex, nullptr);
// 1. 创建监听的套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
// 2. 绑定
sockaddr_in saddr{};
saddr.sin_family = AF_INET;
saddr.sin_port = htons(9999);
saddr.sin_addr.s_addr = INADDR_ANY;
int res = bind(lfd, (struct sockaddr *) &saddr, sizeof(saddr));
if(res == -1){
perror("bind");
close(lfd);
return -1;
}
// 3.设置监听
res = listen(lfd, 128);
if(res == -1){
perror("listen");
close(lfd);
return -1;
}
// 将监听的fd的状态检测委托给内核检测
int maxfd = lfd;
// 初始化检测的读集合
fd_set rdset;
fd_set rdtemp;
// 初始化
FD_ZERO(&rdset);
// 将监听的lfd设置到检测的读集合中
FD_SET(lfd, &rdset);
// 通过select委托内核检测读集合中的文件描述符状态, 检测read缓冲区有没有数据
// 如果有数据, select解除阻塞返回
// 应该让内核持续检测
while (true){
// 默认阻塞
// rdset 中是委托内核检测的所有的文件描述符
pthread_mutex_lock(&mutex);
rdtemp = rdset;
pthread_mutex_unlock(&mutex);
// timeval* time;
// time->tv_sec = 0;
// time->tv_usec = 0;
select(maxfd + 1, &rdtemp, nullptr, nullptr, nullptr);
// rdset中的数据被内核改写了,
// 只保留了发生变化的文件描述的标志位上的1,
// 没变化的改为0
// 只要rdset中的fd对应的标志位为1 -> 缓冲区有数据了
// 判断
// 有没有新连接
if(FD_ISSET(lfd, &rdtemp)){
auto* info = new fdInfo;
info->fd = lfd;
info->maxfd = &maxfd;
info->rdset = &rdset;
// 创建子线程
pthread_t tid;
pthread_create(&tid, nullptr, acceptConn, info);
pthread_join(tid, nullptr);
}
// 再检测有没有通信的文件描述符
for (int i = 0; i <= maxfd; ++i) {
// 判断从监听的文件描述符之后到maxfd这个范围内的文件描述符是否读缓冲区有数据
if(i != lfd && FD_ISSET(i, &rdtemp)){
// 创建子线程
auto* info = new fdInfo;
info->fd = i;
info->rdset = &rdset;
// 创建子线程
pthread_t tid;
pthread_create(&tid, nullptr, conmmunication, info);
pthread_join(tid, nullptr);
}
}
}
pthread_mutex_destroy(&mutex);
return 0;
}
测试如图: