目录
一、进程通信介绍
1.目的
2.发展
3.进程通信是什么,怎么通信?
二、管道
1.介绍
2.匿名管道
1.单向通信管道原理
2.代码实现
3.管道特征
4.管道的四种情况
5.管道的应用场景
使用管道实现一个简易版本的进程池
3.命名管道
1.思考
2.创建一个命名管道
3.匿名管道与命名管道的区别
4.命名管道的打开规则
4.日志
日志等级:
日式时间相关函数
日志代码实现
5.总结
三、system V共享内存
1.原理
2.代码书写
1.相关函数
1.shmget
返回值:
key:
size:
shmfig:
2.shmat
3.shmdt
4.shmctl
2.代码
3.共享内存特性
一、进程通信介绍
1.目的
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
2.发展
- 管道
- 匿名管道pipe
- 命名管道
简单经典的通信使用的一种方式
- System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
单独设计了一套接口,与文件无关。 只能本地使用,本地通信,在网络阶段,有很多替代方案。
- POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
网络和多线程时使用
3.进程通信是什么,怎么通信?
1.是什么?
两个或多个进程实现数据层面的交互。
因为进程独立性的存在,进程通信的成本较高 -> 进程通信是有成本的
2.怎么办?
- 进程间通信的本质:必须让不同的进程看到同一份"资源"
- "资源"?:特定形式的内存空间
- 这个"资源"谁提供?一般是操作系统
- 为什么不是我们两个进程中的一个呢?假设一个进程提供,这个资源属于谁?这个进程独有,破环进程独立性。来自第三方空间
- 我们进程访问这个空间,进行通信,本质就是访问操作系统!进程代表的就是用户,"资源"从创建,使用,释放 --- 出自系统调用接口! --- 1.从底层设计,从接口设计,都要由操作系统独立设计;2.一般操作系统,会有一个独立的通信模块 -- 隶属于文件系统 -- IPC通信模块定制标准 -- 进程间通信是有标准的 -- 就是上述的system V(本机内部) 和 POSIX(网络通信)
二、管道
1.介绍
基于文件级别的进程通信方式
- 管道是Unix中最古老的进程间通信的形式。
- 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
2.匿名管道
#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码
1.单向通信管道原理
2.代码实现
创建管道函数 pipe
#include <iostream>
#include <string>
#include <cstdlib> //stdlib.h
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
using namespace std;
#define N 2
#define NUM 1024
//child
void Writer(const int& wfd)
{
string str = "hello, i am child!";
pid_t self = getpid();
int number = 0;
char buffer[NUM];
while(true)
{
//构建发送字符串
buffer[0] = 0; //字符串清空,只是为了提醒阅读代码的人,把这个字符数组当作字符串了。
snprintf(buffer, sizeof(buffer), "%s-%d-%d", str.c_str(), self, number++);
//发送/写入给父进程
write(wfd, buffer, strlen(buffer));
sleep(1);
}
}
//father
void Reader(const int& rfd)
{
char buffer[NUM];
while(true)
{
ssize_t n = read(rfd, buffer, sizeof(buffer));
if(n > 0)
{
buffer[n] = 0; // 0 == '\0'
cout << "father get some message[" << getpid() << "]:" << buffer << endl;
}
}
}
int main()
{
int pipefd[N] = {0}; //输出型参数
int n = pipe(pipefd); //申请管道
if(n < 0)
return -1;
// cout << "pipefd[0]:" << pipefd[0] << ",pipefd[1]" << pipefd[1] << endl;
// father -> r ; child -> w;
pid_t id = fork(); //创建子进程
if(id < 0)
return 2;
if(id == 0)
{
//child
close(pipefd[0]);
//IPC code
Writer(pipefd[1]);
close(pipefd[1]);
exit(0);
}
//father
close(pipefd[1]);
// IPC code
Reader(pipefd[0]);
close(pipefd[0]);
return 0;
}
3.管道特征
- 具有血缘关系的进程会进行进程间通信
- 管道只能单向通信
- 父子进程是会进程协同的,同步和互斥的 --- 保护管道文件的数据安全
- 管道是面向字节流的。
- 管道是基于文件的,而文件的生命周期是跟随进程的
4.管道的四种情况
- 读写端正常,管道如果为空,读端就要阻塞
- 读写端正常,管道如果被写满,写端就要阻塞
- 读端正常读,写端关闭,读端就会读到0,表面读到了文件(pipe)结尾,不会被阻塞
- 读端关闭,写端正常写,操作系统就要杀掉正在写入的进程。如何杀掉--通过信号杀掉
5.管道的应用场景
使用管道实现一个简易版本的进程池
原由:创建进程需要调用fork函数,而fork函数这个系统调用是有成本的!
"Task.hpp"
#pragma once
#include <iostream>
#include <vector>
using namespace std;
//函数指针
typedef void (*task_t)();
void task1()
{
std::cout << "lol : 刷新野怪" << std::endl;
}
void task2()
{
std::cout << "lol : 刷新蓝条" << std::endl;
}
void task3()
{
std::cout << "lol : 刷新血量" << std::endl;
}
void task4()
{
std::cout << "lol : 更新系统" << std::endl;
}
void LoadTasks(std::vector<task_t> *tasks)
{
tasks->push_back(task1);
tasks->push_back(task2);
tasks->push_back(task3);
tasks->push_back(task4);
}
"ProcessPool.cc"
#include "Task.hpp"
#include <string>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/wait.h>
const int ProcessNum = 10;
std::vector<task_t> tasks;
//先描述 -- 管道
struct channel
{
int _cmdfd; //发送任务的文件描述符
pid_t _slaverid; //子进程的pid
std::string _processname; //子进程的名字,方便我们打印日志
channel(int cmdfd, pid_t slaverid, const std::string &processname)
: _cmdfd(cmdfd)
, _slaverid(slaverid)
, _processname(processname)
{}
};
void slaver()
{
while(true)
{
int cmdcode = 0;
int n = read(0, &cmdcode, sizeof(int));
if(n == sizeof(int))
{
std::cout << " child get a command: " << getpid() << " cmdcode: " << cmdcode << std::endl;
if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
}
if(!n)
break;
}
}
void InitProcessPool(std::vector<channel> *channels)
{
std::vector<int> oldfd;
// 1.初始化 --- bug
for (size_t i = 0; i < ProcessNum; ++i)
{
int pipefd[2]; //临时空间
int n = pipe(pipefd); //
if(n != 0)
{
perror("pipe create file");
return;
}
pid_t id = fork();
if(id == 0)
{
std::cout << "child " << getpid() << " close history fd :";
for (auto &e : oldfd)
{
std::cout << e << " ";
close(e);
}
std::cout << std::endl;
// child
close(pipefd[1]);
dup2(pipefd[0], 0);
close(pipefd[0]);
slaver();
std::cout << "process " << getpid() << ":quit!" << std::endl;
exit(0);
}
//father
close(pipefd[0]);
//开始添加channel字段
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfd.push_back(pipefd[1]);
sleep(1);
}
}
void Debug(const std::vector<channel> &channels)
{
// test
for(const auto& e:channels)
{
std::cout << "pid: " << getpid() << " " << e._cmdfd << " " << e._slaverid << " " << e._processname << std::endl;
}
}
void Menu()
{
std::cout << "############################################" << std::endl;
std::cout << "########1.刷新野怪 2.刷新蓝条 #############" << std::endl;
std::cout << "########3.刷新血量 4.更新系统 0.退出######" << std::endl;
std::cout << "############################################" << std::endl;
}
void ctrlSlaver(const std::vector<channel> &channels)
{
srand(time(0));
int which = 0;
//int cnt = 0;
while (true)
{
Menu();
std::cout << "Please enter@:";
int n;
std::cin >> n;
if(n <= 0 || n >= 5)
break;
// 1.选择任务
// int cmdcode = rand() % tasks.size();
int cmdcode = n - 1;
// 2.选择子进程
// int processpos = rand() % channels.size(); //随机方法
std::cout << "father say:"
<< "cmdcode: " << cmdcode
<< " already sendto " << channels[which]._slaverid
<< " processname: " << channels[which]._processname
<< endl;
// 3.发送任务
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
++which;
which %= channels.size(); //轮转法
sleep(1);
}
}
void QuitProcess(const std::vector<channel>& channels)
{
for(const auto& e: channels)
{
close(e._cmdfd);
waitpid(e._slaverid, nullptr, 0);
}
//version1
// for (int i = channels.size() - 1; i >= 0; --i)
// {
// close(channels[i]._cmdfd);
// waitpid(channels[i]._slaverid, nullptr, 0);
// }
//sleep(5);
// 有bug
// for (const auto &e : channels)
// close(e._cmdfd);
// sleep(5);
// for (const auto &e : channels)
// waitpid(e._slaverid, nullptr, 0);
// sleep(5);
}
int main()
{
//再组织
//将对子进程结构的增删查改转化为对数据结构vector的增删查改
std::vector<channel> channels;
// 1.初始化
LoadTasks(&tasks);
InitProcessPool(&channels);
//test
Debug(channels);
// 2.开始控制子进程
ctrlSlaver(channels);
// 3.清理收尾
QuitProcess(channels);
return 0;
}
结果
注:
这是我们原先创建子进程的代码,但是这份代码会造成一个问题,就是子进程会继承父进程对上一个子进程管道读端。
void InitProcessPool(std::vector<channel> *channels)
{
// 1.初始化 --- bug
for (size_t i = 0; i < ProcessNum; ++i)
{
int pipefd[2]; //临时空间
int n = pipe(pipefd); //
if(n != 0)
{
perror("pipe create file");
return;
}
pid_t id = fork();
if(id == 0)
{
// child
close(pipefd[1]);
dup2(pipefd[0], 0);
close(pipefd[0]);
slaver();
std::cout << "process " << getpid() << ":quit!" << std::endl;
exit(0);
}
//father
close(pipefd[0]);
//开始添加channel字段
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
}
}
而解决办法是,把上一个写端记录下来,在创建子进程时,顺便把子进程的继承自父进程的写端给close掉。
3.命名管道
1.思考
我们上面使用的匿名管道只能在有共同祖先/血缘相近的进程间使用,而我们想在不同进程间进行管道通信时,应该怎么做呢?
我们可以使用FIFO文件在做这项工作,它经常被称作命名管道。
2.创建一个命名管道
1.命令行创建
mkfifo filename
2.程序中创建
int mkfifo(const char *filename,mode_t mode)
3.匿名管道与命名管道的区别
- 匿名管道由pipe函数创建并打开。
- 命名管道由mkfifo函数创建,打开用open
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完成之后,它们具有相同的语义。
4.命名管道的打开规则
如果当前打开操作是为读而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
- O_NONBLOCK enable:立刻返回成功
如果当前打开操作是为写而打开FIFO时
- O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
- O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
4.日志
此内容与管道无关,只是需在当前练习中打印日志,所以做一个笔记
日志包含:日志时间,日志等级,日志内容,文件的名称和行号
日志等级:
- Info:常规消息
- Warning:报警信息
- Error:必要严重的问题,可能需要立即处理
- Fatel:致命的
- Debug:调试
日式时间相关函数
time:打印时间戳
time_t time(time_t *t);
当前时间戳传nullptr
gettimeofday:
int gettimeofday(struct timeval *tv, struct timezone *tz /*时区*/);
struct timezone *tz:时区,缺省为nullptr即可
localtime:
struct tm *localtime(const time_t *timep);
注意:这里年是从1900年开始的,所以要加上1900.月是从0开始的,所以要加1.
日志格式可变参数部分
int vsnprintf(char *str, size_t size, const char *format, va_list ap);
日志代码实现
#pragma once
#include <iostream>
#include <string>
#include <stdarg.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define SIZE 1024
//日志等级
#define Info 0
#define Debug 1
#define Warning 3
#define Error 4
#define Fatel 5
//打印方式
#define Screen 1
#define Onefile 2
#define Classfile 3
#define Logfile "log.txt"
class Log
{
public:
Log()
{
PrintMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
PrintMethod = method;
}
std::string levelToString(int level)
{
switch(level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatel:
return "Fatel";
default:
return "None";
}
}
//日志函数
// void logmessage(int level, const char* format, ...)
// {
// time_t t = time(nullptr);
// struct tm *ctime = localtime(&t);
// char leftbuffer[SIZE];
// snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
// ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
// ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
// va_list s;
// va_start(s, format);
// char rightbuffer[SIZE];
// vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
// va_end(s);
// //格式:默认部分+自定义部分
// char logtxt[SIZE];
// snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
// //printf("%s", logtxt);
// printlog(level, logtxt);
// }
void printlog(int level, const std::string &logtxt)
{
switch(PrintMethod)
{
case Screen:
std::cout << logtxt;
break;
case Onefile:
PrintOneFile(Logfile, logtxt);
break;
case Classfile:
PrintClassFile(level, logtxt);
break;
default:
break;
}
}
void PrintOneFile(const std::string &filename, const std::string &logtxt)
{
std::string logname = path + filename;
int fd = open(logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);
if(fd < 0)
return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void PrintClassFile(int level, const std::string &logtxt)
{
std::string filename = Logfile;
filename += ".";
filename += levelToString(level);
PrintOneFile(filename, logtxt);
}
void operator()(int level, const char* format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
//格式:默认部分+自定义部分
char logtxt[SIZE];
snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
//printf("%s", logtxt);
printlog(level, logtxt);
}
private:
int PrintMethod;
std::string path;
};
5.总结
总的来说,匿名管道和命名管道都是面向字节流的,会进行同步和互斥,生命周期随进程,使用时需要打开文件,单向通行。让不同进程看到同一份资源 -- 文件。
三、system V共享内存
1.原理
共享内存是在物理地址空间上申请的,通过页表挂接到不同进程程序地址空间的一种通信方式。
那么这块物理内存是进程申请的还是操作系统来申请的呢?
答案是操作系统,因为进程具有独立性,进程申请的资源归进程所有。
ipcs -m :查看所有的共享内存
共享内存的生命周期是跟随的内核的,用户不主动释放,共享内存会一直存在,除非内核关闭(用户释放)。
ipcrm -m shmid :删除shmid对应的共享内存
2.代码书写
1.相关函数
1.shmget
申请一块共享内存
int shmget(key_t key, size_t size, int shmflg);
返回值:
共享内存标识符
key:
1.key是一个数字,这个数字是几不重要。关键在于它必须在内核中具有唯一性,能够让不同的进程进行唯一性标识
2.第一个进程可以通过key创建共享内存,第二个之后的进程,只要拿着同一个key,就可和第一个进程看到同一个共享内存!
3.对于一个已经创建好的共享内存,key在哪?key在共享内存的描述对象中!
4.第一次创建的时候,必须有一个key了,怎么有?
ftok - convert a pathname(路径) and a project identifier(项目id) to a System V IPC key
key_t ftok(const char *pathname, int proj_id);
ftok是一套算法,将路径字符串和整形id进行了数值计算。
5.key -- 类似 -- 路径 -- 唯一性
size:
创建共享内存的大小,单位是字节
shmfig:
如何创建,获取 。。
IPC_CREAT | 单独使用,如果你申请的共享内存不存在,就创建,存在,就获取并返回 |
IPC_CREAT | IPC_EXCL | 如果你申请的共享内存不存在,就创建,存在,就出错返回。确保我们如果申请成功了一个共享内存,这个共享内存一定是一个新的 |
IPC_EXCL | 不单独使用 |
注:
key与shmid
key:操作系统内标定唯一性。
shmid:只在你的进程内,用来表示资源的唯一性。
2.shmat
void *shmat(int shmid, const void *shmaddr, int shmflg);
将申请的共享内存挂接到进程的虚拟地址空间
3.shmdt
int shmdt(const void *shmaddr);
取消挂接到进程的虚拟地址空间的共享内存
4.shmctl
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
cmd:
IPC_STAT: 获取
IPC_RMID:删除
2.代码
comm.hpp
#ifndef __COMM_HPP__
#define __COMM_HPP__
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include "log.hpp"
using namespace std;
class comm
{
public:
comm()
{
//log.Enable(Classfile);
}
//获取key
key_t Getkey()
{
key_t k = ftok(pathname.c_str(), proj_id);
if(k < 0)
{
log(Fatel, "ftok error string: %s, ftok error code: %d", strerror(errno), errno);
exit(1);
}
log(Info, "ftok success, key is : 0x%x", k);
return k;
}
int GetShareMemHelper(int flag)
{
int shmid = shmget(Getkey(), size, flag);
if(shmid < 0)
{
log(Fatel, "create share memory error string: %s, error code: %d", strerror(errno), errno);
exit(1);
}
log(Info, "create share memory success, shmid: %d", shmid);
return shmid;
}
int CreateShm()
{
return GetShareMemHelper(IPC_CREAT | IPC_EXCL | 0666);
}
int GetShm()
{
return GetShareMemHelper(IPC_CREAT);
}
private:
Log log;
const string pathname = "/home/shen";
const int proj_id = 0x6666;
//共享内存大小一般是4096的整数倍, 如果我们申请4097的话,操作系统实际给出4096*2的大小
const int size = 4096;
};
#endif
processa.cc
#include "comm.hpp"
Log log;
int main()
{
comm co;
int shmid = co.CreateShm();
log(Debug, "create shm done");
char *straddr = (char*)shmat(shmid, nullptr, 0);
log(Debug, "attach shm done");
while(true)
{
cout << "client say#:" << straddr << endl; //直接访问共享内存
sleep(1);
}
shmdt(straddr);
log(Debug, "detach shm done");
shmctl(shmid, IPC_RMID, nullptr);
log(Debug, "delete shm done");
return 0;
}
processb.cc
#include "comm.hpp"
Log log;
int main()
{
comm co;
int shmid = co.GetShm();
log(Debug, "Get shm done");
char *straddr = (char*)shmat(shmid, nullptr, 0);
log(Debug, "attach shm done");
while(true)
{
cout << "Please Enter@ ";
fgets(straddr, 4096, stdin);
}
shmdt(straddr);
log(Debug, "detach shm done");
return 0;
}
3.共享内存特性
- 共享内存没有同步互斥之类的保护机制
- 共享内存是所有的进程间通信中,速度是最快的! --- 原因:拷贝次数少
- 共享内存内部的数据,由用户自己维护!