匿名管道的应用--进程池/C++实现
当系统中需要处理众多任务时,可以将这些任务分配给多个子进程来分担工作。然而,频繁地创建和销毁进程会导致较高的时间成本。为减少这种开销,可以采取预先创建一组子进程的策略(以避免在任务分配时进行即时创建),并在子进程空闲时让它们处于阻塞状态(以防止子进程完成任务后被立即终止,从而在下一次任务分配时无需重新创建进程)。
为了实现这一目标,我们需要使用匿名管道这一技术。通过在父进程与子进程之间创建匿名管道,我们可以实现父进程向特定的子进程发送任务指令。
但需要注意的是,父进程不应该集中大量的将某个任务派发给同一个子进程,让其它进程处于空闲状态,这样会使得整机效率低下。我们可以通过
轮询或者随机派发的方式给子进程派发任务,实现负载均衡
我们要实现子进程对不同任务的管理,怎么做到呢??先描述再组织
既然随机派发任务,首先要有任务
#pragma once
#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#define TaskNum 3
typedef void (*task_t)(); // task_t 函数指针类型
void Print()
{
std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
std::cout << "I am a download task" << std::endl;
}
void Flush()
{
std::cout << "I am a flush task" << std::endl;
}
task_t tasks[TaskNum];
void LoadTask()
{
srand(time(nullptr) ^ getpid() ^ 17777);
tasks[0] = Print;
tasks[1] = DownLoad;
tasks[2] = Flush;
}
void ExcuteTask(int number)
{
if (number < 0 || number > 2)
return;
tasks[number]();
}
int SelectTask()
{
return rand() % TaskNum;
}
void work()
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command));
if (n == sizeof(int))
{
std::cout << "pid is : " << getpid() << " handler task" << std::endl;
ExcuteTask(command);
}
else if (n == 0)
{
std::cout << "sub process : " << getpid() << " quit" << std::endl;
break;
}
}
}
先描述
class Channel
{
public:
Channel(int wfd, pid_t id, const std::string &name)
: _wfd(wfd), _subprocessid(id), _name(name)
{
}
int GetWfd() { return _wfd; }
pid_t GetProcessId() { return _subprocessid; }
std::string GetName() { return _name; }
void CloseChannel()
{
close(_wfd);
}
void Wait()
{
pid_t rid = waitpid(_subprocessid, nullptr, 0);
if (rid > 0)
{
std::cout << "wait " << rid << " success" << std::endl;
}
}
~Channel()
{
}
private:
int _wfd;
pid_t _subprocessid;
std::string _name;
};
创建信道和子进程
// 形参类型和命名规范
// const &: 输出
// & : 输入输出型参数
// * : 输出型参数
// task_t task: 回调函数
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
for (int i = 0; i < num; i++)
{
// 1.创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0)
exit(1);
// 2.创建子进程
pid_t id = fork();
if (id == 0)
{
// child - read
close(pipefd[1]);
dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入
task(); // 执行任务
close(pipefd[0]);
exit(0);
}
// 3.构建一个channel的名称
std::string channel_name = "Channel-" + std::to_string(i);
// 父进程 -- write
close(pipefd[0]);
// a. 子进程的pid b. 父进程关心的管道的w端
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
通过channel控制子进程
void ctrlProcessOnce(std::vector<Channel> &channels)
{
sleep(1);
// a.选择一个任务
int taskcommand = SelectTask();
// b.选择一个信道和进程
int channel_index = NextChannel(channels.size());
// c.发送任务
SendTaskCommand(channels[channel_index], taskcommand);
std::cout<< std::endl;
std::cout << "taskcommand: " << taskcommand << " channel: "
<< channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{
if (times > 0)
{
while (times--)
{
ctrlProcessOnce(channels);
}
}
else
{
while (true)
{
ctrlProcessOnce(channels);
}
}
}
回收管道和子进程--->关闭所有的写端、回收子进程
void CleanUpChannel(std::vector<Channel> &channels)
{
for (auto &channel : channels)
{
channel.CloseChannel();
}
// 注意
for (auto &channel : channels)
{
channel.Wait();
}
}
设计主函数
// ./processpool 5
int main(int argc, char *argv[])
{
if (argc != 2)
{
std::cerr << "Usage: " << argv[0] << " processnum " << std::endl;
}
int num = std::stoi(argv[1]);
// 加载任务
LoadTask();
// 再组织
std::vector<Channel> channels;
// 1. 创建信道和子进程
CreateChannelAndSub(num, &channels, work1);
// 2. 通过channel控制子进程
ctrlProcess(channels, num);
// 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程
CleanUpChannel(channels);
return 0;
}
测试结果
测试成功!但是我们发现在回收管道和子进程
的过程中我们是先把所有的管道关闭结束后,再进行等待;那我们能不能关闭一个等待一个呢?
void CleanUpChannel(std::vector<Channel> &channels)
{
for (auto &channel : channels)
{
channel.CloseChannel();
channel.Wait();
}
}
原理看下图:
随着子进程越来越多,那么前面管道的写端就会越来越多;
所以,如果我们关闭一个文件描述符,仅仅只是关闭了父进程的一个,但子进程继承的写端都没有关闭;所以此时这种情况,不能关一个,退一个;在管道内有一个引用计数属性,只要引用计数不为0,不会真正关闭管道,这样子进程也不会真正退出,进程就阻塞了;
-
为什么分开关闭,先关闭完,再等待就可以了?
因为我们关闭是从上往下的,最后一个管道先释放,最后一个管道释放了,那么它曾经对应指向上一个写端也就自动关闭了,类似于递归,从上往下关,然后从下往上不断读到0的
-
所以我们也有了新的方式关闭,我们倒着关闭就可以了
-
void CleanUpChannel(std::vector<Channel> &channels) { int num = channels.size() -1; while(num >= 0) { channels[num].CloseChannel(); channels[num--].Wait(); } }
-
我就想一个一个关闭呢?
-
因为我们是在创建子进程的时候出现了问题,所以我们要修改一下创建子进程的逻辑
因为第二次创建管道开始,第一个管道就会多出写端,因此我们只需要在第二次创建时作出修改即可;
因为第一次创建后已经被push_back了,所以在第二次创建的时候,可以把第一次的关闭了;同样后面创建依次如此
-
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task) { // BUG? --> fix bug for (int i = 0; i < num; i++) { // 1. 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) exit(1); // 2. 创建子进程 pid_t id = fork(); if (id == 0) { if (!channels->empty()) { // 第二次之后,开始创建的管道 for(auto &channel : *channels) channel.CloseChannel(); } // child - read close(pipefd[1]); dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入 task(); close(pipefd[0]); exit(0); } // 3.构建一个channel名称 std::string channel_name = "Channel-" + std::to_string(i); // 父进程 close(pipefd[0]); // a. 子进程的pid b. 父进程关心的管道的w端 channels->push_back(Channel(pipefd[1], id, channel_name)); } }
命名管道的应用——使用Client&Server通信
namedPipe.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cerrno>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
const std::string comm_path = "./myfifo";
#define DefaultFd -1
#define Creater 1
#define User 2
#define Read O_RDONLY
#define Write O_WRONLY
#define BaseSize 4096
class NamePiped
{
private:
const std::string _fifo_path;
int _id;
int _fd;
bool OpenNamedPipe(int mode)
{
_fd = open(_fifo_path.c_str(), mode);
if (_fd < 0)
return false;
return true;
}
public:
NamePiped(const std::string &path, int who)
: _fifo_path(path), _id(who), _fd(DefaultFd)
{
if (_id == Creater)
{
int res = mkfifo(_fifo_path.c_str(), 0666);
if (res != 0)
{
perror("mkfifo");
}
std::cout << "creater create named pipe" << std::endl;
}
}
bool OpenForRead()
{
return OpenNamedPipe(Read);
}
bool OpenForWrite()
{
return OpenNamedPipe(Write);
}
int ReadNamedPipe(std::string *out)
{
char buffer[BaseSize];
int n = read(_fd, buffer, sizeof(buffer));
if (n > 0)
{
buffer[n] = 0;
*out = buffer;
}
return n;
}
int WriteNamedPipe(const std::string &in)
{
return write(_fd, in.c_str(), in.size());
}
~NamePiped()
{
if (_id == Creater)
{
int res = unlink(_fifo_path.c_str());
if (res != 0)
{
perror("unlink");
}
std::cout << "creater free named pipe" << std::endl;
}
if (_fd != DefaultFd)
close(_fd);
}
};
-
client.cc
#include "namedPipe.hpp"
// write
int main()
{
NamePiped fifo(comm_path, User);
if (fifo.OpenForWrite())
{
std::cout << "client open namd pipe done" << std::endl;
while (true)
{
std::cout << "Please Enter> ";
std::string message;
std::getline(std::cin, message);
fifo.WriteNamedPipe(message);
}
}
return 0;
}
server.cc
#include "namedPipe.hpp"
// server read: 管理命名管道的整个生命周期
int main()
{
NamePiped fifo(comm_path, Creater);
// 对于读端而言,如果我们打开文件,但是写还没来,我会阻塞在open调用中,直到对方打开
// 进程同步
if (fifo.OpenForRead())
{
std::cout << "server open named pipe done" << std::endl;
sleep(3);
while (true)
{
std::string message;
int n = fifo.ReadNamedPipe(&message);
if (n > 0)
{
std::cout << "Client Say> " << message << std::endl;
}
else if(n == 0)
{
std::cout << "Client quit, Server Too!" << std::endl;
break;
}
else
{
std::cout << "fifo.ReadNamedPipe Error" << std::endl;
break;
}
}
}
return 0;
}