本篇博客整理了进程间通信的方式管道、 system V IPC的原理,结合大量的系统调用接口,和代码示例,旨在让读者透过进程间通信去体会操作系统的设计思想和管理手段。
目录
一、进程间通信
二、管道
1.匿名管道
1.1-通信原理
1.2-系统调用 pipe()
1.3-管道的容量
1.4-管道通信时的特殊情况
1.5-管道的特征总结
补- 匿名管道模拟简易的进程池
2.命名管道
2.1-指令 mkfifo
2.2-系统调用 mkfifo()
补- 命名管道实现简单的本地聊天程序
三、共享内存
1.基本原理
2.相关系统调用
2.1-创建 shmget() 和 ftok()
2.2-挂接 shmat()
2.3-取消关联 shmdt()
2.4-释放 shmctl()
3.相比管道,通信效率更高
补.共享内存实现简单的本地聊天程序
四、消息队列
1.基本原理
2.相关系统调用
2.1-创建 msgget() 和 ftok()
2.2-释放 msgctl()
2.3-发送数据 msgsnd()
2.4-获取数据 msgrcv()
五、信号量
1.基本原理
2.相关系统调用
六、内核对 IPC 资源的管理
一、进程间通信
进程间通信(Interprocess communication,简称IPC),是两个或多个进程实现数据层面的交互,传播或交换信息。
【Tips】目的:数据传输、资源共享、通知事件、进程控制。
- 数据传输:一个进程需要将它的数据发送给另外一个进程。
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或者一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时需要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如 Debug 进程),此时控制进程希望及时知道它的状态改变。
【Tips】本质:让不同的进程看到同一份资源。
进程用于通信的资源是一种特定形式的内存空间,为了不破坏进程的独立性,这份资源由操作系统提供,所以,进程使用通信资源进行进程间的通信,本质就是在访问操作系统。
进程所代表的是用户,但操作系统是不相信用户的,也就是说,进程不能直接去使用操作系统提供给进程的资源,必须在进程内部通过系统调用去使用。
在操作系统内部,可能会存在多组进程需要通信,因此资源可能有多份。操作系统会将这多份资源管理起来,例如,一般操作系统会有一个独立的通信模块(IPC通信模块),隶属于文件系统。 通信模块有 system V 和 posix 两个标准,前者主要是针对本机内部通信,后者是针对网络通信。在这两个标准发布之前(也就是操作系统还没有通信模块的时候),进程之间是通过基于文件级别的通信方式——管道来进行通信的。
【Tips】分类:管道、 system V 、 posix。
- 管道:匿名地址 pipe、命名管道。
- System V IPC:System V 消息队列、System V 共享内存、System V 信号量。
- POSIX IPC:消息队列、共享内存、信号量、互斥量、条件变量、读写锁。
二、管道
管道是 Unix 中最古老的进程间通信的形式,从一个进程连接到另一个进程的一个数据流就被称为一个“管道”,本质是一种不进行IO的内存级文件。
例如,使用指令来统计当前登录的用户个数:
who 指令用于当前登录的用户名, wc -l 指令用于显示文件的行数,它们都是两个可执行程序,在加载后成为两个进程。
who 进程通过 stdout 将数据输出至“管道”当中,wc 进程再通过 stdin 从“管道”当中读取数据,至此便完成了数据的传输,进而可以完成数据的进一步加工处理。
1.匿名管道
匿名管道,顾名思义,就是没有名字的文件,仅用于本地的父子进程间、或由同一个父进程创建的兄弟进程之间的通信。
【Tips】匿名管道的特点
- 具有血缘关系的进程之间的通信。
- 只能单向通信。
- 父子进程是会协同的,进行同步与互斥,以保证管道文件中数据的安全。
- 管道是面向字节流的。
- 管道是基于文件的,而文件的生命周期是随进程的,进程如果退出了,管道也会被自动关闭掉。
1.1-通信原理
匿名管道实现父子进程间通信的原理就是,让两个父子进程能够看到同一份文件资源,然后父子进程就可以对同一个文件进行写入或读取操作,进而实现父子进程间通信。
当父进程打开一个文件,操作系统就会在内存上创建一个 struct file 结构体,里面包含文件的各种属性、对磁盘文件的操作方法、inode 结构等。被父进程创建的子进程,会和父进程一起访问这个由父进程打开的文件,具体的方式是,它们的 struct file 中封装了同一个 inode 结构,而这个 inode 结构指向了从磁盘加载到文件页缓冲区里的一个由父进程打开的文件。
由于父子进程看到的同一份文件资源是由操作系统来维护的,因此在父子进程分别对这个文件进行写入时,文件页缓冲区中的数据并不会发生写时拷贝。父进程可以向文件中写内容,写完后可以继续干自己的事,且不破坏父进程的独立性。子进程可以向文件中读内容,读完后可以继续干自己的事,也不破坏子进程的独立性。这样一读一写,父子进程就完成了一次进程间通信,而这种通信模式是单向的。
对文件进行IO操作时,需要访问硬盘,从这个外设上读取数据,因此IO的速度非常慢。但父子进程进行通信,显然,磁盘中文件的内容并不重要,重要的是父进程写了什么,子进程又读到了什么。于是,操作系统为了提高效率,关闭了内存中 struct file 与硬盘的 IO 通道, 而让父子进程在内存的文件页缓冲区中,一个无名的文件中分别进行读写。父进程会把数据写到文件页缓冲区的这个无名文件中,子进程会从文件页缓冲区的这个无名文件中读取数据。此时,父子间通信不仅正常进行,效率还非常高,且对进程之间的独立性没有影响。
这种不进行IO的文件就叫做内存级文件。也就是说,其实磁盘文件和内存文件未必一一对应,有的文件只在内存中存在,而不在磁盘中存在。
而这种操作系统为了支持进程间通信而为进程提供的匿名文件资源,就叫匿名管道。
1.2-系统调用 pipe()
#include<unistd.h>
int pipe(int pipefd[2]);
参数:一个至少有两个元素的数组,实际上传参传的是数组名。
这里的pipefd[0]是管道读端的文件描述符,pipefd[1]是管道写端的文件描述符。
返回值:管道创建成功则返回0;创建失败则返回-1,并设置合适的错误码。
【Tips】创建匿名管道的一般步骤
在创建匿名管道实现父子进程间通信的过程中,需要 pipe() 和 fork() 搭配使用。
- step1:父进程调用 pipe() 创建管道
- step2:父进程调用 fork() 创建子进程
- step3:父/子进程调用 close() 关闭 pipe() 的写端 fd[0],子/父进程关闭 pipe() 的读端 fd[1]
为了演示 pipe() 的用法,此处引入以下代码:
//子进程向匿名管道当中写入10行数据,父进程从匿名管道当中将数据读出。
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
int main()
{
//创建匿名管道
int fd[2] = { 0 };
if (pipe(fd) < 0){
perror("pipe");
return 1;
}
//创建子进程
pid_t id = fork();
if (id == 0){
//子进程
//关闭读端
close(fd[0]);
//向管道写入数据
const char* msg = "hello father, I am child...";
int count = 10;
while (count--){
write(fd[1], msg, strlen(msg));
sleep(1);
}
//写入完毕,关闭文件(写端)
close(fd[1]);
exit(0);
}
//父进程
//关闭写端
close(fd[1]);
//从管道读取数据
char buff[64];
while (1){
ssize_t s = read(fd[0], buff, sizeof(buff)-1);
if (s > 0){
buff[s] = '\0';
printf("child send to father:%s\n", buff);
}
else if (s == 0){
printf("read file end\n");
break;
}
else{
printf("read error\n");
break;
}
}
//读取完毕,关闭文件(读端)
close(fd[0]);
waitpid(id, NULL, 0);
return 0;
}
【Tips】 命令行中的匿名管道
命令行中的 | ,其底层就是通过 pipe() 来创建管道的。
输入管道相关的指令后, bash 会对输入的指令做分析,统计出指令中 | 的个数,创建出对应数量的管道,然后通过 fork() 创建出一批子进程进行重定向工作,将管道左边进程的输出,重定向到管道文件中,将管道右边进程的输入,重定向到管道文件中,最终通过程序替换去执行指令(程序替换不会影响预先设置好的重定向)。
//【补】pipe() 的升级版:系统调用 pipe2()
#include<unistd.h>
int pipe2(int pipefd[2], int flags);
参数:1.pipefd[2]:一个至少有两个元素的数组,实际上传参传的是数组名。
这里的pipefd[0]是管道读端的文件描述符,pipefd[1]是管道写端的文件描述符。
2.flags:用于设置选项
1)当管道为空,没有数据可读时:
· O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来为止。
· O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
2)当管道被写满时:
· O_NONBLOCK disable:write调用阻塞,直到有进程读走数据。
· O_NONBLOCK enable:write调用返回-1,errno值为EAGAIN。
3)若管道写端被关闭,则read返回0。
4)若读端被关闭,则write操作会产生信号SIGPIPE,进而可能导致write进程退出。
5)若写入的数据量不大于PIPE_BUF(内核管道缓冲区的容量)时,Linux将保证写入的原子性,
数据会被连续地写入管道。
6)若写入的数据量大于PIPE_BUF(内核管道缓冲区的容量)时,Linux将不再保证写入的原子性。
返回值:管道创建成功则返回0;创建失败则返回-1,并设置合适的错误码。
1.3-管道的容量
管道的容量是有限的,具体与内核管道缓冲区的容量、缓冲条目的数量有关,如果管道已满,那么写端将阻塞或失败。要查看管道的容量,有以下方法:
- 方法一:指令 man 7 pipe
——“在2.6.11之前的Linux版本中,管道的最大容量与系统页面大小相同,从Linux 2.6.11往后,管道的最大容量是65536字节。”
小编使用的 Linux 是2.6.11之后的版本,因此小编使用的Linux下,管道的容量是65536字节。
【ps】65536字节 = 4096(内核管道缓冲区的容量) x 16(缓冲条目的数量)
- 方法二:指令 ulimit -a
指令 ulimit -a 可以查看内核管道缓冲区的容量。
只要得知内核管道缓冲区的容量和缓冲条目的数量,就能推导出管道的容量。
(内核管道缓冲区的容量 x 缓冲条目的数量 = 管道的容量)
小编的Linux下,管道的容量是 512 × 8 = 4096 字节 = 4KB。
如果写入数据的大小,小于内核管道缓冲区的容量(这里为4kb),那么写入操作就是原子性的,数据会被连续地写入管道。
- 方法三:通过代码粗暴测试
当管道被写满,写端的进程就会被挂起。可以利用这一点,让读端的进程一直不读取管道的数据,而写端的进程一直向管道写入数据,写端的进程被挂起时,就能得知管道的最大容量。
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
int main()
{
//创建匿名管道
int fd[2] = { 0 };
if (pipe(fd) < 0){
perror("pipe");
return 1;
}
//创建子进程
pid_t id = fork();
if (id == 0){
//子进程
//关闭读端
close(fd[0]);
char c = 'a';
int count = 0;
//一直进行写入,每次写入一个字节
while (1){
write(fd[1], &c, 1);
count++;
//每写入一次,就打印已写入的字节数
printf("%d\n", count);
}
close(fd[1]);
exit(0);
}
//父进程
//关闭写端
close(fd[1]);
//但不进行读取
//...
waitpid(id, NULL, 0);
close(fd[0]);
return 0;
}
由演示图,以上代码运行后在显示出 65536 后挂起了,这说明管道的容量就是65536字节。
1.4-管道通信时的特殊情况
在使用匿名管道通信时,可能会出现四种特殊情况:
- 读写端正常,若管道为空,则读端堵塞;
- 读写端正常,若管道被写满,则写端阻塞;
- 读端正常,写端关闭,读端就会读到0,表示读到了管道文件的结尾,不会被阻塞;
- 写端正常,读端关闭,操作系统会通过 13 号信号(SIGPIPE)把正在写入的进程 kill 掉。
1.5-管道的特征总结
(1)管道的生命周期取决于进程的创建和终止
管道本质上也是文件,依赖于文件系统,由于所有打开文件的进程都退出后,文件资源也就被释放掉了,因此,管道的生命周期与进程的生命周期有关。
(2)管道内部自带同步与互斥机制
一次只允许一个进程使用的资源,被称为临界资源,而管道在同一时刻只允许一个进程进行写入或读取操作,因此,管道其实就是一种临界资源。
临界资源是需要被保护的,如果不对临界资源进行任何保护,就可能出现同一时刻有多个进程对同一临界资源进行操作,导致同时读写、交叉读写、读取数据不一致等问题。
保护临界资源的手段一般是同步与互斥机制,于是就有内核会对管道的操作进行同步与互斥:
- 同步: 两个或两个以上的进程在运行过程中协同步调,按预定的先后次序运行,例如,A任务的读取操作依赖于B任务因写入操作而产生的数据。
- 互斥: 一个公共资源同一时刻只能被一个进程使用,多个进程不能同时使用公共资源。
其实,同步是一种复杂的互斥,互斥则是一种特殊的同步。互斥具有唯一性和排它性,且不限制任务的运行顺序,而同步的任务之间则有明确的顺序关系。
对于管道来说,互斥就是两个进程不能同时对管道进行操作,它们必须等其中一个进程操作完毕,另外一个才能操作。同步也是指两个进程不能同时对管道进行操作,而必须要按照某种次序来对管道进行操作。
(3)管道提供流式服务
数据的读取分为流式服务和数据报服务:
- 流式服务: 数据没有明确的分割,并不分固定的报文段。
- 数据报服务: 数据有明确的分割,读取数据必须按固定的报文段来读取。
而管道提供的是流式服务,具体来说就是,进程A写入管道中的数据,进程B每次想读多少都可以。
(4)管道中的数据传输方式属于半双工通信
数据在线路上的传输方式可以分为单工通信、半双工通信、全双工通信:
- 单工通信:数据传输是单向的,在通信双方中,一方固定为发送端,另一方固定为接收端。
- 半双工通信:数据可以在一个信号载体的两个方向上传输,但是不能同时传输。
- 全双工通信:数据在两个方向上同时传输,相当于两个单工通信的结合,全双工可以同时/瞬时进行信号的双向传输。
显然,管道中数据的传输方式属于半双工通信。
【补】由管道引申出的一些概念:
- 临界资源:多个进程/执行流看到的公共的一份资源。
- 临界区:进程访问临界资源的代码。
- 同步:每个进程按预定的先后次序进入临界区。
- 互斥:在任何时刻,都只能有一个进程进入临界区。
- 原子性:要么就不做,要么就做完,没有中间状态。
补- 匿名管道模拟简易的进程池
- Makefile:
ProcessPool:ProcessPool.cpp
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f ProcessPool
- Task.hpp:
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <unordered_map>
#include <unistd.h>
#include <functional>
typedef std::function<void()> func;
std::vector<func> callbacks; // 存放若干个回调
std::unordered_map<int, std::string> desc; // 查看有多少方法用的
void readMySQL()
{
std::cout << "sub process[" << getpid() << " ] 执行访问数据库的任务\n" << std::endl;
}
void execuleUrl()
{
std::cout << "sub process[" << getpid() << " ] 执行url解析\n" << std::endl;
}
void cal()
{
std::cout << "sub process[" << getpid() << " ] 执行加密任务\n" << std::endl;
}
void save()
{
std::cout << "sub process[" << getpid() << " ] 执行数据持久化任务\n" << std::endl;
}
void load() // 操作表,先插入描述再插入方法,下标就对齐了
{
desc.insert({ callbacks.size(), "readMySQL: 读取数据库" });
callbacks.push_back(readMySQL);
desc.insert({ callbacks.size(), "execuleUrl: 进行url解析" });
callbacks.push_back(execuleUrl);
desc.insert({ callbacks.size(), "cal: 进行加密计算" });
callbacks.push_back(cal);
desc.insert({ callbacks.size(), "save: 进行数据的文件保存" });
callbacks.push_back(save);
}
void showHandler() // 查看有多少方法
{
for (const auto& iter : desc)
{
std::cout << iter.first << "\t" << iter.second << std::endl; // \t制表符
}
}
int handlerSize() // 直接返回有多少个任务的方法
{
return callbacks.size();
}
- ProcessPool.cpp:
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"
#define PROCESS_NUM 5 // 创建的子进程数目
using namespace std;
int waitCommand(int waitFd, bool& quit) //如果对方不发,我们就阻塞
{
uint32_t command = 0; // uint32_t四个字节
ssize_t s = read(waitFd, &command, sizeof(command)); // 期望读取四个字节
if (s == 0) // 读到0让子进程退出
{
quit = true;
return -1;
}
assert(s == sizeof(uint32_t)); // 不是四个字节就报错
return command;
}
void sendAndWakeup(pid_t who, int fd, uint32_t command) // 通过文件描述符,向哪一个文件发什么命令
{ // who给哪个进程,这个进程的id
write(fd, &command, sizeof(command));
cout << "main process: call process " << who << " execute " << desc[command] << " through " << fd << endl;
}
int main()
{
// 代码中关于fd的处理,有一个小问题,不影响我们使用,但是你能找到吗??
load();
vector<pair<pid_t, int>> slots; // 存放子进程pid和子进程写端id(pipefd)
vector<int> deleteFd; // 存放要删除的子进程写端fd(不删除也不会出问题)
for (int i = 0; i < PROCESS_NUM; i++) // 先创建多个进程
{
int pipefd[2] = { 0 };
int ret = pipe(pipefd); // 创建管道
assert(ret == 0); // 等于0才创建成功
(void)ret;
pid_t id = fork();
assert(id != -1);
if (id == 0) // 子进程,进行读取
{
close(pipefd[1]); // 关闭写端
for (int i = 0; i < deleteFd.size(); i++) // 关闭所以继承下来的写端fd
{
close(deleteFd[i]);
}
while (true)
{
// 等命令
bool quit = false; // 默认不退出
int command = waitCommand(pipefd[0], quit); // 如果对方不发,我们就阻塞
if (quit) // 读到0就退出关闭所有进程
{
break;
}
if (command >= 0 && command < handlerSize()) // 执行对应的命令
{ // handlerSize任务方法的个数
callbacks[command]();
}
else
{
cout << "非法command: " << command << endl;
}
}
exit(1);
}
close(pipefd[0]); // 父进程,进行写入,关闭读端
slots.push_back(pair<pid_t, int>(id, pipefd[1])); // 把此次循环得到的子进程id和子进程写端的id保存
deleteFd.push_back(pipefd[1]); // 把要被继承下去的子进程写端fd保存起来
}
// 父进程均衡地派发任务(单机版的负载均衡)
srand((unsigned long)time(nullptr) ^ getpid() ^ 2335643123L); // 仅仅让数据源更随机
while (true)
{
// 选择一个任务
int command = rand() % handlerSize();
// 选择一个进程 ,采用随机数的方式,选择进程来完成任务,随机数方式的负载均衡
int choice = rand() % slots.size();
// 把任务给指定的进程
sendAndWakeup(slots[choice].first, slots[choice].second, command);
sleep(1);
}
for (const auto& slot : slots) // 关闭fd, 所有的子进程都会退出
{
close(slot.second);
}
for (const auto& slot : slots) // 回收所有的子进程信息
{
waitpid(slot.first, nullptr, 0);
}
}
2.命名管道
命名管道,顾名思义,就是有名字的管道,也是系统中的一个内存级文件。和匿名管道一样,命名管道的不会向磁盘中刷新数据,且它的通信原理也和匿名管道大致相同。
要找到一个文件一般有两种方法,一种是通过文件的 inode 号,另一种则是通过路径和文件名。要找到一个命名管道,显然是通过后者,“路径 + 文件名”唯一地标识了一个命名管道。
2.1-指令 mkfifo
使用指令 mkfifo 可以创建一个命名管道:
mkfifo 命名管道名
2.2-系统调用 mkfifo()
#include<sys/type.h>
#include<sys/stat.h>
int mkfifo(const char* pathname,mode_t mode);
参数:1. pathname:命名管道所在路径或命名管道名
若pathname以路径的方式给出,则将命名管道文件创建在pathname路径下;
若pathname以文件名的方式给出,则将命名管道文件默认创建在当前路径下。
2. mode:权限
返回值:管道创建成功则返回0;失败则返回-1,并设置合适的错误码。
【补】命名管道的打开规则
1)以读而打开命名管道时:
· O_NONBLOCK disable:阻塞直到有相应进程以写而打开命名管道。
· O_NONBLOCK enable:立刻返回成功。
2)以写而打开命名管道时:
· O_NONBLOCK disable:阻塞直到有相应进程以读而打开命名管道。
· O_NONBLOCK enable:立刻返回失败,错误码为ENXIO。
为演示 mkfifo() 的用法,此处引入以下代码:
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#define FILE_NAME "myfifo"
int main()
{
//将文件默认掩码设置为0
umask(0);
//创建命名管道文件
if (mkfifo(FILE_NAME, 0666) < 0){
perror("mkfifo");
return 1;
}
//create success...
return 0;
}
补- 命名管道实现简单的本地聊天程序
- makefile:
.PHONY:all
all:client server
client:client.cc
g++ -o $@ $^ -std=c++11
server:server.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f client server
- comm.hpp:
//comm.hpp
#pragma once
#include <sys/types.h>
#include <iostream>
#include <unistd.h>
#include <cstdlib>
#include <cstdio>
#include <sys/stat.h>
#include <fcntl.h>
#include <string>
#define FIFO_FILE "./myfifo"
#define MODE 0664
enum
{
FIFO_CREAT_ERR = 1,
FIFO_DELET_ERR,
FIFO_OPEN_ERR,
FIFO_CLOSE_ERR,
FIFO_WRITE_ERR,
FIFO_READ_ERR
};
class Init
{
public:
Init()
{
int n = mkfifo(FIFO_FILE, MODE); // 创建成功返回0,创建失败返回-1
if (n == -1)
{
// 创建失败
perror("mkfifo");
exit(FIFO_CREAT_ERR);
}
}
~Init()
{
int m = unlink(FIFO_FILE); // unlink 可以删除任意文件
if (m == -1)
{
perror("unlink");
exit(FIFO_DELET_ERR);
}
}
};
- 服务端:
//server.cc
#include "comm.hpp"
using namespace std;
int main()
{
// 创建管道
Init init;
// 打开管道
int fd = open(FIFO_FILE, O_RDONLY); // 等待写入方打开之后,自己才会打开文件,向后执行, open 会阻塞
if(fd < 0)
{
// 文件打开失败
perror("open fifo");
exit(FIFO_OPEN_ERR);
}
cout << "server open file done" << endl;
// 开始通信
while(true)
{
char buffer[1024];
int x = read(fd, buffer, sizeof(buffer));
if(x > 0)
{
buffer[x] = 0;
cout << "client say@ " << buffer << endl;
}
else if(x == 0)
{
cout << "client quit, me too!" << endl;
break;
}
else
{
perror("read");
exit(FIFO_READ_ERR);
}
}
// 关闭管道
int p = close(fd);
if(p == -1)
{
perror("close");
exit(FIFO_CLOSE_ERR);
}
return 0;
}
- 客户端:
// client.cc
#include "comm.hpp"
using namespace std;
int main()
{
// 打开管道
int fd = open(FIFO_FILE, O_WRONLY);
if(fd < 0)
{
perror("open");
exit(FIFO_OPEN_ERR);
}
cout << "client open file done" << endl;
// 开始通信
string message;
int num = 0;
while (true)
{
cout << "Please Enter@ ";
getline(cin, message);
int ret = write(fd, message.c_str(), message.size());
if(ret == -1)
{
perror("write");
exit(FIFO_WRITE_ERR);
}
}
// 关闭管道
int m = close(fd);
if (m == -1)
{
perror("close");
exit(FIFO_CLOSE_ERR);
}
return 0;
}
三、共享内存
【补】system V IPC
system V IPC是操作系统中的一种通信模块,与实现管道通信的目的类似,都是要让不同的进程看到同一份资源。
system V IPC 所提供的通信方式有三种:
- system V共享内存
- system V消息队列
- system V信号量
其中,共享内存和消息队列用于传送数据,信号量用于保证进程间的同步与互斥。
1.基本原理
共享内存的原理与动态库加载的原理基本一致。
操作系统会在物理内存中取一块内存空间,然后将其分别与各个进程之间建立页表映射,使共享内存与进程地址空间的共享区存在对应关系,从而让不同的进程看到了同一份内存资源。
在操作系统中,可能存在大量的进程正在通信,于是就可能同时存在大量的共享内存,那么操作系统就需要对这些内存资源做管理,因此,操作系统除了要从物理内存中取一块内存空间,还得为其维护相关的内核数据结构。
//【补】共享内存的相关数据结构
struct ipc_perm{
__kernel_key_t key; //标识系统中共享内存的唯一性
__kernel_uid_t uid;
__kernel_gid_t gid;
__kernel_uid_t cuid;
__kernel_gid_t cgid;
__kernel_mode_t mode;
unsigned short seq;
};
struct shmid_ds {
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment (bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current attaches */
unsigned short shm_unused; /* compatibility */
void *shm_unused2; /* ditto - used by DIPC */
void *shm_unused3; /* unused */
};
//...
//ps:shmid_ds和ipc_perm结构体在/usr/include/linux/shm.h和/usr/include/linux/ipc.h下可以找到
【Tips】申请共享内存的大致过程:
- 操作系统在物理内存上申请一块空间;
- 将申请到的空间,通过页表挂接到进程地址空间的共享区;
- 返回起始虚拟地址,供程序中使用。
【Tips】释放共享内存的大致过程:
- 取消共享内存与地址空间之间关联;
- 释放空间,将内存资源归还。
【ps】申请、挂接、去关联、释放这些动作都是由操作系统来完成的。
2.相关系统调用
2.1-创建 shmget() 和 ftok()
/* 接口1 */
#include<sys/ipc.h>
#include<sys/shem.h>
int shmget((key_t key, size_t size, int shmflg);
功能:申请共享内存
参数:1. key:共享内存的内核标识符。
2. size:共享内存的开辟的字节数。
3. shmflg:共享内存的创建方式,其中包括:
1)IPC_CREAT:没有创建,有则返回。
2)IPC_EXIT: 有则出错返回。若要使用也得跟上方式1(IPC_CREAT | IPC_EXCL)
返回值:成功,返回一个有效的共享内存标识符(用户层标识符);失败,返回-1,并设置合适的错误码。
/* 接口2 */
#include<sys/ipc.h>
#include<sys/shm.h>
key_t ftok(const char *pathname, int proj_id);
功能:将一个已存在的路径名pathname和一个整数标识符proj_id转换成一个key值(IPC键值),
在使用shmget()获取共享内存时,这个key值会被填充进维护共享内存的数据结构当中。
参数:1. pathname:任意的文件路径,但pathname所指定的文件必须存在且可存取,
一般都写成当前路径"."。
2. proj_id:整数标识符/项目ID,可自定义,但不能是0。
说明:这两个参数都是为了生成key_t类型的内核的标识符。
返回值:成功,返回独一无二的key值;失败,返回-1
[ps]1.ftok()生成的key值可能存在冲突,此时修改ftok()的参数即可。
2.需要通信的各个进程,在使用ftok()获取key值时,
都需要采用同样的路径名和和整数标识符,进而生成同一种key值,然后才能找到同一个共享内存。
[ps]key值为什么是由用户传参来指定生成的,而非操作系统直接生成的?
这是因为,具体哪两个进程需要通信,取决于用户,而非操作系统。
其实ftok()就相当于是两个通信进程之间的一种约定,只要它们约
定好同一个pathname和proj_id,那么这两个进程就能得到同一个
key,从而找到同一个共享内存。
【Tips】shmget() 与 ftok() 之间的关系:
为演示 shmget() 和 ftok() 的用法,此处引入以下代码:
#include<iostream>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<string>
using namespace std;
const string pathname = "/home/CVEer";
const int proj_id = 0xFFFF;
int main()
{
key_t key = ftok(pathname.c_str(),proj_id);
if(key == -1) return 1;
int ud = shmget(key,4096,IPC_CREAT);
//int ud = shmget(key,4096,IPC_CREAT | 0666);//设置共享内存的权限为0666
if(ud == -1) return 1;
cout << "创建成功!" << endl;
return 0;
}
【ps】由于共享内存是由操作系统进行管理的,因此在没有调用相应的系统调用时,就算进程退出了,操作系统也不会释放共享内存。
【补】指令 ipcs
- 参数 -m:列出共享内存的相关信息。
- 参数 -q:列出消息队列的相关信息。
- 参数 -s:列出信号量的相关信息。
ipcs -m 所列出的信息含义:
【补】删除一个共享内存的指令:ipcrm -m + 共享内存的用户层id(shmid)
2.2-挂接 shmat()
#include<sys/shm.h>
void *shmat(int shmid, const void *shmaddr, int shmflg);
功能:将共享内存挂接到进程地址空间的共享区中
参数:1.shmid:共享内存的用户层id/标识符。
2.shmaddr:指向挂接在进程地址空间中的共享区地址,
一般设为空指针即可,系统会自动分配地址,
3.shmflg:设置当前进程对挂接的共享内存的权限,
一般设置成0,表示采用共享内存自身的权限。
1)SHM_RDONLY:关联共享内存后只进行读取操作
2)SHM_RND:若shmaddr不为NULL,则关联地址自动向下调整为SHMLBA的整数倍。
(公式:shmaddr - (shmaddr % SHMLBA))
3)0:默认为读写权限
返回值:成功,返回共享内存挂接在进程空间中的共享区地址;失败,返回(void*)-1,并设置合适的错误码。
为演示 shmmat() 的用法,此处引入以下代码:
#include<iostream>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<string>
#include <unistd.h>
using namespace std;
const string pathname = "/home/CVEer";
const int proj_id = 0xFFFF;
int main()
{
//创建
key_t key = ftok(pathname.c_str(),proj_id);
if(key == -1) return 1;
int shmid = shmget(key,4096,IPC_CREAT | IPC_EXCL | 0666);
if(shmid == -1) return 1;
cout << "创建成功!" << endl;
sleep(2);
//挂接
int* shmptr = (int*)shmat(shmid ,NULL,0);
if(*shmptr == -1) return 1;
cout << "挂接成功"<< endl;
sleep(5);
return 0;
}
2.3-取消关联 shmdt()
#include<sys/shm.h>
int shmdt(const void *shmaddr);
功能:取消共享内存与进程地址空间中共享区的映射关系
参数:shmaddr:共享内存挂接在进程地址空间中的共享区地址
返回值:成功则返回0;失败则返回-1,并设置合适的错误码。
为演示 shmmdt() 的用法,此处引入以下代码:
#include<iostream>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<string>
#include <unistd.h>
using namespace std;
const string pathname = "/home/CVEer";
const int proj_id = 0xFFFF;
int main()
{
//创建
key_t key = ftok(pathname.c_str(),proj_id);
if(key == -1) return 1;
int shmid = shmget(key,4096,IPC_CREAT | IPC_EXCL | 0666);
if(shmid == -1) return 1;
cout << "创建成功" << endl;
sleep(2);
//挂接
int* shmptr = (int*)shmat(shmid ,NULL,0);
if(*shmptr == -1) return 1;
cout << "挂接成功"<< endl;
sleep(5);
//取消关联
int ret = shmdt((const void *)shmptr);
if(ret < 0) return 1;
cout << "关联已取消" << endl;
sleep(5);
return 0;
}
2.4-释放 shmctl()
#include<sys/ipc.h>
#include<sys/shm.h>
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
功能:释放一个共享内存
参数:1.shmid:共享内存的用户层id/标识符。
2.cmd:操作选项,常见有:
1)IPC_STAT,获取共享内存的状态信息,放在buf指向的变量中。
2)IPC_SET,设置共享内存的状态,需要将buf变量传进去,方便修改
3)IPC_RMID,删除共享内存。
4)SHM_LOCK,锁定共享内存。
5)SHM_UNLOCK,解锁共享内存。
3.buf:语言层面用来描述一个共享内存的结构体,里面保存了共享内存的部分属性
返回值:如果操作是 IPC_RMID ,那么删除成功,则返回0;失败则返回-1.并设置合适的错误码。
为演示 shmctl() 的用法,此处引入以下代码:
#include<iostream>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<string>
#include <unistd.h>
using namespace std;
const string pathname = "/home/CVEer";
const int proj_id = 0xFFFF;
int main()
{
//创建
key_t key = ftok(pathname.c_str(),proj_id);
if(key == -1) return 1;
int shmid = shmget(key,4096,IPC_CREAT | 0666);
if(shmid == -1) return 1;
cout << "创建成功" << endl;
//挂接
char* shmptr = (char*)shmat(shmid,NULL,0);
if(shmptr == (void*)(-1)) return 1;
cout << "挂接成功"<< endl;
sleep(5);
//去关联
int ret = shmdt(shmptr);
if(ret < 0) return 1;
cout << "关联已取消" << endl;
sleep(5);
//释放
ret = shmctl(shmid,IPC_RMID,NULL);
if(ret == -1)
{
cout << "释放失败" << endl;
return 1;
}
cout << "释放成功"<< endl;
sleep(5);
return 0;
}
3.相比管道,通信效率更高
管道创建好后,通信仍需要调用 read()、write() 等系统接口,而共享内存创建好后,通信无需再调用系统接口。
通信双方,一端写入,一端读取,会发生数据的拷贝。
对于管道来说,一次通信会发生四次数据拷贝。
而对于共享内存来说,一次通信仅发生两次数据拷贝。
所以相较于管道,共享内存通信效率更高。
但这并不意味着共享内存就全面优于管道。管道是自带同步与互斥,对共享的内存资源有保护机制,而共享内存并没有为共享的内存资源提供任何保护机制,包括同步与互斥。所以,共享内存的安全性和稳定性要劣于管道。
补.共享内存实现简单的本地聊天程序
- makefile:
.PHONY:all
all:client server
client:client.cc
g++ -o $@ $^ -std=c++11
server:server.cc
g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
rm -f client server
- comm.hpp:
#include <iostream>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <string>
#include <string.h>
#include <errno.h>
using namespace std;
const int size = 4096;
const string path_name = "/home/CVEer";
const int proj_id = 0x6666;
key_t GetKey() // 获取 key
{
key_t k = ftok(path_name.c_str(), proj_id);
if(k < 0)
{
perror("ftok fail");
exit(1);
}
return k;
}
int GetShareMem() // 创建共享内存
{
key_t key = GetKey();
int shmid = shmget(key, size, IPC_CREAT|IPC_EXCL|0666);
if(shmid < 0)
{
perror("shmid fail");
exit(2);
}
return shmid;
}
- 服务端:
//server.cc
#include "comm.hpp"
#include <unistd.h>
int main()
{
// 创建共享内存
int shmid = CreatMem();
// 挂接共享内存
char *shamem = (char*)shmat(shmid, NULL, 0);
// ipc-cod 通信代码
while(true)
{
cout << "client asy@ " << shamem << endl; // 直接访问共享内存
sleep(1);
}
// 去关联
shmdt(shamem);
// 释放共享内存
int ret = shmctl(shmid, IPC_RMID, NULL);
return 0;
}
- 客户端:
//client.cc
#include "comm.hpp"
#include <unistd.h>
int main()
{
// 获取共享内存
int shmid = GetMem();
// 挂接
char *shmaddr = (char*)shmat(shmid, NULL, 0);
// ipc-code 通信代码
while(true)
{
cout << "Please enter:";
fgets(shmaddr, size, stdin);
}
// 去关联
shmdt(shmaddr);
return 0;
}
四、消息队列
消息队列是 system V IPC 所提供的一种通信方式,用于数据的传输。
1.基本原理
消息队列是能让不同进程看到同一份资源的、一个在内核中维护的队列,队列中的每个成员都是一个数据块,这些数据块本质上是一个个结构体,都由类型和信息两部分构成,其中,类型字段用来标识一个数据块是由哪个进程发送的,信息就是进程通信的内容。
两个通信的进程,通过某种方式找到同一个消息队列,要发送数据时,都在消息队列的队尾添加数据块,要获取数据时,都在消息队列的队头取数据块。
系统中也可能会存在大量的消息队列,于是,内核也需要为消息队列维护相关的数据结构。
//消息队列的相关数据结构
struct ipc_perm{
__kernel_key_t key;
__kernel_uid_t uid;
__kernel_gid_t gid;
__kernel_uid_t cuid;
__kernel_gid_t cgid;
__kernel_mode_t mode;
unsigned short seq;
};
struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first; /* first message on queue,unused */
struct msg *msg_last; /* last message in queue,unused */
__kernel_time_t msg_stime; /* last msgsnd time */
__kernel_time_t msg_rtime; /* last msgrcv time */
__kernel_time_t msg_ctime; /* last change time */
unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */
unsigned long msg_lqbytes; /* ditto */
unsigned short msg_cbytes; /* current number of bytes on queue */
unsigned short msg_qnum; /* number of messages in queue */
unsigned short msg_qbytes; /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid; /* last receive pid */
};
//...
2.相关系统调用
2.1-创建 msgget() 和 ftok()
/* 接口1 */
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);
参数:1.key:消息队列的内核标识符。
2. msgflg:消息队列的创建方式,其中包括:
1)IPC_CREAT:没有创建,有则返回。
2)IPC_EXIT: 有则出错返回。若要使用也得跟上方式1(IPC_CREAT | IPC_EXCL)
返回值:创建成功,返回的一个有效的用户层标识符;失败,返回-1,并设置合适的错误码。
/* 接口2 */
#include<sys/ipc.h>
#include<sys/shm.h>
key_t ftok(const char *pathname, int proj_id);
功能:将一个已存在的路径名pathname和一个整数标识符proj_id转换成一个key值(IPC键值),
在使用msgget()获取消息队列时,这个key值会被填充进维护消息队列的数据结构当中。
参数:1. pathname:任意的文件路径,但pathname所指定的文件必须存在且可存取,
一般都写成当前路径"."。
2. proj_id:整数标识符/项目ID,可自定义,但不能是0。
说明:这两个参数都是为了生成key_t类型的内核的标识符。
返回值:成功,返回独一无二的key值;失败,返回-1
【补】消息队列的相关指令操作
- ipcs -q:查看当前操作系统中所有的消息队列。
- ipcrm -q + 用户层标识符(msqid):释放一个消息队列。
2.2-释放 msgctl()
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数:1.mspid:共享内存的用户层id/标识符。
2.cmd:操作选项,常见有:
1)IPC_STAT,获取消息队列的状态信息,放在buf指向的变量中。
2)IPC_SET,设置消息队列的状态,需要将buf变量传进去,方便修改
3)IPC_RMID,删除消息队列。
4)SHM_LOCK,锁定消息队列。
5)SHM_UNLOCK,解锁消息队列。
3.buf:语言层面用来描述一个消息队列的结构体,里面保存了消息队列的部分属性
返回值:如果操作是 IPC_RMID ,那么删除成功,则返回0;失败则返回-1.并设置合适的错误码。
2.3-发送数据 msgsnd()
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数:1.msqid:表示消息队列的用户级标识符。
2.msgp:表示待发送的数据块。必须为以下结构:
struct msgbuf{
long mtype; /* message type, must be > 0 */
char mtext[自定义大小]; /* message data */
};
3.msgsz:表示所发送数据块的大小
4.msgflg:表示发送数据块的方式,一般默认为0即可。
返回值:发生成功,返回0;发生失败,返回-1,并设置合适的错误码。
2.4-获取数据 msgrcv()
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数:1.msqid:表示消息队列的用户级标识符。
2.msgp:表示获取到的数据块,是一个输出型参数,必须为以下结构:
struct msgbuf{
long mtype; /* message type, must be > 0 */
char mtext[自定义大小]; /* message data */
};
3.msgsz:表示要获取数据块的大小
4.msgtyp:表示要接收数据块的类型。
返回值:获取成功,返回实际获取到 mtext 数组中的字节数;获取失败,返回-1,并设置合适的错误码。
五、信号量
信号量是 system V IPC 所提供的一种通信方式,用于保证进程间的同步与互斥。
1.基本原理
由于进程有共享资源的需要,而有的资源要求进程互斥使用,因此,使用这些资源的各进程就处于竞争关系,而这种竞争关系就叫做进程互斥。系统中,一次只允许一个进程使用的资源,被称为临界资源或互斥资源,在进程中涉及到临界资源的程序段就叫临界区。
例如,管道就是一种临界资源,自带同步与互斥机制,以保护进程共享的内存资源。而共享内存则不是临界资源,它没有同步与互斥机制,可能会出现 A 进程正在向共享内存中写入,还没有写完,B 进程就来读取,导致发方和收方的数据不完整,引起数据不一致问题。
为了方便管理和使用,系统中一大块的临界资源会被划分成多个小块的临界资源,当有进程申请使用时按需分配给其小块的临界资源。
信号量就是一种用于控制多个进程或线程访问临界资源的同步机制,它本质是一个计数器,用于记录临界资源的可用数量。
信号量保证的是,假设只有 n 个临界资源,不会出现 n+1 个执行流来访问临界资源,以防数据不一致问题的发生。如果在临界资源充足的情况下,出现多个进程/执行流访问同一个临界资源,这样的情况就属于编码 Bug ,而非数据不一致问题。
信号量(计数器)可以有效的保证访问临界资源的执行流的数量。一个进程/执行流成功申请到了信号量,就表示这个进程/执行流具有访问临界资源的权限了。申请到了信号量,但没有去访问临界资源,是对临界资源的一种预定机制,也就是说,每个进程/执行流想要访问临界资源的时候,并不是直接访问,而是先向系统申请信号量资源,再按需访问临界资源。那么,信号量其实也是一种共享资源。
既然信号量也是一种共享资源,那么就可能出现多个进程/执行流同时在申请同一个信号量。信号量本就是是用来保护临界资源的,如此,信号量得首先保证自身的安全。
申请信号量,本质是对计数器 --,被称为 P 操作(申请一个资源,如果资源不够就阻塞等待);释放共享资源,本质是对计数器 ++,被称为 V 操作(释放一个资源,如果有进程在等待该资源,则唤醒一个进程)。-- 和 ++ 操作转成汇编,一般会对应三条汇编指令——从内存中读取数据到 CPU 中、CPU 内进行操作、CPU 将结果写回内存——进程在运行的时候,随时可能被替换,于是,在多进程同时在申请同一个信号量、共享信号量的前提下, -- 和 ++ 操作可能会导致信号量的值发生错乱,引发数据不一致问题。而PV操作,经过互斥机制的保护,具有原子性,只对应一条汇编指令,确保了信号量的安全性。
在系统中也可能存在大量的信号量,内核也为信号量维护了相关的数据结构。
//信号量的相关数据结构
struct ipc_perm{
__kernel_key_t key;
__kernel_uid_t uid;
__kernel_gid_t gid;
__kernel_uid_t cuid;
__kernel_gid_t cgid;
__kernel_mode_t mode;
unsigned short seq;
};
struct semid_ds {
struct ipc_perm sem_perm; /* permissions .. see ipc.h */
__kernel_time_t sem_otime; /* last semop time */
__kernel_time_t sem_ctime; /* last change time */
struct sem *sem_base; /* ptr to first semaphore in array */
struct sem_queue *sem_pending; /* pending operations to be processed */
struct sem_queue **sem_pending_last; /* last pending operation */
struct sem_undo *undo; /* undo requests on this array */
unsigned short sem_nsems; /* no. of semaphores in array */
};
//...
如果临界资源只有一份,那么相应信号量(计数器)的值只能是 1 或者 0,且在任何时候只允许一个进程/执行流访问共享资源,这种只能为 1、0 两态的计数器就叫做二元信号量。二元信号量主要用于实现进程/执行流对临界资源的互斥访问,本质是一把锁,计数器的值最大为 1,意味着临界资源只有一份,换句话说,临界资源不会分成很多块,而是当做一个整体,整体申请,整体释放,以实现互斥。
【Tips】信号量
- 信号量本质是一个计数器,申请和释放涉及PV操作,具有原子性。
- 一个执行流要申请临界资源,必须先申请信号量资源,只有申请到信号量资源,才能访问临界资源。
- 申请信号量,本质是临界资源的预定机制。
- 二元信号量是值只有0、1两态的特殊信号量,本质是一把互斥锁。
2.相关系统调用
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
//创建信号量/信号量集:
int semget(key_t key, int nsems, int semflg);
//控制信号量:
int semctl(int semid, int semnum, int cmd, ...);
//申请或释放信号量(PV操作):
int semop(int semid, struct sembuf *sops, unsigned nsops);
六、内核对 IPC 资源的管理
共享内存、消息队列、信号量,统称为操作系统中的 IPC 资源。为了管理这些资源,操作系统分别为它们维护了三个结构体:struct shmid_kernel、struct msg_queue、struct sem_array,然后通过一个 struct kern_ipc_perm* 类型的柔性数组,将所有的 IPC 资源管理起来。
在这三个结构体中,第一个成员变量都是 struct kern_ipc_perm 类型的。可以理解为,struct kern_ipc_perm 是一个基类,struct shmid_kernel、struct msg_queue、struct sem_array 是继承了 struct kern_ipc_perm 的三个子类。
如何在维护 IPC 资源的柔性数组中找到一个 IPC 对象呢?
在 struct kern_ipc_perm 中,字段 “key_t key;” 键值用于标识一个 kern_ipc_perm 对象属于哪种 IPC资源。只要有了一个 kern_ipc_perm 的地址,再将该地址通过强制类型转换,转换成这个 kern_ipc_perm 对象所属的 IPC 对象的类型,就可以由转换后的结果,访问到这个 kern_ipc_perm 对象所属的 IPC 对象了。也就是说,用户层上使用的 shmid(共享内存标识符)、msqid(消息队列标识符)、semid(信号量标识符)本质上就是维护 IPC 资源的柔性数组的下标。这其中包含了多态的思想。
维护 IPC 资源柔性数组被封装在一个名为 ipc_id_ary 结构体对象中。ipc_id_ary 是一张顺序表,隶属于操作系统,不属于任何进程。它使得柔性数组的下标是一直线性递增的,这个递增属性不会因为 IPC 资源的释放而改变,例如,如果此时操作系统中最后一个 IPC 资源的下标是 12,释放掉这个 IPC 资源,下一次再创建 IPC 资源,新创建的 IPC 资源的下标是会是 13,直到递增到一定值的时候,才会回归0。