全是干货~
文章目录
- 前言
- 一、多进程版
- 二、多线程版
- 三、线程池版
- 四、Tcp服务器日志的改进
- 五、将Tcp服务器守护进程化
- 总结
前言
在上一篇文章中,我们实现了Tcp服务器,但是为了演示多进程和多线程的效果,我们将服务器与客户通通信写成了一下死循环,导致只要有一个用户没有和服务器通信完毕其他用户就都无法和服务器通信,这也体现了单进程面对网络时的一些不足,下面我们用多进程,多线程,线程池来解决这个问题。
一、Tcp服务器多进程版
上一篇文章中我们写的serviceIO接口是死循环所以导致无法多个用户通信,那么如何用多进程解决这个问题呢?其实很简单,我们只需要创建一个子进程,因为子进程会继承父进程的文件描述符,所以他们一定是可以指向同一个sock文件的,然后让子进程去调用serviceIO,我们的父进程就阻塞的等待子进程就好。
pid_t id = fork();
if (id == 0)
{
// 子进程
// 子进程不需要listensock,既然不需要我们就关闭
close(_listensock);
serviceID(sock);
close(sock);
exit(0);
}
// 父进程
pid_t ret = waitpid(id, nullptr, 0);
if (ret > 0)
{
cout << "waitsuccess: " << ret << endl;
}
首先我们子进程也会继承父进程的listensock文件描述符,但是这个文件描述符是父进程用来监听的,我们子进程的工作只是与客户端通信,所以我们应该关闭不用的文件描述符,这就像老司机一样,每次出远门前都会检查轮胎等设备是否完好。当客户端退出后我们的子进程就将sock文件描述符关闭,然后退出该子进程,因为我们让父进程去等待了子进程,所以无需害怕子进程退出变成孤儿进程,但是这样的代码总感觉不对劲,是不是有什么问题呢?没错!我们一旦让父进程等待了子进程,那么这个代码不还是串行的吗?我们要实现多用户通信,那么必然是父进程会创建多个子进程,因为start本质是一个死循环,所以每次父进程都会创建一个子进程去和用户通信,如果我们让父进程去等待子进程,那就和之前一样,只有当子进程处理完一个用户的通信,另一个用户才能与服务端通信,那么该如何解决这个问题呢?看下面代码:
pid_t id = fork();
if (id == 0)
{
// 子进程
// 子进程不需要listensock,既然不需要我们就关闭
close(_listensock);
// 子进程创建孙进程,如果成功将子进程关闭让孙进程处理任务,由于孙进程的父进程退出所以变成孤儿进程最终会被
// 操作系统领养,不需要进程等待
if (fork() > 0)
{
exit(0);
}
serviceID(sock);
close(sock);
exit(0);
}
// 父进程
pid_t ret = waitpid(id, nullptr, 0);
if (ret > 0)
{
cout << "waitsuccess: " << ret << endl;
}
我们让子进程关闭不用的文件描述符后,立刻让子进程再创建一个子进程,一旦创建成功我们就让原先的子进程退出,让原先的子进程的子进程去执行与客户通信的代码,这样做的好处就是我们不用等待原先子进程的子进程。因为原先子进程的子进程已经退出了,所以这个进程就变成了孤儿进程,我们都知道,一旦进程变成孤儿进程就会被操作系统领养,所以我们不用担心这个孤儿进程的退出问题,只有某一个客户端退出了这个孙进程才会退出然后被操作系统领养,这样就解决我们的刚刚的问题,那么我们运行起来看看:
可以看到是没有问题的,这次不管哪个客户端退出都不会影响其他用户。我们上面日志的打印是进过修改的,后面我们会讲。我们可以看到文件描述符是4和5,现在我们重新启动一下看一下文件描述符有没有正确关闭:
可以看到我们重新启动后文件描述符还是4和5这就说明我们之前正确关闭了文件描述符,如果是从其他数字开始那么一定是之前的文件描述符泄漏了。
当然我们上面频繁的创建子进程肯定是不好的,所以我们还有第二种办法:信号忽略版:
void start()
{
//忽略17号信号
signal(SIGCHLD,SIG_IGN);
for (;;)
{
//4.server获取新链接 未来真正使用的是accept返回的文件描述符
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// sock是和client通信的fd
int sock = accept(_listensock,(struct sockaddr*)&peer,&len);
//accept失败也无所谓,继续让accept去获取新链接
if (sock<0)
{
logMessage(ERROR,"accept error,next");
continue;
}
logMessage(NORMAL,"accept a new link success");
cout<<"sock: "<<sock<<endl;
pid_t id = fork();
if (id==0)
{
close(_listensock);
serviceID(sock);
close(sock);
exit(0);
}
//父进程
//已经对17号信号做忽略,父进程不用等待子进程,子进程会自动退出,但是需要父进程关闭文件描述符
close(sock);
}
}
首先我们对17号信号SIGCHLD做忽略,什么是17号信号呢?当我们子进程退出时会给父进程发送17号信号告诉父进程自己要退出了,而我们对这个信号做忽略父进程就不会等待子进程了,然后我们让子进程关闭不需要的文件描述符,去执行与客户通信的代码,如果客户端退出了,我们就让子进程关闭与客户端通信的文件描述符并且让子进程退出,因为已经对17号信号做忽略,所以子进程退出后直接会被操作系统领养,而父进程要干的事就是关闭文件描述符,这里可能有人会问如果客户端没有退出,子进程就不会关闭文件描述符,那么父进程把文件描述符关闭了不会影响子进程吗?实际上并不会,这就像引用计数一样,子进程和父进程都指向一个文件描述符,那么这个文件描述符的引用计数就是2,只有引用计数减为0才会真的关闭,所以我们如果不然父进程关闭文件描述符,那么子进程只会将引用计数减为1这个文件描述符就一直不会关闭就造成文件描述符泄漏了,所以需要父进程关闭文件描述符。
我们可以看到文件描述符都是4,这是因为我们的cpu的运算速度太快了,在accept接口中刚申请到4号文件描述符,然后创建子进程(子进程继承4号描述符),然后父进程直接关闭了sock,关闭后又到了accept这里申请所以还是4,如果在用户很多的情况下出现多个4,多个5多个6都是有可能的我们可以看到这种方法要比第一种方法更好,因为这种方法不用频繁的创建子进程。
二、Tcp服务器多线程版
由于创建进程的工作量非常大,所以我们用多线程给用户提供服务。多线程的原理和多进程是一样的,我们只需要创建一个线程,让这个新线程去执行与客户通信的代码,但是与客户通信就必须用到文件描述符和serviceIO方法,而我们的serviceIO是一个成员函数,我们多线程执行的回调函数必须是静态的,所以我们可以写一个类,里面存放this指针和sock,然后写一个静态成员函数,在函数中调用回调方法:
class TcpServer;
struct ThreadData
{
ThreadData(TcpServer* self,int sock)
:_self(self)
,_sock(sock)
{
}
TcpServer* _self;
int _sock;
};
static void* threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadData *td = static_cast<ThreadData*>(args);
td->_self->serviceID(td->_sock);
close(td->_sock);
delete td;
return nullptr;
}
在上图中可以看到,首先我们创建了一个线程,然后实现了一个struct ThreadData的类,类内成员有Tcpserver的指针和sock文件描述符,然后我们new了一个ThreadData的指针,用this和sock初始化,在创建线程的时候将这个指针传给回调函数,进入回调函数首先将线程分离,一旦分离我们就不需要等待新线程了(如果等待新线程就又变成了串行的),分离后线程结束会被操作系统处理,我们只需要调用serviceIO方法,一旦调用完毕就关闭文件描述符,然后将指针释放置为空。为什么多线程这里不需要让主线程关闭文件描述符呢?因为所有线程都会共享进程的文件描述符,这里不像多进程那样子进程会指向父进程的文件描述符一旦指向那么文件描述符的引用计数必然会+1,但是多线程看到的文件描述符就是进程中打开的那个,所以我们让线程关闭了文件描述符就意味着进程中的那个文件描述符也关闭了。下面我们运行起来(运行前记得将makefile中的服务端加上-pthread选项,否则无法编译):
下面我们验证一下文件描述符是否正确关闭,重新打开服务器:
清屏后我们重新打开发现文件描述符依然从4开始,这就说明我们的文件描述符没有泄露。
三、Tcp服务器线程池版
还记得我们之前写的线程池吗,线程池的好处就是可以一次创建多个线程并且去执行任务,只不过我们今天的任务是与客户端通信,所以我们把之前线程池的代码拿过来:
#include <pthread.h>
#include <iostream>
#include <vector>
#include <queue>
#include <unistd.h>
#include <mutex>
#include "lockguard.hpp"
#include "log.hpp"
using namespace std;
const int gnum = 5;
template <class T>
class ThreadPool
{
public:
static ThreadPool<T>* getInstance()
{
if (_tp == nullptr)
{
_mtx.lock();
if (_tp == nullptr)
{
_tp = new ThreadPool<T>();
}
_mtx.unlock();
}
return _tp;
}
static void* handerTask(void* args)
{
ThreadPool<T>* threadpool = static_cast<ThreadPool<T>*>(args);
while (true)
{
T t;
{
// threadpool->lockQueue();
LockGuard lock(threadpool->getMutex());
while (threadpool->IsQueueEmpty())
{
threadpool->condwaitQueue();
}
// 获取任务队列中的任务
t = threadpool->popQueue();
}
t();
}
return nullptr;
}
void Push(const T& in)
{
LockGuard lock(&_mutex);
//pthread_mutex_lock(&_mutex);
_task_queue.push(in);
pthread_cond_signal(&_cond);
//pthread_mutex_unlock(&_mutex);
}
void start()
{
for (const auto& t: _threads)
{
pthread_create(t,nullptr,handerTask,this);
logMessage(DEBUG,"线程%p创建成功",t);
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
for (auto& t: _threads)
{
delete t;
}
}
public:
void lockQueue()
{
pthread_mutex_lock(&_mutex);
}
void unlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void condwaitQueue()
{
pthread_cond_wait(&_cond,&_mutex);
}
bool IsQueueEmpty()
{
return _task_queue.empty();
}
T popQueue()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
pthread_mutex_t* getMutex()
{
return &_mutex;
}
private:
ThreadPool(const int &num = gnum)
: _num(num)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
for (int i = 0;i<_num;i++)
{
_threads.push_back(new pthread_t);
}
}
ThreadPool(const ThreadPool<T>& tp) = delete;
ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete;
int _num;
vector<pthread_t *> _threads;
queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T>* _tp;
static mutex _mtx;
};
template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;
template <class T>
mutex ThreadPool<T>::_mtx;
除了一个单例模式的线程池还有一个锁我们也拿进来:
#include <iostream>
#include <pthread.h>
class Mutex //自己不维护锁,有外部传入
{
public:
Mutex(pthread_mutex_t *mutex)
:_pmutex(mutex)
{
}
void lock()
{
pthread_mutex_lock(_pmutex);
}
void unlock()
{
pthread_mutex_unlock(_pmutex);
}
~Mutex()
{
}
private:
pthread_mutex_t *_pmutex;
};
class LockGuard //自己不维护锁,由外部传入
{
public:
LockGuard(pthread_mutex_t *mutex)
:_mutex(mutex)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
然后我们开始写线程池版本,首先我们要启动线程池,因为是在start接口中通信所以我们就在start接口中启动线程池:
当然我们还需要写一个任务,这个任务也很简单就是我们之前通信的代码:
void start()
{
// 4.线程池初始化
ThreadPool<Task>::getInstance()->start();
logMessage(NORMAL,"ThreadPool init success");
for (;;)
{
//4.server获取新链接 未来真正使用的是accept返回的文件描述符
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// sock是和client通信的fd
int sock = accept(_listensock,(struct sockaddr*)&peer,&len);
//accept失败也无所谓,继续让accept去获取新链接
if (sock<0)
{
logMessage(ERROR,"accept error,next");
continue;
}
logMessage(NORMAL,"accept a new link success,get new sock: %d",sock);
//5.用sock和客户端通信,面向字节流的,后续全部都是文件操作
//serviceID(sock);
//对于一个已经使用完毕的sock,我们要关闭这个sock,要不然会导致文件描述符泄漏
//close(sock);
// 4.线程池版本
ThreadPool<Task>::getInstance()->Push(Task(sock,serviceID));
}
}
因为我们上次将线程池修改为单例模式,所以我们就以单例的方式启动,启动后就给我们创建了多个线程,然后我们去写一个任务:
#include <iostream>
class Task
{
using func_t=std::function<void(int)>;
public:
Task()
{
}
Task(int sock,func_t func)
:_sock(sock)
,_callback(func)
{
}
void operator()()
{
_callback(_sock);
}
~Task()
{
}
private:
int _sock;
func_t _callback;
};
这个任务只需要知道线程要调用的回调方法和文件描述符,然后我们写一个仿函数重载()符号。
有了任务我们直接构造一个匿名对象然后Push一个任务,在线程池中会直接通过仿函数来调用serviceIO函数,如下图:
下面我们将线程池运行起来:
可以看到是没有问题的,以上就是我们Tcp服务器的3个版本,下面我们讲解一下如何给日志添加更多好玩的功能,就像我演示的那样可以打印数据。
四、Tcp服务器日志的改进
我们在原先日志代码的基础上加上可变参数列表,那么如何提取可变参数呢?
要使用可变参数我们首先要知道va_last是什么,然后如何使用va_last.要使用va_last需要用到三个宏,va_start(),va_arg(),va_end().
如上图,va_last要指向上面3.14,10,'c'这些参数,比如现在va_last指向第一个参数3.14,要指向第二个参数10只需要让va_last这个指针偏移一定的字节数。那么如何让va_last指向第一个参数呢,直接用va_start(start)就可以让va_last指向第一个参数。va_arg()可以让指针向后移动特定的类型,比如刚刚要从3.14指向10,那么只需要va_arg(start,int)即可,va_end()就是让start指针变成nullptr。
下面我们就用vsprintf接口做演示:
void logMessage(int level,const char* format, ...)
{
//[日志等级][时间戳/时间][pid][message]
//std::cout<<message<<std::endl;
char logprefix[1024]; //日志前缀
snprintf(logprefix,sizeof(logprefix),"[%s][%ld][pid:%d]",to_levelstr(level),(long int)time(nullptr),getpid());
char logcontent[1024]; //日志内容
va_list arg;
va_start(arg,format);
vsprintf(logcontent,format,arg);
std::cout<<logprefix<<logcontent<<std::endl;
}
首先我们的日志是有固定格式的,我们写的日志前缀必须是[日志等级][][][],而日志内容就是我们日志中出现的字符串,所以我们需要两个缓冲区,prefix代表前缀,content代表日志内容。在前缀我们需要将等级,时间,pid打印出来,而message就是我们的日志内容了需要定义va_last,然后让arg指针指向format的位置,vsprintf就可以将参数中的字符串和要打印的参数读出来放到缓冲区,然后我们将两个缓冲区拼接为一个字符串就完成了日志中带参数的打印,就比如我们上面的演示中直接在日志中打印创建了几号文件描述符,这就是利用了可变参数实现的。
五、将Tcp服务器守护进程化
我们现在写的tcp服务器是受xshell客户端影响的,一旦我们将xshell客户端退出那么我们的服务器也退出了,而事实上一款服务器是不能受影响的,下面我们先讲讲linux前台后台的原理,在实现一个守护进程。
如上图,首先我们登陆xshell后,linux会给我们一个会话,这个会话包含一个前台进程和多个后台进程,注意:不管是什么时刻,前台进程都只能有一个。而我们输入指令的命令行就是bash,查看后台进程的指令是jobs:
比如当我们创建睡眠10000任务后,会给我们一个序号1,代表这是1号作业,我们也可以多加几个:
在一个程序后面加&符号就代表将这个进程放到后台:
首先我们看到我们创建的sleep的ppid都是29324,因为我们是在命令行启动的,所以他们的父进程都是bash,然后我们观察PGID,我们用 I 写在一起的sleep进程他们的PGID是相同的,PGID相同的表示在同一个进程组,而同一个进程组要完成一个作业,就像刚刚的2号作业和3号作业,而相同PGID的第一个进程就是这个进程组的组长。SID表示会话ID,会话ID一样表示这些进程都在一个会话当中。
这个时候我们把1号放前台让大家看看现象:
fg这个指令代表将某个任务放到前台。我们将sleep任务放到前台后,发现bash不工作了,这就验证了每个会话只能有一个前台任务,就像下图:
那么如何将bash切换回来呢?ctrl +z 即可,ctrl +z 可以暂停一个任务,一旦任务被暂停就会被放到后台中:
暂停后如何让进程再继续运行呢?用bg命令:
理解了以上原理后我们就可以实现一个守护进程了。
我们从刚刚的演示中可以看到,一旦任务被暂停就会被切换到后台,而我们的服务器不管是在前台还是后台,一旦有人登陆xshell,默认就会将bash切换为前台进程,这个时候我们的服务器是有可能受到用户登陆的影响的,而如果我们的进程自成一个会话,自己就是自己进程组的组长,那么就不会受到我们所说的那些影响,如下图:
下面我们开始实现守护进程:
大家记住上面的接口,这个接口可以将一个非进程组组长的进程变成一个独立的会话,注意:一定不可以是一个进程组的组长,只能是普通组员,后面我们会有办法让一个组长变成不是组长。
实现守护进程就3个步骤:
1.让调用进程忽略掉异常的信号
就比如我们的服务器,如果客户端已经将文件描述符关闭了服务端还在向文件描述符写入,这个时候操作系统就会给这个进程发送SIGPIP信号(表示对管道异常写入),因为我们不能让进程收到操作系统的影响而退出,所以我们将这个信号忽略。
2.让自己这个进程不是组长,setsid()
这一步其实很简单我们只需要创建一个子进程,一旦创建成功那么就将原来的进程退出掉。原理是:如果我们本身就是组长,那么创建的子进程一定是组员,一旦将原来是组长的进程退出,那么新的组长就是原来进程组组长的下一个进程,这个时候我们对创建的子进程进行setsid(),这个进程就自成进程组,PID和PGID和SID就都是一样的了。
3.因为守护进程是脱离终端的,即使我们关闭xshell,只要远程服务器没有关机那么我们的守护进程就会一直运行,除非我们用kill -9杀死这个进程,既然是脱离终端的,那么我们就必须将默认打开的三个文件描述符关闭,当然强制关闭也不好,我们可以先看是否可以将默认的三个文件描述符重定向了,如果可以就不用关闭,在linux中有一个万能垃圾桶/dex/null,所有向这个路径写入的东西都是看不到的无效的,所以我们可以重定向到这个路径。
4.(可选)我们的进程会默认打开一个cwd指令,这个指令会记录我们进程当前所在的路径,这也就可以证明为什么我们不指明路径的时候默认创建的文件在当前路径,而我们实际上可以对这个路径做更改,比如我们的守护进程不想放在当前路径,可以放在其他路径。
#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define DEV "/dev/null"
void daemonSelf(const char* currPath = nullptr)
{
//1.让调用进程忽略掉异常的信号
signal(SIGPIPE,SIG_IGN);
//2.如何让自己不是组长,setsid
if (fork()>0)
{
exit(0);
}
//只剩子进程
pid_t n = setsid();
assert(n != -1);
//3.守护进程是脱离终端的,关闭或者重定向以前进程默认打开的文件
int fd = open(DEV,O_RDWR);
if (fd>0)
{
dup2(fd,0);
dup2(fd,1);
dup2(fd,2);
close(fd);
}
else
{
close(0);
close(1);
close(2);
}
//4.可选:进程执行路径发生更改
if (currPath) chdir(currPath);
}
如果打开dev成功我们就重定向,失败就关闭0,1,2文件描述符。重定向函数我们讲过,第一个参数是old,第二个参数是new,我们要将0,1,2描述符重定向到dev/null中,所以old是dev/null所在的文件描述符。因为这个借口很绕所以我们当时说过:记住第一个参数是重定向的目的地即可。重定向完成我们就关闭以前的文件描述符防止文件描述符泄漏。
chdir就是修改默认路径的接口。下面我们演示一下:
首先包含头文件,我们在服务器初始化之后将服务器变成守护进程,下面我们运行起来:
如上图所示,PID和PGID和SID相同已经守护进程化了,现在我们给服务器发送消息试试:
可以看到是没有问题的,只要消息回显了就说明服务器在运行。
因为我们的服务器一旦守护进程化,那么原先在默认打开的文件描述符写的日志消息就会被重定向到dev/null中,所以我们只能看到服务器初始化的日志信息,一旦启动就看不到了,下面我们将日志修改一下,直接将日志信息给我们打印到两个文件中:
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <ctime>
#include <unistd.h>
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
#define LOG_ERR "log.error"
#define LOG_NORMAL "log.txt"
const char* to_levelstr(int level)
{
switch(level)
{
case DEBUG:return "DEBUG";
case NORMAL:return "NORMAL";
case WARNING:return " WARNING";
case ERROR:return "ERROR";
case FATAL:return "FATAL";
default : return nullptr;
}
}
void logMessage(int level,const char* format, ...)
{
//[日志等级][时间戳/时间][pid][message]
char logprefix[1024]; //日志前缀
snprintf(logprefix,sizeof(logprefix),"[%s][%ld][pid:%d]",to_levelstr(level),(long int)time(nullptr),getpid());
char logcontent[1024]; //日志内容
va_list arg;
va_start(arg,format);
vsprintf(logcontent,format,arg);
//文件版
FILE* log = fopen(LOG_NORMAL,"a");
FILE* err = fopen(LOG_ERR,"a");
if (log!=nullptr && err!=nullptr)
{
FILE* tep = nullptr;
if (level==DEBUG || level==NORMAL || level==WARNING)
{
tep = log;
}
else
{
tep = err;
}
if (tep)
{
fprintf(tep,"%s%s\n",logprefix,logcontent);
}
fclose(log);
fclose(err);
}
}
首先我们将日志分类,一个文件存放等级为0,1,2的日志,一个文件存放等级为4,5的日志,然后我们以读的方式打开这两个文件,如果都打开成功了,我们就定义一个文件指针,当日志等级为0,1,2的时候我们就让新的文件指针指向log.txt这个文件,否则就是error这个文件,确认了要写入的是哪个文件后,我们就像这个文件写入我们之前的日志前缀+日志内容,写完就关闭这两个文件。
下面我们看看成果:
可以看到是没有问题的,只要有新用户登录就会有日志上传。
总结
本篇文章中最重要的是网络知识与系统知识融合在一起,就比如多进程版和多线程版中多进程需要关闭两次文件描述符而多线程只需要一次,要理解这些概念就必须知道进程和线程的概念,所以网络的学习很考验系统的基本功。