Linux:进程池制作 & 匿名管道 & 命名管道
- 前言
- 一、匿名管道制作进程池
- 一、进程池框架
- 二、创建管道、创建进程、工作进程执行任务
- 2.1 创建管道、创建进程
- 2.2 工作进程执行任务
- 三、主进程向子进程发送任务
- 3.1 任务封装
- 3.2 主进程向子进程发送任务
- 四、回收资源
- 五、所有源代码
- 5.1源码
- 5.2 运行结果(包含调试信息)
- 六、命名管道实现进程池
- 6.1 实现思路
- 6.2 源码
前言
创建进程是有时间成本的。当计算机要执行任务时才创建进程,势必会影响执行任务的性能。所以我们可以通过提前创建一批进程,当有任务需要被执行时直接喂给这些进程即可。我们把这些提前创建好的进程称为进程池!!
下面我们会通过一个主进程(父进程)通过匿名管道和一批工作进程(子进程)进行通信。父进程通过不断派发任务给子进程,子进程通过读取管道文件中的任务码来执行对应的任务,从而模拟进程池的整个行为!!
一、匿名管道制作进程池
一、进程池框架
父进程创建一批子进程,并建立单向通信通道。我们这里规定父进程向匿名管道中一次只能写4字节数据,子进程一次只能从管道文件中读取4字节数据。由于匿名管道的性质,父进程只需向目标子进程所对应的管道中发送任务码即可。如果管道文件中存在数据,子进程会不断读取任务码,执行对应的任务;否则子进程进入阻塞状态,等待主进程向管道中写入数据!
但主进程需要知道向那个管道中发送任务码,向那个匿名管道中进行写入,以及子进程信息。即我们需要对管道进行管理!先描述,在组织!这里我们通过一个类对管道进行描述,其中保存着:主进程控制写端文件描述符、工作进程id和名字!由于我们需要执行快速随机访问,所以选择vector
进行组织管理!所以这个进程池的制作大致框架如下:
【描述结构体】:
#define NUM 5//任务进程个数
int number = 1;//管道编号
class channel
{
public:
channel(int fd, pid_t id)
:ctrlfd(fd)
,workerid(id)
{
name = "channle-" + std::to_string(number++);
}
public:
int ctrlfd;
pid_t workerid;
std::string name;
};
【框架】:
int main()
{
std::vector<channel> channels;//用于管理管道
//1、创建管道,创建进程,工作进程工作方式
CreaterChannel(&channels);
//2、主进程向工作进程发送任务
// 这里特殊设计,我们通过g_always_loop来判断主进程是一直发送任务,还是发送指定次后就结束退出
const bool g_always_loop = true;
SendTask(channels, !g_always_loop, 10);
// 回收资源: 关闭写端,子进程自动退出
ReleaseChannels(channels);
return 0;
}
二、创建管道、创建进程、工作进程执行任务
2.1 创建管道、创建进程
我们可以通过父进程循环NUM
次,每次先创建管道,然后创建子进程。此时关闭父进程和子进程中不需要的读写段,建立单向通信通道。此时就可以建立如下关系:
但上述简单关闭管道文件的读写段会存在问题的!我们需要特殊处理:
上述这些多余的指向管道读端是fork创建子进程时,子进程继承父进程的信息之一(红线)!所以我们每次创建出的子进程还需将所有继承父进程多余的读端全部关闭,否则无法回收子进程导致内存泄漏!!
其中最简单的解决办法就是:我们将父进程的写端文件描述符全部记录下来,每次创建出子进程时,子进程所继承的多余读写信息已经全部保存。我们只需依次将其关闭即可!!
void CreaterChannel(std::vector<channel> *channels)
{
std::vector<int> old;//记录子进程继承的多余读端
for(int i = 0; i < NUM; i++)//创建NUM个工作进程
{
// 1. 创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n;//防止编译器报警
//2. 创建子进程
pid_t id = fork();
if(id < 0)
{
perror("fork");
return;
}
// 3. 形成单向通信
if(id == 0)//子进程,工作进程
{
if(!old.empty())//将子进程继承父进程的多余读端全部关闭
{
for(auto rfd : old)
{
close(rfd);
}
}
close(pipefd[1]);
dup2(pipefd[0], 0);//将读端重定向到标准输入
work();//子进程执行任务
exit(0);
}
//父进程,主进程
close(pipefd[0]);
channels->push_back(channel(pipefd[1], id));//主进程对应写端和工作进程信息
old.push_back(pipefd[1]);
}
}
2.2 工作进程执行任务
当管道中有数据时,子进程只需读取相关任务码,然后不断执行即可!但如果此时父进程退出时,由于匿名管道的特性,read的返回值会被设为0,此时子进程在进行读取就没有任何意义了,子进程退出!!(检查任务码和执行任务码实现后续会统一分析)
void work()
{
while(1)
{
int code = 0;
ssize_t n = read(0, &code, sizeof(code));
if(n == 0)
{//主进程写端退出,子进程也退出
break;
}
else if(n > 0)
{
if(!init.CheckSafe(code))//检查任务码是否合法
continue;
init.RunTask(code);//执行任务码
}
else
{
//nothing
}
}
}
三、主进程向子进程发送任务
3.1 任务封装
下面我们仅仅通过一些打印数据来子进程待执行的所有模拟任务
【待执行任务】:
void Download()
{
std::cout << "我是一个下载任务" << std::endl;
}
void Printflog()
{
std::cout << "我是一个打印日志任务" << std::endl;
}
void PushVideoStream()
{
std::cout << "我是一个推送视频流任务" << std::endl;
}
【任务封装】:
我们通过包装器function
将上述指针函数进行统一。同时我们向管道中读取和写入的是任务码,所以下面我们给出了相应的任务码,并将上述如何通过vector
容器进行管理,下标对应任务码信息,并封装成了类!
除此之外,还提供选择任务接口(随机选择)、检查任务码是否合理、任务码转对应任务名、运行特定任务码对应任务等接口!
具体如下:
using task_t = std::function<void()>;
class Init
{
public:
//任务码
static const int g_Download_code = 0;
static const int g_Printflog_code = 1;
static const int g_PushVideoStream_code = 2;
std::vector<task_t> tasks;
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(Printflog);
tasks.push_back(PushVideoStream);
srand(time(nullptr));
}
int SelectTask()//选择任务接口
{
return rand() % tasks.size();
}
std::string CodeToName(int code)//任务码转对应任务名
{
switch(code)
{
case 0:
return "Download";
case 1:
return "Printflog";
case 2:
return "PushVideoStream";
default:
return "Nothing";
}
}
bool CheckSafe(int code)//检查任务码是否合理
{
return code >= 0 && code < tasks.size();
}
void RunTask(int code)//行特定任务码对应任务
{
tasks[code]();
}
};
Init init;
3.2 主进程向子进程发送任务
考虑到子进程完成任务的负载均衡,我们通过循环依次向每一个子进程发送任务码,较为平均的将任务分配给子进程!
向子进程发送任务码,首先要确定待发送的任务,信道。然后将任务码写入信道!
前面已经提到过,我们这里所设计的发送任务接口支持:死循环一直发送任务、发送指定次数任务后退出!所以最后我们需要判断是否发送任务需求结束,退出!具体如下:
void SendTask(const std::vector<channel> &channels, bool flag, int num)
{
int pos = 0;//所选择信道所在数组位置下标
while(true)
{
// 1. 选择任务
int commit = init.SelectTask();
if(init.CheckSafe(commit))
{
// 2. 选择信道, 发送任务码
channel c = channels[pos++];
pos %= channels.size();
write(c.ctrlfd, &commit, sizeof(commit));
}
//判断是否需要退出
if(!flag)
{
if(--num == 0)
break;
}
}
}
四、回收资源
当父进程发送任务信息退出后,我们仅需将写端关闭即可此时子进程执行read
的返回值为0。子进程此时就能识别到写端已经退出,此时子进程也退出!最后让父进程等待子进程,防止子进程僵尸导致内存泄漏即可!!
void ReleaseChannels(std::vector<channel> &channels)
{
for(auto & c : channels)
{
close(c.ctrlfd);
pid_t rid = waitpid(c.workerid, nullptr, 0);
if(rid == -1)
{
perror("waitpid");
return;
}
std::cout << "wait " << c.name << " " << c.workerid << " success" << std::endl;
}
}
五、所有源代码
emsp;到处为止,进程池的简单制作到处就结束了,下面是所有源码,其中还包含调试代码!
5.1源码
【ProcessPool.chh】
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
using task_t = std::function<void()>;
void Download()
{
std::cout << "我是一个下载任务" << std::endl;
}
void Printflog()
{
std::cout << "我是一个打印日志任务" << std::endl;
}
void PushVideoStream()
{
std::cout << "我是一个推送视频流任务" << std::endl;
}
class Init
{
public:
//任务码
static const int g_Download_code = 0;
static const int g_Printflog_code = 1;
static const int g_PushVideoStream_code = 2;
std::vector<task_t> tasks;
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(Printflog);
tasks.push_back(PushVideoStream);
srand(time(nullptr));
}
int SelectTask()
{
return rand() % tasks.size();
}
std::string CodeToName(int code)
{
switch(code)
{
case 0:
return "Download";
case 1:
return "Printflog";
case 2:
return "PushVideoStream";
default:
return "Nothing";
}
}
bool CheckSafe(int code)
{
return code >= 0 && code < tasks.size();
}
void RunTask(int code)
{
tasks[code]();
}
};
Init init;
【processPool.cpp】
#include <iostream>
#include <unistd.h>
#include <cassert>
#include <cerrno>
#include <cstdlib>
#include <string>
#include <vector>
#include <cstdio>
#include <sys/wait.h>
#include "ProcessPool.chh"
#define NUM 5//任务进程个数
int number = 1;//管道编号
class channel
{
public:
channel(int fd, pid_t id)
:ctrlfd(fd)
,workerid(id)
{
name = "channle-" + std::to_string(number++);
}
public:
int ctrlfd;
pid_t workerid;
std::string name;
};
void work()
{
while(1)
{
int code = 0;
ssize_t n = read(0, &code, sizeof(code));
if(n == 0)
{//主进程写端退出,子进程也退出
break;
}
else if(n > 0)
{
if(!init.CheckSafe(code))
continue;
init.RunTask(code);
}
else
{
//nothing
}
}
}
void CreaterChannel(std::vector<channel> *channels)
{
std::vector<int> old;//记录主进程的读端
for(int i = 0; i < NUM; i++)
{
// 1. 创建管道
int pipefd[2];
int n = pipe(pipefd);
assert(n == 0);
(void)n;//防止编译器报警
//2. 创建子进程
pid_t id = fork();
if(id < 0)
{
perror("fork");
return;
}
// 3. 形成单向通信
if(id == 0)//子进程,工作进程
{
if(!old.empty())//将子进程继承父进程多余读端全部关闭
{
for(auto rfd : old)
{
close(rfd);
}
for(auto id : old)
{
std::cout << "creater quit id:" << id << " " << std::endl;
}
}
close(pipefd[1]);
dup2(pipefd[0], 0);//将读端重定向到标准输入
work();
exit(0);
}
//父进程,主进程
close(pipefd[0]);
channels->push_back(channel(pipefd[1], id));
old.push_back(pipefd[1]);
}
}
//Debug
void PrintChannel(const std::vector<channel> &channels)
{
std::cout << channels.size() << std::endl;
for(int i = 0; i < channels.size(); i++)
{
std::cout << "name:" << channels[i].name << ", 写端描述符:" << channels[i].ctrlfd
<<", 工作进程id:" << channels[i].workerid << std::endl;
}
std::cout << "Creater success" << std::endl;
}
void SendTask(const std::vector<channel> &channels, bool flag, int num)
{
int pos = 0;//所选择信道所在数组位置下标
while(true)
{
// 1. 选择任务,返回任务码
int commit = init.SelectTask();
if(init.CheckSafe(commit))
{
// 2. 选择信道, 发送任务码
channel c = channels[pos++];
pos %= channels.size();
write(c.ctrlfd, &commit, sizeof(commit));
// Debug
std::cout << "select channel: " << c.name << ", send task:" << init.CodeToName(commit) << "[" << commit << "]"
<< ", workerid:" << c.workerid << std::endl;
}
//判断是否需要退出
if(!flag)
{
if(--num == 0)
break;
}
}
}
void ReleaseChannels(std::vector<channel> &channels)
{
for(auto & c : channels)
{
close(c.ctrlfd);
pid_t rid = waitpid(c.workerid, nullptr, 0);
if(rid == -1)
{
perror("waitpid");
return;
}
std::cout << "wait " << c.name << " " << c.workerid << " success" << std::endl;
}
}
int main()
{
std::vector<channel> channels;
//创建管道,创建进程
CreaterChannel(&channels);
std::cout << "Creater end" << std::endl;
PrintChannel(channels);
//主进程向工作进程发送任务
const bool g_always_loop = true;
SendTask(channels, !g_always_loop, 10);
// 回收资源: 关闭写端,子进程自动退出
//sleep(3);
ReleaseChannels(channels);
return 0;
}
5.2 运行结果(包含调试信息)
六、命名管道实现进程池
6.1 实现思路
命名管道实现进程池,我们可以创建多个命名管道,同时父进程打开写端,子进程打开读端!(注意此时创建的子进程还是会出现基础多余的读端文件描述符。所以子进程在打开命名管道读端前,同样需要先将多余的写端描述符关闭)
发生任务码,和子进程执行任务和匿名管道实现方式一样,就不多说了。
至于回收资源,那就更简单了。我们只需将命名管道写端描述符关闭,此时子进程会获取的读端退出信息(即read返回值为0),将管道读端也关闭!!最后父进程等待回收子进程,防止子进程僵尸即可!!
6.2 源码
【ProcessPool.cpp】
#include <iostream>
#include <unistd.h>
#include <cassert>
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <string>
#include <vector>
#include <cstdio>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "ProcessPool.chh"
#define NUM 5//任务进程个数
class fifo
{
public:
fifo(std::string str, pid_t id, int fd)
:FILENAME(str.c_str())
,workerid(id)
,ctrlfd(fd)
{ }
public:
const char *FILENAME;
pid_t workerid;
int ctrlfd;
};
int Work()
{
int code = 0;
while(true)
{
ssize_t n = read(0, &code, sizeof(code));
std::cout << "read size:" << n << std::endl;
if(n == 0)
{
std::cout << "write quit, mee to!" << std::endl;
break;
}
else if(n > 0)
{
init.RunTask(code);
}
else
{
std::cerr << "errno: " << errno << "errstring: " << strerror(errno) << std::endl;
return 1;
}
}
return 0;
}
bool Createrfifo(std::vector<fifo> *fifos)
{
std::vector<int> old;
for(int i = 1; i <= NUM; i++)
{
std::string filename = "fifo-" + std::to_string(i);
int s = mkfifo(filename.c_str(), 0644);
if(s == -1)
{
std::cout << "errno:" << errno << " strerror" << strerror(errno) << std::endl;
return false;
}
pid_t id = fork();
if(id == 0)//child
{
if(!old.empty())
{
for(auto& fd : old)
close(fd);
}
int rfd = open(filename.c_str(), O_RDONLY);
dup2(rfd, 0);
Work();
std::cout << "read close!!!!!!!!!!!!!!!!!!!!!" << std::endl;
close(rfd);
exit(0);
}
//parent
int wfd = open(filename.c_str(), O_WRONLY);
fifos->push_back(fifo(filename, id, wfd));
old.push_back(wfd);
}
return true;
}
void SendCommit(const std::vector<fifo> &fifos, int flag, int num)
{
int pos = 0;
while(true)
{
// 1. 选择任务
int code = init.SelectTask();
if(init.CheckSafe(code))
{
//2. 发送任务码
write(fifos[pos].ctrlfd, &code, sizeof(code));
pos = (pos + 1) % fifos.size();
}
// 3.判断是否退出
if(!flag)
{
if(--num == 0)
{
break;
}
}
}
std::cout << "write finish!!!" << std::endl;
}
void Releasefifo(std::vector<fifo> &fifos)
{
std::cout << "start release" << std::endl;
for(auto& fifo : fifos)
{
std::cout << "colse wfd: " << fifo.ctrlfd << std::endl;
close(fifo.ctrlfd);
pid_t id = waitpid(fifo.workerid, nullptr, 0);
}
std::cout << "release success!" << std::endl;
}
int main()
{
std::vector<fifo> fifos;
// 1.创建命名管道,进程
if(!Createrfifo(&fifos))
return 0;
sleep(2);
for(const auto& fifo : fifos)
std::cout << fifo.FILENAME <<":" << fifo.workerid << ":" << fifo.ctrlfd << std::endl;
// 2.发送任务
sleep(2);
bool g_always_loop = true;
SendCommit(fifos, !g_always_loop, 10);
// 3. 回收资源
sleep(2);
Releasefifo(fifos);
return 0;
}
【ProcessPool.chh】
#pragma once
#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
using task_t = std::function<void()>;
void Download()
{
std::cout << "我是一个下载任务" << std::endl;
}
void Printflog()
{
std::cout << "我是一个打印日志任务" << std::endl;
}
void PushVideoStream()
{
std::cout << "我是一个推送视频流任务" << std::endl;
}
class Init
{
public:
//任务码
static const int g_Download_code = 0;
static const int g_Printflog_code = 1;
static const int g_PushVideoStream_code = 2;
std::vector<task_t> tasks;
public:
Init()
{
tasks.push_back(Download);
tasks.push_back(Printflog);
tasks.push_back(PushVideoStream);
srand(time(nullptr));
}
int SelectTask()
{
return rand() % tasks.size();
}
std::string CodeToName(int code)
{
switch(code)
{
case 0:
return "Download";
case 1:
return "Printflog";
case 2:
return "PushVideoStream";
default:
return "Nothing";
}
}
bool CheckSafe(int code)
{
return code >= 0 && code < tasks.size();
}
void RunTask(int code)
{
tasks[code]();
}
};
Init init;