【Linux-进程间通信】匿名管道的应用-进程池实现+命名管道的应用ClientServer通信

匿名管道的应用--进程池/C++实现

当系统中需要处理众多任务时,可以将这些任务分配给多个子进程来分担工作。然而,频繁地创建和销毁进程会导致较高的时间成本。为减少这种开销,可以采取预先创建一组子进程的策略(以避免在任务分配时进行即时创建),并在子进程空闲时让它们处于阻塞状态(以防止子进程完成任务后被立即终止,从而在下一次任务分配时无需重新创建进程)。

为了实现这一目标,我们需要使用匿名管道这一技术。通过在父进程与子进程之间创建匿名管道,我们可以实现父进程向特定的子进程发送任务指令。

但需要注意的是,父进程不应该集中大量的将某个任务派发给同一个子进程,让其它进程处于空闲状态,这样会使得整机效率低下。我们可以通过

轮询或者随机派发的方式给子进程派发任务,实现负载均衡

我们要实现子进程对不同任务的管理,怎么做到呢??先描述再组织

既然随机派发任务,首先要有任务

#pragma once

#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

#define TaskNum 3

typedef void (*task_t)(); // task_t 函数指针类型

void Print()
{
    std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
    std::cout << "I am a download task" << std::endl;
}
void Flush()
{
    std::cout << "I am a flush task" << std::endl;
}

task_t tasks[TaskNum];

void LoadTask()
{
    srand(time(nullptr) ^ getpid() ^ 17777);
    tasks[0] = Print;
    tasks[1] = DownLoad;
    tasks[2] = Flush;
}

void ExcuteTask(int number)
{
    if (number < 0 || number > 2)
        return;
    tasks[number]();
}

int SelectTask()
{
    return rand() % TaskNum;
}
void work()
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is : " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process : " << getpid() << " quit" << std::endl;
            break;
        }
    }
}

先描述


class Channel
{
public:
    Channel(int wfd, pid_t id, const std::string &name)
        : _wfd(wfd), _subprocessid(id), _name(name)
    {
    }
    int GetWfd() { return _wfd; }
    pid_t GetProcessId() { return _subprocessid; }
    std::string GetName() { return _name; }
    void CloseChannel()
    {
        close(_wfd);
    }
    void Wait()
    {
        pid_t rid = waitpid(_subprocessid, nullptr, 0);
        if (rid > 0)
        {
            std::cout << "wait " << rid << " success" << std::endl;
        }
    }
    ~Channel()
    {
    }

private:
    int _wfd;
    pid_t _subprocessid;
    std::string _name;
};

创建信道和子进程

// 形参类型和命名规范
// const &: 输出
// & : 输入输出型参数
// * : 输出型参数
//  task_t task: 回调函数
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
    for (int i = 0; i < num; i++)
    {
        // 1.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if (n < 0)
            exit(1);

        // 2.创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // child - read
            close(pipefd[1]);
            dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入
            task();             // 执行任务
            close(pipefd[0]);
            exit(0);
        }

        // 3.构建一个channel的名称
        std::string channel_name = "Channel-" + std::to_string(i);
        // 父进程 -- write
        close(pipefd[0]);
        // a. 子进程的pid b. 父进程关心的管道的w端
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

通过channel控制子进程

void ctrlProcessOnce(std::vector<Channel> &channels)
{
    sleep(1);
    // a.选择一个任务
    int taskcommand = SelectTask();
    // b.选择一个信道和进程
    int channel_index = NextChannel(channels.size());
    // c.发送任务
    SendTaskCommand(channels[channel_index], taskcommand);
    std::cout<< std::endl;
    std::cout << "taskcommand: " << taskcommand << " channel: "
              << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}

void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{
    if (times > 0)
    {
        while (times--)
        {
            ctrlProcessOnce(channels);
        }
    }
    else
    {
        while (true)
        {
            ctrlProcessOnce(channels);
        }
    }
}

回收管道和子进程--->关闭所有的写端、回收子进程

void CleanUpChannel(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        channel.CloseChannel();

    }
    // 注意
    for (auto &channel : channels)
    {
        channel.Wait();
    }
}

设计主函数

// ./processpool 5
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum " << std::endl;
    }

    int num = std::stoi(argv[1]);
    // 加载任务
    LoadTask();

    // 再组织
    std::vector<Channel> channels;

    // 1. 创建信道和子进程
    CreateChannelAndSub(num, &channels, work1);

    // 2. 通过channel控制子进程
    ctrlProcess(channels, num);

    // 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程
    CleanUpChannel(channels);
    return 0;
}

测试结果

 

测试成功!但是我们发现在回收管道和子进程的过程中我们是先把所有的管道关闭结束后,再进行等待;那我们能不能关闭一个等待一个呢?

void CleanUpChannel(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        channel.CloseChannel();
        channel.Wait();
    }
}

 

原理看下图:

 

 

 

随着子进程越来越多,那么前面管道的写端就会越来越多;

所以,如果我们关闭一个文件描述符,仅仅只是关闭了父进程的一个,但子进程继承的写端都没有关闭;所以此时这种情况,不能关一个,退一个;在管道内有一个引用计数属性,只要引用计数不为0,不会真正关闭管道,这样子进程也不会真正退出,进程就阻塞了;

  • 为什么分开关闭,先关闭完,再等待就可以了?

因为我们关闭是从上往下的,最后一个管道先释放,最后一个管道释放了,那么它曾经对应指向上一个写端也就自动关闭了,类似于递归,从上往下关,然后从下往上不断读到0的

  • 所以我们也有了新的方式关闭,我们倒着关闭就可以了

  • void CleanUpChannel(std::vector<Channel> &channels)
    {
        int num = channels.size() -1;
        while(num >= 0)
        {
            channels[num].CloseChannel();
            channels[num--].Wait();
        }
    }

  • 我就想一个一个关闭呢?

  • 因为我们是在创建子进程的时候出现了问题,所以我们要修改一下创建子进程的逻辑

    因为第二次创建管道开始,第一个管道就会多出写端,因此我们只需要在第二次创建时作出修改即可;

    因为第一次创建后已经被push_back了,所以在第二次创建的时候,可以把第一次的关闭了;同样后面创建依次如此

  • void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
    {
        // BUG? --> fix bug
        for (int i = 0; i < num; i++)
        {
            // 1. 创建管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                exit(1);
    
            // 2. 创建子进程
            pid_t id = fork();
            if (id == 0)
            {
                if (!channels->empty())
                {
                    // 第二次之后,开始创建的管道
                    for(auto &channel : *channels) channel.CloseChannel();
                }
                // child - read
                close(pipefd[1]);
                dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入
                task();
                close(pipefd[0]);
                exit(0);
            }
    
            // 3.构建一个channel名称
            std::string channel_name = "Channel-" + std::to_string(i);
            // 父进程
            close(pipefd[0]);
            // a. 子进程的pid b. 父进程关心的管道的w端
            channels->push_back(Channel(pipefd[1], id, channel_name));
        }
    }

 

命名管道的应用——使用Client&Server通信

namedPipe.hpp

#pragma once

#include <iostream>
#include <cstdio>
#include <cerrno>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

const std::string comm_path = "./myfifo";
#define DefaultFd -1
#define Creater 1
#define User 2
#define Read O_RDONLY
#define Write O_WRONLY
#define BaseSize 4096

class NamePiped
{
private:
    const std::string _fifo_path;
    int _id;
    int _fd;

    bool OpenNamedPipe(int mode)
    {
        _fd = open(_fifo_path.c_str(), mode);
        if (_fd < 0)
            return false;
        return true;
    }

public:
    NamePiped(const std::string &path, int who)
        : _fifo_path(path), _id(who), _fd(DefaultFd)
    {
        if (_id == Creater)
        {
            int res = mkfifo(_fifo_path.c_str(), 0666);
            if (res != 0)
            {
                perror("mkfifo");
            }
            std::cout << "creater create named pipe" << std::endl;
        }
    }
    bool OpenForRead()
    {
        return OpenNamedPipe(Read);
    }
    bool OpenForWrite()
    {
        return OpenNamedPipe(Write);
    }

    int ReadNamedPipe(std::string *out)
    {
        char buffer[BaseSize];
        int n = read(_fd, buffer, sizeof(buffer));
        if (n > 0)
        {
            buffer[n] = 0;
            *out = buffer;
        }
        return n;
    }

    int WriteNamedPipe(const std::string &in)
    {
        return write(_fd, in.c_str(), in.size());
    }
    
    ~NamePiped()
    {
        if (_id == Creater)
        {
            int res = unlink(_fifo_path.c_str());
            if (res != 0)
            {
                perror("unlink");
            }
            std::cout << "creater free named pipe" << std::endl;
        }
        if (_fd != DefaultFd)
            close(_fd);
    }
};
  • client.cc

#include "namedPipe.hpp"

// write
int main()
{
    NamePiped fifo(comm_path, User);
    if (fifo.OpenForWrite())
    {
        std::cout << "client open namd pipe done" << std::endl;
        while (true)
        {
            std::cout << "Please Enter> ";
            std::string message;
            std::getline(std::cin, message);
            fifo.WriteNamedPipe(message);
        }
    }

    return 0;
}

server.cc

#include "namedPipe.hpp"

// server read: 管理命名管道的整个生命周期
int main()
{
    NamePiped fifo(comm_path, Creater);
    // 对于读端而言,如果我们打开文件,但是写还没来,我会阻塞在open调用中,直到对方打开
    // 进程同步
    if (fifo.OpenForRead())
    {
        std::cout << "server open named pipe done" << std::endl;

        sleep(3);
        while (true)
        {
            std::string message;
            int n = fifo.ReadNamedPipe(&message);
            if (n > 0)
            {
                std::cout << "Client Say> " << message << std::endl;
            }
            else if(n == 0)
            {
                std::cout << "Client quit, Server Too!" << std::endl;
                break;
            }
            else
            {
                std::cout << "fifo.ReadNamedPipe Error" << std::endl;
                break;
            }
        }
    }

    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/905580.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java设计模式之创建者模式(5种)

设计模式 软件设计模式&#xff0c;又称为设计模式&#xff0c;是一套被反复利用&#xff0c;代码设计经验的总结&#xff0c;他是在软件设计过程中的一些不断发生的问题&#xff0c;以及该问题的解决方案。 **创建者模式又分为以下五个模式&#xff1a;**用来描述怎么“将对象…

数据库->数据库约束

目录 一、数据库约束 1.定义 2.约束类型 3.NOT NULL 非空约束 4. UNIQUE 唯一约束 5.PRIMARY KEY 主键约束 1.主键的使用 2.把表中的主键交给数据库自己维护 2.1主键列设置为null 则使用自增 2.2插入除了主键以外的所有非空列&#xff08;推荐方法&#xff09; 2.3自…

ValueError: Object arrays cannot be loaded when allow_pickle=False

文章目录 问题解决方法1&#xff1a;allow_pickleTrue解决方法2&#xff1a;降低numpy版本错误原因&#xff1a;python和numpy版本不兼容 问题 Traceback (most recent call last): File “D:\project\test_st\retrieval\read_npy.py”, line 4, in data np.load(‘mosi0__le…

HTML CSS

目录 1. 什么是HTML 2. 什么是CSS ? 3. 基础标签 & 样式 3.1 新浪新闻-标题实现 3.1.1 标题排版 3.1.1.1 分析 3.1.1.2 标签 3.1.1.3 实现 3.1.2 标题样式 3.1.2.1 CSS引入方式 3.1.2.2 颜色表示 3.1.2.3 标题字体颜色 3.1.2.4 CSS选择器 3.1.2.5 发布时间字…

Prometheus和Grafana的安装部署

初识Prometheus和Grafana 通常来说&#xff0c;对于一个运行时的复杂系统&#xff0c;如果系统出了问题是很难排查的。因为你是不太可能在运行时一边检查代码一边调试的。因此&#xff0c;你需要在各种关键点加上监控&#xff0c;通过监控获取的数据&#xff0c;指导我们进一步…

ubuntu20.04 加固方案-设置用户缺省UMASK

一、编辑/etc/profile配置文件 打开终端。 查看当前umask 使用文本编辑器&#xff08;如vim&#xff09;编辑/etc/profile文件。 sudo vim /etc/profile 二、添加配置参数 在打开的配置文件的末尾中&#xff0c;添加或修改以下参数&#xff1a; umask 027 三、保存并退出…

liunx网络套接字 | 实现基于tcp协议的echo服务

前言&#xff1a;本节讲述linux网络下的tcp协议套接字相关内容。博主以实现tcp服务为主线&#xff0c;穿插一些小知识点。以先粗略实现&#xff0c;后精雕细琢为思路讲述实现服务的过程。下面开始我们的学习吧。 ps&#xff1a;本节内容建议了解网络端口号的友友们观看哦。 目录…

【果蔬识别】Python+卷积神经网络算法+深度学习+人工智能+机器学习+TensorFlow+计算机课设项目+算法模型

一、介绍 果蔬识别系统&#xff0c;本系统使用Python作为主要开发语言&#xff0c;通过收集了12种常见的水果和蔬菜&#xff08;‘土豆’, ‘圣女果’, ‘大白菜’, ‘大葱’, ‘梨’, ‘胡萝卜’, ‘芒果’, ‘苹果’, ‘西红柿’, ‘韭菜’, ‘香蕉’, ‘黄瓜’&#xff09;…

isp框架代码理解

一、整体框架如下&#xff1a; 1 外层的src中 1.1 从camera.c->task.c&#xff1a;封装了3层&#xff0c;透传到某个功能的本级。 1.2 core.c和capability.c中实现&#xff1a;开机初始化加载参数。2. plat/src中 2.1 fun.c中继task.c又透传了一层&#xff1b;以及最后功能…

VuePress文档初始化请求过多问题探讨

1. 背景 公司有部门使用VuePress 1.0搭建平台的帮助文档&#xff0c;前期文档不是很多&#xff0c;所以没有暴露出特别明显的问题。但随着文档的逐步迭代和内容的增多&#xff0c;出现了大量的并发请求&#xff0c;总共有218个请求&#xff0c;导致服务器带宽耗尽、响应速度下降…

Paimon x StarRocks 助力喜马拉雅构建实时湖仓

作者&#xff1a;王琛 喜马拉雅数仓专家 小编导读&#xff1a; 本文将介绍喜马拉雅直播的业务现状及数据仓库架构的迭代升级&#xff0c;重点分享基于 Flink Paimon StarRocks 实现实时湖仓的架构及其成效。我们通过分钟级别的收入监控、实时榜单生成、流量监测和盈亏预警&am…

Virtuoso使用layout绘制版图、使用Calibre验证DRC和LVS

1 绘制版图 1.1 进入Layout XL 绘制好Schmatic后&#xff0c;在原理图界面点击Launch&#xff0c;点击Layout XL进入版图绘制界面。 1.2 导入元件 1、在Layout XL界面左下角找到Generate All from Source。 2、在Generate Layout界面&#xff0c;选中“Instance”&#…

vscode插件-08 Golang

文章目录 Go安装其他必须软件 Go Go语言环境&#xff0c;只需安装这一个插件。然后通过vscode命令下载安装其他go环境需要的内容。 程序调试&#xff0c;需要创建.vscode文件夹并编写launch.json文件。 安装其他必须软件 ctrlshiftp&#xff0c;调出命令面板&#xff0c;输入…

Linux系列-vim的使用

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” vim的使用 vim是多模式编辑器&#xff0c;不同的是vim是vi的升级版本&#xff0c;它不仅兼容vi的所有指令&#xff0c;而且还有一些新的特性在里面&#xff0c;比如语法加亮&am…

强化学习DQN实践(gymnasium+pytorch)

Pytorch官方教程中有强化学习教程&#xff0c;但是很多中文翻译都太老了&#xff0c;里面的代码也不能跑了 这篇blog按照官方最新教程实现&#xff0c;并加入了一些个人理解 工具 gymnasium&#xff1a;由gym升级而来&#xff0c;官方定义&#xff1a;An API standard for rei…

2024快手面试算法题-生气传染

问题描述 思路分析 生气只会向后传播&#xff0c;最后一个生气的人一定是最长连续没有生气的人中的最后一个人&#xff0c;前提是前面得有一个人生气。 注意&#xff0c;一次只能传播一个人&#xff0c;比如示例1&#xff0c;第一次只会传播给第一个P&#xff0c;不会传播给第…

入门 | Kafka数据使用vector消费到Loki中使用grafana展示

一、Loki的基本介绍 1、基本介绍 Loki 是由 Grafana Labs 开发的一款水平可扩展、高性价比的日志聚合系统。它的设计初衷是为了有效地处理和存储大量的日志数据&#xff0c;与 Grafana 生态系统紧密集成&#xff0c;方便用户在 Grafana 中对日志进行查询和可视化操作。 从架构…

分类算法——逻辑回归 详解

逻辑回归&#xff08;Logistic Regression&#xff09;是一种广泛使用的分类算法&#xff0c;特别适用于二分类问题。尽管名字中有“回归”二字&#xff0c;逻辑回归实际上是一种分类方法。下面将从底层原理、数学模型、优化方法以及源代码层面详细解析逻辑回归。 1. 基本原理 …

【Spring MVC】DispatcherServlet 请求处理流程

一、 请求处理 Spring MVC 是 Spring 框架的一部分&#xff0c;用于构建 Web 应用程序。它遵循 MVC&#xff08;Model-View-Controller&#xff09;设计模式&#xff0c;将应用程序分为模型&#xff08;Model&#xff09;、**视图&#xff08;View&#xff09;和控制器&#x…

现代数字信号处理I--最佳线性无偏估计 BLUE 学习笔记

目录 1. 最佳线性无偏估计的由来 2. 简单线性模型下一维参数的BLUE 3. 一般线性模型下一维参数的BLUE 4. 一般线性模型下多维参数的BLUE 4.1 以一维情况说明Rao论文中的结论 4.2 矢量参数是MVUE的本质是矢量参数中的每个一维参数都是MVUE 4.3 一般线性模型多维参数BLUE的…