【Linux】匿名管道通信场景——进程池

🔥 个人主页:大耳朵土土垚
🔥 所属专栏:Linux系统编程

这里将会不定期更新有关Linux的内容,欢迎大家点赞,收藏,评论🥳🥳🎉🎉🎉

文章目录

  • 1. 初始化进程池
  • 2. 进程池执行任务
    • 2.1 任务管理
    • 2.2 执行任务
  • 3. 清理进程池
  • 4. 封装与完整实现
  • 5. 结语

1. 初始化进程池

  进程池的实现是依靠匿名管道,通过进程间通信使得父进程能够管理多个进程任务,相当于父进程拥有了很多个进程——进程池,通过不同的进程完成指定的任务。
  所以我们需要创建多个匿名管道和子进程,进行进程间通信,发送信息给子进程让它们根据接收到的信息处理相关任务。
  因为有多个管道和子进程,为了方便父进程使用不同管道发送对应信息给子进程,我们需要将管道的文件描述符以及对应子进程的pid保存起来,我们选择将它们封装在一个Channel类中。又因为有多个匿名管道和子进程,所以将多个Channel类对象储存在C++STL中的容器vector中来方便父进程进行管理进程池。

代码如下:

int InitProcesspool(int num,std::vector<Channel>& channels)
{
    for(int i = 0; i < num; i++)//使用循环创建多个匿名管道和子进程
    {
        //1.创建匿名管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if(n < 0) return 2;//根据不同的返回值判断原因,也可以使用枚举来约定返回值代表的内容

        //2.创建子进程
        pid_t id = fork();
        if(id < 0) return 3;

        //3.建立通信管道,父子进程关闭读端或写端
        if(id == 0)//子进程
        {
            //子进程读取,关闭写端
            ::close(pipefd[1]);
            //dup2
            dup2(pipefd[0],0);
            //子进程需要执行的内容
            Work();
            ::exit(0);
        }

        //父进程
        //父进程写入,关闭读端
        ::close(pipefd[0]);
        channels.emplace_back(pipefd[1],id);//保存在channel对象中并存入vector
    }
    return 0;
}

对子进程内部,我们使用dup2系统调用将匿名管道读端文件描述符与标准输入stdin交换,这样我们就不需要保存不同进程对应匿名管道的读端文件描述符,只需要统一从0号文件描述符中读取内容即可。

对于Channel类

class Channel{
public:
    Channel(int fd,pid_t who):_fd(fd),_who(who)
    {
        _name = "Channel-"+std::to_string(fd)+"-"+std::to_string(who);
    }

    std::string GetName()
    {
        return _name;
    }
    int GetFd()
    {
        return _fd;
    }
    pid_t GetWho()
    {
        return _who;
    }
    void Send(int num)//父进程往匿名管道发送信息
    {
        ::write(_fd,&num,sizeof(num));
    }
    ~Channel()
    {

    }
private:
    int _fd;//保存匿名管道通信的文件描述符
    std::string _name;//名字(自己取的)
    pid_t _who;//子进程pid
};

对于父进程发送给子进程的信息我们选择约定一个数字对应一个任务,不同数字对应不同需要完成的任务,子进程接收到信息后就可以根据数字来确定不同的任务。

2. 进程池执行任务

2.1 任务管理

  执行任务之前我们需要先确定有哪些任务,怎么执行…所以我们需要进行任务管理,同样我们也是使用一个类TaskManager来进行任务管理:

#include<iostream>
#include<unordered_map>
#include<functional>
#include<ctime>

using task_t = std::function<void()>;//函数指针


//不同任务函数
    void Load()
    {
        std::cout<<"正在执行加载任务..."<<std::endl;
    }
    void Del()
    {
        std::cout<<"正在执行删除任务..."<<std::endl;
    }
    void Log()
    {
        std::cout<<"正在执行日志任务..."<<std::endl;
    }

static int number = 0;
class TaskManager
{
public:
    TaskManager()
    {
        srand(time(nullptr));
        InsertTask(Load);
        InsertTask(Del);
        InsertTask(Log);
    }
    int SelectTask()
    {
        return rand()%number;
    }
    void InsertTask(task_t t)
    {
        m[number++] = t;
    }
    void Excute(int num)
    {
        if(m.find(num) == m.end())
            return;
        m[num]();//执行任务
    }
    ~TaskManager()
    {
    }
private:
    std::unordered_map<int,task_t> m;//使用map封装数字与对应的任务函数指针
};

TaskManager tm;

  选择新创建一个源文件Task.hpp来封装上述内容,上述任务管理类中我们使用map来保存数字与任务函数指针的相关关系,这样通过数字就可以确定是哪一个任务函数;此外选择任务使用的方法是随机数的方法,大家可以根据自己的想法确定不同的方式。

2.2 执行任务

  • 发送任务

使用按顺序轮询的方式派发任务给不同的子进程——设置10次任务循环,先通过任务管理类中的选择函数获取任务编号,然后通过父进程进程池管理类将任务编号发送给子进程。

void ExcuteTask(std::vector<Channel>& channels)
{
    int n = 0;
    int count = 10;
    while(count--)//执行10次任务
    {
        //1.选择任务,获取任务编号
        int tasknum = tm.SelectTask();

        //2.选择子进程,使用轮询选择,派发任务
        channels[n++].Send(tasknum);
        n%=channels.size();

        std::cout<<std::endl;
        std::cout<<"*****成功发送"<<10-count<<"个任务*****"<<std::endl;
        
        sleep(2);//每个2s发送一个任务

    }
}

  • 接受并执行任务

子进程接受并执行任务——先通过匿名管道接受父进程发送的任务编号,然后通过任务管理类对象执行任务编号所对应的任务函数。

//子进程接受并执行任务
void Work()
{
    while(true)
    {
        int num = 0;
        int n = ::read(0,&num,sizeof(num));
        if(n == sizeof(num))//读取成功
            tm.Excute(num);//不要发成n
        else if(n == 0)
        {
            break;
        }
        else
        {
            break;
        }
    }
}

3. 清理进程池

我们需要回收匿名管道的文件描述符和子进程

void CleanProcesspool(std::vector<Channel>& channels)
{
    for(auto& c : channels)
        ::close(c.GetFd());
    for(auto& c : channels)
    {
        pid_t rid = ::waitpid(c.GetWho(),nullptr,0);
        if(rid <= 0)
        {
            std::cout<<std::endl;
            std::cout<<"清理子进程失败..."<<std::endl;
            return;
        }
    }

    std::cout<<std::endl;
    std::cout<<"成功清理子进程..."<<std::endl;
}

  注意这里不能使用一个循环来进行清理,如下面代码是错误的:

void CleanProcesspool(std::vector<Channel>& channels)
{
    for(auto& c : channels)//只使用一次循环
    {
    	::close(c.GetFd());
        pid_t rid = ::waitpid(c.GetWho(),nullptr,0);
        if(rid <= 0)
        {
            std::cout<<std::endl;
            std::cout<<"清理子进程失败..."<<std::endl;
            return;
        }
    }

    std::cout<<std::endl;
    std::cout<<"成功清理子进程..."<<std::endl;
}

这是因为在创建子进程时,子进程会继承父进程的文件描述符表,因此在第一个匿名管道创建后,例如父进程的4号文件描述符指向该匿名管道写端,那么在创建第二个子进程时,该子进程会继承4号文件描述符也指向第一个匿名管道写端,因此创建的子进程越多,前面匿名管道写端被指向的就越多,所以仅仅关闭一个进程的写端指向,还有其他的写端指向,所以读端无法读到0,也就无法退出,如下图所示:
在这里插入图片描述
在这里插入图片描述

当创建2个子进程时,第一个匿名管道写端就有两个进程指向,当创建的进程越多,该写端指向的也就越多。

如果要使用一个循环来清理回收子进程,我们可以从后往前关闭文件描述符,因为最后一个管道写端只有父进程指向。

4. 封装与完整实现

  对于父进程管理进程池我们可以封装一个类来更好的进行管理与实现:

#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <sys/wait.h>

#include "Task.hpp"
#include "Channel.hpp"
// master


class ProcessPool
{
public:
    int InitProcesspool(int num)
    {
        for (int i = 0; i < num; i++)
        {
            // 1.创建匿名管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                return 2;

            // 2.创建子进程
            pid_t id = fork();
            if (id < 0)
                return 3;

            // 3.建立通信管道,父子进程关闭读端或写端
            if (id == 0) // 子进程
            {
                // 子进程读取,关闭写端
                ::close(pipefd[1]);
                // dup2
                dup2(pipefd[0], 0);
                Work();
                ::exit(0);
            }

            // 父进程
            // 父进程写入,关闭读端
            ::close(pipefd[0]);
            channels.emplace_back(pipefd[1], id);
        }
        return 0;
    }
    void ExcuteTask()
    {
        int n = 0;
        int count = 10;
        while (count--) // 执行10次任务
        {
            // 1.选择任务,获取任务编号
            int tasknum = tm.SelectTask();

            // 2.选择子进程,使用轮询选择,派发任务
            channels[n++].Send(tasknum);
            n %= channels.size();

            std::cout << std::endl;
            // std::cout<<"**************************"<<std::endl;
            std::cout << "*****成功发送" << 10 - count << "个任务*****" << std::endl;
            // std::cout<<"**************************"<<std::endl;
            // std::cout<<std::endl;

            sleep(3);
        }
    }

    void CleanProcesspool()
    {
        for (auto &c : channels)
            ::close(c.GetFd());

        for (auto &c : channels)
        {
            pid_t rid = ::waitpid(c.GetWho(), nullptr, 0);
            if (rid <= 0)
            {
                std::cout << std::endl;
                std::cout << "清理子进程失败..." << std::endl;
                return;
            }
        }

        std::cout << std::endl;
        std::cout << "成功清理子进程..." << std::endl;
    }

private:
    std::vector<Channel> channels;
};

main函数:

#include "ProcessPool.hpp"

int main(int argc, char* argv[])
{
    //0.获取应该创建管道个数num个
    if(argc!=2)
    {
        std::cout<<"请输入管道个数."<<std::endl;
        return 1;
    }
    int num = std::stoi(argv[1]);
    std::vector<Channel> channels;
    
    ProcessPool* pp = new ProcessPool;

    //1.初始化进程池——创建进程池
    pp->InitProcesspool(num);
    
    //2.执行任务
    pp->ExcuteTask();

    //3.任务执行完成,回收子进程
    pp->CleanProcesspool();

    delete pp;
    
    return 0;
}

运行结果如下:

在这里插入图片描述

5. 结语

  以上就是基于匿名管道通信实现的进程池,子进程会根据接受到的信息执行不同的任务,父进程可以看作Master来进行管理创建的多个进程。关键点在于对进程管理的封装以及回收子进程时会有多个进程指向匿名管道的读端,所以回收时要注意可能会出现的bug。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/926777.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

FUSU: 多源多时相土地利用变化分割数据集

FUSU是首个针对细粒度城市语义理解的多时态、多源地类变化分割数据集&#xff0c;其提供高分辨率双时态图像和每月时序观测&#xff0c;支持对城市动态变化的高频率监测。FUSU-Net是统一的时序架构&#xff0c;可同时进行变化检测和分割任务。结合光学和SAR数据&#xff0c;通过…

LLM学习笔记(13)分词器 tokenizer

由于神经网络模型不能直接处理文本&#xff0c;因此我们需要先将文本转换为数字&#xff0c;这个过程被称为编码 (Encoding)&#xff0c;其包含两个步骤&#xff1a; 使用分词器 (tokenizer) 将文本按词、子词、字符切分为 tokens&#xff1b;将所有的 token 映射到对应的 tok…

Unity中让光点跟图片填充区的末尾一起移动

一、实现效果展示 想要实现的效果如下,就是要让白色光点图片跟随绿色圆形图片填充区末尾一起移动。 二、代码如下: using UnityEngine; using System.Collections; using UnityEngine.UI; using DG.Tweening;public class IconCircle : MonoBehaviour {public float ti…

给定一个整数可能为正,0,负数,统计这个数据的位数.

题目描述 给定一个整数可能为正,0,负数,统计这个数据的位数. 例如1234567输出7位; -12345678输出8位;0输出1位 代码实现 int main() { long long m; long long n; scanf("%lld",&n); mn; int count0;//位数 do { count; n/10;//舍弃个位 }while(n!0); printf(&…

LLamafactory API部署与使用异步方式 API 调用优化大模型推理效率

文章目录 背景介绍第三方大模型API 介绍LLamafactory 部署API大模型 API 调用工具类项目开源 背景介绍 第三方大模型API 目前&#xff0c;市面上有许多第三方大模型 API 服务提供商&#xff0c;通过 API 接口向用户提供多样化的服务。这些平台不仅能提供更多类别和类型的模型…

【关闭or开启电脑自带的数字键盘】

目录 一、按数字键盘左上角的按键【NumLK Scroll】 二、修改注册表中数字键盘对应的数值【InitialKeyboardIndicators】 1、步骤&#xff1a; 2、知识点&#xff1a; 一、按数字键盘左上角的按键【NumLK Scroll】 这是最简单快捷的方法。 关闭后若想开启&#xff0c;再按一…

【FAQ】使用Node.js 镜像 构建本地项目

在nodejs官方并没有提供使用node.js构建本地项目的方法&#xff0c;但是通过阅读官方文档&#xff0c;可以发现&#xff0c;官方在包管理器界面提供了如下语句 所以node.js容器是可以执行语句的 下面通过docker 的 -w 、-v 参数设置容器工作目录和目录映射&#xff08;实现本…

深度学习 | pytorch + torchvision + python 版本对应及环境安装

Hi&#xff0c;大家好&#xff0c;我是半亩花海。要让一个基于 torch 框架开发的深度学习模型正确运行起来&#xff0c;配置环境是个重要的问题&#xff0c;本文介绍了 pytorch、torchvision、torchaudio 及 python 的对应版本以及环境安装的相关流程。 目录 一、版本对应 二…

4399大数据面试题及参考答案(数据分析和数据开发)

对数据分析的理解 数据分析是一个从数据中提取有价值信息以支持决策的过程。它涵盖了数据收集、清洗、转换、建模和可视化等多个环节。 首先&#xff0c;数据收集是基础。这包括从各种数据源获取数据&#xff0c;例如数据库、文件系统、网络接口等。这些数据源可以是结构化的数…

fastdds:编译、安装并运行helloworld

fastdds安装可以参考官方文档&#xff1a; 3. Linux installation from sources — Fast DDS 3.1.0 documentation 从INSTALLATION MANUAL这一节可以看出来&#xff0c;fastdds支持的操作系统包括linux、windows、qnx、MAC OS。本文记录通过源码和cmake的方式来安装fastdds的…

Istio笔记01--快速体验Istio

Istio笔记01--快速体验Istio 介绍部署与测试部署k8s安装istio测试istio 注意事项说明 介绍 Istio是当前最热门的服务网格产品&#xff0c;已经被广泛应用于各个云厂商和IT互联网公司。企业可以基于Istio轻松构建服务网格&#xff0c;在接入过程中应用代码无需更改&#xff0c;…

ipad项目 蓝湖宽度

ipad项目 横屏状态时 蓝湖宽度设置930px media screen and (orientation: portrait) {/* 竖屏时的样式 */ } media screen and (orientation: landscape) {/* 默认是 横屏时的样式 */ }

14、保存与加载PyTorch训练的模型和超参数

文章目录 1. state_dict2. 模型保存3. check_point4. 详细保存5. Docker6. 机器学习常用库 1. state_dict nn.Module 类是所有神经网络构建的基类&#xff0c;即自己构建一个深度神经网络也是需要继承自nn.Module类才行&#xff0c;并且nn.Module中的state_dict包含神经网络中…

在鸿蒙应用中 Debug 对开发者的帮助

文章目录 摘要引言Debug 的意义与挑战案例&#xff1a;页面渲染性能优化中的 Bug 排查Debug 过程详解问题定位问题解决优化布局与渲染逻辑 代码详细讲解示例代码详细讲解1. 导入必要模块2. 数据生成3. 使用虚拟列表组件items 属性itemHeight 属性renderItem 属性 4. 返回完整组…

基于多VSG独立微网的多目标二次控制MATLAB仿真模型

“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 模型简介 本文将一致性算法引入微电网的二次频率和电压控制&#xff0c;自适应调节功率参考值和补偿电压&#xff0c;同时实现频率电压恢复、有功 无功功率的比例均分以及功率振荡抑制&#xff0c;提高系统的暂态和稳…

洛谷 P2415 集合求和 C语言

题目&#xff1a; https://www.luogu.com.cn/problem/P2415 思路从大佬学来的思路。 如图&#xff1a; 我们可以发现&#xff0c;集合最后出现过的数字是2的&#xff08;n-1&#xff09;次方&#xff0c;所以就很好计算了。 代码如下&#xff1a; #include <iostream&g…

leaflet 的基础使用

目录 一、创建dom节点 二、创建地图 三、添加底图&#xff08;天地图&#xff09;&#xff0c;在地图创建完成后添加底图 本章主要讲述leaflet在vue中的使用&#xff1a; leaflet 详情总目录&#xff1a;传送 一、创建dom节点 <div class"map" id"map_…

Springboot 2.x升级到3.x

运维在扫描项目的时候发现了官方发布的漏洞&#xff0c;https://spring.io/security/cve-2024-38816 我们使用的是spring框架的2.x系列&#xff0c;WebMvc依赖于5.3系列&#xff0c;描述说需要更新到5.3.40&#xff0c;但是官方迟迟不再更新。同时发现官方说5.3系列也就更新到…

【HarmonyOS】@Observed和@ObjectLink嵌套对象属性更改UI不刷新问题

【HarmonyOS】Observed和ObjectLink嵌套对象属性更改UI不刷新问题 一、问题背景 使用了Observed和ObjectLink&#xff0c;修改嵌套对象的属性&#xff0c;UI还是不刷新&#xff0c;常见的问题有以下三种形式&#xff1a; 1.多级嵌套&#xff0c;嵌套对象的类并没有添加Observ…

【rustdesk】客户端和服务端的安装和部署(自建服务器,docker,远程控制开源软件rustdesk)

【rustdesk】客户端和服务端的安装和部署&#xff08;自建服务器&#xff0c;docker&#xff09; 一、官方部署教程 https://rustdesk.com/docs/zh-cn/client/mac/ 官方服务端下载地址 https://github.com/rustdesk/rustdesk-server/releases 我用的docker感觉非常方便&am…