Linux:使用匿名管道对进程池的模拟实现

目录

一、Makefile

二、processpool.cc

2.1创建通信管道和子进程

 2.2控制子进程

 2.3回收进程

三、task.hpp

 四、完整代码


接下来我们将模拟实现一个进程池,进程池广泛应用与各个领域和方向,比如我们打开电脑后同时打开很多个进程(也就是软件),此时就要操作系统就要对进程进行管理,并且让每个进程去运行各自所对应的功能,今天我们无法实现如此庞大的功能,但可以通过模拟来熟悉和了解进程池是什么有什么作用以及其底层的运行原理。博主将通过匿名管道的方式,经过层层封装、层层调用来一步步实现一个小型的迷你进程池。

一、Makefile

processpool:processpool.cc
	g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
	rm -f processpool

二、processpool.cc

首先需要构建出我们整个进程池的主体框架结构和功能。我们生成.exe文件后通过./processpool来进行运行,此时我们可以通过在main函数中显示写命令行参数列表来进行传参从而控制创建进程的数量,argv中存放我们输入的命令,在输入时用空格作为分隔符可以通过./process num传参给main函数,num就是我们要创建的进程的个数,此时argc所对应的就是argv中元素的个数,我们判断符合要求后就去创建通信信道和子进程。

int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return UsageError;
    }
    int sub_process_num=stoi(argv[1]);
    if(sub_process_num<=0)
    return ArgError;
    srand((uint64_t)time(nullptr));

    //1.创建通信信道和子进程
    ProcessPool *processpool_ptr=new ProcessPool(sub_process_num);
    processpool_ptr->CreateProcess(worker);
    //2.控制子进程
    CtrlProcessPool(processpool_ptr,10);

    cout<<"task run done"<<endl;

    sleep(100);

    //3.回收进程

    delete processpool_ptr;

    return 0;
}

        管道由父进程创建并打开两个文件描述符,012都被占用的情况下,在第一次进行创建时pipe打开的fd是3和4,子进程继承父进程时会将文件描述符表也一并继承下来,父进程要写就必须关闭读端,子进程要读就需要关闭写端。

 

 可此时父进程在循环的进行创建管道和子进程, 而文件描述符fd的分配规则也是从前往后找有空位的地方依次往后进行分配,而刚刚父进程已经关闭了读端即fd为3所指向的文件,所以现在3号下标是空的,而父进程再次去创建下一个管道和进程时就会出现如下的情况:

pipe[0]即读端中的值依然是3 ,pipe[1]即写端中的值则是5,子进程依旧会继承父进程的两个读写端,然后依旧是父进程关闭读端,子进程关闭写端。

所以接下来继续创建就会反复出现子进程读端是3的情况。而父进程的写端则会依次4567创建下去。 同样的,子进程在继承父进程时会连同文件描述符表一起进行继承:同样,上一个管道的写端也会被新创建的子进程继承下来

此时的管道就不是真正意义上的单向管道了,随着父进程打开的文件描述符越来越多,就会导致后面创建的子进程就都会将之前打开的写端都继承下去。

而想要解决这种情况可以通过两种方式:

1、倒着关闭管道写端描述符,根据操作系统的规则,写端(父进程)没有被关闭时,读端(子进程)就会阻塞等待,而写端一旦被完全关闭,读端就会立马退出。而关闭写端时关闭的实际上时写端的引用计数,当引用计数为0时,写端才会被完全关闭。而之所以倒着关闭就是因为最后一个创建的channel的写端一定只有一个,此时倒着关闭写端的文件描述符后子进程就会退出,退出时连带着子进程的文件描述符表也会一并销毁,此时原本子进程中指向之前从父进程继承下来的其他管道的写端描述符的引用计数都会--,父进程依次倒着关闭写端,到第一个管道时,它就只剩下一个写端了。

2、在创建子进程时就关闭父进程曾经历史打开的写端描述符,这样每个子进程就只对应一个读端,每个管道也只对应一个写端。(博主用的是第二种方法)。

2.1创建通信管道和子进程

//1.创建通信信道和子进程
class ProcessPool
{
public:
    ProcessPool(int sub_process_num):_sub_process_num(sub_process_num)
    {}
    int CreateProcess(work_t work)//回调函数
    {
        vector<int> fds;
        for(int number=0;number<_sub_process_num;number++)
        {
            int pipefd[2]{0};
            int n=pipe(pipefd);
            if(n<0) return PipeError;

            pid_t id=fork();
            //子进程执行关闭写端,去读
            if(id==0)
            {
                //把父进程历史打开的写端关闭
                if(!fds.empty())
                {
                    cout<<"close w_fd:";
                    for(auto fd:fds) 
                    {
                        close(fd);
                        cout<<fd<<" ";
                    }
                    cout<<endl;
                }
                //child -> r
                close(pipefd[1]);
                //执行任务

                //为什么要dup2重定向:
                //原本从标准输入中读,变成从管道中读,
                //也就是此时当前子进程fd0中存放的就是当前管道的地址
                //之后通过访问标准输入就可以读到管道中的内容。
                dup2(pipefd[0],0);
                work(pipefd[0]);
                exit(0);
            }

            string cname="channel-"+to_string(number);
            //father 父进程执行
            close(pipefd[0]);
            channels.push_back(Channel(pipefd[1],id,cname));

            //让父进程此次创建的wfd保存,便于下次创建子进程继承时关闭之前的文件描述符
            fds.push_back(pipefd[1]);
        }
        return 0;
    }
    int NexChannel()
    {
        static int next=0;
        int c=next;
        next++;
        next%=channels.size();
        return c;
    }
    void SendTaskCode(int index,uint32_t code)
    {
        cout<<"send code: "<<code<<"to "<<channels[index].name()<<"sub process id: "<<channels[index].pid()<<endl;
        write(channels[index].wfd(),&code,sizeof(code));
    }

    //让所有子进程全部退出,只需要关闭所有的Channel w(写端)即可
    void KillAll()
    {
        for(auto &channel:channels)
        {
            channel.Close();
             pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
            cout<<channel.name()<<"close done"<<" sub process quit now: "<<channel.pid()<<endl;
            cout<<endl;
        }
    }
    void wait()
    {
        for(auto &channel:channels)
        {
            pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
        }
    }
    void Debug()
    {
        for(auto &channel:channels)
        {
            channel.PrintDebug();
        }
    }
    ~ProcessPool()
    {
    }
private:
    int _sub_process_num;
    vector<Channel> channels;
};

 2.2控制子进程

void CtrlProcessPool(ProcessPool* processpool_ptr,int cnt)
{
    while(cnt)
    {
        //a.选择一个进程和通道
        int channel=processpool_ptr->NexChannel();
        //b.选择一个任务
        uint32_t code=NextTask();
        //c.发送任务
        processpool_ptr->SendTaskCode(channel,code);

        sleep(1);
        cnt--;
    }
}

 2.3回收进程

 //让所有子进程全部退出,只需要关闭所有的Channel w(写端)即可
    void KillAll()
    {
        for(auto &channel:channels)
        {
            channel.Close();
            pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
            cout<<channel.name()<<"close done"<<" sub process quit now: "<<channel.pid()<<endl;
            cout<<endl;
        }
    }

三、task.hpp

在task.hpp中我们可以模拟一些我们需要执行的进程任务并对其进行封装处理,父进程负责通过向管道write向子进程发送需要执行的具体任务信息,子进程则通过read读到需要用到的进程和其需要执行的具体任务,前面的代码中我们已经把创建的子进程所对应的管道的读端重定向到0,所以worker在读取任务时直接从0中读取即可。

#pragma once
#include <iostream>
#include <unistd.h>

using namespace std;

typedef void(*work_t)(int);//函数指针类型
typedef void(*task_t)(int,pid_t);//函数指针类型

void PrintLog(int fd,pid_t pid)
{
    cout<<"sub process:"<<pid<<",fd: "<<fd<<",task is:printf log task"<<endl<<endl;
}

void ReloadConf(int fd,pid_t pid)
{
    cout<<"sub process:"<<pid<<",fd: "<<fd<<",task is:reload conf task"<<endl<<endl;
}

void ConnectMysql(int fd,pid_t pid)
{
    cout<<"sub process:"<<pid<<",fd: "<<fd<<",task is:connect mysql task"<<endl<<endl;
}

task_t task[3]={PrintLog,ReloadConf,ConnectMysql};

uint32_t NextTask()
{
    return rand()%3;
}

void worker(int fd)
{
    //从0中读取任务
    while(true)
    {
        uint32_t command_code=0;
        ssize_t n=read(0,&command_code,sizeof(command_code));
        if(n==sizeof(command_code))
        {
            if(command_code>=3) continue;
            task[command_code](fd,getpid());
        }
        else if(n==0)
        {
            cout<<"sub process: "<<getpid()<<" quit now.."<<endl;
            break;
        }
    }
}

 父进程可以随机的向特定管道写任务码,然后子进程再从各自相连接的管道中读取任务码,没有任务码时子进程就处于阻塞状态,有任务时就去读取并执行worker去执行我们提前构建好的任务列表中的特定任务。

 四、完整代码

#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <ctime>
#include <sys/wait.h>
#include "task.hpp"
using namespace std;
enum
{
    UsageError=1,
    ArgError,
    PipeError
};

void Usage(const string &proc)
{
    cout<<"Usage:"<<proc<<"subprocess-num"<<endl;
}


class Channel//保存管道的信息和写端fd
{
public:
    Channel(int wfd,pid_t sub_id,const string &name)
    :_wfd(wfd),_sub_process_id(sub_id),_name(name)
    {}
    void PrintDebug()
    {
        cout<<"_wfd:"<<_wfd;
        cout<<",_sub_process_id:"<<_sub_process_id;
        cout<<",_name:"<<_name<<endl;
    }
    string name(){return _name;}
    int wfd(){return _wfd;}
    pid_t pid(){return _sub_process_id;}
    void Close(){ close(_wfd); }
    ~Channel()
    {}
private:
    int _wfd;
    pid_t _sub_process_id;
    string _name;
};
//1.创建通信信道和子进程
class ProcessPool
{
public:
    ProcessPool(int sub_process_num):_sub_process_num(sub_process_num)
    {}
    int CreateProcess(work_t work)//回调函数
    {
        vector<int> fds;
        for(int number=0;number<_sub_process_num;number++)
        {
            int pipefd[2]{0};
            int n=pipe(pipefd);
            if(n<0) return PipeError;

            pid_t id=fork();
            //子进程执行关闭写端,去读
            if(id==0)
            {
                //把父进程历史打开的写端关闭
                if(!fds.empty())
                {
                    cout<<"close w_fd:";
                    for(auto fd:fds) 
                    {
                        close(fd);
                        cout<<fd<<" ";
                    }
                    cout<<endl;
                }
                //child -> r
                close(pipefd[1]);
                //执行任务

                //为什么要dup2重定向:
                //原本从标准输入中读,变成从管道中读,
                //也就是此时当前子进程fd0中存放的就是当前管道的地址
                //之后通过访问标准输入就可以读到管道中的内容。
                dup2(pipefd[0],0);
                work(pipefd[0]);
                exit(0);
            }

            string cname="channel-"+to_string(number);
            //father 父进程执行
            close(pipefd[0]);
            channels.push_back(Channel(pipefd[1],id,cname));

            //让父进程此次创建的wfd保存,便于下次创建子进程继承时关闭之前的文件描述符
            fds.push_back(pipefd[1]);
        }
        return 0;
    }
    int NexChannel()
    {
        static int next=0;
        int c=next;
        next++;
        next%=channels.size();
        return c;
    }
    void SendTaskCode(int index,uint32_t code)
    {
        cout<<"send code: "<<code<<"to "<<channels[index].name()<<"sub process id: "<<channels[index].pid()<<endl;
        write(channels[index].wfd(),&code,sizeof(code));
    }

    //让所有子进程全部退出,只需要关闭所有的Channel w(写端)即可
    void KillAll()
    {
        for(auto &channel:channels)
        {
            channel.Close();
            pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
            cout<<channel.name()<<"close done"<<" sub process quit now: "<<channel.pid()<<endl;
            cout<<endl;
        }
    }
    void wait()
    {
        for(auto &channel:channels)
        {
            pid_t pid=channel.pid();
            pid_t rid=waitpid(pid,nullptr,0);
            if(rid==pid)
            {
                cout<<"wait sub process: "<<pid<<" success..."<<endl;
            }
        }
    }
    void Debug()
    {
        for(auto &channel:channels)
        {
            channel.PrintDebug();
        }
    }
    ~ProcessPool()
    {
    }
private:
    int _sub_process_num;
    vector<Channel> channels;
};
void CtrlProcessPool(ProcessPool* processpool_ptr,int cnt)
{
    while(cnt)
    {
        //a.选择一个进程和通道
        int channel=processpool_ptr->NexChannel();
        //b.选择一个任务
        uint32_t code=NextTask();
        //c.发送任务
        processpool_ptr->SendTaskCode(channel,code);

        sleep(1);
        cnt--;
    }
}
int main(int argc,char* argv[])
{
    if(argc!=2)
    {
        Usage(argv[0]);
        return UsageError;
    }
    int sub_process_num=stoi(argv[1]);
    if(sub_process_num<=0)
    return ArgError;
    srand((uint64_t)time(nullptr));

    //1.创建通信信道和子进程
    ProcessPool *processpool_ptr=new ProcessPool(sub_process_num);
    processpool_ptr->CreateProcess(worker);
    //2.控制子进程
    CtrlProcessPool(processpool_ptr,10);

    cout<<"task run done"<<endl;

    //3.回收进程
    processpool_ptr->KillAll();

    //防止僵尸出现,需要父进程对其进行资源回收
    //processpool_ptr->wait();

    
    delete processpool_ptr;

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/587575.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot对接前端传递的base64编码的图片信息,转成图片以Get请求进行浏览器文件下载,不下载到本地。

一、问题描述 1.1需求描述。 前端将浏览器展示的图片以base64编码的形式传递给后端&#xff0c;以此实现文件下载的功能&#xff0c;在浏览器弹出文件下载框。效果如下 1.2实现思路 将前端传递的base64进行解码&#xff0c;设置响应头返回响应体&#xff0c;代码如下。 pu…

如何安全的使用密码登录账号(在不知道密码的情况下)

首先&#xff0c;需要用到的这个工具&#xff1a; 度娘网盘 提取码&#xff1a;qwu2 蓝奏云 提取码&#xff1a;2r1z 1、打开工具&#xff0c;进入账号密码模块&#xff0c;如图 2、看到鼠标移动到密码那一栏有提示&#xff0c;按住Ctrl或者Alt点击或者双击就能复制内容&…

【大前端】ECharts 绘制立体柱状图

立体柱状图分为&#xff1a; 纯色立体柱状图渐变立体柱状图 常用实现方式 纯色立体柱状图 纯色立体柱状图&#xff0c;使用MarkPoint和颜色渐变就实现&#xff0c;如下代码 import * as echarts from "echarts";var chartDom document.getElementById("main&…

Window(Qt/Vs)软件添加版本信息

Window&#xff08;Qt/Vs&#xff09;软件添加版本信息 文章目录 Window&#xff08;Qt/Vs&#xff09;软件添加版本信息VS添加版本信息添加资源文件添加版本定义头自动更新版本添加批处理脚本设置生成事件 Qt添加版本信息添加资源文件文件信息修改自动更新版本 CMake添加版本信…

rust疑难杂症

rust疑难杂症解决 边碰到边记录&#xff0c;后续可能会逐步增加&#xff0c;备查 cargo build时碰到 Blocking waiting for file lock on package cache 原因是Cargo 无法获取对包缓存的文件锁&#xff0c; 有时vscode中项目比较多&#xff0c;如果其中某些库应用有问题&…

环形链表的经典问题

环形链表 环形链表的介绍链表中是否带环返回链表开始入环的第一个节点 本文主要介绍如何判断一个链表是否是环形链表&#xff0c;以及如何得到环形链表中的第一个节点。 环形链表的介绍 环形链表是一种链表数据结构&#xff0c;环形链表是某个节点的next指针指向前面的节点或指…

【linux学习指南】linux 环境搭建

文章目录 &#x1f4dd;前言&#x1f320; 云服务器的选择&#x1f320;阿里云&#x1f320;腾讯云&#x1f320;华为云 &#x1f320;使用 XShell 远程登陆到 Linux&#x1f309;下载 XShell &#x1f320;查看 Linux 主机 ip&#x1f309; XShell 下的复制粘贴&#x1f309; …

大数据信用花了,一般多久能正常?

在当今数字化时代&#xff0c;大数据技术被广泛应用于各个领域&#xff0c;包括金融、电商、社交等。然而&#xff0c;随着大数据技术的普及&#xff0c;个人信用问题也日益凸显&#xff0c;其中“大数据信用花”现象尤为引人关注。那么&#xff0c;大数据信用花究竟是什么?一…

【linuxC语言】exec函数族

文章目录 前言一、exec函数族二、示例代码2.1 代码12.2 代码22.3 代码3 总结 前言 在Linux环境下&#xff0c;C语言提供了一组强大的函数族&#xff0c;即exec函数族&#xff0c;用于执行其他程序。这些函数允许程序在运行时加载并执行不同的程序&#xff0c;从而实现了程序之…

Android(Java)项目支持Kotlin语言开发

Android&#xff08;Java&#xff09;项目通过相关Kotlin设置后&#xff0c;允许同时使用Java语言和Kotlin语言进行开发代码的。 示例环境&#xff1a; Android Studio Giraffe | 2022.3.1 Patch 3 Java 8 Kotlin 1.9.20 设置Kotlin选项&#xff1a; 第一步&#xff1a;在项…

AI大模型探索之路-训练篇9:大语言模型Transformer库-Pipeline组件实践

系列篇章&#x1f4a5; AI大模型探索之路-训练篇1&#xff1a;大语言模型微调基础认知 AI大模型探索之路-训练篇2&#xff1a;大语言模型预训练基础认知 AI大模型探索之路-训练篇3&#xff1a;大语言模型全景解读 AI大模型探索之路-训练篇4&#xff1a;大语言模型训练数据集概…

Sentinel 控制台学习

引言 上篇文章已经讲过 SpringCloud Sentinel集成到微服务项目中&#xff0c;接下来我们继续学习怎么使用sentinel控制台对微服务进行限流&#xff0c;熔断&#xff0c;降级等一系列操作。 控制台 接下来我们单独讲解每一个菜单按钮 实时监控 实时监控&#xff1a; 可以看到…

必应广告投放怎么做?怎么开户推广?

今天搜索引擎广告依旧是企业提升品牌知名度、吸引潜在客户的关键渠道之一&#xff0c;必应Bing&#xff0c;作为全球第二大搜索引擎&#xff0c;不仅拥有庞大的用户基础&#xff0c;更以其精准的定向能力和高效的转化效率&#xff0c;成为众多企业拓展市场的优选平台。 一、必…

【Java探索之旅】包管理精粹 Java中包的概念与实践

文章目录 &#x1f4d1;前言一、封装1.1 封装的概念1.2 访问限定修饰符 二、封装扩展&#xff08;包&#xff09;2.1 包的概念2.2 带入包中的类2.3 自定义包2.4 常见的包 &#x1f324;️全篇总结 &#x1f4d1;前言 在Java编程中&#xff0c;封装是面向对象编程的核心概念之一…

PotatoPie 4.0 实验教程(32) —— FPGA实现摄像头图像浮雕效果

什么是浮雕效果&#xff1f; 浮雕效果是一种图像处理技术&#xff0c;用于将图像转换为看起来像浮雕一样的效果&#xff0c;给人一种凸起或凹陷的立体感觉&#xff0c;下面第二张图就是图像处理实现浮雕效果。 不过这个图是用Adobe公司的PS人工P图实现的&#xff0c;效果比较…

【R语言数据分析】数据类型与数据结构

R的数据类型有数值型num&#xff0c;字符型chr&#xff0c;逻辑型logi等等。 R最常处理的数据结构是&#xff1a;向量&#xff0c;数据框&#xff0c;矩阵&#xff0c;列表。 向量有数值型向量&#xff0c;字符型向量&#xff0c;逻辑型向量等&#xff0c;字符型向量就是反应…

二维码门楼牌管理应用平台建设:实现用户权限的高效管理

文章目录 前言一、用户权限管理的重要性二、用户管理中心的构建三、用户权限管理的实施策略四、用户权限管理的挑战与应对五、结语 前言 随着信息技术的飞速发展&#xff0c;二维码门楼牌管理应用平台已成为城市管理的重要组成部分。本文将深入探讨如何通过用户权限管理&#…

基于SpringBoot+Vue外卖系统设计和实现(源码+LW+部署讲解)

&#x1f339;作者简介&#xff1a;✌全网粉丝10W&#xff0c;csdn特邀作者、博客专家、Java领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战&#xff0c;高校老师/讲师/同行前辈交流✌ &#x1f339; 主要…

STM32(c语言基础)

1.硬件部分&#xff1a;按键&#xff0c;传感器 传感器模块&#xff1a;光敏电阻&#xff0c;热敏电阻&#xff0c;红外接收管 光敏电阻&#xff1a;光线越强&#xff0c;光敏电阻的阻值就越小&#xff1b; 热敏电阻&#xff1a;温度越高&#xff0c;热敏电阻的阻值越小&…

Vitis HLS 学习笔记--AXI4 主接口

目录 1. 简介 2. 认识MAXI 3. MAXI突发操作 3.1 全局/本地存储器 3.2 MAXI优势与特点 3.3 查看MAXI报告 3.3.1 HW Interfaces 3.3.2 M_AXI Burst Information 3.4 MAXI 资源消耗 4. 理解 Volatile 4.1 标准C/C中的 volatile 4.2 HLS 中的 volatile 5. 总结 1. 简介…