前言:本节讲述linux网络下的tcp协议套接字相关内容。博主以实现tcp服务为主线,穿插一些小知识点。以先粗略实现,后精雕细琢为思路讲述实现服务的过程。下面开始我们的学习吧。
ps:本节内容建议了解网络端口号的友友们观看哦。
目录
实现内容
线程池版本整体代码
准备文件
makefile
tcpserver.hpp
main.cc
tcpclient
version1运行结果
version2版本
version3版本
version4版本
实现内容
本篇内容将要实现一个服务端, 一个客户端。 然后客户端用来链接服务端, 向服务端发送消息, 然后服务端能够接收到消息并将消息返回给客户端。
实现的版本有四个:
- version1:实现单执行流的客户服务echo服务, 就是服务端只为一个服务端进行服务。
- version2:在version1的版本上, 添加进程, 实现多进程的客户服务echo服务, 就是服务端为多个客户端进行服务, 但是因为是多进程,所以开销大。
- version3:改进version2版本, 将多进程改成多线程。实现多线程的echo服务。 但是当用户很多的时候, 线程量太大, 无法控制。
- version4:终极版本, 改进version3, 以线程池为基础, 实现可控的多线程echo服务。 控制线程个数, 既保证了并发性, 又防止了用户太多,线程爆满的问题。
博主先实现version1, 然后在version1的基础上进行改版。下面开始实现:
线程池版本整体代码
tcpserver
#pragma once
#include "Log.hpp"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "ThreadPool.h"
#include "Task.h"
#include <sys/wait.h>
#include <unistd.h>
using namespace std;
const int defaultfd = -1;
const int defaultport = 8080;
const string defaultip = "0.0.0.0";
const int backlog = 10; // 直接用,一般不要设置的太大。
class TcpServer; //声明
class ThreadData
{
public:
ThreadData(int fd, string ip, uint16_t port, TcpServer* const t)
: sockfd_(fd), clientip_(ip), clientport_(port), t_(t)
{
}
public:
int sockfd_;
string clientip_;
uint16_t clientport_;
public:
TcpServer* const t_;
};
Log lg;
enum
{
SockError = 2,
BindError,
ListenError
};
class TcpServer
{
public:
TcpServer(int port = defaultport, string ip = defaultip, int sockfd = defaultfd)
: listensockfd_(sockfd), ip_(ip), port_(port)
{
}
void InitServer()
{
listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (listensockfd_ < 0)
{
lg(Fatal, "create socket, errno: %d, strerror: %s", errno, strerror(errno));
exit(SockError);
}
//
lg(Info, "create socket success, sockfd: %d", listensockfd_);
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port_);
inet_aton(ip_.c_str(), &(local.sin_addr)); // 主机序列转网络学列。 inet_aton是一个线程安全的函数。
// 绑定
if (bind(listensockfd_, (sockaddr *)&local, sizeof(local)) < 0)
{
lg(Fatal, "bind error, errno: %d, strerror: %s", errno, strerror(errno));
exit(BindError);
}
// tcp面向字节流, 是被动的, 所以要将对应的socket设置为监听状态。
if (listen(listensockfd_, backlog) < 0) // backlock表示的是底层全连接队列的长度。 这个参数对意思, 不做解释。
{
lg(Fatal, "Listen error, errno: %d, strerror: %s", errno, strerror(errno));
exit(ListenError);
}
lg(Info, "Listen has success");
}
void Start()
{
ThreadPool<Task>::GetInstance()->Start();
lg(Info, "tcpServer is running...");
for (;;) // tcp协议也是一种一直处于运行的服务
{
// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
// 1、获取新连接,
struct sockaddr_in client; // 获取的是客户端的addr
socklen_t len = sizeof(client);
int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
if (sockfd < 0) // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
{
lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
continue;
}
uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
char clientip[32];
inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
// 2、根据新连接进行通信
lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);
//version--4线程池版本
Task task_(sockfd, clientip, clientport);
ThreadPool<Task>::GetInstance()->Push(task_);
}
}
~TcpServer()
{
}
private:
int listensockfd_; // 监听套接字, 只用来升起服务器, 接收链接
uint16_t port_;
string ip_;
};
main.cc
#include"tcpserver.hpp"
#include<memory>
int main(int argc, char* argv[])
{
if (argc != 2)
{
cout << "has return" << endl;
return 1;
}
//
uint16_t port = stoi(argv[1]);
unique_ptr<TcpServer> tcpsvr(new TcpServer(port));
tcpsvr->InitServer();
tcpsvr->Start();
return 0;
}
tcpclient
#include<iostream>
using namespace std;
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<string.h>
#include<netinet/in.h>
int main(int argc, char* argv[])
{
//处理argc, argv[]
if (argc != 3)
{
cout << "has return " << endl;
return 1;
}
//
uint16_t serverport = stoi(argv[2]);
string serverip = argv[1];
//创建addr结构体, 设置端口号ip地址
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
cout << "socket error" << endl;
return 1;
}
sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));
//1、客户端要绑定端口号, 但是不需要显示的绑定, 而是系统进行随机端口的绑定。
int n = connect(sockfd,(sockaddr*)&server, sizeof(server));
if (n < 0)
{
cerr << "connect error..." << endl;
return 2;
}
//2、发送信息, 接收信息。
string message;
while (true)
{
cout << "Please Enter# ";
getline(cin, message);
//
write(sockfd, message.c_str(), message.size());
char inbuffer[4096];
int n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);
if (n > 0)
{
inbuffer[n] = 0;
cout << inbuffer << endl;
}
}
return 0;
}
ThreadPool
#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>
//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{
pthread_t tid_;
string name_;
};
template<class T>
class ThreadPool
{
static const int defaultnum = 5; //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)
private:
//加锁解锁
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
//唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
public:
//线程要执行的函数
static void* HandlerTask(void* args)
{
ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
while(true)
{
tp->Lock();
while (tp->tasks_.empty())
{
tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。
}
//否则就去拿到tasks里面的任务
T t = tp->tasks_.front();
tp->tasks_.pop();
//
tp->Unlock();
t(); //每一个线程先对任务进行消费, 消费完成之后处理任务。
}
}
//运行这个线程池, 也就是先将线程创建出来。 然后去运行线程
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name_ = "thread-";
threads_[i].name_ += to_string(i);
pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
}
}
//主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理
void Push(const T& t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
//获取单例
//改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么
//要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。
//解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够
//进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连
//第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!
static ThreadPool<T>* GetInstance(int num = defaultnum)
{
if (tp_ == nullptr)
{
pthread_mutex_lock(&tp_->lock_);
if (tp_ == nullptr)
{
tp_ = new ThreadPool<T>(num);
}
pthread_mutex_unlock(&tp_->lock_);
}
return tp_;
}
private:
//构造函数私有化, 只有Getinstance里面才能创建。
ThreadPool(int num = defaultnum)
:threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
//单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。
ThreadPool(const ThreadPool<T>& tp) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
vector<ThreadInfo> threads_; //线程都维护在vector当中, 这个就是线程池里面的线程的个数,
queue<T> tasks_ ; //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。
pthread_mutex_t mutex_; //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用
pthread_cond_t cond_; //条件变量, 用来没有任务的时候,消费者要挂起。
static pthread_mutex_t lock_; //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。
static ThreadPool<T>* tp_; //tp指针, 这就是唯一个单例对象。
};
template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
Task
#include <iostream>
using namespace std;
#include"Log.hpp"
#include <vector>
#include <string>
extern Log lg;
// Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:
// 构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算
Task(int sockfd, string clientip, uint16_t clientport)
:sockfd_(sockfd), clientip_(clientip), clientport_(clientport)
{
}
~Task() {}
// 执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。
void run()
{
char buffer[4096];
string temp = clientip_;
while (true)
{
ssize_t n = read(sockfd_, buffer, sizeof(buffer));
if (n > 0)
{
buffer[n] = 0;
cout << "client say#: " << buffer << endl;
string echo_string("tcpserver echo# " + (string)buffer);
write(sockfd_, echo_string.c_str(), echo_string.size());
}
else if (n == 0)
{
lg(Info, "quit sockfd:%d ", sockfd_);
exit(1);
}
else
{
lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd_, clientport_, temp.c_str());
}
//
}
}
// 仿函数, 为了方便我们的对象能够像函数一样使用。
void operator()()
{
run();
}
private:
// 每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_
int sockfd_;
string clientip_;
uint16_t clientport_;
};
准备文件
先准备好文件。 tcpserver.hpp实现服务的接口, main.cc运行服务端。 然后tcpclient.cc运行客户端。
makefile
makefile不解释, 直接上代码(这里加上-g是为了后续方便调试, 也可以不加)
.PHONY:all
all: tcpserver.exe tcpclient.exe
tcpserver.exe:main.cc
g++ -o $@ $^ -std=c++11 -lpthread -g
tcpclient.exe:tcpclient.cc
g++ -o $@ $^ -std=c++11 -lpthread -g
.PHONY:clean
clean:
rm -rf tcpserver.exe tcpclient.exe
tcpserver.hpp
先来看框架
#pragma once
#include "Log.hpp"
#include <iostream>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include"ThreadPool.h"
#include"Task.h"
#include <sys/wait.h>
#include <unistd.h>
using namespace std;
const int defaultfd = -1;
const int defaultport = 8080;
const string defaultip = "0.0.0.0";
const int backlog = 10; // 直接用,一般不要设置的太大。
Log lg;
enum
{
SockError = 2,
BindError,
ListenError
};
class TcpServer
{
public:
//构造函数, 将服务端的端口号, ip地址传过来
TcpServer(int port = defaultport, string ip = defaultip, int sockfd = defaultfd)
: listensockfd_(sockfd), ip_(ip), port_(port)
{}
//初始化服务端,分两步:绑定和监听
void InitServer()
{
//绑定
//监听
}
//运行服务端
void Start()
{
}
//执行相应的服务
void Service(int sockfd, const string &clientip, uint16_t clientport)
{
}
//析构函数
~TcpServer()
{}
private:
int listensockfd_; // 监听套接字, 只用来升起服务器, 接收链接
uint16_t port_;
string ip_;
};
然后初始化就是创建sockaddr结构体, 创建套接字, 然后绑定。 因为tcp是面向字节流的。 所以还要对网卡进行监听。下面是初始化服务。
//对服务端进行初始化
void InitServer()
{
listensockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (listensockfd_ < 0)
{
lg(Fatal, "create socket, errno: %d, strerror: %s", errno, strerror(errno));
exit(SockError);
}
//
lg(Info, "create socket success, sockfd: %d", listensockfd_);
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port_);
inet_aton(ip_.c_str(), &(local.sin_addr)); // 主机序列转网络学列。 inet_aton是一个线程安全的函数。
// 绑定
if (bind(listensockfd_, (sockaddr *)&local, sizeof(local)) < 0)
{
lg(Fatal, "bind error, errno: %d, strerror: %s", errno, strerror(errno));
exit(BindError);
}
// tcp面向字节流, 是被动的, 所以要将对应的socket设置为监听状态。
if (listen(listensockfd_, backlog) < 0) // backlock表示的是底层全连接队列的长度。 这个参数对意思, 不做解释。
{
lg(Fatal, "Listen error, errno: %d, strerror: %s", errno, strerror(errno));
exit(ListenError);
}
lg(Info, "Listen has success");
}
然后是运行服务, 运行服务也是分两步: accept与客户端建立连接。 然后执行服务。
void Start()
{
lg(Info, "tcpServer is running...");
for (;;) // tcp协议也是一种一直处于运行的服务
{
// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
// 1、获取新连接,
struct sockaddr_in client; // 获取的是客户端的addr
socklen_t len = sizeof(client);
int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
if (sockfd < 0) // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
{
lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
continue;
}
uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
char clientip[32];
inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
// 2、根据新连接进行通信
lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);
//version--1 单进程版
Service(sockfd, clientip, clientport);
close(sockfd);
}
}
然后执行的服务是echo服务, 就是先接受客户端发来的信息, 然后将信息加工一下发回去。
void Service(int sockfd, const string &clientip, uint16_t clientport)
{
char buffer[4096];
while (true)
{
ssize_t n = read(sockfd, buffer, sizeof(buffer) - 1);
if (n > 0)
{
buffer[n] = 0;
cout << "client say#: " << buffer << endl;
string message("tcpserver echo# " + string(buffer));
//
write(sockfd, message.c_str(), message.size());
}
else if (n == 0)
{
lg(Info, "quit sockfd:%d ", sockfd);
exit(1);
}
else
{
lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip.c_str());
}
}
}
上面就是整个的代码。
我们这里说一下accept这个代码
accept代码的第一个参数是sockfd, 就是网卡的文件fd。 然后第二个参数和第三个参数都是输出型参数。 能够将对方也就是客户端的sockaddr带出来。
main.cc
主函数就是接收到传来的端口号, 创建服务端然后初始化并运行起来。
#include"tcpserver.hpp"
#include<memory>
int main(int argc, char* argv[])
{
if (argc != 2)
{
cout << "has return" << endl;
return 1;
}
//
uint16_t port = stoi(argv[1]);
unique_ptr<TcpServer> tcpsvr(new TcpServer(port));
tcpsvr->InitServer();
tcpsvr->Start();
return 0;
}
tcpclient
先看客户端的框架, 就是先链接服务端。 然后就给服务端发信息,接收信息。 (接收到的这个信息是被服务端处理过的)
#include<iostream>
using namespace std;
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<string.h>
#include<netinet/in.h>
int main(int argc, char* argv[])
{
//处理argc, argv[]
//先连接
//再发数据接受数据
return 0;
}
处理argc的时候, 因为我们的参数一定是三个。即, 一个程序名一个端口号, 一个IP地址。 所以如果argc不是等于3的话直接返回。 为3的话就将端口号以及ip地址保存一下。 其中ip地址是要连接到的服务端的ip地址, 端口号是要连接到服务端的端口号。
//处理argc, argv[]
if (argc != 3)
{
cout << "has return " << endl;
return 1;
}
//
uint16_t serverport = stoi(argv[2]);
string serverip = argv[1];
然后创建addr结构,连接到服务端。
//创建addr结构体, 设置端口号ip地址
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
cout << "socket error" << endl;
return 1;
}
sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));
//1、客户端要绑定端口号, 但是不需要显示的绑定, 而是系统进行随机端口的绑定。
int n = connect(sockfd,(sockaddr*)&server, sizeof(server));
if (n < 0)
{
cerr << "connect error..." << endl;
return 2;
}
最后就是收发消息, 这里创建循环, 让我们可以执行多次服务, 多次收发消息。
//2、发送信息, 接收信息。
string message;
while (true)
{
cout << "Please Enter# ";
getline(cin, message);
//
write(sockfd, message.c_str(), message.size());
char inbuffer[4096];
int n = read(sockfd, inbuffer, sizeof(inbuffer) - 1);
if (n > 0)
{
inbuffer[n] = 0;
cout << inbuffer << endl;
}
}
然后我们就能够运行了。下面是运行结果。
version1运行结果
可以看到已经可以收发消息。
version2版本
version2版本是创建多进程。 但是, 要知道, 我们的父进程创建子进程后要进行等待。 否则会造成内存泄漏。 但是我们父进程等待, 父进程又不能向下运行代码了, 就不能继续创建子进程了。 所以, 为了解决这个问题。 我们就可以创建一个孤儿进程。 让子进程创建好孙子进程后直接退出, 将孙子进程托孤。 父进程等待子进程后就继续向下执行。 这样就能创建一批孙子进程并发访问!!!
void Start()
{
lg(Info, "tcpServer is running...");
for (;;) // tcp协议也是一种一直处于运行的服务
{
// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
// 1、获取新连接,
struct sockaddr_in client; // 获取的是客户端的addr
socklen_t len = sizeof(client);
int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
if (sockfd < 0) // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
{
lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
continue;
}
uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
char clientip[32];
inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
// 2、根据新连接进行通信
lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);
// // version--1 单进程版
// Service(sockfd, clientip, clientport);
// close(sockfd);
//version--2 多进程版
pid_t id = fork();
if (id == 0)
{
close(listensockfd_);
if (fork() > 0)
{
exit(0);
}
Service(sockfd, clientip, clientport);
close(sockfd);
exit(0);
//child
}
close(sockfd);
pid_t rid = waitpid(id, nullptr, 0);
}
}
version3版本
第三版本是多线程版本, 多线程同样有主线程等待的问题,我们的主线程一旦要等待子线程, 那么就不能向后执行了。 所以为了能够并发, 就要对子线程进行分离。
下面是代码的改动:
第一个改动地方就是创建一个ThreadData类:
class TcpServer; //声明
//创建这个类是为了能够将服务端对象传给线程去执行
class ThreadData
{
public:
ThreadData(int fd, string ip, uint16_t port, TcpServer* const t)
: sockfd_(fd), clientip_(ip), clientport_(port), t_(t)
{
}
public:
int sockfd_;
string clientip_;
uint16_t clientport_;
public:
TcpServer* const t_;
};
然后第二个改动的地方就是线程要执行的动作:
static void *pthrun(void *args)
{
pthread_detach(pthread_self()); // 子线程直接分离
// 一个进程打开的所有的文件描述符表, 其他进程能看到呢?
ThreadData* td = static_cast<ThreadData*>(args);
td->t_->Service(td->sockfd_, td->clientip_, td->clientport_);
}
第三个改动就是start函数里面执行服务的代码部分:
void Start()
{
lg(Info, "tcpServer is running...");
for (;;) // tcp协议也是一种一直处于运行的服务
{
// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
// 1、获取新连接,
struct sockaddr_in client; // 获取的是客户端的addr
socklen_t len = sizeof(client);
int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
if (sockfd < 0) // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
{
lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
continue;
}
uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
char clientip[32];
inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
// 2、根据新连接进行通信
lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);
// // version--1 单进程版
// Service(sockfd, clientip, clientport);
// close(sockfd);
//version--2 多进程版
// pid_t id = fork();
// if (id == 0)
// {
// close(listensockfd_);
// if (fork() > 0)
// {
// exit(0);
// }
// Service(sockfd, clientip, clientport);
// close(sockfd);
// exit(0);
// //child
// }
// close(sockfd);
// pid_t rid = waitpid(id, nullptr, 0);
// version--3多线程版本
ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);
pthread_t tid;
pthread_create(&tid, nullptr, pthrun, td);
}
}
运行结果
一开始一个server服务, 然后我们添加一个client就增加一个server服务, 增加一个client就增加一个服务。
version4版本
version4线程池版本, version4的线程池版本就是我们创建好线程池。 然后创建一个任务类, 将我们要执行的服务作为任务类的一个结构。 这就要求我们的这个任务类里面必须有我们的ip, port, sockfd这样的。 字段。 并且我们还要引入几个头文件, 下面为代码:
首先要有两个新文件, 一个包含task类, 一个包含线程池类。
我们让task类里面包含服务的方法。就是run方法。
#include <iostream>
using namespace std;
#include"Log.hpp"
#include <vector>
#include <string>
extern Log lg;
// Task.h文件里面包含了任务类, 这个是我们线程池要执行的任务
class Task
{
public:
// 构造函数, 第一个参数data1, 第二个参数data2, 第三个参数是加减乘除的符号。 这个任务就是进行四则运算
Task(int sockfd, string clientip, uint16_t clientport)
:sockfd_(sockfd), clientip_(clientip), clientport_(clientport)
{
}
~Task() {}
// 执行任务的接口run(), 这个方法对三个变量进行判断, 然后进行运算。
void run()
{
char buffer[4096];
string temp = clientip_;
while (true)
{
ssize_t n = read(sockfd_, buffer, sizeof(buffer));
if (n > 0)
{
buffer[n] = 0;
cout << "client say#: " << buffer << endl;
string echo_string("tcpserver echo# " + (string)buffer);
write(sockfd_, echo_string.c_str(), echo_string.size());
}
else if (n == 0)
{
lg(Info, "quit sockfd:%d ", sockfd_);
exit(1);
}
else
{
lg(Waring, "Waring, sockfd:%d, clientport: %d, clientip: %s", sockfd_, clientport_, temp.c_str());
}
//
}
}
// 仿函数, 为了方便我们的对象能够像函数一样使用。
void operator()()
{
run();
}
private:
// 每一个任务对象里面都有三个参数, 一个data1, 一个data2, 最后一个op_
int sockfd_;
string clientip_;
uint16_t clientport_;
};
然后线程池代友友们如果没写过可以自己实现一个,或者直接用博主的, 如下:
#pragma once
#include<iostream>
#include<pthread.h>
#include<vector>
#include<string>
using namespace std;
#include<queue>
#include<ctime>
#include<unistd.h>
//对线程的属性做一下封装, 有利于线程池的保存以及后面的处理
struct ThreadInfo
{
pthread_t tid_;
string name_;
};
template<class T>
class ThreadPool
{
static const int defaultnum = 5; //默认的线程池的大小(线程池的大小就是里面包含的线程的数量)
private:
//加锁解锁
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
//唤醒线程, 线程是可以被挂起的(就比如信号量)。 当任务没有的时候,线程就要被挂起, 有任务后再唤醒
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
public:
//线程要执行的函数
static void* HandlerTask(void* args)
{
ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
while(true)
{
tp->Lock();
while (tp->tasks_.empty())
{
tp->ThreadSleep(); //如果队列里面没有任务了, 就让线程去休眠。
}
//否则就去拿到tasks里面的任务
T t = tp->tasks_.front();
tp->tasks_.pop();
//
tp->Unlock();
t(); //每一个线程先对任务进行消费, 消费完成之后处理任务。
}
}
//运行这个线程池, 也就是先将线程创建出来。 然后去运行线程
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name_ = "thread-";
threads_[i].name_ += to_string(i);
pthread_create(&(threads_[i].tid_), nullptr, HandlerTask, this);
}
}
//主线程给线程池发送任务, 注意, 这个任务一定是可以被储存起来的。 因为当我们的任务很多很多的时候, 我们的线程池内的线程要一个一个地对这些任务进行处理
void Push(const T& t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
//获取单例
//改编成单例的步骤里面只有这里要说一下, 就是为什么我们要套双层判断。 其实这里的最外面的一层的判断是我们另外加上去的。 为什么
//要加这个判断呢? 就是如果我们不加最外层这一层判断。 那么每一个线程获取单例都要申请所,加锁。 不就是相当于所有的线程都在串行执行? 这就有效率问题。
//解决方案就是这个再加一层判断。 这样假如有四个线程。 那么一开始四个线程都在判断, 那么它们四个线程都进入了if里面。 然后就都申请锁, 但是只有第一个线程能够
//进入第二层里面, 其他的进入不了。 那么当这一轮的四个线程都申请一次锁候就都退出了函数, 然后就都去做自己的事情了。 问题是, 当下次它们再来申请单例对象的时候它们连
//第一层判断都成功不了了, 也就都不用加锁解锁了, 这就大大提高了效率!!!
static ThreadPool<T>* GetInstance(int num = defaultnum)
{
if (tp_ == nullptr)
{
pthread_mutex_lock(&tp_->lock_);
if (tp_ == nullptr)
{
tp_ = new ThreadPool<T>(num);
}
pthread_mutex_unlock(&tp_->lock_);
}
return tp_;
}
private:
//构造函数私有化, 只有Getinstance里面才能创建。
ThreadPool(int num = defaultnum)
:threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
//单例模式只有一个对象, 所以要将拷贝构造和拷贝赋值封住, 为了防止有人在外部重新拷贝一个对象。
ThreadPool(const ThreadPool<T>& tp) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
private:
vector<ThreadInfo> threads_; //线程都维护在vector当中, 这个就是线程池里面的线程的个数,
queue<T> tasks_ ; //向线程池中发送任务, 这个队列里面保存的就是我们的任务的数目。
pthread_mutex_t mutex_; //锁,用来生产者线程(本份代码只是主线程)给线程池发送任务时候加锁使用以及消费者线程抢夺任务时加锁使用
pthread_cond_t cond_; //条件变量, 用来没有任务的时候,消费者要挂起。
static pthread_mutex_t lock_; //锁, 这个锁是为了在获取单例的时候能够让线程原子性的访问if (tp_ == nullptr)。
static ThreadPool<T>* tp_; //tp指针, 这就是唯一个单例对象。
};
template<class T>
ThreadPool<T>* ThreadPool<T>::tp_ = nullptr;
template<class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
然后就是start运行函数
void Start()
{
ThreadPool<Task>::GetInstance()->Start();
lg(Info, "tcpServer is running...");
for (;;) // tcp协议也是一种一直处于运行的服务
{
// tcp是面向连接的, 所以他比udp还多了一步accept, 先将客户端与服务端连接起来。accept的返回值成功返回整数文件描述符,否则-1被返回, 错误码被设置
// 1、获取新连接,
struct sockaddr_in client; // 获取的是客户端的addr
socklen_t len = sizeof(client);
int sockfd = accept(listensockfd_, (sockaddr *)&client, &len); // accept成功, 就能知道是谁连接的我。
if (sockfd < 0) // 关于这两个套接字, sockfd_的核心工作就只是把链接获取上来, 未来进行IO操作, 看的是sockfd。
{
lg(Waring, "listen error, errno: %d, strerror: %s", errno, strerror(errno));
continue;
}
uint16_t clientport = ntohs(client.sin_port); // 网络序列转主机序列
char clientip[32];
inet_ntop(sockfd, &client.sin_addr, clientip, sizeof(clientip));
// 2、根据新连接进行通信
lg(Info, "get a new link..., sockfd:%d, clientport: %d, clientip: %s", sockfd, clientport, clientip);
// // version--1 单进程版
// Service(sockfd, clientip, clientport);
// close(sockfd);
//version--2 多进程版
// pid_t id = fork();
// if (id == 0)
// {
// close(listensockfd_);
// if (fork() > 0)
// {
// exit(0);
// }
// Service(sockfd, clientip, clientport);
// close(sockfd);
// exit(0);
// //child
// }
// close(sockfd);
// pid_t rid = waitpid(id, nullptr, 0);
// // version--3多线程版本
// ThreadData* td = new ThreadData(sockfd, clientip, clientport, this);
// pthread_t tid;
// pthread_create(&tid, nullptr, pthrun, td);
//version--4线程池版本
Task task_(sockfd, clientip, clientport);
ThreadPool<Task>::GetInstance()->Push(task_);
}
}
运行结果:
我们可以看到, 只要已启动服务端的瞬间, 就能创建出6个线程(一个主线程, 五个分线程)
——————以上就是本节全部内容哦, 如果对友友们有帮助的话可以关注博主, 方便学习更多知识哦!!!