目录
一、信号量相关函数
1. 创建信号量集
2. 获取信号量集
3. 等待、通知信号量集
4. 控制信号量集
二、简单进程同步
1. 创建信号量集
2. P操作
3. V操作
4. 删除信号量集
5. 测试:
三、生产者与消费者
1. 创建、删除共享内存及信号量集
2. 单一生产者和消费者
2.1. 生产者
2.1. 消费者
3. 多生产者多消费者
4. 改进
四、总结
一、信号量相关函数
1. 创建信号量集
本次实验使用信号量机制来实现进程的同步与互斥,因此首先需要创建信号量集。
使用函数semget可以创建信号量集,它的原型如下:
int semget(key_t key, int nsems, int semflg)
其中,key是创建信号量集的键,nsems是信号量集中信号量量的数量;
semflg是指定的选项及其权限位,包含IPC_CREAT(创建新的信号量集)、IPC_EXCEL(如果信号量集已经存在,则返回错误)等。
这个函数创建的是一个信号量集,其中包含多个信号量,可以通过函数semop来访问信号量集中的某个信号量,或者使用semctl函数来对信号量进行操作,一般创建数据集后都要首先使用semctl对每个信号量设置初始值。semop和semctl函数的介绍在下面。
2. 获取信号量集
一个进程创建号信号量集后,另一个进程想要访问这个信号量集,可以使用semget函数传入KEY值来获取该信号量集的id,该函数原型如下:
int semid = semget(KEY, 0, 0);
上述代码只为获取一个已经存在的信号量集,不需要nsems和semflg,全部为0即可。获取成功会返回信号量集id,如果获取失败的话会返回-1 。
3. 等待、通知信号量集
使用semop函数可以访问一个信号量集,进行获取(P操作)和释放(V操作),其原型如下:
int semop(int semid,struct sembuf *sops,unsigned nsops)
semid是使用semget函数获取到的信号量集的id;
sops是一个sembuf类型的结构,用于描述信号量的操作:等待、通知等,其定义如下:
struct sembuf{
short sem_num; // 要访问的信号量在信号量集中的索引
short sem_op; // 对信号量的操作,为负数是P操作,正数是V操作
short sem_flg; // 操作标志,可以是0或IPC_NOWAIT(非阻塞方式)
}
nsops是指定信号量集中操作的信号量个数。
4. 控制信号量集
要对整个信号量集进行操作可以使用semctl函数,其原型如下:
int semctl(int semid, int semnum, int cmd, union semun arg)
semid是由semget函数返回的信号量集id
semnum是信号量在信号量集中的索引
cmd是控制命令,用于对信号量执行指定的操作,命令包括:
IPC_STAT:获取信号量集合的属性信息,将结果写入指定的结构体中。
IPC_SET:设置信号量集合的属性信息,使用指定的结构体中的值进行设置。
GETVAL:获取指定信号量的值。
SETVAL:设置指定信号量的值。
GETALL: 获取信号量集合中所有信号量的值
SETALL: 设置所有信号量的值
GETPID:获取最后一个执行 semop() 操作的进程 ID。
GETNCNT:获取等待该信号量值增加的进程数。
GETZCNT:获取等待该信号量值变为 0 的进程数。
IPC_RMID:删除信号量集合。
二、简单进程同步
现在使用上面给出的函数编写几个程序实现进程同步。
信号量机制实现进程同步需要使用一些简单的信号量集操作,包括创建信号量集,P操作,V操作,删除信号量集。
1. 创建信号量集
createSem.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
using namespace std;
#define KEY 2002
int main(){
int semid = semget(KEY, 1, IPC_CREAT);
semctl(semid, 0, SETVAL, 0);
}
上面代码使用semid函数创建一个只包含一个信号量的信号量集,随后使用semctl将该信号量的值初始化为0 。
2. P操作
p1.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
#define KEY 2002
using namespace std;
int main(){
int semid = semget(KEY, 0, 0);
struct sembuf *sops = new sembuf;
sops->sem_num = 0;
sops->sem_op = -1;
sops->sem_flg = 0;
if(semop(semid, sops, 1) == -1){
char err[] = "semop";
perror(err);
exit(1);
}
cout << "获取成功" << endl;
return 0;
}
上一个程序createSem创建完信号量集后,信号量集就保存在缓冲区中,此时另一个程序可以使用semget函数访问该信号量集,使用同样的键值KEY就能访问到上一个进程创建的信号量集。
上述代码首先获取已经创建好的信号量集,随后定义sembuf类型结构指针sops,设置访问信号量索引sem_num为0,信号量操作sem_op为-1(p操作,信号量减一),操作标志为0 。接着就使用semop函数传入sops对信号量进行操作,如果操作失败(如信号量集不存在或信号量索引非法等)就输出错误信息,结束程序。如果获取成功则会执行下面的语句打印“获取成功”,如果进行P操作时该信号量为0,此时进行会阻塞,等待其他进程释放信号,为了简化问题,一开始设置信号量为0,此时执行p1程序一定会阻塞。
3. V操作
p2.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
#define KEY 2002
using namespace std;
int main(){
int semid = semget(KEY, 0, 0);
struct sembuf *sops = new sembuf;
sops->sem_num = 0;
sops->sem_op = 1;
sops->sem_flg = 0;
if(semop(semid, sops, 1) == -1){
char err[] = "semop";
perror(err);
exit(1);
}
return 0;
}
上面代码与p1基本时一样的,只是修改了sem_op,改为1(V操作,信号量加1),执行该程序后,对应信号量会加一,原本阻塞的程序就能结束等待,继续执行下去。
4. 删除信号量集
deletSem.cpp
信号量集使用完毕后需要删除信号量集,使用semctl函数实现,控制命令选择IPC_RMID。
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
using namespace std;
#define KEY 2002
int main(){
int semid = semget(KEY, 0, 0);
semctl(semid, 0, IPC_RMID, 0);
}
5. 测试:
现在可以执行上面的4个程序了,执行的顺序是createSem(创建信号量集)、p1(P操作)、p2(V操作)、deleteSem(删除信号量集)
在终端执行如下命令创建各个文件并编译。
接着先执行createSem和p1程序:
可以看到执行p1程序后,不会输出“获取成功”,也没有结束,因为一开始设置信号量为0,因此此时p1获取资源(P操作)是会阻塞的。
在这时,我们再运行p2程序释放资源(V操作),释放后该信号量加一,此时p1程序就能正常获取资源了,程序会继续执行下去。
可以看到在另一个终端执行p2后,p1就能正常执行下去了,并打印“获取成功”。
最后不要忘了删除信号量集。
三、生产者与消费者
现在我们掌握了信号量的基本操作,现在我们使用上面的函数编写程序解决生产者与消费者问题。
按照生产者和消费者的问题描述,我们需要三个信号量,一个是互斥信号量mutex,用于实现各进程对缓冲池的互斥使用,一个是empty用于表示缓冲池中空缓冲区的数量,最后一个full表示缓冲池中满缓冲池的数量。在这里我们使用共享内存作为公用缓冲池,在上次实验我们在共享内存中写入数据是会覆盖掉原数据的,因此设置共享内存中最多只能写入一个数据,相当于公用缓冲池中只有一个可用的缓冲区,因此信号量empty和full最多为1,二者的取值范围都是{-1,0,1}。
其实共享内存也不是不能写入多个数据,它会覆盖原数据是因为获取的共享内存的地址都是首地址,同一个地址重复写入数据当然会覆盖原数据了。如果是字符串的话比较难实现写入多个数据,因为字符串的长度是不定的,这要区分每个字符串的话比较麻烦(使用\0),但如果是整型数据的话,是可以很容易实现写入多个数据的,因为整型是定长的,可以将公用缓冲池划分为多个整型大小的缓冲区,每个缓冲区存储一个数据,这样就能实现缓冲池有多个缓冲区了(缓冲区的意义是不是这个我不太确定,不过看书上空缓冲区满缓冲区的描述应该就是一个缓冲区存放一个数据),这个后面再说,先实现一个简单的只有一个缓冲区的生产者与消费者。
根据要求修改createSem程序,修改为创建一个有3个信号量的信号量集,并按照要求初始化各个信号量的值,再添加创建共享内存的操作,同时为创建信号量集和共享内存添加错误判断,在遇到错误时打印错误信息。
1. 创建、删除共享内存及信号量集
createSem.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#define KEY 2002
#define SHMKEY 2020
#define SIZE 256
using namespace std;
int main(){
// 创建信号量集
int semid = semget(KEY, 3, IPC_CREAT);
if (semid == -1) {
perror("Failed to create semaphore");
exit(1);
}
// 创建互斥信号量mutex实现各进程对共享内存的互斥使用
semctl(semid, 0, SETVAL, 1);
// 创建信号量empty表示空缓冲区数量(暂时只能覆盖写入,最多为1)
semctl(semid, 1, SETVAL, 1);
// 创建信号量full表示满缓冲区的数量(同样最多只能为1,初始为0)
semctl(semid, 2, SETVAL, 0);
// 创建共享内存
int shmid = shmget(SHMKEY, SIZE, IPC_CREAT);
if (shmid == -1) {
perror("Failed to create shared memory");
exit(1);
}
}
根据需要修改deleteSem程序,增加一个删除共享内存的操作,同时添加错误判断。
deleteSem.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#define KEY 2002
#define SHMKEY 2020
using namespace std;
int main(){
int semid = semget(KEY, 0, 0);
if (semid == -1) {
perror("Failed to get semaphore");
exit(1);
}
// 删除信号量集
semctl(semid, 0, IPC_RMID, 0);
int shmid = shmget(SHMKEY, 0, 0);
if (shmid == -1) {
perror("Failed to get shared memory");
exit(1);
}
// 删除共享内存
shmctl(shmid, IPC_RMID, NULL);
}
2. 单一生产者和消费者
2.1. 生产者
生产者程序生产一个产品后,需要先等待共享内存中有空缓冲区,接着在等待申请共享内存资源,获得共享内存资源后送入产品(写入数据),操作完成后释放共享内存资源(mutex信号量加一),同时full信号量加一,告知消费者现在缓冲池中已经有满缓冲区了。
流程如下:
produce an item nextp;
wait(empty);
wait(mutex);
*buffer = nextp;
signal(mutex);
signal(full);
buffer是共享内存地址
代码:
producer.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/shm.h>
#include<sys/ipc.h>
#include<ctime>
#include<unistd.h>
#define KEY 2002
#define SHMKEY 2020
using namespace std;
void simSemop(int semid, int sem_num, int sem_op);
int main(){
int semid = semget(KEY, 0, 0);
int i = 0;
while(1){
// 等待缓冲池中有空缓冲区,wait(empty)
simSemop(semid, 1, -1);
// 获取共享内存资源,wait(mutex)
simSemop(semid, 0, -1);
// 获取共享内存
int shmid = shmget(SHMKEY, 0, 0);
// 连接共享内存
int *shmadd = (int*) shmat(shmid, NULL, 0);
// 获取进程号
int pid = getpid();
// 写入数据(进程号)
*shmadd = pid;
shmdt(shmadd);
// 释放共享内存资源(signal(mutex))
simSemop(semid, 0, 1);
cout << "生产者进程" << getpid() << "生产成功 " << i << endl;
// 增加满缓冲区数量(signal(full))
simSemop(semid, 2, 1);
// 生产间隔1-9秒
srand(time(NULL));
int slptime = rand() % 9 + 1;
sleep(slptime);
i++;
}
}
void simSemop(int semid, int sem_num, int sem_op){
struct sembuf *sops = new sembuf;
sops->sem_num = sem_num;
sops->sem_op = sem_op;
sops->sem_flg = 0;
if(semop(semid, sops, 1) == -1){
perror("Semop fail");
exit(1);
}
}
由于要多次使用P操作和V操作,因此我将P操作和V操作都封装在simSemop函数中,可以直接调用该函数来简化步骤。
生产者是不断生产产品的(这里的产品是生产者的进程号),无限循环上面的流程,上面的代码实际上是没有生产产品的过程的,因此每次生产之后需要等待1到9秒来模拟生产的过程(其实这个等待过程应该放在最上面而不是末尾)。
2.1. 消费者
消费者等待共享内存中有满缓冲区,共享内存中有满缓冲区后消费者再申请获取共享内存资源,获取成功后从共享内存中取出数据,最后释放共享内存资源(mutex信号量加一),同时empty信号量加一,告知生产者现在缓冲池中已经有空缓冲区可以写入数据了。
流程如下:
wait(full);
wait(mutex);
nextc = *buffer;
signal(mutex);
signal(empty);
consume the item in nextc;
代码:
consumer.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<unistd.h>
#include<ctime>
#define KEY 2002
#define SHMKEY 2020
using namespace std;
void simSemop(int semid, int sem_num, int sem_op);
int main(){
int semid = semget(KEY, 0, 0);
struct sembuf *sops = new sembuf;
while(1){
// 等待缓冲池中有满缓冲区,wait(full)
simSemop(semid, 2, -1);
// 获取共享内存资源,wait(mutex)
simSemop(semid, 0, -1);
int shmid = shmget(SHMKEY, 0, 0);
int *shmadd = (int*) shmat(shmid, NULL, 0);
cout << "消费者进程" << getpid() << " 获取:" << *shmadd << endl;
// 释放共享内存资源,signal(mutex)
simSemop(semid, 0, 1);
// 增加空缓冲区数量,signal(empty)
simSemop(semid, 1, 1);
}
}
void simSemop(int semid, int sem_num, int sem_op){
struct sembuf *sops = new sembuf;
sops->sem_num = sem_num;
sops->sem_op = sem_op;
sops->sem_flg = 0;
if(semop(semid, sops, 1) == -1){
perror("Semop fail");
exit(1);
}
}
测试:
首先创建信号量集和共享内存,接着先运行消费者,此时共享内存中没有可用的数据,可用数据信号量full为0,消费者进程会阻塞,接着再运行生产者,空间区信号量empty初始为1,此时生产者可以正常运行,生产产品,消费者就能同步获取产品,接着生产者等待1-9秒后再生产,消费者阻塞,等待生产者再次生产。
3. 多生产者多消费者
现在增加一点点难度,运行多个生产者进程和多个消费者进程。
可以直接在生产者和消费者程序中加上创建子进程的代码(在while循环之前)
// 创建5个子进程
for(int i = 0; i < 5; i++){
// 创建子进程
pid_t child = fork();
// 如果是子进程就退出循环,防止子进程也创建子进程
if(child == 0)
break;
}
这里我创建了5个子进程,一共就是6个生产者进程和6个消费者进程。
运行测试:
可以看出,这样的程序是有些问题的,多个生产者是同时生产产品的,因为time函数获取的时间是秒级的,而一秒对进程来说还是太长了,在一秒内足够6个进程都创建出来并且生产完毕,此时6个生产者进程处在同一秒内,获取的时间都是一样的,而随机种子一样,生成的随机数也都是一样的,因此它们会等待同样长的时间,之后又会同时生产产品。为了解决这个问题,我们可以将使用精度更高的函数来获取时间,比如clock_gettime。
改进代码:
producer.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/shm.h>
#include<sys/ipc.h>
#include<ctime>
#include<unistd.h>
#define KEY 2002
#define SHMKEY 2020
using namespace std;
void simSemop(int semid, int sem_num, int sem_op);
int main(){
int semid = semget(KEY, 0, 0);
int i = 0;
// 创建5个子进程
for(int i = 0; i < 5; i++){
// 创建子进程
pid_t child = fork();
// 如果是子进程就退出循环,防止子进程也创建子进程
if(child == 0)
break;
}
while(1){
// 获取纳秒级时间
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
// 以时间的纳秒部分作为随机种子
srand(ts.tv_nsec);
int slptime = rand() % 9 + 1;
// 生产时间
sleep(slptime);
// 等待缓冲池中有空缓冲区,wait(empty)
simSemop(semid, 1, -1);
// 获取共享内存资源,wait(mutex)
simSemop(semid, 0, -1);
// 获取共享内存
int shmid = shmget(SHMKEY, 0, 0);
int *shmadd = (int*) shmat(shmid, NULL, 0);
// 获取进程号
int pid = getpid();
// 写入数据(进程号)
*shmadd = pid;
shmdt(shmadd);
// 释放共享内存资源(signal(mutex))
simSemop(semid, 0, 1);
cout << "生产者进程" << getpid() << " 生产成功! " << i << endl;
// 增加满缓冲区数量(signal(full))
simSemop(semid, 2, 1);
i++;
}
}
void simSemop(int semid, int sem_num, int sem_op){
struct sembuf *sops = new sembuf;
sops->sem_num = sem_num;
sops->sem_op = sem_op;
sops->sem_flg = 0;
if(semop(semid, sops, 1) == -1){
perror("Semop fail");
exit(1);
}
}
(生产者代码不用更改)
上面代码将随机种子改为当前时间的纳秒部分,并且将睡眠的部分移到了最上面,这样更贴合实际(生产时间)。
运行测试:
可以看到现在正常了,一开始运行消费者进程,此时没有生产者进程在运行,所有消费者进程在等待队列中等待生产者生产产品。运行生产者进程后,生产者进程等待随机的时间(模拟生产时间)后生产出产品放入共享内存,只要6个生产者进程中有一个生产出产品,等待队列中最前面(这里是先进先出)的消费者会同步获取产品,其他消费者则继续等待新产品,生产者生产后需要等待随机的时间再次生产出产品,如此往复。
从上面的动图中我们也能看出最先创建的消费者进程最先获取产品,因为它最先等待,在等待队列的最前面,但父进程由于需要创建其他的进程,因此父进程最后一个进入等待队列或者最后一个创建的子进程最后进入等待队列,上图中就是前4个子进程 26200、26201、26202、26203 先等待,然后是父进程26199,最后是最后一个创建的子进程26204 。消费者进程获取完产品后会重新等待,由于一开始消费者进程和生产者进程不是同时运行的,在生产者运行时消费者进程已经全部创建好并且在等待了(就算是同时也一样,因为生产者最短也要1秒时间生产产品),因此第一个取产品的消费者进程获取完后会处于第二轮获取产品的第一位,其他消费者进程以此类推,因此每一轮消费者等待的顺序都是一样的,如此往复。
4. 改进
在上一节说过共享内存也可以有多个缓冲区,写入多个数据,现在我们使用有多个缓冲区的公用缓冲池解决生产者和消费者问题。
再这之前,我们先来简单了解一下内存地址,如果对指针很了解的话应该就很容易理解,其实指针也可以看成是地址,或者更应该说指针变量存放的是地址,指针是一个变量,它是有类型的,地址加一就是实际的加一,指针加一却是加一个类型的长度(比如整型4个字节,整型指针加一就是地址加4)。
如果使用shmat函数获取共享内存的指针时将其强制转化为整型,那么这个指针就是整型的指针,每次移动的长度就一个整型的长度,相当于将共享内存分为了多个区,每个区都可以存放一个整型数据。
假设一个共享内存大小是20字节,使用整型指针访问共享内存就相当于共享内存中有5个区,每个区4个字节(一般在linux中int类型长度是4个字节),指针每次加一,地址就会增加一个区的长度4bytes。如下图:
#include<iostream>
using namespace std;
int main(){
int a[5];
int* p = a;
for(int i = 0; i < 5; i++){
*(p + i) = i;
}
for(int i = 0; i < 5; i++){
cout << *(p + i) << " ";
}
cout << endl;
}
上面这个代码就是一个简单的示例,用整型数组来代替共享内存(整型数组其实就是一块连续的内存地址),使用一个整型指针访问内存中的每个区,为其赋值0到4 。 之后再使用指针访问每个区,输出该区的数据。
运行结果:
地址空间及地址存放数据如下:
要是不理解的话就把它当成是一个整型数组,指针p就相当于整型数组变量a,p+3就是a+3, *(p+3)(取p+3地址上的值)就是a[3] ,实际上数组变量a就是个指针。
指针是用来存放地址的,使用“*”操作符可以取指针所指向的地址中的值,为指针指定类型就能规定指针每次偏移时地址移动的长度以及取数据时最多长的数据。地址实际并没有改变,但使用不同类型的指针访问地址得到的数据会不一样(如果乱用指针的话可能会读取到乱码),使用指针访问地址,地址就相当于被分成了多个区,每个区存放一个指定类型(int、char等)的数据。在存放数据和取出数据时用的指针类型最好要一样,乱用指针的后果可是很严重的。
数组其实跟指针是一样的,数组就是申请了一块连续的内存,然后按照数组的类型“划分”内存,按照下标取对应区内的数据,数组变量a实际上就是一个指针,数组变量可以当成指针用*a访问元素,也可以使用下标a[0]。
既如此,我们就可以使用整型指针将共享内存 “划分“ 为多个区,每个区存放一个整型数据。我们需要两个变量来存储in值和out值,两个值是指针的偏移量,每个生产者写入数据的地址都是p+in,写入后将in的值加一,每个消费者取数据的地址都是p+out,读取后将out加一,in和out的值我们可以存放在共享内存的最后两个”整型区“,可用的区数量要减少2防止误访问到in和out。
创建一个大小为6个整型大小的共享内存,其中前五个整型地址空间为空缓冲区,最后两个存放in和out。
首先需要更改createSem程序,将共享内存的大小改为28,共可以存储7个int类型数据,前5个作为缓冲区,剩下最后两个分别存储in和out的值,in和out初始值都是0。
createSem.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#define KEY 2002
#define SHMKEY 2020
#define SIZE 28
using namespace std;
int main(){
// 创建信号量集
int semid = semget(KEY, 3, IPC_CREAT);
if (semid == -1) {
perror("Failed to create semaphore");
exit(1);
}
// 创建互斥信号量mutex实现各进程对共享内存的互斥使用
semctl(semid, 0, SETVAL, 1);
// 缓冲区数量,最后两个存放in和out不算入缓存区数量
int bufferNum = (SIZE / sizeof(int) - 2);
// 创建信号量empty表示空缓冲区数量(初始为缓冲区数量)
semctl(semid, 1, SETVAL, bufferNum);
// 创建信号量full表示满缓冲区的数量(初始为0)
semctl(semid, 2, SETVAL, 0);
// 创建共享内存
int shmid = shmget(SHMKEY, SIZE, IPC_CREAT);
if (shmid == -1) {
perror("Failed to create shared memory");
exit(1);
}
int* shmadd = (int*) shmat(shmid, 0, 0);
// 将缓冲区初始都设为空缓冲区,赋值-1
for(int i = 0; i < bufferNum; i++)
*(shmadd + i) = -1;
// 倒数第二位存放in的值,初始为0
*(shmadd + bufferNum) = 0;
// 最后一位存放out的值,初始为0
*(shmadd + bufferNum + 1) = 0;
}
生产者程序生产一个产品后,需要先等待共享内存中有空缓冲区,接着在等待申请共享内存资源,获得共享内存资源后首先获取共享内存首地址,然后访问共享内存倒数第二个缓冲区,获取指针偏移量in,根据in在对应的地址中写入数据,写完后再将倒数第二个缓冲区中的in值加一。操作完成后释放共享内存资源(mutex信号量加一),同时full信号量加一,告知消费者现在缓冲区中已经有满缓冲区了。
producer.cpp
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/shm.h>
#include<sys/ipc.h>
#include<ctime>
#include<unistd.h>
#define KEY 2002
#define SHMKEY 2020
#define SIZE 28
using namespace std;
void simSemop(int semid, int sem_num, int sem_op);
int main(){
int semid = semget(KEY, 0, 0);
int i = 0;
// 创建5个子进程
for(int i = 0; i < 5; i++){
// 创建子进程
pid_t child = fork();
// 如果是子进程就退出循环,防止子进程也创建子进程
if(child == 0)
break;
}
while(1){
// 获取纳秒级时间
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
// 以时间的纳秒部分作为随机种子
srand(ts.tv_nsec);
int slptime = rand() % 9 + 1;
// 生产时间
sleep(slptime);
// 等待缓冲池中有空缓冲区,wait(empty)
simSemop(semid, 1, -1);
// 获取共享内存资源,wait(mutex)
simSemop(semid, 0, -1);
// 获取共享内存
int shmid = shmget(SHMKEY, 0, 0);
int* shmadd = (int*) shmat(shmid, NULL, 0);
int bufferNum = SIZE / sizeof(int) - 2;
// 获取in的值,in所在的位置是倒数第二个整型大小的地址空间
int in = *(shmadd + bufferNum);
// 获取进程号
int pid = getpid();
// 写入数据(进程号)
*(shmadd + in) = pid;
// in值加一,需要mod区数量使其处于缓存池的最大区数量范围内
*(shmadd + bufferNum) = (in + 1) % (bufferNum);
shmdt(shmadd);
// 释放共享内存资源(signal(mutex))
simSemop(semid, 0, 1);
cout << "生产者进程" << getpid() << " 生产成功! " << i << endl;
// 增加满缓冲区数量(signal(full))
simSemop(semid, 2, 1);
i++;
}
}
void simSemop(int semid, int sem_num, int sem_op){
struct sembuf *sops = new sembuf;
sops->sem_num = sem_num;
sops->sem_op = sem_op;
sops->sem_flg = 0;
if(semop(semid, sops, 1) == -1){
perror("Semop fail");
exit(1);
}
}
消费者等待共享内存中有满缓冲区,共享内存中有数据后消费者在申请获取共享内存资源,获得共享内存资源后首先获取共享内存首地址,然后访问共享内存最后一个缓冲区,获取指针偏移量out,根据out在对应的地址中读取数据,读取完后再将最后一个缓冲区中的out值加一。最后释放共享内存资源(mutex信号量加一),同时empty信号量加一,告知生产者现在缓冲池中已经有空缓冲区可以写入数据了。
#include<iostream>
#include<sys/sem.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<unistd.h>
#include<ctime>
#define KEY 2002
#define SHMKEY 2020
#define SIZE 28
using namespace std;
void simSemop(int semid, int sem_num, int sem_op);
int main(){
int semid = semget(KEY, 0, 0);
struct sembuf *sops = new sembuf;
// 创建5个子进程
for(int i = 0; i < 5; i++){
// 创建子进程
pid_t child = fork();
// 如果是子进程就退出循环,防止子进程也创建子进程
if(child == 0)
break;
}
while(1){
// 等待缓冲池中有满缓冲区,wait(full)
simSemop(semid, 2, -1);
// 获取共享内存资源,wait(mutex)
simSemop(semid, 0, -1);
int shmid = shmget(SHMKEY, 0, 0);
int *shmadd = (int*) shmat(shmid, NULL, 0);
// 缓冲区数量
int bufferNum = SIZE / sizeof(int) - 2;
// 获取out值,in所在的位置是最后一个整型大小的地址空间
int out = *(shmadd + bufferNum + 1);
cout << "消费者进程" << getpid() << " 获取:" << *(shmadd + out) << endl;
// out值加一,%bufferNum 是为了保证值的范围在0到bufferNum - 1之间
*(shmadd + bufferNum + 1) = (out + 1) % bufferNum;
// 释放共享内存资源,signal(mutex)
simSemop(semid, 0, 1);
// 增加空缓冲区,signal(empty)
simSemop(semid, 1, 1);
}
}
void simSemop(int semid, int sem_num, int sem_op){
struct sembuf *sops = new sembuf;
sops->sem_num = sem_num;
sops->sem_op = sem_op;
sops->sem_flg = 0;
if(semop(semid, sops, 1) == -1){
perror("Semop fail");
exit(1);
}
}
运行结果:
如上图,这次一开始我们先运行生产者,观察一下生产者因缓冲池中没有空缓冲区而阻塞,可以看到在生产了5个产品后,6个生产者进程就被阻塞了,没有一个能继续生产产品,接着在运行消费者,在5个消费者进程取出5个产品后,缓冲池立即多出5个空缓冲区,立马有5个生产者生产出产品,消费者同步获取产品,接着就是生产者一个个生产产品,消费者同步获取产品,如此往复。
四、总结
生产者消费者问题主要要解决的就是进程之间同步的问题以防止进程之间无序争夺资源,造成系统混乱。
生产者和消费者共用一个缓冲池,生产者写入数据,消费者取出数据,为了防止数据错误,同一个时间只能有一个进程使用缓冲池,实现进程互斥访问缓冲池;消费者要在生产者生产完产品,缓冲池中有产品后再去申请获取缓冲池资源,生产者在缓冲池满了后要在消费者取出产品后再申请缓冲池资源将产品放入缓冲池。
使用信号量机制实现上面的要求就需要3个信号量,一个是缓冲池的资源数量,最大为1,实现互斥访问缓冲池;一个是缓冲池中空缓冲区数量信号量;最后一个是缓冲池满缓冲区数量信号,初始为0;生产者首先等待缓冲池空缓冲区信号量,再申请缓冲池资源,生产完后释放缓冲池资源,增加缓冲池中满缓冲区数量信号量,消费者先等待缓冲池中满缓冲区信号量,再申请缓冲池资源,生产完后释放缓冲池资源,增加缓冲池空缓冲区数量信号量。
生产者和消费者写入数据和取出数据都是按照顺序进行的,各个生产者进程按照in的值按顺序往下写入数据,各个消费者进程按照out的值按顺序从缓冲区中取出数据。这样不会出现生产者写入到满缓冲区的情况,也不会出现生产者从空缓冲区取出数据的情况,因为生产者一开始缓冲区全是空的,生产的时候按照顺序一个个写入数据就不会写入到满缓冲区中,生产者在前面一直往前写入数据时,消费者在生产者后面取数据,此时消费者访问的地址中都是放满数据的,消费者只要一直在生产者后面就不会出现访问到空缓冲区的情况,而信号量机制保证了这一点,只有生产者生产完后消费者才能同步获取数据,不会出现消费者在生产者之前访问共享内存。将out和in的值mod缓冲区数量就能实现在共享内存中循环写入和读取数据。
此次实验还是有点难度的,至少比上次实验难不少,这次实验没有例子,我从最基本的步骤开始,从简单的进程同步到只有一个缓冲区的生产者和消费者同步到最后完整的生产者和消费者同步,一开始还以为共享内存不好划分缓冲区,后面想了想,想起了c语言了指针,用它就能实现这个操作,但是如果是字符串的话也还是不好搞的。