一、进程池的概念
1.1、什么是进程池
进程池是一种并发编程模式,用于管理和重用多个处理任务的进程。它通常用于需要频繁创建和销毁进程的情况,以避免因此产生的开销。
进程池的优点包括:
- 减少进程创建销毁的开销:避免频繁创建和销毁进程所带来的系统资源开销。
- 提高系统响应速度:由于进程已经初始化并且一直保持在内存中,可以立即分配执行任务,减少了任务等待时间。
- 控制资源使用:通过限制进程池中的进程数量,可以控制系统资源的使用情况,避免资源过度消耗。
1.2、管理进程
预先创建一些空闲进程,管理进程会把工作分发到空闲进程来处理,空闲进程处理结束后,通知管理进程。
管理进程需要将任务发给空闲的工作进程,这里就涉及到进程之间的通信,进程间的通信有以下三种方式,我们之前都做了详细的解释
管道:https://blog.csdn.net/weixin_43903639/article/details/138155634?spm=1001.2014.3001.5501
消息队列:https://blog.csdn.net/weixin_43903639/article/details/138155723?spm=1001.2014.3001.5501
共享内存:https://blog.csdn.net/weixin_43903639/article/details/138189200?spm=1001.2014.3001.5501
二、进程池模型
对于一个进程池,我们需要维护一个进程队列,如果进程在忙就等待,如果进程空闲,那么就给空闲进程发任务让进程去处理。
三、进程通信
我们这里采用了有名管道的方式。实际上使用匿名管道是一样的。为了方便管理,我们这里创建了一个Fifo的类,通过类将管道视为一个个的对象。
// 权限
#define Mode 0666
// 文件地址
#define Path "./default"
class Fifo
{
public:
Fifo(string path = Path, int mode = Mode) : _path(path), _mode(mode)
{
int return_mkfifo_val = mkfifo(_path.c_str(), _mode);
if (return_mkfifo_val < 0)
{
cout << "mkfifo error:" << errno << " reason :" << strerror(errno) << endl;
exit(1);
}
cout << "mkfifo success" << endl;
}
~Fifo()
{
int return_unlink_val = unlink(_path.c_str());
if (return_unlink_val < 0)
{
cout << "unlink error:" << errno << " reason :" << strerror(errno) << endl;
}
cout << "unlink namepipe success" << endl;
}
private:
string _path;
int _mode;
};
管道的默认权限是 0666
,管道的默认文件地址是 ./default
,这样管理的优势是便于管道的创建与销毁。
三、进程对象
我们将一个进程也视为一个对象,那么一个进程就需要以下的元素
fd0
: 管道,通过这个管道接收主进程的数据fd1
:管道,通过这个管道给主进程发数据pid
:子进程的pidisbusy
:此子进程是否在忙
class Process
{
public:
Process(int fd0, int fd1, pid_t process_pid) : _fd0(fd0), _fd1(fd1), _process_pid(process_pid), _isbusy(false) {}
~Process() {}
// 获取父进程要写入的管道
int get_fd0() { return _fd0; }
// 获取子进程要写入的管道
int get_fd1() { return _fd1; }
// 获取子进程pid
pid_t get_process_pid() { return _process_pid; }
// 获取是否忙碌标志位
bool get_isbusy() { return _isbusy; }
// 修改标志为
void set_isbusy(bool flag) { _isbusy = flag; }
private:
int _fd0; // 父进程要写入的管道
int _fd1; // 父进程要读入的管道
pid_t _process_pid; // 子进程pid
bool _isbusy; // 是否忙碌标志位
};
四、进程池
进程池就是同时管理管道和进程的。其中包含多个进程对象,每个进程对象又要包含两个管道。
vector<string> pipe0
主进程写入,子进程读的管道名vector<string> pipe1
子进程写入,主进程读的管道名vector<Fifo *> fifo0
主进程写入,子进程读的管道vector<Fifo *> fifo1
子进程写入,主进程读的管道vector<Process> _processpool
进程池管理的进程int _processnum
进程池管理的进程数量
// 进程池
class ProcessPool
{
public:
ProcessPool(int processnum) : _processnum(processnum) {}
~ProcessPool()
{
for (int i = 0; i < _processnum; i++) {
delete fifo0[i];
delete fifo1[i];
}
// 要释放所有的子进程
for (int i = 0; i<_processnum; i++) {
// 通知子进程结束,通知失败的话直接杀死子进程
if (kill(_processpool[i].get_process_pid(), SIGTERM) != 0) {
// 杀死子进程
kill(_processpool[i].get_process_pid(), SIGUSR1);
}
}
}
// 生成管道的名字
void makePipeName()
{
pipe0.clear();
pipe1.clear();
for (int i = 0; i < _processnum; i++)
{
string s0;
s0 += "pipe0_" + to_string(i + 1);
pipe0.push_back(s0);
string s1;
s1 += "pipe1_" + to_string(i + 1);
pipe1.push_back(s1);
}
}
// 创建进程池,接收一个参数的函数指针
void CreateProcessPool(work_t work = worker)
{
for (int i = 0; i < _processnum; i++)
{
// 创建命名管道
fifo0.push_back(new Fifo(pipe0[i]));
fifo1.push_back(new Fifo(pipe1[i]));
int id = fork();
if (id == 0)
{
// 子进程
int fd0 = open(pipe0[i].c_str(), O_RDONLY);
int fd1 = open(pipe1[i].c_str(), O_WRONLY);
work(fd0, fd1);
exit(0);
}
// 父进程打开管道,未发送任务
int fd0 = open(pipe0[i].c_str(), O_WRONLY);
int fd1 = open(pipe1[i].c_str(), O_RDONLY | O_NONBLOCK);
// 设置为非阻塞模式
fcntl(fd1, F_SETFL, O_NONBLOCK);
_processpool.push_back({fd0, fd1, id});
}
}
// 找到空闲进程返回进程在_processpool中的序号,没找到返回-1
int getAChannal()
{
for (int i = 0; i < _processnum; i++)
{
if (_processpool[i].get_isbusy() == false) {
_processpool[i].set_isbusy(true);
return i;
}
}
return -1;
}
// 发送任务,任务就是数据
void SendTask(char *data, int datasize)
{
// 随机选中管道
int luck_process = getAChannal();
while (luck_process == -1) {
// 看一下哪些进程是空闲的。每个进程结束后都会发送自己的pid。
for (int i=0; i<_processnum; i++) {
pid_t pid = 0;
ssize_t bytes_read = read(_processpool[i].get_fd1(), &pid, sizeof(pid_t));
if (bytes_read == -1) continue;
// 哪个进程返回了数据,就认为Ta结束了。
else {
_processpool[i].set_isbusy(false);
}
}
luck_process = getAChannal();
}
// 父进程向管道发送任务,发送任务就是向相应的管道写入数据
write(_processpool[luck_process].get_fd0(), data, datasize);
}
static void worker(int fd0, int fd1)
{
char buf[BUFSIZ];
while (1)
{
int read_return_value = read(fd0, buf, BUFSIZ);
// 处理读取到的数据
if (read_return_value > 0)
{
cout << "my data is : " << buf << " my pid is : " << getpid() << endl;
// 模拟处理读取到的数据
sleep(1);
}
// 向消息队列传递自己的pid,表示已经完成任务
pid_t pid = getpid();
write(fd1, &pid, sizeof(pid_t));
}
}
private:
int _processnum;
vector<string> pipe0; // 父进程写入,子进程读
vector<string> pipe1; // 子进程写入,父进程读
vector<Fifo *> fifo0;
vector<Fifo *> fifo1;
vector<Process> _processpool;
};
五、仿真
这里我们的工作函数是默认的工作函数,也就是打印传入的数据。
#include "ProcessPool.h"
#include <iostream>
int main(int argc, char* argv[])
{
ProcessPool* processpool = new ProcessPool(5);
processpool->makePipeName();
processpool->CreateProcessPool();
char buf[3];
for (int i=0;i<20;i++) {
sprintf(buf,"%d",i+100);
processpool->SendTask(buf,3);
}
delete processpool;
return 0;
}
看一下仿真结果