文章目录
- 什么叫进程池
- 进程池的优点
- 创建进程池
- 代码实现:
什么叫进程池
我们知道,一个进程创建子进程通常是为了让这个子进程去为它完成某个任务。例如我们使用的指令,其实就是bash进程创建子进程让子进程去执行的。但是我们需要考虑这样一个问题:是不是遇到问题之后才创建子进程呢?
频繁的创建和销毁进程都是一项较大的开销,涉及到内存分配、上下文切换等操作。于是我们可以提前创建出一批进程,当有任务要做的时候就从这一批子进程中拿出一个空闲的去执行,一旦某个子进程把任务执行完之后也不立即销毁进程,而是等待下一个任务的来临。
我们把这些预先创建的一批子进程就叫做进程池。我们把进程池中的进程也称为工作进程。
进程池的优点
- 通过复用已经创建的进程,减少创建和销毁进程的开销,提高了系统资源的利用与率和系统的性能。
- 动态调整进程池中工作进程的数量。可以让系统根据实际的负载和任务需求,动态的增加或者减少工作进程的数量,以适应不同的工作负载。提高了系统的工作的灵活性。
- 简化编程。进程池提供了一种高级抽象,隐藏了底层进程管理的细节,使得并发进程更加简单和可靠。
创建进程池
现在我们尝试自己去实现一个进程池。
创建进程好很简单,如何管理这些进程呢?每创建一个进程我们都用一个结构体(或者类)维护,于是对工作进程的管理就变成了对结构体的管理,整个进程池从代码角度上来说就是一个结构体数组。
那我们如何将任务分配到工作进程呢?这个问题的本质其实是在问进程之间通信的方式。于是我们很容易的想到使用管道。父进程通过管道,将任务信息传递给子进程。
下面是使用进程池的一个简单模型:
代码实现:
Process.cpp文件
#include <iostream>
#include <cerrno>
#include <unistd.h>
#include <string>
#include <vector>
#include <sys/types.h>
#include <wait.h>
#include <sys/stat.h>
#include "Task.hpp"
using namespace std;
// master
class Channel // 维护管道信息
{
public:
Channel(int wfd, int subprocessid, string name)
: _wfd(wfd), _subprocessid(subprocessid), _name(name)
{
}
int GetWfd() { return _wfd; }
int GetProcessId() { return _subprocessid; }
string GetName() { return _name; }
void CloseChannel()
{ // 关闭信道的写端
close(_wfd);
}
void Wait()
{ // 关闭工作进程
int status = 0;
pid_t rid = waitpid(_subprocessid, NULL, 0);
if (rid > 0)
{
cout << "wait " << rid << " success" << endl;
}
}
~Channel()
{
}
private:
int _wfd; // 信道以写方式打开的文件描述符
int _subprocessid; // 读端进程的pid
string _name; // 信道的命名
};
// 创建信道和子进程
void CreatChannelAndSub(int num, vector<Channel> &Channels, task_t task)
{ // task是一个回调函数
for (int i = 0; i < num; i++)
{
// 1.创建管道
int pipefd[2];
int n = pipe(pipefd);
if (n < 0)
exit(0);
// 创建子进程
pid_t pid = fork();
if (pid == 0)
{
// 关闭写端
close(pipefd[1]);
dup2(pipefd[0], 0); // 将管道的读端重定向到标准输入
task(); // 子进程执行的任务
close(pipefd[0]);
exit(0);
}
// 父进程关闭读端,只用来输出任务
close(pipefd[0]);
string name = "Channel-" + to_string(i);
// 将创建的信道存入输出型参数中
Channels.push_back({pipefd[1], pid, name});
}
}
int NextChannel(int channelnum)
{
static int next = 0;
int channel = next;
next = (next + 1) % channelnum;
return channel;
}
void SendTask(int taskcommand, Channel &channel)
{
write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}
// 具体任务
void CtrlProcessOnce(vector<Channel> &Channels)
{
// 1.选择一个任务
int taskcommand = SelectTask();
// 2.选择一个空信道
int taskchannel = NextChannel(Channels.size());
// 3.发送任务编号给信道
SendTask(taskcommand, Channels[taskchannel]);
cout << "taskcommand: " << taskcommand << " channel: " << Channels[taskchannel].GetName() << " sub process: " << Channels[taskchannel].GetProcessId() << endl;
}
// 控制子进程执行任务
void CtrlProcess(vector<Channel> &Channels, int time = -1)
{ // time表示分配多少次任务给子进程去执行,默认是无限
if (time == -1)
{ // 默认
while (true)
{
CtrlProcessOnce(Channels);
sleep(1);
}
}
else if (time > 0)
{
while (time--)
{
CtrlProcessOnce(Channels);
sleep(1);
}
}
}
void CleanUpChannel(vector<Channel> &channels)
{
for (auto &it : channels)
{
it.CloseChannel();
}
for (auto &it : channels)
{
it.Wait();
}
}
int main(int argc, char *argv[])
{ // 参数列表,argv[1]表示创建的工作进程的数量,即内存池大小
if (argc != 2)
{
cerr << "Usage: " << argv[0] << " Processnum" << endl;
return 1;
}
int num = stoi(argv[1]);
// 加载任务
LoadTask();
vector<Channel> Channels; // 进程池
// 创建信道和子进程
CreatChannelAndSub(num, Channels, work1);
// 通过Channel控制子进程
CtrlProcess(Channels, 10);
// 关闭子进程和信道的写端
CleanUpChannel(Channels);
return 0;
}
Task.hpp文件
#include <iostream>
#include <ctime>
#include <sys/stat.h>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#define TASKNUM 3
using namespace std;
typedef void (*task_t)(); // task_t函数指针类型
void Print()
{ // 任务1
cout << "Printf task" << endl;
}
void DownLoad()
{ // 任务2
cout << "DownLoad task" << endl;
}
void Flush()
{ // 任务3
cout << "Flush task" << endl;
}
task_t tasks[TASKNUM]; // 用来存放函数指针
void LoadTask()
{
srand(time(nullptr));
tasks[0] = Print;
tasks[1] = DownLoad;
tasks[2] = Flush;
}
void ExcuteTask(int n)
{
if (n < 0 || n > 2)
return;
tasks[n]();
}
int SelectTask()
{
int n = rand() % TASKNUM;
return n;
}
void work1()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof command); // 读取管道中的任务编号(int)
if (n == sizeof(int))
{
ExcuteTask(command);
}
else if (n == 0)
{
cout << "sub process: " << getpid() << " quit " << endl;
break;
}
}
}
观察运行结果: