进程间的通信–匿名管道
进程池的概念
进程池是一种将多个进程组织起来以执行特定任务的机制。它由多个预先创建好的资源进程和一个管理进程组成。这些资源进程被管理进程负责分配和调度,用于处理任务。
当有新的任务提交时,管理进程会从进程池中取出一个资源进程,将任务分发给它来处理。任务完成后,资源进程并不会被关闭,而是被放回进程池中,等待处理其他的任务。这样可以避免频繁地创建和销毁进程,节省了时间和资源。
进程池的主要优势包括减少操作系统的调度难度,节省创建和销毁进程的时间,并在一定程度上实现并发效果。通过有效地管理资源进程和任务分配,进程池可以提高系统的整体性能和效率。
下面我们来模拟实现进程池,来加深对进程池的理解。
模拟场景
通过主程序的创建,当做管理进程;然后再创建几个子进程作为资源进程,在主函数模拟n个任务来临时,轮盘调用可用的资源进程。
代码
task.hpp
:这是一个任务头文件,包含了有关任务的模拟,任务的处理,任务的派发的任务有关函数。
#pragma once
#include<iostream>
#include<unistd.h>
using namespace std;
typedef void(*work_t)(int);
typedef void(*task_t)(int,pid_t);
void PrintLog(int fd,pid_t pid)
{
cout << "sub process: " << pid << ", fd: " << fd<< ", task is : printf log task" << endl;
}
void ReloadConf(int fd,pid_t pid)
{
cout << "sub process: " << pid << ", fd: " << fd<< ", task is : reload conf task" << endl;
}
void ConnectMysql(int fd,pid_t pid)
{
cout << "sub process: "<< pid <<", fd: "<<fd<< ",task is: connact mysql task"<<endl;
}
task_t task[3]={PrintLog,ReloadConf,ConnectMysql};
uint32_t NextTask()
{
return rand()%3;
}
void worker(int fd)
{
while(true)
{
uint32_t commond_code=0;
ssize_t n=read(0,&commond_code,sizeof(commond_code));
if(n==sizeof(commond_code))
{
if(commond_code >=3)continue;
task[commond_code](fd,getpid());
}
else if(n==0)
{
cout<<"get sub_process:"<<getpid()<<" quit now:"<<endl;
break;
}
cout<<"I am worker : "<<getpid()<<endl;
sleep(1);
}
}
processpool.cpp
: 这里面包含了进程池(创建资源进程、发送任务码、回收资源进程、资源进程的管理)、进程池的控制(进程通道的选择、任务的选择、任务的选择)、进程间的通道(包含父进程的文件标识符、子进程的进程ID、通道的名称)以及主函数;
#include<iostream>
#include<string>
#include<cstdlib>
#include<unistd.h>
#include<vector>
#include<ctime>
#include<sys/wait.h>
#include"task.hpp"
using namespace std;
enum
{
UsageError=1,
ArgError,
PipeError
};
void Usage(const std::string &proc)
{
cout<<"Usage: "<<proc<<"subprocess-num"<< endl;
}
class channel
{
public:
channel(int wfd,pid_t sub_id,const std::string& name)
:_wfd(wfd),
_sub_process_id(sub_id),
_name(name)
{}
void PrintDebug()
{
cout<<"_wfd:"<<_wfd<<" ";
cout<<"_sub_process_id:"<<_sub_process_id<<" ";
cout<<"_name:"<<_name<<" ";
cout<<endl;
}
string name(){return _name;}
int wfd(){ return _wfd;}
pid_t pid() { return _sub_process_id;}
void Close() { close(_wfd); }
~channel()
{}
private:
int _wfd;
pid_t _sub_process_id;
string _name;
};
class ProcessPool
{
public:
//构造函数初始化
ProcessPool(int sub_process_num)
:_sub_process_num(sub_process_num)
{}
int CreateProcess(work_t work)
{
vector<int> fds;
for(int number=0;number<_sub_process_num;number++)
{
int pipefd[2]{0};
int n=pipe(pipefd);
if(n<0)
{
return PipeError;
}
pid_t id=fork();
if(id==0)
{
if(!fds.empty())
{
cout<<"close w fd:";
for(auto fd:fds)
{
close(fd);
cout<<fd<<" ";
}
cout<<endl;
}
//child
close(pipefd[1]);
//执行任务
dup2(pipefd[0],0);
work(pipefd[0]);
exit(0);
}
string cname="channel-"+ to_string(number);
close(pipefd[0]);
//保存对应的子进程通道信息
_channels.push_back(channel(pipefd[1],id,cname));
//将父进程的fd进行保存
fds.push_back(pipefd[1]);
}
return 0;
}
int NextChannel()
{
static int next =0;
int c=next;
next++;
next%=_channels.size();
return c;
}
void SendTaskCode(int index,uint32_t code)
{
cout<<"send code: "<<code<<" to "<<_channels[index].name()<<" sub process id: "<<_channels[index].pid()<<endl;
write(_channels[index].wfd(),&code,sizeof(code));
}
void KillAll()
{
for(auto& C : _channels)
{
C.Close();
pid_t pid=C.pid();//获取对应子进程id
//waitpid()成功返回子进程的ID,失败返回-1;
pid_t rid=waitpid(pid,nullptr,0);
if(rid==pid)
{
cout<<"wait sub_process: "<<pid<<"sucess..."<<endl;
}
cout<<C.name()<<" close done"<<" sub process quit now"<<C.pid()<<endl;
}
}
void Debug()
{
for(auto& channel: _channels)
{
channel.PrintDebug();
}
}
~ProcessPool()
{}
private:
int _sub_process_num;
vector<channel> _channels;
};
void CtrlProcessPool(ProcessPool* processpool_ptr,int cnt)
{
while(cnt)
{
//选择一个进程和通道
int channel=processpool_ptr->NextChannel();
//选择一个任务
uint32_t code=NextTask();
//发送任务
processpool_ptr->SendTaskCode(channel,code);
sleep(1);
cnt--;
}
}
//,/processpool 5
int main(int argc,char* argv[])
{
if(argc!=2)
{
Usage(argv[0]);
return UsageError;
}
int sub_process_num=std::stoi(argv[1]);
if(sub_process_num <=0) return ArgError;
srand((uint64_t)time(nullptr));
//创建通信和子进程
ProcessPool *processpool_ptr=new ProcessPool(sub_process_num);
processpool_ptr->CreateProcess(worker);
processpool_ptr->Debug();
//控制子进程
CtrlProcessPool(processpool_ptr,sub_process_num);
cout<<"task run done"<<endl;
//sleep(100);
//回收子进程
processpool_ptr->KillAll();
delete processpool_ptr;
return 0;
}
讲解
如果匿名管道有多个写端,情况会比较复杂。匿名管道的特性决定了它只能实现一对一的通信,即一个写端对应一个读端。如果多个写端同时写入数据,可能会导致以下问题:
- 1.写入顺序混乱:由于多个写端并发写入,写入的顺序可能会混乱,导致数据的顺序不可预测。
- 2.写入内容被覆盖:多个写端同时写入时,如果没有采取合适的同步机制,可能会发生数据覆盖的情况,即后面的写入会覆盖前面的写入结果。
- 3.数据丢失:如果某个写端写入速度快于其他写端,读端可能无法及时消费所有写入的数据,导致数据丢失。
为了避免这些问题,通常需要在使用匿名管道时进行适当的同步操作,例如使用互斥锁、条件变量等机制来保证多个写端之间的互斥访问和顺序执行。