1. 什么是进程间通信(进程间通信的目的)
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变
在之前的学习过程中我们了解到进程是具有独立性的,也就是说要实现通信成本一定不低
那我们为啥要实现通信呢?
是因为在使用操作系统的过程中是存在多进程协同的应用场景滴!完成某种业务需求 (例如:cat file | grep ’ hello ')
- 那么该如何理解通信的本质问题呢?
- 要实现通信,首先数据需要存放位置,如果将数据存放在某个进程当中,因为进程的独立性,直接由进程创建的资源其他进程无法看见
所以要由操作系统直接或者间接的给通信双方的进程提供"内存空间" - 要让不同的进程看到同一份公共的资源
通过资源是OS中的不同模块提供所得到不同的通信种类:如果是文件模块 – 管道通信 如果是SystemV通信模块 – System V通信…
2. 进程间通信发展
管道
- 匿名管道pipe
- 命名管道
System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
其实在进程间通信的研究过程当中产生了一大堆的标准,但是主流的还是以上三种
POSIX — 让通信过程可以跨越主机
System V — 主要是聚焦在本地通信当中
管道 — 基于文件系统形成的
3. 管道
3.1 什么是管道
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道
文件存储在磁盘当中,当数据写入到内核缓冲区时,会刷新到磁盘上
进程间通信是否会采取数据写入到磁盘上再从磁盘上读取到进程的上下文环境呢? – 不会
因为这样太慢了,我们要实现的是两进程的通信(从内存到内存) ,而将数据写入磁盘(内存 -> 磁盘 -> 内存)效率非常低
对于管道文件而言,需不需要在磁盘上占据内存空间(在磁盘上打开文件) – 不需要
操作系统非常强大,可以直接在内存当中创建文件(管道文件 – 内存级文件)
内存级文件不需要进行磁盘刷新,大大的提高了进程间通信的效率
3.2 匿名管道
上述的实现过程中是如何让两个进程看到同一个管道文件? 通过fork创建子进程完成
子进程会继承父进程的文件描述符表 父子进程会指向同一文件,但是该文件没有名字(内存级文件) 我们将其称之为匿名管道
为啥父子进程要以读写的方式打开同一文件呢?
因为子进程会继承父进程的文件描述符表和文件的打开方式,若是父进程只以读/写的方式打开文件,那么子进程就会继承对应的读写方式,无法构成管道的需求。
这里的pipefd [ 2 ] 是输出型参数。
在之前的学习过程中我们了解到只要打开一个文件,操作系统会对应打开0,1,2(标准输入,标准输出,标准错误)
那么文件描述符就是要从3开始
创建管道文件,操作系统以读写的方式打开文件,将进程的文件描述符表填写到数组当中,再将数组输出返回调用该函数就可以创建管道文件
写管道文件的描述符必须通过fd数组的下标访问,不能直接使用具体的fd:3 / 4
(因为我们并不清楚现在文件描述符当中是否还存在其他描述符)
fd【0】读取 – 0 像嘴巴
fd【1】写入 – 1 像钢笔
基本框架如下:
#include <iostream>
#include <unistd.h>
// 当我们在进行C语言、C++混编的时候
// 推荐将C语言的头文件引用为<c..>的格式
#include <cassert>
#include <cstdlib>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
int main()
{
// 第一步:创建管道文件,打开读写端
int fds[2];
int n = pipe(fds);
// 管道创建成功,n返回0
assert(n == 0);
// 第二步:fork(创建子进程)
pid_t id = fork();
// 创建子进程成功
assert(id >= 0);
if (id == 0)
{
// 子进程模块
// 关闭读接口
close(fds[0]);
// 进行父子进程通信
close(fds[1]);
exit(0);
}
// 父进程模块
// 进行读取
// 关闭写接口
close(fds[1]);
// 父进程进行等待
n = waitpid(id, nullptr, 0);
assert(n == id);
close(fds[0]);
}
运行结果如下:
此时父进程完成读取,进入等待状态(R ——> S),在等待管道文件就会将父进程的PCB放入文件的等待队列当中
3.3 管道读写规则
- 当没有数据可读时
- O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
- O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
- 当管道满的时候
- O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
- O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
- 如果所有管道写端对应的文件描述符被关闭,则read返回0
- 如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程
退出- 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
- 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。
3.4 管道特点
- 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
- 管道提供流式服务
- 一般而言,进程退出,管道释放,所以管道的生命周期随进程
- 一般而言,内核会对管道操作进行同步与互斥
- 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
sleep10000 | sleep 20000 bash命令行解释器,会将其划分成为两个进程, 竖划线创建的就是匿名管道
3.5 匿名管道进程池(重点)
1) 理清思路
我们想要实现的功能是首先父进程跟多个子进程之间建立管道,创建任务集,随机挑选子进程完成随机任务。
子进程根据父进程传入管道中的commandCode完成对应的任务。
2) 代码实现
① 加载任务集
#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <cassert>
#include <ctime>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
/子进程要完成的某种任务 --- 模拟实现
// 函数指针 类型
typedef void (*func_t)();
// 这里就模拟实现了三种任务
void downLoadTask()
{
// 执行任务的同时获取一下子进程的pid -- 便于观察
std::cout << getpid() << ": 下载任务\n"
<< std::endl;
}
void ioTask()
{
std::cout << getpid() << ": IO任务\n"
<< std::endl;
}
void flushTask()
{
std::cout << getpid() << ": 刷新任务\n"
<< std::endl;
}
// 往out中插入任务集
void loadTaskFunc(std::vector<func_t> *out)
{
assert(out);
out->push_back(downLoadTask);
out->push_back(ioTask);
out->push_back(flushTask);
}
int main()
{
// 1. 加载任务集
std::vector<func_t> funcMap;
loadTaskFunc(&funcMap);
return 0;
}
② 创建子进程
// 2. 创建子进程
std::vector<subEp> subs;
createSubProcess(&subs, funcMap);
#define PROCESS_NUM 4
/子进程要完成的某种任务 --- 模拟实现
// 函数指针 类型
typedef void (*func_t)();
// 这里就模拟实现了三种任务
void downLoadTask()
{
// 执行任务的同时获取一下子进程的pid -- 便于观察
std::cout << getpid() << ": 下载任务\n"
<< std::endl;
}
void ioTask()
{
std::cout << getpid() << ": IO任务\n"
<< std::endl;
}
void flushTask()
{
std::cout << getpid() << ": 刷新任务\n"
<< std::endl;
}
// 往out中插入任务集
void loadTaskFunc(std::vector<func_t> *out)
{
assert(out);
out->push_back(downLoadTask);
out->push_back(ioTask);
out->push_back(flushTask);
}
///下面的代码模拟实现多进程程序
class subEp
{
public:
subEp(pid_t subId, int writeFd)
: subId_(subId), writeFd_(writeFd)
{
char nameBuffer[1024];
snprintf(nameBuffer, sizeof nameBuffer, "process-%d[pid(%d)-fd(%d)]", num++, subId_, writeFd_);
name_ = nameBuffer;
}
public:
static int num;
std::string name_;
pid_t subId_;
int writeFd_;
};
// 因为num为static类型 不属于任何一个对象(实例) 所以必须在这里进行初始化
int subEp::num = 0;
int recvTask(int readFd)
{
int code = 0;
// 判断是否读取到管道中的数据
ssize_t s = read(readFd, &code, sizeof code);
// 读取到 4个字节
if (s == 4)
return code;
// 父进程退出
else if (s <= 0)
return -1;
else
return 0;
}
void createSubProcess(std::vector<subEp> *subs, std::vector<func_t> &funcMap)
{
for (int i = 0; i < PROCESS_NUM; i++)
{
int fds[2];
int n = pipe(fds);
assert(n == 0);
(void)n;
pid_t id = fork();
if (id == 0)
{
close(fds[1]);
while (true)
{
// 获取管道中的任务编号,如果没有发送,子进程应该进行阻塞等待
int commandCode = recvTask(fds[0]);
if (commandCode >= 0 && commandCode < funcMap.size())
{
// 执行任务
funcMap[commandCode];
}
else if (commandCode == -1)
{
// 检测到父进程退出 跳出循环
break;
}
}
// 子进程退出
exit(0);
}
// 关闭读文件描述符 fds[0]
close(fds[0]);
subEp sub(id, fds[1]);
// 将创建出的子进程插入subEp对象当中
subs->push_back(sub);
}
}
这里构造的subEp(先描述再组织)用来管理子进程
③ 父进程控制子进程
// 3. 父进程控制子进程,负载均衡的向子进程发生任务码
int taskCnt = 3;
loadBlanceControl(subs, funcMap, taskCnt);
void sendTask(const subEp &process, int taskNum)
{
std::cout << "send task num: " << taskNum << " send to -> " << process.name_ << std::endl;
int n = write(process.writeFd_, &taskNum, sizeof(taskNum));
assert(n == sizeof(int));
(void)n;
}
void loadBlanceControl(const std::vector<subEp> &subs, const std::vector<func_t> &funcMap, int count)
{
int processnum = subs.size();
int tasknum = funcMap.size();
while (count)
{
// 这里就是负载均衡的体现
// 1. 选择一个子进程 --> std::vector<subEp> -> index - 随机数
int subIdx = rand() % processnum;
// 2. 选择一个任务 --> std::vector<func_t> -> index
int taskIdx = rand() % tasknum;
// 3. 将任务发送给选择的进程
sendTask(subs[subIdx], taskIdx);
// 每次发送完休息1秒
sleep(1);
count--;
}
// 当write写入退出 说明读到0了 不再需要给子进程发送任务
for (int i = 0; i < processnum; i++)
{
close(subs[i].writeFd_); // waitpid();
}
}
④ 回收子进程信息
// 4. 回收子进程信息
waitProcess(subs);
void waitProcess(std::vector<subEp> processes)
{
int processnum = processes.size();
for(int i =0;i<processnum;i++)
{
waitpid(processes[i].subId_,nullptr,0);
std::cout<<"wait sub process success ...: " << processes[i].subId_ << std::endl;
}
}
assert断言
意料之中用assert 意料之外用if判断
(void)n的作用 assert的作用只是在debug下,在release版本下该行代码会删除
那么之前定义的n变量就没有进行使用可能就会出现warning报错:定义了变量但未使用。
3.6 命名管道
- 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
- 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
- 命名管道是一种特殊类型的文件
创建一个命名管道
- 命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
$ mkfifo filename
- 命名管道也可以从程序里创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开用open
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。
命名管道的打开规则
- 如果当前打开操作是为读而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
O_NONBLOCK enable:立刻返回成功
- 如果当前打开操作是为写而打开FIFO时
O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
命名管道通信
逻辑分析
即便两个文件在进行通信,管道文件的大小仍是0
代码分析
// 头文件 + 创建/删除管道
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
// 定义宏 将命名管道放在当前目录下(便于观察)
#define NAMED_PIPE "./mypipe"
// 创建管道
bool createFifo(const std::string &path)
{
// 将文件掩码设置为0
umask(0);
int n = mkfifo(path.c_str(), 0600);
if (n == 0)
return true;
else
{
std::cout << "errno: " << errno << " err stirng: " << strerror(errno) << std::endl;
return false;
}
}
// 删除管道
void removeFifo(const std::string &path)
{
int n = unlink(path.c_str());
assert(n == 0);
(void)n;
}
mkfifo 创建管道文件,unlink 关闭文件
client端 – 客户端 往管道中写入数据
#include "comm.hpp"
int main()
{
std::cout << "client begin" << std::endl;
// 将管道文件以只写入的方式打开
int wfd = open(NAMED_PIPE, O_WRONLY);
std::cout << "client end" << std::endl;
// 管道文件打开失败
if (wfd < 0)
exit(1);
// write
char buffer[1024];
while (true)
{
std::cout << "Please say# ";
fgets(buffer, sizeof(buffer), stdin);
// 当管道中有数据 将最后一位置为0
// 这里的if判断buffer>0虽然没必要 因为输入数据至少要按下回车键 就一定有数据
// 但是逻辑正确
if (strlen(buffer) > 0)
buffer[strlen(buffer) - 1] = 0;
ssize_t n = write(wfd, buffer, strlen(buffer));
assert(n == strlen(buffer));
(void)n;
}
close(wfd);
return 0;
}
server端 — 服务端 从管道当中读出数据
#include "comm.hpp"
// sever作为主控制
int main()
{
// 创建命名管道
bool r = createFifo(NAMED_PIPE);
assert(r);
(void)r;
std::cout << "sever begin" << std::endl;
int rfd = open(NAMED_PIPE, O_RDONLY);
std::cout << "sever end" << std::endl;
if (rfd < 0)
exit(1);
// read
char buffer[1024];
while (true)
{
ssize_t s = read(rfd, buffer, sizeof(buffer) - 1);
if (s > 0)
{
buffer[s] = 0;
std::cout << "client->server#" << buffer << std::endl;
}
else if (s == 0)
{
std::cout << "client quit,me too!" << std::endl;
break;
}
else
{
std::cout << "err string: " << strerror(errno) << std::endl;
break;
}
}
close(rfd);
// 等待10秒 观察当前路径下的命名管道是否删除
// sleep(10);
removeFifo(NAMED_PIPE);
return 0;
}
细节1:
默认情况下 读取进程先运行,只有当写入打开后,读取进程才会继续往后进行
read进程会卡在open函数这, 因为此时管道文件只有读描述符,没有写入描述符(读没有意义,所以阻塞)