简易进程池的实现

什么是进程池?

        进程池(Process Pool)是一种用于管理和复用多个进程的技术或设计模式。在进程池中,一定数量的进程会被预先创建并保持在内存中,以便在需要时立即使用,而不是每次需要进程时都重新创建新的进程,这样可以提高系统的性能和效率。

        进程池通常用于需要频繁创建和销毁进程的场景,例如网络服务器等。通过预先创建一些进程并保持它们处于空闲状态,可以避免频繁创建和销毁进程所带来的开销,并且可以更好地控制同时进行的进程数量,以避免系统资源被耗尽。

        一般来说,进程池包括以下几个基本组件:
1. 进程池管理器(Process Pool Manager):负责创建、管理和维护进程池中的进程,包括池中进程的初始化、分配和回收等操作。
2. 进程队列(Process Queue):用于存放空闲进程的队列,当有任务需要处理时,可以从队列中取出一个空闲进程进行任务处理。
3. 任务队列(Task Queue):用于存放需要处理的任务,当一个进程空闲时,可以从任务队列中取出一个任务进行处理。
4. 进程间通信机制:用于进程之间的通信,例如管道、共享内存、消息队列等。

通过合理设计和使用进程池,可以提高系统的并发处理能力,降低系统资源消耗,同时也便于监控和管理进程。

以下是我们的简易进程池的框架。 

 

因此在本次项目中,在面向对象思想的指导下,我们需要创建一个进程池,管理进程的相关操作。

思路 

我们在写之前,首先需要明确项目的功能是什么?都需要实现哪些模块?

将大框架搭建好之后,逐步填充细节。

首先我们明确需要实现的功能是:一个父进程开辟进程池中的多个子进程,然后向子进程发送任务信息,由子进程执行任务。

实现的模块:进程池(包含子进程的创建,子进程的执行任务板块,子进程的销毁),任务模块,父进程控制块。

我们可以将进程封装为一个小类,再用进程池封装进程的类。利用匿名管道的特性实现父子进程间通信。 

需要注意进程与任务间的负载均衡。

一个超级大Bug 

我们知道,子进程是会继承父进程的文件信息列表的,因此当父进程以写端打开管道,其后创建的子进程将会继承当前父进程的所有wfd与rfd,但由于父进程rfd个数为0,但wfd会叠加,因此最后一个子进程将会继承前面父进程的所有wfd。也就是说,后面创建的进程,会保存前面创建的管道的写文件描述符。 因此倘若我们按从前往后的顺序关闭父进程写端同时进行wait等待,是没有结果的。

我们的解决方法是每创建一个子进程,都会关闭其从父进程那里继承来的所有写文件描述符。

当然也有别的办法,1.从后往前关闭管道,最后的管道只有父进程一个写端。

2. 将所有的写端全部结束之后再进行wait等待。

源码 

task.hpp 

任务模块,其内包含任务列表,与工作过程 

#pragma once
#include<iostream>
#include<unistd.h>
using namespace std;

typedef void (*work_t)(int);//函数指针类型
typedef void(*task_t)();

void task1()
{
    cout<<"i'm task11111, hello people! from sub process: "<<getpid()<<endl;
}

void task2()
{
    cout<<"i'm task22222, hello people! from sub process: "<<getpid()<<endl;
}

void task3()
{
    cout<<"i'm task33333, hello people! from sub process: "<<getpid()<<endl;
}

//任务的函数指针数组,存储任务列表
task_t taskarray[]={task1,task2,task3};

//寻找下一个派发的任务
int NextTask()
{
    //随机抽取任务
    return rand()%3;
}

//工作过程
void worker(int rfd)
{
    while(true)
    {
        sleep(1);
         int taskcode=0;
        //接收任务码
        int n=read(rfd,&taskcode,sizeof(taskcode));
        //当可以读取到任务信息执行任务
        if(n==sizeof(taskcode))
        {
            //执行任务
            taskarray[taskcode]();
               cout<<"task success excute... processid: "<<getpid()<<endl<<endl;
        }
        else//读取不到任务信息即退出
        {
            cout<<"no task can excute,exit...  processid: "<<getpid()<<endl<<endl;
            break;
        }
    }
   

}

processpool.cc 

完成进程池的创建,销毁与回收等待,同时保证任务的正确执行与退出。 

#include<iostream>
#include<string>
#include<vector>
#include<sys/types.h>
#include<sys/wait.h>
#include<unistd.h>
#include"task.hpp"
using namespace std;

//管理管道属性
class channel
{
public:
    channel(size_t wfd,size_t pid,string name)
    :_wfd(wfd)
    ,_process_id(pid)
    ,_channel_name(name)
    {}
    size_t wfd()
    {
        return _wfd;
    }
    size_t pid()
    {
        return _process_id;
    }
    string name()
    {
        return _channel_name;
    }
    void Close()
    {
        close(_wfd);
    }
    ~channel(){}
   
private:
    size_t _wfd;
    size_t _process_id;
    string _channel_name;
};

//管理进程池
class processpool
{
public:
    processpool(int sub_process_num)
    :_sub_process_num(sub_process_num)
    {}

    //创建进程池
    void CreatProcessPool(work_t worker)
    {
        for(int i=0;i<_sub_process_num;i++)//创建管道与进程
        {
            int pipefd[2];
            pipe(pipefd);
            pid_t pid=fork();
            vector<int> fds;//存放管道中除却父进程以外的写端,并在子进程中一一进行释放
            if(pid==0)//子进程读
            {
                close(pipefd[1]);
                for(int i=0;i<fds.size();i++)
                {
                    close(fds[i]);
                }
                //子进程接收父进程发送的任务并完成任务
                worker(pipefd[0]);
                exit(0);
            }

            //父进程为写端
            close(pipefd[0]);
            string name="channel-";
            name+=to_string(pid);
            _channels.push_back(channel(pipefd[1],pid,name));
            fds.push_back(pipefd[1]);//将父进程的wfd进行插入,当下一个子进程创建后会继承该文件信息
                                    //因此在子进程中需要关闭继承到的写端,以免管道出现多个写端的状况
                                    //多个写端造成后果,当父进程终止写入,管道仍旧有多个管道
            //父进程发送任务给子进程

        }
    }

    void PrintDebug()//打印进程池中进程相关信息
    {
        for(auto &e: _channels)
        {
            cout<<"_wfd: "<<e.wfd()<<"\t";
            cout<<"_process_pid: "<<e.pid()<<"\t";
            cout<<"_channel_name: "<<e.name()<<"\t";
            cout<<endl;
        }
    }

//寻找下一个分配任务的子进程
    int NextChannel()
    {
        static int cnt=0;
        int ret=cnt%_channels.size();
        cnt++;
        return ret;
    }

//发送任务信息码给子进程
    void SendTaskMessage(int index,int taskcode)
    {
        cout<<"taskcode:"<<taskcode<<"  channel id: "<<_channels[index].pid()<<endl;
        int n=write(_channels[index].wfd(),&taskcode,sizeof(taskcode));
    }

//杀死进程池中所有子进程
    void KillAll()
    {
        for(int i=0;i<_channels.size();i++)
        {
            _channels[i].Close();
        }
    }

//对所有子进程进行回收
    void Wait()
    {
        for(int i=0;i<_channels.size();i++)
        {
            pid_t pid=_channels[i].pid();
            int ret=waitpid(pid,nullptr,0);
            if(ret=pid)
            cout<<"sub process already recyle success... processid: "<<pid<<endl;
            else
            cout<<"sub process already recyle fail fail fail!!!  processid: "<<pid<<endl;
        }
    }
private:
    int _sub_process_num;
    vector<channel> _channels;
};

//控制进程池执行任务
void CtrlProcessPool(processpool Processpool,int cnt)
{
     while(cnt-->0)
    {
        //挑选进程
        int index=Processpool.NextChannel();
        //挑选任务
        int taskcode=NextTask();
        //发送任务给进程
        sleep(1);
        cout<<"第"<<cnt<<"个任务"<<endl;
        Processpool.SendTaskMessage(index,taskcode);
    }
}
int main(int argc,char* argv[])
{
    if(argc!=2)//规范启动法则
    {
        cout<<"Please Re-Enter!  Please enter subprocess numbers!"<<endl;
        return -1;
    }
    //启动成功
    int subprocess_num=stoi(argv[1]);

    //创建进程池
    processpool Processpool(subprocess_num);
    Processpool.CreatProcessPool(worker);
    //Processpool.PrintDebug();
    //控制子进程
    //挑选进程与任务,并将任务发送给对应进程
    CtrlProcessPool(Processpool,7);
    //结束后回收子进程

    //关闭写端,进而关闭子进程
     Processpool.KillAll();
     //父进程等待回收子进程
     Processpool.Wait();
    return 0;
}

代码中有详细注释。 

运行结果 

 

这里的任务与进程是整数倍的关系,因此显得比较规整。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/642436.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【算法】前缀和——寻找数组的中心下标

本节博客是用前缀和算法图解“寻找数组的中心下标”&#xff0c;有需要借鉴即可。 目录 1.题目2.题意3.前缀和求解4.示例代码5.细节6.总结 1.题目 题目链接&#xff1a;LINK 2.题意 我们以示例1为例来图解一下题意&#xff1a; 3.前缀和求解 根据已有经验&#xff0c;我…

前端vue项目遇到的问题01——那些初级问题

前端vue项目遇到的问题01——那些初级问题 1. npm install 问题1.1 依赖冲突1.1.1 详细问题1.1.2 报错原因1.1.3 解决问题1.1.3.1 方式1——无视冲突1.1.3.1 方式2——更换依赖版本 1.2 nodejs版本问题1.3 node版本正确的情况&#xff08;audit问题&#xff09;&#xff08;这个…

人类交互4 感觉输入和运动输出

人类感觉系统概述 人类感觉系统是由多个感觉器官和神经系统组成&#xff0c;负责感知外部世界的各种刺激和信息。人类感觉系统包括以下几个主要部分&#xff1a; 视觉系统&#xff1a;视觉系统由眼睛、视神经和大脑视觉皮层组成&#xff0c;负责感知光线、颜色和形状&#xff…

java:static关键字用法

在静态方法中不能访问类的非静态成员变量和非静态方法&#xff0c; 因为非静态成员变量和非静态方法都必须依赖于具体的对象才能被调用。 从上面代码里看出&#xff1a; 1.静态方法不能调用非静态成员变量。静态方法test2()中调用非静态成员变量address&#xff0c;编译失败…

Ant Design Vue中 a-table 嵌套子表格

需求&#xff1a;在父表格中嵌套子表格&#xff0c;当点击展开某一行时&#xff0c;有展开的关闭当前展开行。使用a-table中的expandedRowKeys 属性和expand 方法。链接&#xff1a;Ant Design Vue 一、属性说明&#xff1a; expandedRowKeys&#xff1a;这个主要是控制展开某行…

26计算机操作系统408考研-操作系统进程与线程篇章(三)

操作系统进程与线程篇章 ` 文章目录 操作系统进程与线程篇章前言一、进程概念进程控制块进程创建进程终止进程的阻塞和唤醒进程唤醒进程挂起和激活线程多线程线程实现与线程模型总结互斥和同步并发原理硬件同步信号量机制信号量的应用管程经典同步问题消息传递前言 一、进程概…

清华新突破||新研究揭示多智能体协作的秘密武器

获取本文论文原文PDF&#xff0c;请在公众号【AI论文解读】留言&#xff1a;论文解读点击订阅&#xff1a;人工智能论文解读合集 引言&#xff1a;多智能体协作中的挑战与机遇 在多智能体系统中&#xff0c;智能体需要通过协作来完成复杂的任务&#xff0c;这种协作涉及到通信…

Python高级进阶--slice切片

slice切片⭐⭐ 在 Python 中&#xff0c;切片操作是一种常见且方便的方式&#xff0c;用于从字符串、列表或元组中获取部分元素。这种操作通过指定起始索引、结束索引和步长来实现。下面我们来看一些关于切片的简单介绍以及一些常见用法。 1. 切片简介 取一个str、list、tup…

kafka跨地区跨集群同步工具MirrorMaker2 —— 筑梦之路

MM2简介 KIP-382: MirrorMaker 2.0 - Apache Kafka - Apache Software Foundation 有四种运行MM2的方法&#xff1a; As a dedicated MirrorMaker cluster.&#xff08;作为专用的MirrorMaker群集&#xff09; As a Connector in a distributed Connect cluster.&#xff08…

单片机设计注意事项

1.电源线可以30mil走线&#xff0c;信号线可以6mil走线 2.LDO推荐 SGM2019-3.3,RT9013,RT9193,1117-3.3V。 3.单片机VCC要充分滤波后再供电&#xff0c;可以接0.1uf的电容 4.晶振附件不要走其他元件&#xff0c;且放置完单片机后就放置晶振&#xff0c;晶振靠近X1,X2。

Mysql基础(七)DQL之select 语句(二)

一 select 语句续 WHERE子句后面跟着的是一个或多个条件,用于指定需要检索的行COUNT(): 多少条数据 where 11 和 count(1) 与 count(*) count(1)、count(*)和count(指定字段)之间的区别 ① order by 排序 mysql 之数据排序扩展 1、使用 order by 语句来实现排序2、排序可…

如何利用GitHubAction来发布自己的Python软件包

我们开发的python软件包如果想发布到网上&#xff0c;可以让其他人通过pip install下载&#xff0c;一般是把软件包发布到PYPI平台。 PYPI准备 我们要现在pypi注册登录一下 文件组织架构 一般的python软件包的文件组织架构为包名文件夹__init__.py程序&#xff0c;包文件夹的…

VBA即用型代码手册:删除Excel中空白行Delete Blank Rows in Excel

我给VBA下的定义&#xff1a;VBA是个人小型自动化处理的有效工具。可以大大提高自己的劳动效率&#xff0c;而且可以提高数据的准确性。我这里专注VBA,将我多年的经验汇集在VBA系列九套教程中。 作为我的学员要利用我的积木编程思想&#xff0c;积木编程最重要的是积木如何搭建…

Spring Cloud学习笔记(Nacos):配置中心基础和代码样例

这是本人学习的总结&#xff0c;主要学习资料如下 - 马士兵教育 1、Overview2、样例2.1、Dependency2.2、配置文件的定位2.3、bootstrap.yml2.4、配置中心新增配置2.5、验证 1、Overview 配置中心用于管理配置项和配置文件&#xff0c;比如平时写的application.yml就是配置文件…

计算机网络套接字知识(非常详细)从零基础入门到精通

本节重点 认识IP地址, 端口号, 网络字节序等网络编程中的基本概念; 学习socket api的基本用法; 一、预备知识 1.理解源IP地址和目的IP地址 ⭐在IP数据包头部中&#xff0c;有两个IP地址&#xff0c;分别叫做源IP地址和目的IP地址。 思考: 我们光有IP地址就可以完成通信了…

Linux Tcpdump抓包入门

Linux Tcpdump抓包入门 一、Tcpdump简介 tcpdump 是一个在Linux系统上用于网络分析和抓包的强大工具。它能够捕获网络数据包并提供详细的分析信息&#xff0c;有助于网络管理员和开发人员诊断网络问题和监控网络流量。 安装部署 # 在Debian/Ubuntu上安装 sudo apt-get install…

基于Perfetto 解读一帧的生产消费流程 Android >= S Qualcomm

广告 首先帮我朋友打个广告 我们一起在运营一个视频号 感兴趣的可以帮忙点击右边这个小铃铛 铃铛 序 1.这个流程里面的东西如果展开其实是有很多的 内容其实还是比较浅显的 sf处就不贴源码了 关一个Vsync就有的解释 当然笔者在流程上先形成一个思维闭环 2.如有小伙伴需要 笔…

C++完成特色旅游管理信息系统

背景&#xff1a; 继C完成淄博烧烤节管理系统后&#xff0c;我们来到了特色旅游管理信息系统的代码编写&#xff0c;历史链接点下方。 C完成淄博烧烤节管理系统_淄博烧烤总账管理系统的-CSDN博客 问题描述&#xff1a; 为了更好的管理各个服务小组&#xff0c;开发相应的管…

C# 拓展方法(涉及Linq)

拓展方法 定义一个扩展方法使用扩展方法例如再举个例子终极例子 注意事项与Linq 在C#中&#xff0c;扩展方法是一种特殊的静态方法&#xff0c;允许开发者向现有类型“添加”新的方法&#xff0c;而无需修改该类型的源代码或创建新的派生类型。这种机制提供了一种更为灵活的方式…

结构化开发方法(数据流图)

一、系统设计基本原理 二、系统总体结构设计 三、数据流图 数据流图