目录
一,理解进程之间的通信
1. 进程间通信目的
2. 进程间通信的技术背景
3,常见的进程间通信
二,管道
1. 尝试建立一个管道
管道的特点:
管道提供的访问控制:
2. 扩展:进程池
阶段一: 创建多个子进程
阶段二:构建命令方法
ProcessPool.cpp
task.hpp
下一期:进程通信基础知识
结语
一,理解进程之间的通信
首先,系统在设计时,秉持这相互独立的原则,因此要想实现进程之间的通信是比较困难的。而进程之间的通信本质上是:不同的进程能访问同一份数据。
1. 进程间通信目的
2. 进程间通信的技术背景
1)进程是具有独立性的,虚拟地址 + 页表 保证进程之间的独立性 (内核中数据结构 , 代码以及数据)
2)通信成本比较高。
3,常见的进程间通信
1. Linux系统——管道
2. SystemV——单机通信(多进程)
3. posix——网络通信
二,管道
而这个 “ 管道 ”虽然是一个文件,但它不向硬盘写入,管道里的数据是内存级的临时数据。
fd 这个变量就是我们父进程创建的那个pipefile(管道文件描述符存储数组),在经过pipe创建后,将管道的文件描述符填入fd数组中。
1. 尝试建立一个管道
#include <iostream>
#include <unistd.h>
#include <string>
#include <assert.h>
#include <sys/types.h>
using namespace std;
int main()
{
// 用数组记录,读写端
int pipefd[2] = {0}; // pipefd[0],是读端,0就如一张口;pipefd[1],是写端,1就如一只笔。
int ret = pipe(pipefd);
assert(ret != -1);
pid_t pd = fork();
// 假设父进程需要:写。
if (pd == 0)
{ // 子进程创建成功
// 子进程则需要的是:读
close(pipefd[1]);
cout << "chail success" << endl;
char red[1024];
// 循环接收并打印数据
while (1)
{
read(pipefd[0], red, sizeof (red) -1);
cout << red << "### " << "我是子进程,pid:" << getpid() << "父进程: " << getppid() << endl;
}
exit(1);
}
// 父进程, 功能:写
close(pipefd[0]);
string base_str = "我是父进程, 正在给你发消息:";
char buffer[1024]; // 输出缓冲区
int count = 0;
while (1) // 不断地向子进程发送数据
{
// 向缓冲区里,不断输入变换的消息
int len = snprintf(buffer, sizeof buffer, "%s pid = [%d], 消息数量: %d", base_str.c_str(), getpid(), count++);
// 向管道写入
write(pipefd[1], buffer, len);
sleep(1); // 一秒秒的刷新
}
return 0;
}
发现现象:
我们是否注意到,在父进程写数据时,是每隔一秒写一份到管道,而我们没有限制子进程打印的间隔,但从结果来看,向管道内取数据(子进程打印过程)似乎受到了什么控制。
我们尝试将父进程的写入间隔设置为默认0,将子进程的读出时间设置为10秒一次读。结果上,我们能看到父进程在写入到一定次数后,停止了写入;子进程经过10秒后,会打印一大堆数据,直到打印完为止。从这个现象我们可以发现:
管道提供访问控制。管道本质上也是一个文件,那么也有其最大容量,当即将满时,暂停写入,等待空间;当管道为空时,读操作将进行等待,等待数据写入。
管道的特点:
1. 管道是用来父子关系的通信管道,是具有继承性的。
2. 管道可以提供进程间通信,提供访问控制。
3. 管道提供面向流试的通信服务(字节流——需要协议进行数据区分,后面我们再聊)。
4. 管道是基于文件的,文件的生命周期是随进程的,因此管道的生命周期也是随进程的。
5. 管道是单向通信,属于半双工通信的一种(半双工意思是永远只有一方在写,一方在读;全双工则可以双方在写,双方在读)
管道提供的访问控制:
2. 扩展:进程池
相关知识:
1.)unin32_t 类型
uint32_t是一个无符号32位整数类型,其意义在于表示一个无符号的32位整数,范围为0到4294967295。使用uint32_t类型可以确保数据的范围和位数符合要求,同时提高代码的可移植性和可读性。这样不管在任何一种型号的机器上sizeof取得的字节数都是4字节。ssize_t类型表示有符号的大小类型,通常用于表示某些系统调用的返回值或参数。它的大小通常与机器的字长相同,即32位机器上为4字节,64位机器上为8字节。ssize_t类型通常用于表示读取或写入的字节数。
2.)functiaonal 头文件
头文件 functional 是 C++ 标准库中的一个头文件,它包含了一些模板类和函数对象,用于支持函数式编程和泛型编程。这些类和函数对象可以帮助程序员编写更加灵活和抽象的代码,提高代码的可读性和可维护性。它还提供了一些算法和函数,如 std::function、std::bind、std::mem_fn 等,用于处理函数对象和函数指针,以及实现函数的组合、绑定和适配等功能。
因此,头文件 functional 的意义在于为 C++ 程序员提供了一些强大的工具,帮助他们更好地利用函数式编程和泛型编程的特性,提高代码的质量和效率。
首先什么是进程池??
通俗的理解为,子进程替父进程进行数据处理。
父进程为了在主程序中完成任务,需要让子进程去处理数据,整理,整合数据,但为了提高创建父进程的效率,在父进程还未向子进程派发任务前,提前创建多个子进程,然后通过管道进行命令传递。
阶段一: 创建多个子进程
通过for循环创建多个子进程,并通过管道联系!
阶段二:构建命令方法
我们将方法单独一个文件,向管道中读取命令信息,然后根据命令信息调用对应的方法。
ProcessPool.cpp
#include <iostream>
#include <assert.h>
#include <vector>
#include <unistd.h>
#include "Task.hpp" //导入的方法文件
#include <time.h>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
#define NAM_OF_PRO 5
// 对命令进行处理
int waitcommand(int fd, bool& quit)
{
uint32_t command = 0; // 取个32位的类型,用来获取4字节的数据
ssize_t byte = read(fd, &command, sizeof command);
// 如果父进程没有发送
if (byte == 0)
{
quit = true; // 命令管道开启 //你将他开启了
return 0;
}
// 检测是否是4字节的命令消息1
assert(byte == sizeof (uint32_t));
return command;
}
int main()
{
// 阶段一 ; 创建多个子进程
// 放pid 与 创建时所在的端口——fd
vector<pair<pid_t, int>> slots; // slots 位置的意思
// 创建多个进程
for (int i = 0; i < NAM_OF_PRO; i++)
{
// 建立管道
int pipefil[2] = {0};
int ret = pipe(pipefil);
assert(ret != -1); // 管道检测
// 创建子进程
pid_t fd = fork();
if (fd == 0) // 子进程
{
// 关闭写端
close(pipefil[1]);
while (1)
{
// 处理子进程任务
//未接收数据,子进程进入阻塞状态
// 阶段二:为子进程设置任务
bool quit = false; // 管道是否被使用
int k = waitcommand(pipefil[0], quit); // 进入等待命令函数
// cout << "K:" << k << " quit " << quit << endl;
if (quit)
break; // 目的:当我们开始关闭管道时,read()会读取0字节,这时让子进程退出
tasklist[k](); // 利用函数容器直接调用,方法函数
}
exit(1);
}
// 父进程,关闭读端
close(pipefil[0]);
// 同时记录子进程,pid数据
slots.push_back({fd, pipefil[1]});
}
// 阶段二: 父进程向子进程传递命令信息
// 要想达到子进程接受命令较合理——负载均衡。
srand((unsigned)time(0) ^ getpid() * 233333); // 位运算与相乘2333333,目的是:让数据更随机
uint32_t command = 0;
Taskloading(); // 对方法进行加载
while (true)
{
cout << "###########################################################\n";
cout << "## 0. 退出 1. 查看方法表 2. 输入命令 #############\n";
cout << "###########################################################\n";
cout << "请输入:";
cin >> command;
//你往管道里面写数据的逻辑在哪里,定位到那里先
//子进程正常执行了,你的问题是什么
if (command == 1 )
{
print_task();
// continue;
} else if ( command == 2)
{
cout << "Please imput command : ";
cin >> command;
if (!(command >= 0 && command < tasklist.size()))
{
cout << "无效命令" << endl;
continue;
}
size_t n = rand() % slots.size(); // 随机数选择子进程
ssize_t ret = write(slots[n].second, &command, sizeof command);
if ( ret != sizeof command)
{
cout << " write fail " << endl;
}else
{
cout << " write success " << " child pid:[" << slots[n].first << "] " << "执行:"
<< command << " " << task_inf[command]<< endl;
}
}else if (command == 0)
{
cout << "退出成功" << endl;
break;
}
else
{
cout << "无效命令,请重新输入" << endl;
}
sleep(1);
}
// 完成分配后,关闭管道
for (const auto& e : slots)
{
close(e.second);
}
// 回收子进程
for (const auto& e : slots)
{
waitpid(e.first, nullptr, 0); // 由于auto只遍历一次,所以不能用轮询回收
}
return 0;
}
task.hpp
#pragma once
#include <iostream>
#include <assert.h>
#include <vector>
#include <string>
#include <unordered_map>
#include <functional>
#include <unistd.h>
using namespace std;
typedef function<void()> func;
vector<func> tasklist; // 记录方法数
unordered_map<int, string> task_inf; // 记录方法信息。
void readMySQL()
{
cout << " process : " << getpid() << " 访问数据库";
}
void val()
{
cout << " process : " << getpid() << " 进行数据运算";
}
void save()
{
cout << " process : " << getpid() << " 进行数据持久化";
}
void cal()
{
cout << " process : " << getpid() << " 进行数据加密";
}
// 加载任务表
void Taskloading()
{
task_inf.insert({tasklist.size(), "readMySQL"});
tasklist.push_back(readMySQL);
task_inf.insert({tasklist.size(), "val"});
tasklist.push_back(val);
task_inf.insert({tasklist.size(), "save"});
tasklist.push_back(save);
task_inf.insert({tasklist.size(), "cal"});
tasklist.push_back(cal);
cout << "方法加载成功!" << endl;
}
// 打印方法表
void print_task()
{
for (const auto& e : task_inf)
{
cout << e.first << " " << e.second << endl;
}
}
int tasksize()
{
return tasklist.size();
}
我给大家留了个坑,不知道大家有没有发现呢? 我们在最后的时候调用不了函数
解析: 是进程的独立性导致的。 我们知道我们在已经对方法容器进行了加载,但是在fork之后父进程的加载,父进程进行了实时拷贝,子进程的tasklist任然是未加载状态,这时你就会问了,既然子进程访问的是一个空容器,那我们不就是非法访问了吗?? 是的,我们没有看到报错是因为我们在父进程,子进程的报错不会导致父进程退出,我们通过 查看进程状态表即可发现,由于子进程的非法访问,导致子进程崩溃,成为了僵尸进程。
解决方法:
1, fork前,进行方法表加载。
2. 每个子进程都加载。
分析:父子进程,对共享的全局变量,都只是只读的权限,修改则要写实拷贝,因此,要么全局变量在fork之前就数据加载好;要么,在每个子进程都加载一份方法表。如果是我,我会一开始就加载好,这样就只要保存一份数据。
下一期:进程通信基础知识
结语
本小节就到这里了,感谢小伙伴的浏览,如果有什么建议,欢迎在评论区评论,如果给小伙伴带来一些收获请留下你的小赞,你的点赞和关注将会成为博主创作的动力。