【Linux】 进程池 一主多从 管道通信

目录

1.代码介绍

2.channel 类

3.进程池类编写

4.主函数及其他

5. 源码


1.代码介绍

本文代码采用一主多从式(一个主进程(master)多个子进程(worker))通过管道进行通信,实现主进程分发任务,子进程完成任务,当主进程关闭管道时,子进程执行完任务后退出。

2.channel 类

创建一个channel类用于描述子进程,方便主进程与其通信。

成员变量:

int _wfd;  //记录管道写描述符用于主进程发送数据

pid_t _pid; //子进程pid

string _name; 

代码实现:

class Channel
{
public:
	Channel(int wfd, pid_t pid, const string &name) : _wfd(wfd),
														   _pid(pid), _name(name) {}
	~Channel() {}
	int wfd() { return _wfd; }
	string name() { return _name; }
	pid_t pid() { return _pid; }
 	void Close() { close(_wfd); }//关闭写描述符,子进程读取完毕后会退出
private:
	int _wfd;
	pid_t _pid;
	string _name;
};

3.进程池类编写

该类用于管理子进程,具体功能:

1.创建子进程。

2.获取子进程(轮转方式保证负载均衡)。

3.主进程发送任务码给子进程。

4.进程等待。

5.控制进程退出。

代码实现:

class ProcessPool
{
public:
	ProcessPool(int num) : _process_num(num) {}
	void createProcess(work_t work)
	{
		vector<int> fds;
		for (int i = 0; i < _process_num; ++i)
		{
			int pipefd[2]{0};
			pipe(pipefd);
			pid_t pid = fork();
			if (pid == 0)
			{
				if(!fds.empty())
				{
					for(auto& fd:fds)
					{
                        //关闭之前子进程管道的写端
						close(fd);
					}
				}
				close(pipefd[1]);
				dup2(pipefd[0], 0);
				work();
				exit(0);//子进程执行任务完毕会退出
			}

			close(pipefd[0]);
			string cname = "channel-" + to_string(i);
			_channels.push_back(Channel(pipefd[1], pid, cname));
			fds.push_back(pipefd[1]);
		}
	}
	int NextChannel()
	{
		static unsigned int index = 0;
		return (index++) % _process_num;
	}
	void SendTaskCode(int index, uint32_t code)
	{
		cout << "send code: " << code << " to " << _channels[index].name() << " sub prorcess id: " << _channels[index].pid() << endl;
		write(_channels[index].wfd(), &code, sizeof(code));
	}
	void Wait()
	{
		waitpid(-1, nullptr, 0);
	}
	void KillAll()
    {
        for (auto &channel : _channels)
        {
            channel.Close();
        }
    }
	~ProcessPool() {}

private:
	int _process_num;
	vector<Channel> _channels;
};

4.主函数及其他

主进程发送任务码,通过管道发送给子进程执行对应任务。

void CtrlProcessPool(const shared_ptr<ProcessPool> &processpool_ptr, int cnt)
{
	while (cnt)
	{
		// a. 选择一个进程和通道
		int channel = processpool_ptr->NextChannel();
		// cout << channel.name() << endl;

		// b. 你要选择一个任务
		uint32_t code = NextTask();

		// c. 发送任务
		processpool_ptr->SendTaskCode(channel, code);

		sleep(1);
		cnt--;
	}
}
int main(int argc, char *argv[])
{
	if (argc != 2)
	{
		printf("\n\t usage ./processPool num\n");
		return 1;
	}
	int process_num = stoi(argv[1]);
	shared_ptr<ProcessPool> process_ptr = make_shared<ProcessPool>(process_num);
	process_ptr->createProcess(worker);
	CtrlProcessPool(process_ptr, 5);

	process_ptr->KillAll();
	process_ptr->Wait();
	return 0;
}

模拟任务:

#pragma once

#include <iostream>
#include <unistd.h>
#include <functional>
using namespace std;

using work_t = function<void()>;
using task_t = function<void()>;

void PrintLog()
{
    cout << "printf log task" << endl;
}

void ReloadConf()
{
    cout << "reload conf task" << endl;
}

void ConnectMysql()
{
    cout << "connect mysql task" << endl;
}

task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};

uint32_t NextTask()
{
    return rand() % 3;
}

void worker()
{
    // 从0中读取任务即可!
    while(true)
    {
        uint32_t command_code = 0;
        ssize_t n = read(0, &command_code, sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code >= 3) continue;
            tasks[command_code]();
        }
        else if(n == 0) //管道写端关闭读端返回值位0
        {
            cout << "sub process: " << getpid() << " quit now..." << endl;
            break;
        }
    }
}

5. 源码

processPool.cc

#include <iostream>
#include <string>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include <memory>
#include "task.hpp"

using namespace std;
class Channel
{
public:
	Channel(int wfd, pid_t pid, const string &name) : _wfd(wfd),
														   _pid(pid), _name(name) {}
	~Channel() {}
	int wfd() { return _wfd; }
	string name() { return _name; }
	pid_t pid() { return _pid; }
 	void Close() { close(_wfd); }//关闭写描述符,子进程读取完毕后会退出
private:
	int _wfd;
	pid_t _pid;
	string _name;
};
class ProcessPool
{
public:
	ProcessPool(int num) : _process_num(num) {}
	void createProcess(work_t work)
	{
		vector<int> fds;
		for (int i = 0; i < _process_num; ++i)
		{
			int pipefd[2]{0};
			pipe(pipefd);
			pid_t pid = fork();
			if (pid == 0)
			{
				if(!fds.empty())
				{
					for(auto& fd:fds)
					{
						close(fd);
					}
				}
				close(pipefd[1]);
				dup2(pipefd[0], 0);
				work();
				exit(0);//子进程执行任务完毕会退出
			}

			close(pipefd[0]);
			string cname = "channel-" + to_string(i);
			_channels.push_back(Channel(pipefd[1], pid, cname));
			fds.push_back(pipefd[1]);
		}
	}
	int NextChannel()
	{
		static unsigned int index = 0;
		return (index++) % _process_num;
	}
	void SendTaskCode(int index, uint32_t code)
	{
		cout << "send code: " << code << " to " << _channels[index].name() << " sub prorcess id: " << _channels[index].pid() << endl;
		write(_channels[index].wfd(), &code, sizeof(code));
	}
	void Wait()
	{
		waitpid(-1, nullptr, 0);
	}
	void KillAll()
    {
        for (auto &channel : _channels)
        {
            channel.Close();
        }
    }
	~ProcessPool() {}

private:
	int _process_num;
	vector<Channel> _channels;
};

void CtrlProcessPool(const shared_ptr<ProcessPool> &processpool_ptr, int cnt)
{
	while (cnt)
	{
		// a. 选择一个进程和通道
		int channel = processpool_ptr->NextChannel();
		// cout << channel.name() << endl;

		// b. 你要选择一个任务
		uint32_t code = NextTask();

		// c. 发送任务
		processpool_ptr->SendTaskCode(channel, code);

		sleep(1);
		cnt--;
	}
}
int main(int argc, char *argv[])
{
	if (argc != 2)
	{
		printf("\n\t usage ./processPool num\n");
		return 1;
	}
	int process_num = stoi(argv[1]);
	shared_ptr<ProcessPool> process_ptr = make_shared<ProcessPool>(process_num);
	process_ptr->createProcess(worker);
	CtrlProcessPool(process_ptr, 5);

	process_ptr->KillAll();
	process_ptr->Wait();
	return 0;
}

task.hpp:

#pragma once

#include <iostream>
#include <unistd.h>
#include <functional>
using namespace std;

using work_t = function<void()>;
using task_t = function<void()>;

void PrintLog()
{
    cout << "printf log task" << endl;
}

void ReloadConf()
{
    cout << "reload conf task" << endl;
}

void ConnectMysql()
{
    cout << "connect mysql task" << endl;
}

task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};

uint32_t NextTask()
{
    return rand() % 3;
}

void worker()
{
    // 从0中读取任务即可!
    while(true)
    {
        uint32_t command_code = 0;
        ssize_t n = read(0, &command_code, sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code >= 3) continue;
            tasks[command_code]();
        }
        else if(n == 0) //管道写端关闭读端返回值位0
        {
            cout << "sub process: " << getpid() << " quit now..." << endl;
            break;
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/930159.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

小红薯最新x-s 算法补环境教程12-06更新(下)

在上一篇文章中已经讲了如何去定位x-s生成的位置&#xff0c;本篇文章就直接开始撸代码吧 如果没看过的话可以看&#xff1a;小红薯最新x-s算法分析12-06&#xff08;x-s 56&#xff09;&#xff08;上&#xff09;-CSDN博客 1、获取加密块代码 首先来到参数生成的位置&…

Nacos源码学习-本地环境搭建

本文主要记录如何在本地搭建Nacos调试环境来进一步学习其源码&#xff0c;如果你也刚好刷到这篇文章&#xff0c;希望对你有所帮助。 1、本地环境准备 Maven: 3.5.4 Java: 1.8 开发工具&#xff1a;idea 版本控制工具: git 2、下载源码 官方仓库地址 &#xff1a;https://git…

视频码率到底是什么?详细说明

视频码率&#xff08;Video Bitrate&#xff09;是指在单位时间内&#xff08;通常是每秒&#xff09;传输或处理的视频数据量&#xff0c;用比特&#xff08;bit&#xff09;表示。它通常用来衡量视频文件的压缩程度和质量&#xff0c;码率越高&#xff0c;视频质量越好&#…

计算机网络复习——概念强化作业

物理层负责网络通信的二进制传输 用于将MAC地址解析为IP地址的协议为RARP。 一个交换机接收到一帧,其目的地址在它的MAC地址表中查不到,交换机应该向除了来的端口外的所有其它端口转发。 关于ICMP协议,下面的论述中正确的是ICMP可传送IP通信过程中出现的错误信息。 在B类网络…

【AI系统】感知量化训练 QAT

感知量化训练 QAT 本文将会介绍感知量化训练&#xff08;QAT&#xff09;流程&#xff0c;这是一种在训练期间模拟量化操作的方法&#xff0c;用于减少将神经网络模型从 FP32 精度量化到 INT8 时的精度损失。QAT 通过在模型中插入伪量化节点&#xff08;FakeQuant&#xff09;…

【AI系统】模型压缩基本介绍

基本介绍 随着神经网络模型的复杂性和规模不断增加&#xff0c;模型对存储空间和计算资源的需求越来越多&#xff0c;使得部署和运行成本显著上升。模型压缩的目标是通过减少模型的存储空间、减少计算量或提高模型的计算效率&#xff0c;从而在保持模型性能的同时&#xff0c;…

使用GO--Swagger生成文档

概述 在前后端分离的项目中&#xff0c;后端配置swagger可以很好的帮助前端人员了解后端接口参数和数据传输。go-swagger 是一个功能全面且高性能的Go语言实现工具包&#xff0c;用于处理Swagger 2.0&#xff08;即OpenAPI 2.0&#xff09;规范。它提供了丰富的工具集&#x…

排查bug的通用思路

⭐️前言⭐️ APP点击某个按钮没有反应/PC端执行某个操作后&#xff0c;响应较慢&#xff0c;通用的问题排查方法: 从多个角度来排查问题 &#x1f349;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f349;博主将持续更新学习记录收获&#xff0c;友友们有任何问题可以在评…

2024年认证杯SPSSPRO杯数学建模C题(第一阶段)云中的海盐解题全过程文档及程序

2024年认证杯SPSSPRO杯数学建模 C题 云中的海盐 原题再现&#xff1a; 巴黎气候协定提出的目标是&#xff1a;在2100年前&#xff0c;把全球平均气温相对于工业革命以前的气温升幅控制在不超过2摄氏度的水平&#xff0c;并为1.5摄氏度而努力。但事实上&#xff0c;许多之前的…

oracle之用户的相关操作

&#xff08;1&#xff09;创建用户(sys用户下操作) 简单创建用户如下&#xff1a; CREATE USER username IDENTIFIED BY password; 如果需要自定义更多的信息&#xff0c;如用户使用的表空间等&#xff0c;可以使用如下&#xff1a; CREATE USER mall IDENTIFIED BY 12345…

ArcMap 处理河道坡度、计算污染区、三维爆炸功能

ArcMap 处理河道坡度、计算污染区、三维爆炸功能今天分析 一、计算河道方向坡度 1、折线转栅格 确定 2、提取河道高程值 确定后展示河流的高程值 3、计算坡向数据 确定后展示 4、计算坡度数据 确定后展示 二、计算上游集水区污染值 1、填挖处理 确定 2、计算流向 确定 3、计算…

一睹:微软最新发布的LazyGraphRAG

微软近期推出了一项革新性的技术——LazyGraphRAG&#xff0c;这是一种启用图谱的检索增强生成&#xff08;Retrieval Augmented Generation&#xff0c;RAG&#xff09;技术&#xff0c;它以其卓越的效率和成本效益&#xff0c;彻底颠覆了传统观念中对“懒惰”的刻板印象。 位…

linux_kernel_编程

内核报错信息查看 include/uapi/asm-generic/errno-base.h 设备树的读取操作 struct device_node *ncof_property_read_bool(nc, "spi-cpha")if (!of_node_name_eq(nc, "slave"))rc of_property_read_u32(nc, "reg", &…

arm64 UOS平台docker配置gitlab

arm64 UOS平台docker配置gitlab 加载或下载gitlab docker镜像配置 加载或下载gitlab docker镜像 docker load < gitlab.tar docker tag xxx gitlab_arm 配置 创建gitlab目录&#xff0c;在gitlab目录下创建etc log opt 目录创建启动文件start_gitlab.sh并增加执行权限 d…

【Homework】【8】Learning resources for DQ Robotics in MATLAB

作业任务 创建一个名为“VS050RobotDH”的类&#xff0c;该类代表Denso VS050机器人&#xff0c;其DH参数如下表所示&#xff0c;并且完全由旋转关节组成。&#xff08;请记住第6课的内容&#xff09; θ \theta θ d d d a a a α \alpha α − π -\pi −π0.3450 π 2 \fra…

Cannot resolve symbol ‘ActivityThread‘ | Android 语法

背景 ActivityThread 是 Android 系统内部使用的一个类,它位于 android.app 包中,但在 Android SDK 的公共 API 中并没有公开。 由于 ActivityThread 是隐藏的内部类,因此在编写单元测试或功能开发时,无法直接引用它。可以使用反射来访问内部 API,或者使用依赖注入的方式…

TSWIKI知识库软件

TSWIKI 知识库软件介绍 推荐一个适合本地化部署、自托管的知识库软件 TSWIKI介绍 tswiki 是一个适合小团队、个人的知识库、资料管理的软件&#xff0c;所有数据均本地化存储。可以本地化、私有云部署&#xff0c;安装简单。在线预览。 主要功能说明 1、简化的软件依赖和安…

Agent AI: Surveying the Horizons of Multimodal Interaction---医疗保健、视频音频、多模态

医疗保健领域 在医疗保健领域&#xff0c;大型语言模型&#xff08;LLMs&#xff09;和视觉语言模型&#xff08;VLMs&#xff09;可以作为诊断代理、患者护理助手&#xff0c;甚至是辅助治疗工具&#xff0c;但它们也伴随着独特的挑战和责任。AI代理在提高患者护理质量和拯救生…

Ajax:回忆与节点

一点回忆 面对我的Ajax学习&#xff0c;实现前后端交互&#xff0c;最开始我采用的使用网络寻找intellij IDEA Ultimate破解方法&#xff0c;然后最终成功&#xff0c;然后按照相关教程配置java ee项目&#xff0c;然后中间又去配置了Tomcat服务器&#xff0c;然后又去学习了一…

游戏引擎学习第35天

开场介绍 今天的任务是继续改进一个虚拟的瓦片地图系统&#xff0c;使其适合处理更大的世界。我们希望这个系统能管理大范围的游戏世界&#xff0c;其中包含按需存储的小区域。昨天&#xff0c;我们介绍了“内存区域”的概念&#xff0c;用于管理持久性存储。我们计划今天继续…