构建与优化自定义进程池

1. 什么是进程池?

简单来说,进程池就是预先创建固定数量的工作进程,通过设计任务队列或调度算法来分配任务给空闲的进程 —— 实现“负载均衡”。

2. 进程池框架设计

枚举错误返回值:

enum
{
    UsageError = 1,
    ArgError,
    PipeError
};
i. 设定子进程数量与格式验证

设置进程池的默认调用方式: ./processpool sub_process_nums

#include <iostream>
using namespace std;

void Usage(char* argv)
{
    cout << "Usage:" << endl;
    cout << "\t" << "argv sub_process_nums" << endl;
}

int main(int argc, char* argv[])
{
    if (argc < 2) // 命令行指令的本质是字符串, 字符串个数 < 2 时返回
    {
        Usage(argv[0]);
        return UsageError;
    }
    // ...
    
    return 0;
}
ii. 控制与管理子进程 之 " 创建通信管道与进程 "

在这部分内容开始之前,不妨先明晰一个问题: **“先创建管道再创建子进程,还是先创建子进程再创建管道?” **

要回答这个问题,我们需要了解 通信管道 的本质 及 fork 函数

  • 通信管道 (通信信道)

当你调用 pipe() 时,操作系统会在 文件描述符表 中为管道分配两个条目,一个用于写入 —— “写端”,一个用于读取 —— “读端”;

管道的 读端写端 本质上都是 文件描述符

  • fork 函数

fork() 的主要功能是,从当前进程(父进程)中创建一个新的进程(子进程);

子进程继承了父进程的资源限制、环境变量、打开的文件描述符表、工作目录等,但它们是独立的实体。

根据以上信息,不难得出:先创建管道、再创建子进程,子进程会继承父进程打开的文件描述符表,接着只需要关闭父进程的读端、子进程的写端,即可实现父子进程间的通信(反之亦然)。

#include <unistd.h>

int main(int argc, char* argv[])
{
    int sub_process_nums = stoi(argv[1]); // c标准库中的函数,将字符串转整型
    if (sub_process_nums <= 0)	
        return ArgError;
    
    // 1. 创建通信管道与进程
    for (int i = 0; i < sub_process_nums; i++)
    {
        int pipefd[2];
        int n = pipe(pipefd);
        
        // 创建管道成功返回 0,失败返回 -1
        if (n < 0)
            return PipeError;
        
        pid_t id = fork();
        if (id == 0)
        {
            // child 负责读
            close(pipefd[1]); // 关闭写端
            // todo
            
            exit(0); // 执行完退出
        }
        
        // father 负责写
        close(pipefd[1]); // 关闭读端
        
    }
    
    // ...
}

int pipefd[2];

int n = pipe(pipefd); ——> 管道创建成功后,pipefd[0] 为 读端文件描述符,pipefd[1] 为 写端文件描述符。

3. 封装通信管道与进程池
i. class Channel

为了保存循环创建的通信管道和子进程信息,我们封装一个 通信管道 类型。

#include <string>

class Channel
{
public:
    Channel(int wfd, pid_t process_id, const string& name)
    	:_wfd(wfd), _sub_process_id(process_id), _name(name)
    {}
    
    // 观察父进程创建子进程时的现象
    void Debug()
    {
        cout << "_wfd: " << _wfd;
        cout << ", _sub_process_id: " << _sub_process_id;
        cout << ", _name: " << _name << endl;
    }
    
    // 增加获取管道各种信息的接口
    int Wfd() { return _wfd; }
    pid_t Pid() { return _sub_processs_id; }
    string Name() { return _name; }
    
    ~Channel() {}
private:
    int _wfd; // 写端文件描述符
    pid_t _sub_process_id;
    string _name;
};

我们并未在 Channel 中封装读端文件描述符,因为我们将在每次循环中对 stdin 做重定向 —— dup2(pipefd[0], 0) ,之后子进程在运行时,只需要向 标准输入stdin —— 0 中读取任务指令即可。

通信管道本质上是文件,管道的读端和写端本质上是文件描述符;

dup2() 的工作原理,是将第一个参数指定的文件描述符,复制到第二个参数指定的位置。

ii. class ProcessPool

封装进程池,是为了更好地控制与管理子进程。

#include <vector>

class ProcessPool
{
public:
    ProcessPool(int sub_process_num) 
    	:_sub_process_num(sub_process_num)
    {} 
    ~ProcessPool() {}
    
    int CreatChannels(work_t work) // 回调函数
    {
        // 1. 创建通信信道和进程
        for (int i = 0; i < _sub_process_num; i++)
        {
            // 先创建管道
            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0)
            {
                return PipeError;
            }

            // 再创建子进程,确保父进程和子进程读写同一根管道
            pid_t id = fork();
            if (id == 0)
            {
                // child -> r
                close(pipefd[1]);

                // TODO
                dup2(pipefd[0], 0); // 将 pipefd[0] 重定向
                work(pipefd[0]); // 方便后续在子进程中观察每个管道读端的文件描述符

                // sleep(100);
                exit(0);
            }

            // father
            close(pipefd[0]);
            string cname = "channel--" + to_string(i);

            _channels.push_back(Channel(pipefd[1], id, cname));
        }

        return 0;
    }
    
private:
    vector<Channel> _channels;
    int _sub_process_num;
};

int CreatChannels(work_t work) { } 中有一两个细节:

一为前文提到过的,重定向;

第二,即这个函数的参数 —— 这种编程模式也叫做 回调函数 —— 将函数作为参数传递给另一个函数,以便特定条件发生时供后者调用。

我们将子进程待执行的函数,作为参数传入 CreatChannels() 中供子进程调用,后续我们只需对传入参数(传入不同的函数)进行修改就可以让子进程执行不同的任务而不用对 CreadChannels() 函数体进行修改。

4. 负载均衡式任务调度
#include <stdlib.h>
#include <time.h>

void CtrlProcess(ProcessPool* ptr, int cnt)
{
    while (cnt)
    {
        // a. 选择一个通道和进程
        int channel = ptr->NextChannel();
        // b. 选择一个任务
        int task = NextTask();
        // c. 发送任务
        ptr->SendTask(channel, task);
        
        sleep(1); // 每隔 1s 发送一次任务
        --cnt;
    }
}

int main()
{
    // ...
    // 1. 创建通信管道与进程
    ProcessPool *processpool_ptr = new ProcessPool(sub_process_num); // sub_process_num 为要创建子进程的个数
    processpool_ptr->CreatChannels(worker); // worker() 待补充
    
    srand(time(nullptr));
    
    // 2. 任务调度
    CtrlProcess(processpool_ptr, 10); // 假定 10 个任务
    cout << "task run done" << endl;
    
    // 3. 回收进程
    
    
    delete processpool_ptr;
    return 0;
}
  • 选择一个通道和进程
class ProcessPool
{
public:
    int NextChannel()
    {
        static int next = 0;
        int c = next;
        next++;
        next %= _channels.size(); // 防止越界
        return c;
    }
};
  • 选择一个任务
typedef void(*task_t)(int, pid_t); // 函数指针类型

// 模拟任务
void PrintLog(int fd, pid_t id)
{
    cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Print log task" << endl << endl;
}

void ConnectMysql(int fd, pid_t id)
{
    cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Connect mysql task" << endl << endl;
}

void ReloadConf(int fd, pid_t id)
{
    cout << "channel rfd: " << fd << ", sub process: " << id << ", task: Reload conf task" << endl << endl;
}

task_t tasks[3] = {PrintLog, ConnectMysql, ReloadConf};

int NextTask()
{
    return rand() % 3; 
}
  • 发送任务
class ProcessPool
{
public:
    void SendTask(int index, int command)
    {
        cout << "Send task to " << _channels[index].Name() << ", pid: " << _channels[index].Pid() << endl;
        write(_channels[index].Wfd(), &command, sizeof(command));
    }
};
5. 子进程任务执行:通过 worker() 读取父进程指令
typedef void(*work_t)(int);// 函数指针类型

void worker(int fd)
{
    while (1)
    {
        int code = 0;
        ssize_t n = read(0, &code, sizeof(code));
        if (n == sizeof(code)) // read 成功,返回值为读取到内容的大小/字节个数
        {
            if (code >= 3) continue;
            tasks[code](fd, getpid());
        }
        else if (n == 0) // 父进程关闭写端后,继续读,read 返回 0
        {
            cout << "sub process id: " <<  getpid() << " is to quit ..." << endl;
            break;
        }
        sleep(1);
    }
}
6. 回收子进程

设计 KillAll() ,完成子进程和管道的回收 —— 遍历进程池中的 _channels ,关闭管道的写端,读端将管道中的数据读完后,会读到返回值 0,表示读结束。

#include <sys/wait.h>

class ProcessPool
{
public:
    void KillAll()
    {
        for (auto& channel : _channels)
        {
            pid_t pid = channel.Pid(); // 子进程(管道读端进程)的 pid
            close(channel.Wfd());
            
            pid_t rid = waitpid(pid, nullptr, 0);
            if (rid == pid) // wait 成功
            {
                cout << "wait sub process: " << pid << "success..." << endl;
            }
            cout << "close channel: " << channel.Name() << ", sub process is to quit.." << endl;
        }
    }
};
int main()
{
    // ... 
    
    // 3. 回收进程
    processpool_ptr->KillAll();

    delete processpool_ptr;
    return 0;
}

程序运行情况如下:

在这里插入图片描述

从图中可以观察到两点信息:1. 每个读端文件描述符都是 3; 2. task run done 后,子进程并没有退出。

原因是,

1. 当父进程关闭管道的读端后, 原先分配给读端的文件描述符(3 号文件描述符)就会被释放;再次调用 pipe() 创建新管道时,OS 会重新分配这个最小未使用的文件描述符(3 号文件描述符)给新创建的管道

2.子进程通过 fork() 创建时,它会继承父进程所有打开的文件描述符。回收进程调用 KillAll() 时,尽管关闭了父进程的写端,子进程仍持有对原管道写端的引用,使得读端无法按预期读到返回值 0 ,进而无法关闭子进程。

要解决 子进程持有对原管道写端的引用 的问题,我们需要定义一个 vector<int> —— 用于保存父进程对所有管道的写端,接着让子进程在执行分配任务之前关闭所有写端 —— 修改 CreatProcess 函数。

int CreatChannels(work_t work)
    {
        vector<int> fds;
        for (int i = 0; i < _sub_process_num; i++)
        {

            int pipefd[2];
            int n = pipe(pipefd);
            if (n < 0)
            {
                return PipeError;
            }
            fds.push_back(pipefd[1]); // 保存管道的写端

            pid_t id = fork();
            if (id == 0)
            {
                // child -> r
                // close(pipefd[1]); // 不再需要单独关闭对应管道的写端
                if (!fds.empty())
                {
                    for (auto& fd : fds)
                    {
                        close(fd);
                    }
                }

                // TODO
                dup2(pipefd[0], 0);
                work(pipefd[0]);

                // sleep(100);
                exit(0);
            }

            // father
            close(pipefd[0]);
            string cname = "channel--" + to_string(i);

            _channels.push_back(Channel(pipefd[1], id, cname));
        }

        return 0;
    }

在这里插入图片描述

子进程正常退出,程序正常结束…

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/881715.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于51单片机的汽车倒车防撞报警器系统

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 本课题基于微控制器控制器&#xff0c; 设计一款汽车倒车防撞报警器系统。 要求&#xff1a; 要求&#xff1a;1.配有距离&#xff0c; 用于把车和障碍物之间的距离信号送入控制器。 2.配有报警系…

如何安装和注册 GitLab Runner

如何安装和注册 GitLab Runner GitLab Runner 是一个用于运行 GitLab CI/CD (Continuous Integration/Continuous Deployment) 作业。它是一个与 GitLab 配合使用的应用程序&#xff0c;可以在本地或云中运行。Runner 可以执行不同类型的作业&#xff0c;例如编译代码、运行测…

传统软件应用技术的价值转换率越来越低

为什么感觉到卷&#xff1f;可能的一个原因是大家都在进步&#xff0c;用户和竞争对手也在进步&#xff0c;而自己却没有进步&#xff0c;也谈不上思维模式的改变。 我们不谈理论、不谈理想、不谈市场环境不好&#xff0c;就谈与用户接触过程的案例&#xff0c;这是最有说服力的…

传输层协议(TCP和UDP)

目录 一、UDP 1、UDPAPI 2、UDPAPI的使用 二、TCP 1、TCPAPI 2、TCP的相关特性 2.1 确认应答 2.2 超时重传 2.3 连接管理&#xff08;三次握手&#xff0c;四次挥手&#xff09; 2.4 滑动窗口 2.5 流量控制 2.6 拥塞控制 2.7 延时应答 2.8 捎带应答 2.9 面向字节…

1.3 计算机网络的分类

欢迎大家订阅【计算机网络】学习专栏&#xff0c;开启你的计算机网络学习之旅&#xff01; 文章目录 前言一、按分布范围分类二、按传输技术分类三、按拓扑结构分类四、按使用者分类五、按传输介质分类 前言 计算机网络根据不同的标准可以被分为多种类型&#xff0c;本章从分布…

SqlSugar的where条件中使用可空类型报语法错误

SQLServer数据表中有两列可空列&#xff0c;均为数值类型&#xff0c;同时在数据库中录入测试数据&#xff0c;Age和Height列均部分有值。   使用SqlSugar的DbFirst功能生成数据库表类&#xff0c;其中Age、Height属性均为可空类型。   当Where函数中的检索条件较多时&a…

【Elasticsearch系列四】ELK Stack

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

idea中java及java web项目的常见问题

1、乱码问题&#xff0c;主要有几处地方&#xff0c;需要检查。 ①确保文件编码&#xff0c;其实主要就是在idea启动文件中&#xff0c;增加了 -Dfile.encodingUTF-8的设置 ②编辑器默认编码&#xff0c;都改为UTF-8 ③Tomcat的运行配置&#xff0c;编码也改为UTF-8,同样使用…

Wpf使用NLog将日志输出到LogViewer

1 LogViewer LogViewer是通过UDP传输的高性能实时log查看器。 具有一下特性&#xff1a; 通过UDP读取日志通过文件导入日志导出日志到一个文件中排序、过滤&#xff08;日志树&#xff0c;日志等级&#xff09;和查找突出显示搜索文本从UPD接收日志时忽略IP地址列表多接收器支…

RabbitMQ08_保证消息可靠性

保证消息可靠性 一、生产者可靠性1、生产者重连机制&#xff08;防止网络波动&#xff09;2、生产者确认机制Publisher Return 确认机制Publisher Confirm 确认机制 二、MQ 可靠性1、数据持久化交换机、队列持久化消息持久化 2、Lazy Queue 惰性队列 三、消费者可靠性1、消费者…

【吊打面试官系列-MySQL面试题】MySQL_fetch_array 和 MySQL_fetch_object 的区别是什么?

大家好&#xff0c;我是锋哥。今天分享关于【MySQL_fetch_array 和 MySQL_fetch_object 的区别是什么&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; MySQL_fetch_array 和 MySQL_fetch_object 的区别是什么&#xff1f; 以下是 MySQL_fetch_array 和 MySQL_fe…

[数据结构与算法·C++] 笔记 1.4 算法复杂性分析

1.4 算法复杂性分析 算法的渐进分析 数据规模 n 逐步增大时, f(n)的增长趋势当 n 增大到一定值以后&#xff0c;计算公式中影响最大的就是 n 的幂次最高的项其他的常数项和低幂次项都可以忽略 大O表示法 函数f&#xff0c;g定义域为自然数&#xff0c;值域非负实数集定义: …

C++/Qt 集成 AutoHotkey

C/Qt 集成 AutoHotkey 前言AutoHotkey 介绍 方案一&#xff1a;子进程启动编写AutoHotkey脚本准备 AutoHotkey 运行环境编写 C/Qt 代码 方案二&#xff1a;显式动态链接方案探索编译动态链接库集成到C工程关于AutoHotkeyDll.dll中的函数原型 总结 前言 上一篇介绍了AutoHotkey…

动手学深度学习(李沐)PyTorch 第 1 章 引言

在线电子书 深度学习介绍 安装 使用conda环境 conda create -n d2l-zh python3.8 pip安装需要的包 pip install jupyter d2l torch torchvision下载代码并执行 wget https://zh-v2.d2l.ai/d2l-zh.zip unzip d2l-zh.zip jupyter notebookpip install rise如果不想使用jupyt…

Ubuntu20-xrdp与Windows-mstsc远程桌面连接

前期准备 两者在同一网段&#xff0c;网线连接。ubuntu端 sudo adduser yu //输入密码和确认密码&#xff0c;后面一路回车&#xff0c;新建用户yu&#xff0c;确保用户没有被登录 sudo apt install xrdp //安装xrdp sudo systemctl status xrdp //查看xrdp服务状态 sudo…

《飞机大战游戏》实训项目(Java GUI实现)(设计模式)(简易)

目录 一、最终实现后&#xff0c;效果如下。 &#xff08;1&#xff09;简单介绍本游戏项目&#xff08;待完善&#xff09; &#xff08;2&#xff09;运行效果图&#xff08;具体大家自己可以试&#xff09; 初始运行情况。 手动更换背景图。 通过子弹攻击敌机&#xff0c;累…

828华为云征文|Flexus云服务器X实例部署宝塔运维面板

本次华为云Flexus云服务器X实例部署宝塔运维面板教学&#xff0c;这次是推陈出新啊 之前的云耀云服务器L实例已经很不错了&#xff0c;大力赞叹华为云的 同时感谢华为云提供优惠卷&#xff0c;只能说白嫖真是太棒了 华为云近期正在筹办华为云828企业节活动&#xff0c;90款免…

HarmonyOS Next开发----使用XComponent自定义绘制

XComponent组件作为一种绘制组件&#xff0c;通常用于满足用户复杂的自定义绘制需求&#xff0c;其主要有两种类型"surface和component。对于surface类型可以将相关数据传入XComponent单独拥有的NativeWindow来渲染画面。 由于上层UI是采用arkTS开发&#xff0c;那么想要…

【RabbitMQ】消息分发、事务

消息分发 概念 RabbitMQ队列拥有多个消费者时&#xff0c;队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅该队列订阅列表里的一个消费者。这种方式非常适合扩展&#xff0c;如果现在负载加重&#xff0c;那么只需要创建更多的消费者来消费处理消息即可。 默…

深度学习01-概述

深度学习是机器学习的一个子集。机器学习是实现人工智能的一种途径&#xff0c;而深度学习则是通过多层神经网络模拟人类大脑的方式进行学习和知识提取。 深度学习的关键特点&#xff1a; 1. 自动提取特征&#xff1a;与传统的机器学习方法不同&#xff0c;深度学习不需要手动…