目录
一、进程间通信
1、 为什么需要进程通信
2、发展和分类
二、管道
1、概念
2、特点
2、复制并共享
3、用fork来共享管道原理
4、站在文件描述符角度-深度理解管道
5、站在内核角度-管道本质
三、匿名管道
1、概念
2、创建
3、snprintf
4、父子进程中进行单向通信
四、实现简单的进程池模型
Task.hhp:任务函数
1、全局变量
2、任务函数
3、初始化函数
4、辅助函数
ProcessPool.cc:进程池
1、初始化任务和进程槽
2、创建子进程和管道
3、waitCommand函数
4、进行通信&执行任务
5、sendAndWakeup() 函数
6、父进程派发任务
运行示例
一、进程间通信
1、 为什么需要进程通信
进程运行的独立性意味着它们在默认情况下是隔离的,使得进程间通信成为一项挑战。进程间通信(IPC)的核心目的是允许不同的进程访问共享资源,例如内存空间,以便于交流、控制和协同工作。进行进程间通信的主要动机包括:
- 数据传输:实现数据的有效传递,允许一个进程将其数据发送至另一进程,促进信息的共享与处理。
- 资源共享:通过允许多个进程访问相同的系统资源,优化资源的使用效率和系统性能。
- 通知事件:使进程能够发送消息通知其他进程某些事件的发生,例如,一个进程在终止时需要告知其父进程。
- 进程控制:支持特定进程(如调试进程)对其他进程进行控制,包括拦截异常、监视状态变化等,以实现更细致的系统管理和错误调试。
进程间通信是现代操作系统中不可或缺的一部分,它不仅增强了进程之间的协作能力,也提升了系统的整体效率和灵活性。
2、发展和分类
管道、System V进程间通信(IPC)和POSIX进程间通信是操作系统中实现不同进程之间进行数据交换与同步的三种主要机制。
管道(Pipes)
- 管道是一种最早的进程间通信机制,最初出现在Unix系统中。它允许两个相关进程之间通过一个单向或双向的数据通道传递字节流。在Linux环境下,有两种类型的管道:
- 匿名管道(Anonymous Pipes):通常用于父子进程之间的通信,由
pipe()
系统调用创建,不具有文件系统的接口,生命周期依赖于创建它的进程。 - 命名管道(FIFOs, Named Pipes):也称为命名队列,它是一个存在于文件系统中的特殊文件,任何知道其路径名的进程都可以打开并使用它进行通信,由
mkfifo()
系统调用创建。
System V 进程间通信 (IPC)
- System V IPC 是一套较为复杂的进程间通信方法,主要用于多进程间的协作。它包括以下几种形式:
- 消息队列(Message Queues):提供了一种异步通信方式,进程可以通过发送和接收消息来交换数据,消息队列可以保证消息的顺序和可靠传输。
- 共享内存(Shared Memory Segments):允许多个进程直接访问同一块物理内存区域,从而实现高速的数据共享。
- 信号量(Semaphores):提供了一种进程间同步手段,用于控制对共享资源的并发访问,避免竞态条件。
POSIX 进程间通信
- POSIX(Portable Operating System Interface for UNIX)定义了一系列跨平台的标准API,为进程间通信提供了更为一致且易于移植的解决方案,主要包括:
- 消息队列(POSIX Message Queues):类似于System V的消息队列,但接口更符合POSIX标准,旨在提高可移植性。
- 共享内存(POSIX Shared Memory Objects):同样基于内存共享原理,但使用了不同的API如
shm_open()
、mmap()
等,提供了更多的灵活性。 - 信号量(POSIX Semaphores):不同于System V的信号量,POSIX信号量提供了更统一的操作接口,可通过
sem_open()
,sem_post()
,sem_wait()
等函数进行操作。 - 此外,POSIX还引入了其他同步机制,如互斥量(Mutexes)、条件变量(Condition Variables)、读写锁(Read-Write Locks),这些不仅适用于进程间同步,也是线程同步的重要工具。
二、管道
1、概念
具体实现时,一个管道拥有两端:读端(入口)和写端(出口)。为了创建并使用管道进行通信,通常采取以下步骤:
- 父进程通过调用特定的系统调用如
pipe()
来创建一个管道,这会返回一对文件描述符,分别对应管道的读端和写端。 - 父进程接着执行
fork()
系统调用以创建子进程,此时,由于父子进程共享同一文件描述符表,因此双方都能访问到这个刚创建的管道资源。 - 为了确保正确的数据流向,父进程关闭它不需要的管道读端(如果它只负责写入),子进程则关闭其不需要的管道写端(如果它只负责读取)。
2、特点
-
面向血缘进程通信:管道主要用于亲缘进程(如父子进程)之间的数据交换,通过调用
pipe()
系统调用创建,并在fork()后由父子进程共享同一管道资源。 -
访问控制与同步:管道的读写两端提供了自然的访问控制,确保了有序的数据流传递。当一个进程在管道的某一端进行读取或写入时,其他进程必须遵循适当的同步规则,以避免数据冲突或阻塞。
-
流式通信服务:管道提供的是面向字节流的通信服务,即数据是以连续的、无结构的字节序列进行传输的,协议相对简单且透明。
-
基于文件实现:尽管概念上管道类似于文件,但它是内核管理的临时内存区域,生命周期与关联进程紧密相关,随进程结束而自动清理。不同于磁盘上的普通文件,管道内容不会持久化存储。
-
半双工通信:管道通常只能单向传输数据,也就是说,它是一个半双工通信通道,同一时刻只允许单向的数据流动,要么从写端流向读端,要么反之。
-
需要双方通信时,需要建立起两个管道
-
-
管道容量与读写行为:
- 当写入速度超过读取速度时,管道内部的缓冲区会逐渐填满,直至达到上限,此时继续写入会导致写进程阻塞,直到有足够的空间。
- 反之,如果读取速度快于写入速度,当管道中的数据被完全读取后,读进程将阻塞等待新的数据到来。
- 如果写端关闭而读端仍在读取,当所有已写入的数据都被读出后,读取操作会返回0值,表示达到了逻辑上的“文件结尾”。
- 若读端先关闭,则写进程在尝试写入管道时,由于没有读者,操作系统可能会发送SIGPIPE信号终止写进程。
2、复制并共享
在创建子进程时,操作系统执行了一个“复制并共享”的过程
在Linux操作系统,当通过fork()
函数创建子进程时,子进程会继承父进程的文件描述符表。文件描述符(file descriptor)是一个指向内核中文件表项的索引,用于标识进程中打开的文件。每个进程都有自己的文件描述符表,但是这些文件描述符指向的是同一个内核文件表中的条目。
这里有几个关键点需要注意:
-
文件描述符的继承:当
fork()
被调用时,子进程获得父进程文件描述符表的副本。这意味着在fork()
调用时,父进程中打开的文件在子进程中也会处于打开状态,且具有相同的文件描述符号。因此,父子进程可以共享打开的文件状态,如当前文件偏移量(file offset)和文件打开模式(例如,读、写)。 -
共享内核文件表:父子进程中的同一文件描述符虽然各自存在于各自的文件描述符表中,但它们实际上指向的是同一个内核级别的文件对象。这意味着对文件的操作会影响到所有引用该文件的进程。例如,如果父进程在一个文件描述符上进行了读取或写入操作,那么文件内部的偏移量将同时影响子进程在同一文件描述符上的读取结果。
-
文件的实际复制并不发生:重要的是要理解,子进程创建时,并没有对打开的文件数据进行物理复制。相反,复制的是文件描述符表的条目,这些条目指向内核中的文件表。这种机制是高效的,因为它避免了不必要的数据复制。
-
独立操作文件描述符:尽管父进程和子进程共享打开的文件,但他们可以独立地操作自己的文件描述符。例如,子进程可以关闭或改变它继承的文件描述符指向的文件的某些属性,而不会影响父进程。然而,对于共享的文件本身(如文件偏移量),更改会影响到所有拥有该文件描述符的进程。
3、用fork来共享管道原理
- 当一个进程调用了 pipe 函数之后,它就会得到一对文件描述符(fd[0](读) 和 fd[1](写))用于访问管道。如果此时该进程又调用了 fork 函数创建了一个子进程,那么这个子进程也会继承这对文件描述符。
- 在这个过程中,父子进程共享同一个管道,也就是说它们都可以通过这两个文件描述符来访问管道。但是,每个进程只能看到自己打开的那部分管道,l例如:父进程只能看到管道的写端,而子进程只能看到管道的读端。
- 这样设计的原因是为了保证数据的安全性。因为管道是一个共享的数据结构,如果多个进程同时对它进行操作,就可能会出现数据冲突的问题。因此,操作系统规定,每个进程只能看到自己打开的那一部分管道,从而避免了这种问题的发生。
- 另外,当一个进程不再需要使用某个文件描述符时,它可以将其关闭。这样做的好处是可以释放相应的系统资源,提高系统的性能。
4、站在文件描述符角度-深度理解管道
图中展示的是一个父进程通过 fork 创建子进程,并且使用管道进行通信的过程。
- 首先,父进程创建了一个管道,这个管道有两个文件描述符:读端和写端。然后,父进程 fork 出了子进程。在 fork 的时候,子进程会继承父进程的所有资源,包括管道的两个文件描述符。
- 接着,父进程关闭了管道的读端(fd[0]),而子进程则关闭了管道的写端(fd[1])。这样做的目的是为了确保只有父进程可以向管道写入数据,而只有子进程可以从管道读取数据。
- 最后,父进程和子进程就可以通过管道进行通信了。父进程可以通过 write 函数将数据写入到管道的写端,而子进程则可以通过 read 函数从管道的读端读取数据。由于管道是全双工的,所以父进程也可以从管道读取数据,而子进程也可以向管道写入数据。
5、站在内核角度-管道本质
在图中,我们看到有两个进程分别对同一个文件进行了读写操作。在这个过程中,内核需要知道这个文件的相关信息,以便正确地处理这些操作。这就是inode的作用,它提供了所有必要的信息,让内核能够正确地处理文件的操作。
在图中,我们看到有两个进程通过管道进行通信。从 Linux 内核角度来看,管道是一种特殊的文件,它由内核维护,可以在不同的进程之间传递数据。管道的本质就是一个内存缓冲区,它被映射到了所有使用它的进程的地址空间中。它允许两个进程共享一个缓冲区,从而实现在不同进程中传递数据的功能。
- 当一个进程想要向管道写入数据时,它会先检查管道的缓冲区是否已满。如果缓冲区未满,则可以直接将数据写入缓冲区;否则,就需要等待其他进程从管道中读取数据,直到缓冲区有空闲的空间为止。
- 当另一个进程想要从管道中读取数据时,它会先检查管道的缓冲区是否有数据。如果有数据,则可以直接从缓冲区中读取;否则,就需要等待其他进程向管道中写入数据,直到缓冲区中有数据为止。
- 管道的读写操作都是原子性的,这意味着一次读或写操作要么全部完成,要么不完成。这样可以防止数据的丢失或者损坏。
- 总的来说,管道是一种非常有用的进程间通信机制,它可以帮助不同进程之间的数据交换变得更加简单和高效。
三、匿名管道
1、概念
匿名管道(Anonymous Pipe)是操作系统提供的一种简单的进程间通信机制,主要用于父子进程或者有直接亲缘关系的进程之间进行数据交换。它是一种半双工的通信方式,即数据只能单向流动,或从父进程流向子进程,或从子进程流向父进程。
特点:
-
内存中存在:匿名管道是在内存中开辟的一段缓冲区,而不是在文件系统中创建一个实际的文件对象。
-
无名称标识:与命名管道不同,匿名管道没有明确的名称标识,它由操作系统在创建时分配,并通过句柄(文件描述符)来引用和操作。
-
血缘关系限制:通常情况下,匿名管道只能在创建它的进程及其直接子进程中使用。也就是说,只有具有直接亲缘关系的进程才能共享同一匿名管道,其他无关进程无法访问。
-
读写模式:匿名管道中的数据传输遵循先进先出(FIFO)原则。一个进程负责写入数据到管道的一端,而另一个进程则从管道的另一端读取数据。
-
单向或双向通信:由于匿名管道的半双工特性,若要实现双向通信,需要创建两个管道,分别用于两个方向的数据传输。
-
阻塞行为:
- 当管道为空时,尝试从管道读取数据的操作会阻塞,直到有数据可读。
- 当管道已满时,尝试向管道写入数据的操作也会阻塞,直到有足够的空间可供写入。
2、创建
pipe()
函数是Unix和类Unix系统(包括Linux)中的一个用于创建匿名(无名)管道的系统调用。匿名管道是一种简单的进程间通信机制,允许父子进程或相关联的进程之间进行单向或双向数据传输。
#include <unistd.h>
int pipe(int fd[2]);
参数说明:
fd
: 一个大小为2的整数数组,类型为int[2]
。这个数组由pipe()
函数填充,并返回两个文件描述符。fd[0]
:指向管道的读端(Read End)。从这个文件描述符可以读取通过管道传递过来的数据。fd[1]
:指向管道的写端(Write End)。数据可以通过这个文件描述符写入到管道中,进而被连接到读端的进程读取。
功能描述: 当调用 pipe()
函数时,操作系统会在内存中创建一段缓冲区,作为管道的内部实现。任何进程将数据写入到管道的写端时,这些数据会暂时存储在缓冲区中,然后可以从管道的读端读取出来。
返回值:
- 成功时,
pipe()
函数返回0,并且已成功分配了两个文件描述符给fd
数组。 - 失败时,返回负值表示错误发生,错误原因可以通过
errno
获取。
使用注意事项:
- 管道是半双工的,即一次只能在一个方向上传输数据。虽然理论上可以创建两个管道来模拟全双工通信,但每个管道只支持单向数据流。
- 管道具有一定的容量限制,当写入端连续写入数据而读取端没有及时读取时,如果管道满载,后续的写操作将会阻塞,直到有空间可写。
- 当读取端关闭后,写入端继续写入数据时,将会收到SIGPIPE信号,通常默认行为是导致进程终止;当然,也可以捕获该信号并采取其他行动。
- 管道可用于父子进程间的通信,或者不同进程中需要同步和协作的部分。在多进程编程中,常结合
fork()
和exec()
家族函数使用,以实现在多个进程间传递信息的目的。
3、snprintf
学习使用管道前,先拓展一下会用到的函数:
#include <stdio.h>
int snprintf(char *str, size_t size, const char *format, ...);
snprintf
是C语言标准库中的一个函数,用于格式化输出到字符串,与 printf
类似,但它的输出受限于指定的缓冲区大小,能够防止缓冲区溢出的安全风险。
参数:
str
:指向目标缓冲区的指针,用于存放格式化后的字符串。size
:指定缓冲区str
的大小(以字节为单位),包括结束符\0
所需的空间。如果生成的字符串长度小于或等于size-1
,则在字符串末尾添加\0
结束符;若生成的字符串过长,则按size
字节截断,并确保仍能正确终止(即至少包含一个结束符\0
)。format
:是一个格式字符串,其中可能包含转换说明符(如%d
、%s
等),它们将与可变参数列表中的相应数据匹配并进行格式化。...
:是可变参数列表,包含了与format
中转换说明符相匹配的数据项。
返回值:
- 如果成功且未发生截断,返回实际写入
str
缓冲区的字符数(不包括结束符\0
)。 - 如果发生截断,返回需要写入的总字符数(即使它大于
size
参数),此时字符串仍然会被适当地截断并在缓冲区中填充了\0
结束符。
char buffer[50];
int value = 12345;
snprintf(buffer, sizeof(buffer), "The value is %d", value);
// 如果buffer足够大,例如大于"The value is 12345" + '\0'所需的长度,则结果将是:
// buffer == "The value is 12345"
// 如果buffer太小,例如只有5个字符,则结果可能是:
// buffer == "The v"
// 返回值为10,表示如果没有截断的话,完整的字符串应该是10个字符(包括'\0')。
4、父子进程中进行单向通信
//Makefile
mypipe:mypipe.cc
g++ -o $@ $^ #-DDEBUG
.PHONY:clean
clean:
rm -f mypipe
// 引入必要的头文件
#include <iostream>
#include <string>
#include <cstdio>
#include <cstring>
#include <cassert>
#include <unistd.h> // 提供fork、close、write、read等系统调用
#include <sys/types.h> // 定义pid_t等类型
#include <sys/wait.h> // 提供waitpid函数
using namespace std;
// 主要功能:创建管道并在父子进程中实现单向通信
int main()
{
// 1. 创建管道,pipefd[0]是读端口,pipefd[1]是写端口
int pipefd[2];
int n = pipe(pipefd); // 创建无名管道
assert(n != -1); // 断言检查管道创建是否成功
(void)n; //消除编译器可能发出的未使用变量n的警告
#ifdef DEBUG
// 输出调试信息,显示管道两端的文件描述符
cout << "pipefd[0]: " << pipefd[0] << endl;
cout << "pipefd[1]: " << pipefd[1] << endl;
#endif
// 2. 创建子进程
pid_t child_pid = fork(); // 调用fork创建子进程
assert(child_pid != -1); // 断言检查fork是否成功
if (child_pid == 0)
{
// 子进程部分(读端)
// 3.1 子进程关闭不需要的管道写端
close(pipefd[1]);
// 缓冲区,用于接收父进程发送的消息
char buffer[8 * 1024];
// 循环读取管道中的数据,直到读到文件结束符(表示父进程已关闭写端口)
while (true)
{
// 从管道读取数据,返回读取的字节数,如果没有数据则阻塞等待
ssize_t bytes_read = read(pipefd[0], buffer, sizeof(buffer) - 1);
if (bytes_read > 0)
{
// 将读取的数据转为C风格字符串
buffer[bytes_read] = '\0';
// 输出接收到的消息
cout << "Child [" << getpid() << "] received a message: " << buffer << endl;
}
else if (bytes_read == 0) // 读取到文件结束,父进程已关闭写端
{
cout << "Writer quit (Father), child quitting too!" << endl;
break;
}
}
// close(pipefd[0]); // 实际上在while循环条件中可以判断并在此处关闭读端
// 子进程完成任务后退出
exit(0);
}
else
{
// 父进程部分(写端)
// 3.1 父进程关闭不需要的管道读端
close(pipefd[0]);
// 初始化一条要发送的消息
string message = "我是父进程,我正在给你发消息";
int count = 0;
char send_buffer[8 * 1024];
// 循环向管道写入消息,直到达到指定次数
while (true)
{
// 构造要发送的消息内容
snprintf(send_buffer, sizeof(send_buffer), "%s[%d] : %d", message.c_str(), getpid(), count++);
// 向管道写入数据
write(pipefd[1], send_buffer, strlen(send_buffer));
// 模拟延时,每次发送消息后等待1秒
sleep(1);
cout << count << endl;
if (count == 5) // 发送指定数量的消息后退出循环
{
cout << "Writer quit (Father)" << endl;
break;
}
}
// 发送完毕所有消息后,关闭写端
close(pipefd[1]);
// 4. 父进程等待子进程结束,并获取其退出状态
pid_t result = waitpid(child_pid, nullptr, 0); // 等待子进程结束
cout << "Child PID: " << child_pid << ", Return Value from waitpid: " << result << endl;
assert(result > 0); // 断言检查waitpid是否成功
(void)result;//消除编译器可能发出的未使用变量的警告
}
return 0;
}
[hbr@VM-16-9-centos mypipe]$ ./mypipe
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 0
1
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 1
2
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 2
3
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 3
4
child get a message[30197] Father# 我是父进程,我正在给你发消息[30196] : 4
5
writer quit(father)
writer quit(father), me quit!!!
id : 30197 ret: 30197
四、实现简单的进程池模型
Task.hhp:任务函数
Task.hpp
是一个头文件,定义了一系列任务(函数)和全局变量,用于定义和管理在进程池模型中执行的任务。通过全局的任务列表和描述,父进程可以根据索引分配任务给子进程执行。
下面我们来逐一分析每个部分:
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>// 提供fork、close等系统调用
#include <functional>// 用于定义函数对象
// 定义函数对象类型
typedef std::function<void()> func;
std::vector<func> callbacks;// 存储回调函数的全局向量
std::unordered_map<int, std::string> desc;// 存储回调函数描述的哈希表
// 示例回调函数
void readMySQL()
{
std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}
void execuleUrl()
{
std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}
void cal()
{
std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}
void save()
{
std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}
void load()// 加载回调函数到全局向量和描述符
{
desc.insert({callbacks.size(), "readMySQL: 读取数据库"});
callbacks.push_back(readMySQL);
desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});
callbacks.push_back(execuleUrl);
desc.insert({callbacks.size(), "cal: 进行加密计算"});
callbacks.push_back(cal);
desc.insert({callbacks.size(), "save: 进行数据的文件保存"});
callbacks.push_back(save);
}
void showHandler()// 显示回调函数列表
{
for(const auto &iter : desc )
{
std::cout << iter.first << "\t" << iter.second << std::endl;
}
}
int handlerSize()// 获取回调函数数量
{
return callbacks.size();
}
1、全局变量
std::function
是一个模板类,用于封装几乎任何可调用的实体,包括普通函数、Lambda表达式、函数对象以及成员函数指针。std::function
的一个重要特性是其类型安全,同时提供了足够的灵活性来存储不同类型的可调用实体。
typedef std::function<void()> func;
typedef std::function<void()> func;
这行代码定义了一个类型别名func
。这里,std::function<void()>
是一个特化形式,表示它可以封装任何没有参数并且返回void
的可调用实体。
std::vector<func> callbacks;
std::unordered_map<int, std::string> desc;
std::vector<func> callbacks;
:一个函数指针的向量,用于存储可执行的任务。这些任务在运行时被添加到向量中,并且可以通过索引来调用。std::unordered_map<int, std::string> desc;
:一个哈希表,用于存储任务的描述。键是任务在callbacks
向量中的索引,值是对任务的文字描述。
2、任务函数
void readMySQL()
{
std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}
void execuleUrl()
{
std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}
void cal()
{
std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}
void save()
{
std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}
- 文件中定义了几个任务函数,例如
readMySQL
、execuleUrl
、cal
和save
。这些函数模拟了不同的任务,如访问数据库、解析URL、执行计算和数据持久化。每个函数都打印出它正在执行的任务和当前子进程的ID。
3、初始化函数
void load()
{
desc.insert({callbacks.size(), "readMySQL: 读取数据库"});
callbacks.push_back(readMySQL);
desc.insert({callbacks.size(), "execuleUrl: 进行url解析"});
callbacks.push_back(execuleUrl);
desc.insert({callbacks.size(), "cal: 进行加密计算"});
callbacks.push_back(cal);
desc.insert({callbacks.size(), "save: 进行数据的文件保存"});
callbacks.push_back(save);
}
void load()
:这个函数初始化任务列表和任务描述。它将每个任务函数添加到callbacks
向量中,并且在desc
哈希表中为每个任务添加一个描述。这样,每个任务都有一个唯一的索引和描述。
4、辅助函数
void showHandler()
{
for(const auto &iter : desc )
{
std::cout << iter.first << "\t" << iter.second << std::endl;
}
}
int handlerSize()
{
return callbacks.size();
}
void showHandler()
:遍历desc
哈希表,并打印出所有任务的索引和描述。这个函数可以用来显示当前可用的任务列表。int handlerSize()
:返回当前任务列表callbacks
的大小,即可用任务的数量。
ProcessPool.cc:进程池
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"
#define PROCESS_NUM 5
using namespace std;
int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞
{
uint32_t command = 0;
ssize_t s = read(waitFd, &command, sizeof(command));
if (s == 0)
{
quit = true;
return -1;
}
assert(s == sizeof(uint32_t));
return command;
}
void sendAndWakeup(pid_t who, int fd, uint32_t command)
{
write(fd, &command, sizeof(command));
cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}
int main()
{
// 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗??
load();
// pid: pipefd
vector<pair<pid_t, int>> slots;
// 先创建多个进程
for (int i = 0; i < PROCESS_NUM; i++)
{
// 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
pid_t id = fork();
assert(id != -1);
// 子进程我们让他进行读取
if (id == 0)
{
// 关闭写端
close(pipefd[1]);
// child
while (true)
{
// pipefd[0]
// 等命令
bool quit = false;
int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞
if (quit)
break;
// 执行对应的命令
if (command >= 0 && command < handlerSize())
{
callbacks[command]();
}
else
{
cout << "非法command: " << command << endl;
}
}
exit(1);
}
// father,进行写入,关闭读端
close(pipefd[0]); // pipefd[1]
slots.push_back(pair<pid_t, int>(id, pipefd[1]));
}
// 父进程派发任务
srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机
while (true)
{
// 选择一个任务, 如果任务是从网络里面来的?
int command = rand() % handlerSize();
// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡
int choice = rand() % slots.size();
// 把任务给指定的进程
sendAndWakeup(slots[choice].first, slots[choice].second, command);
sleep(1);
}
// 关闭fd, 所有的子进程都会退出
for (const auto &slot : slots)
{
close(slot.second);
}
// 回收所有的子进程信息
for (const auto &slot : slots)
{
waitpid(slot.first, nullptr, 0);
}
}
1、初始化任务和进程槽
int main()
{
load();
vector<pair<pid_t, int>> slots;
- 加载任务:通过调用
load()
函数,初始化全局的任务列表callbacks
和任务描述desc
。 - 定义进程槽:使用
vector<pair<pid_t, int>> slots;
定义一个容器来存储子进程ID和对应的管道写端文件描述符。
2、创建子进程和管道
for (int i = 0; i < PROCESS_NUM; i++)
{
// 创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
pid_t id = fork();
assert(id != -1);
- 循环创建子进程:通过
for
循环,创建PROCESS_NUM
个子进程。每次循环中,都会创建一个管道(pipe(pipefd)
),用于父子进程间的通信。- 使用
assert(n == 0)
来确保管道创建成功(在Linux系统中,成功时返回0) (void)n;
是为了Release模式下消除编译器可能产生的未使用变量警告。
- 使用
- 管道文件描述符:
pipefd[0]
是管道的读端,pipefd[1]
是管道的写端。 - 使用
fork()
系统调用来创建一个新的进程。fork()
调用后,会生成一个与父进程几乎完全相同的子进程。assert(id != -1);
来检查fork()
是否成功执行,如果失败则fork()
会返回-1,程序将终止执行并输出错误信息。
3、waitCommand函数
waitCommand
函数是这段代码中自定义的一个用于从管道读取命令的函数,其作用是阻塞等待父进程通过管道发送过来的命令,并在接收到特定信号时决定是否退出循环。
int waitCommand(int waitFd, bool &quit) //如果对方不发,我们就阻塞
{
uint32_t command = 0;
ssize_t s = read(waitFd, &command, sizeof(command));
if (s == 0)
{
quit = true;
return -1;
}
assert(s == sizeof(uint32_t));
return command;
}
参数:
int waitFd
: 这是管道的读端文件描述符(如上文中的pipefd[0]
),用来从管道中读取数据。bool &quit
: 这是一个引用类型的布尔变量,用来标记子进程是否需要退出。
函数逻辑:
- 首先定义一个整型变量
command
用于存储从管道读取到的命令。 - 使用
read()
系统调用尝试从给定的管道读取指定大小的数据(这里是4字节,假设命令是一个无符号32位整数)。 - 如果
read()
返回0,这意味着管道另一端关闭了连接,通常这表示父进程打算结束与子进程的通信,因此将quit
设置为true
并返回一个非正常值(这里为-1
),以指示子进程应该退出其任务执行循环。 - 如果
read()
成功读取到了4个字节的数据(等于sizeof(uint32_t)
),则断言成功,并将读取到的命令作为整数值返回。
4、进行通信&执行任务
这段代码描述的是子进程和父进程各自执行的任务,基于之前创建的管道进行通信:
if (id == 0)
{
// 关闭写端
close(pipefd[1]);
// child
while (true)
{ // 等命令
bool quit = false;
int command = waitCommand(pipefd[0], quit); //如果对方不发,我们就阻塞
if (quit)
break;
// 执行对应的命令
if (command >= 0 && command < handlerSize())
{
callbacks[command]();
}
else
{
cout << "非法command: " << command << endl;
}
}
exit(1);
}
// father,进行写入,关闭读端
close(pipefd[0]); // pipefd[1]
slots.push_back(pair<pid_t, int>(id, pipefd[1]));
子进程部分(if (id == 0)
):
- 关闭写端: 子进程中不需要向管道写入数据,所以它会关闭管道的写端
close(pipefd[1])
。 - 循环等待命令: 子进程进入无限循环,不断从管道读端口
pipefd[0]
等待接收来自父进程的命令。
接收并执行命令
-
waitCommand()
用于从管道读取并解析命令,并在接收到特定命令表示退出时设置quit
为true
。 -
处理命令: 当接收到父进程发来的命令值
command
之后,执行callbacks[command]();
就是调用预先注册到向量中的对应任务函数,完成实际的工作内容。如果命令非法,则输出错误信息。 -
退出循环: 当检测到
quit
为true
,即接收到父进程发送的退出命令时,子进程跳出循环并调用exit(1)
结束自身。
父进程部分
-
关闭读端: 父进程不需要从管道读取数据,因此关闭管道的读端
close(pipefd[0])
。 -
存储子进程信息: 将子进程的ID (
id
) 和该子进程对应的管道写端口 (pipefd[1]
) 保存在一个结构体(这里是一个pair<pid_t, int>
类型的对象)中,并将其添加到名为slots
的容器(如向量或列表)中。这样父进程可以管理多个子进程及其对应的管道写端口,以便将来向每个子进程发送不同的命令。
5、sendAndWakeup()
函数
sendAndWakeup()函数
用于向指定进程通过管道发送一个命令,并在控制台上打印相关信息。具体说明如下:
void sendAndWakeup(pid_t who, int fd, uint32_t command)
{
write(fd, &command, sizeof(command));
cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}
参数:
pid_t who
:表示接收命令的目标子进程的进程ID。int fd
:这是管道的写端文件描述符,父进程通过这个描述符将命令写入管道,以通知目标子进程执行任务。uint32_t command
:要发送的命令编号,对应于之前定义的任务函数。
函数实现:
-
使用
write(fd, &command, sizeof(command))
将命令写入到管道中。这里的command
是一个整数索引,指向存储在全局变量callbacks
中的任务函数列表。 -
在控制台输出一条消息,显示主进程正在通过管道
fd
呼叫进程who
执行任务desc[command]
。这里的desc
是一个无序_map(std::unordered_map<int, std::string> desc; ),键是命令索引,值是对应的描述信息。
6、父进程派发任务
父进程持续地以随机方式向各个子进程派发任务,并在完成任务调度后有序地回收子进程资源的功能。
srand((unsigned long)time(nullptr) ^ getpid() ^ 23323123123L); // 让数据源更随机
while (true)
{
// 选择一个任务
int command = rand() % handlerSize();
// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡
int choice = rand() % slots.size();
// 把任务给指定的进程
sendAndWakeup(slots[choice].first, slots[choice].second, command);
sleep(1);
}
// 关闭fd, 所有的子进程都会退出
for (const auto &slot : slots)
{
close(slot.second);
}
// 回收所有的子进程信息
for (const auto &slot : slots)
{
waitpid(slot.first, nullptr, 0);
}
}
初始化随机数种子:
- 这行代码使用当前时间戳、当前进程ID以及一个常数值来初始化随机数生成器(
srand()
函数)。 - 异或操作符 (
^
) 将多个不同的随机源混合起来,以提高生成种子的随机性。这样可以确保不同时间启动的进程或同一进程中多次调用rand()
都能得到不同的随机数。
无限循环派发任务:
- 使用
rand() % handlerSize()
从所有可用的任务中随机选择一个任务索引。 - 使用
rand() % slots.size()
从所有已创建的子进程中随机选择一个子进程。 - 调用
sendAndWakeup()
函数,向选定的子进程发送选中的任务命令。这里通过管道写端口将命令传递给子进程,并唤醒其执行相应任务。
这一行代码是调用sendAndWakeup(slots[choice].first, slots[choice].second, command);
sendAndWakeup()
函数并传入三个参数,来向一个子进程发送命令。详细讲解如下:-
slots[choice].first
:slots
是一个存储了子进程信息的vector<pair<pid_t, int>>
类型容器,在循环中每个元素代表一个子进程及其管道写端口的文件描述符。choice
是通过随机数生成器确定的一个随机索引,用于从slots
容器中选择一个子进程。 -
slots[choice].first
就是根据这个随机索引获取到的子进程ID(pid_t
类型),它将作为sendAndWakeup()
函数的第一个参数传递给函数,用来标识要唤醒执行任务的具体子进程。 -
同样地,
slots[choice].second
代表与所选子进程对应的管道写端口的文件描述符(int
类型)。这是第二个参数,用于在函数内部调用write()
系统调用,将命令通过管道写入到选定子进程,从而通知该子进程开始执行相应的任务。 -
command
:command
是之前通过rand() % handlerSize();
随机生成的任务编号,它是一个整数值(uint32_t
类型)。此值作为第三个参数传递给sendAndWakeup()
函数,表示要派发给子进程执行的具体任务。
-
- 每次派发完任务后,让父进程休眠1秒(
sleep(1)
),模拟任务之间的间隔。
关闭管道写端口:
for (const auto &slot : slots)
{
close(slot.second);
}
- 当不再需要向子进程发送任务时,父进程遍历
slots
容器,关闭与每个子进程关联的管道写端口。这会导致读取端(在子进程中)检测到 EOF 或异常,进而促使子进程退出其等待命令的循环。
回收子进程信息:
for (const auto &slot : slots)
{
waitpid(slot.first, nullptr, 0);
}
- 父进程再次遍历
slots
容器,对每个子进程调用waitpid()
函数,用于等待子进程结束并回收其资源。传入参数nullptr
表示不关心子进程的退出状态码,0
表示阻塞直到子进程结束。通过这种方式,父进程能够确保所有的子进程都已正常结束,并正确释放系统资源。
运行示例
[hbr@VM-16-9-centos ProcessPoll]$ ./ProcessPool
main process: call process 32283 execute readMySQL: 读取数据库 through 6
sub process[32283 ] 执行访问数据库的任务
main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析
main process: call process 32285 execute readMySQL: 读取数据库 through 8
sub process[32285 ] 执行访问数据库的任务
main process: call process 32282 execute cal: 进行加密计算 through 5
sub process[32282 ] 执行加密任务
main process: call process 32283 execute execuleUrl: 进行url解析 through 6
sub process[32283 ] 执行url解析
main process: call process 32283 execute execuleUrl: 进行url解析 through 6
sub process[32283 ] 执行url解析
main process: call process 32283 execute save: 进行数据的文件保存 through 6
sub process[32283 ] 执行数据持久化任务
main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析
main process: call process 32281 execute readMySQL: 读取数据库 through 4
sub process[32281 ] 执行访问数据库的任务
main process: call process 32281 execute cal: 进行加密计算 through 4
sub process[32281 ] 执行加密任务
main process: call process 32281 execute execuleUrl: 进行url解析 through 4
sub process[32281 ] 执行url解析
^C