在学习完线程相关的概念之后,本节来认识一下Linux多线程相关的一个重要模型----“ 生产者消费者模型”
本文参考:
Linux多线程生产者与消费者_红娃子的博客-CSDN博客
Linux多线程——生产者消费者模型_linux多线程生产者与消费者_两片空白的博客-CSDN博客
数据结构“入门”—队列(C语言实现)_队列c语言_Fan~Fan的博客-CSDN博客
生产者消费者模式保姆级教程 (阻塞队列解除耦合性) 一文帮你从C语言版本到C++ 版本, 从理论到实现 (一文足以)_阻塞队列实现生产者消费者模式_小杰312的博客-CSDN博客
生产者与消费者的概念
这个模型的答题逻辑可以使用信号量(POSIX信号量)或 互斥量+条件变量 实现,这里介绍使用互斥量+条件变量的方法。
一个进程中的线程有两种角色,一种是生产者,一种是消费者。生产者为消费者提供任务,消费者拿到任务,解决任务。
在生成者和消费者之间还有一个"交易场所",是一个内存块。生成者线程将任务放到内存块中,消费者线程在内存块中拿任务。当内存块数据达到一高水位线时,生产者会进行等待,唤醒消费者拿任务,当内存块数据达到一低水位线时,消费者会等待,并且唤醒生产者生产任务。(条件变量)通过这个模型,可以解除生产者和消费者的强耦合问题。
生成者,消费者存在着3种关系。生产者和生产者之间是互斥的关系,消费者和消费者之间是互斥的关系,生产者和消费者之间是互斥和同步的关系。
对于生产者:
对于消费者:
关键的问题,在于生产者和消费者什么时候睡眠,又什么时候被唤醒,从哪里读取和写入,这就是生产者和消费者模型的关键。
什么时候睡眠和唤醒在上图已经演示,从哪里读取和写入的答案应该是“队列”。
所以生产者和消费者不直接相互通信,而是通过队列,队列就是这个模型可以解耦的关键。
C语言的队列
既然要学习队列,就要先学习C语言的队列相关知识:
队列的概念
只允许在一端进行插入数据操作,在另一端进行删除数据操作的特殊线性表,队列具有先进先出(FIFO)的属性。入队是从队尾添加数据,出队是从队头读取数据。
队列的实现
队列的实现可以使用数组或者是链表结构,相对而言链表的结构更优一些。
使用阻塞队列来实现生产者消费者的模型
在多线程编程中,阻塞队列是一种常用于实现生产者和消费者模型的数据结构。其普通队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入元素,当队列满的时候,往队列中存放元素的操作也会被阻塞,直到有元素从队列中取出。
队列的形式并不重要,这里采用环形队列:
实现思路
生产者和生产者之间互斥,消费者和消费者之间互斥
- 在生产和消费的时候需要定义两个互斥量,一个是生产者之间的,一个是消费者之间的。
生产者和消费者之间互斥且同步
- 定义一个互斥量,取数据的时候,不能放,放数据的时候,不能取
- 有两个条件,满和空,定义两个条件变量
代码展示(认真看注释!!)
#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <signal.h>
//定义阻塞队列
typedef struct BlockQueue { //使用了typedef给结构体起了一个别名“BlockQueue”
size_t cap;//容量
size_t size;//当前产品个数
int front;//队头游标
int tail;//队尾游标
int* data; //数据域,创建了一个数据类型为int的数组,data指向数组头的地址
pthread_mutex_t lock;
pthread_cond_t full;
pthread_cond_t free;
} BlockQueue;
BlockQueue* bqp;//定义全局方便信号处理时候销毁
//如果没有使用typedef,此处就应该为“struct BlockQueue *bqp;”
//此处定义指针的原因是,接下来有很多函数会修改结构体的参数,如果直接传结构体作为形参,那么修改的就是局部变量,只有传入指向结构体的指针才能方便的直接在函数内修改结构体参数
void DestroyBlockQueue(BlockQueue* bqp) { //销毁队列
free(bqp->data); //释放分配给指针的空间,防止资源无效占用
pthread_mutex_destroy(&bqp->lock);
pthread_cond_destroy(&bqp->full);
pthread_cond_destroy(&bqp->free);
}
void handler(int signo) { //自定义的信号处理函数,详见main的signal函数
printf("ByeBye\n");
DestroyBlockQueue(bqp);
exit(EXIT_SUCCESS);
}
BlockQueue* InitQueue(int n) { //初始化阻塞队列
BlockQueue* bqp = (BlockQueue*)malloc(sizeof(BlockQueue)); //使用malloc分配空间
bqp->cap = n; //容量为n
bqp->size = 0; //当前产品个数为0
bqp->front = bqp->tail = 0; //由于现在没有产品,所以队头和队尾的游标都是0
bqp->data = (int*)malloc(sizeof(int) * n); //给指向数据的指针分配空间,大小是“容量” 乘以 “int型变量大小”
pthread_mutex_init(&bqp->lock, NULL); //初始化互斥量
pthread_cond_init(&bqp->free, NULL); //初始化代表“队列为空”的条件变量
pthread_cond_init(&bqp->full, NULL);//初始化代表“队列已满”的条件变量
return bqp; //返回指向结构体的指针
}
int IsEmpty(BlockQueue* bqp) {//判断阻塞队列是否为空的函数
return bqp->size == 0; //返回值如果是1则代表容量是0,队列空;反之代表队列容量不为0,队列非空
}
int IsFull(BlockQueue* bqp) {//判断阻塞队列是否已满的函数
return bqp->size == bqp->cap; //返回值如果是1则代表当前产品个数=容量,队列满;反之代表队列未满
}
void WaitConsume(BlockQueue* bqp) {//消费被阻塞, 此时队列为空,等待队列有产品
pthread_cond_wait(&bqp->full, &bqp->lock);
}
void WaitProduct(BlockQueue* bqp) {//生产被阻塞, 此时队列已满,等待队列有空位
pthread_cond_wait(&bqp->free, &bqp->lock);
}
void NotifyConsume(BlockQueue* bqp) {//通知消费, 队列中有产品了
pthread_cond_signal(&bqp->full);
}
void NotifyProduct(BlockQueue* bqp) {//通知生产, 队列中有空位了
pthread_cond_signal(&bqp->free);
}
void Lock(BlockQueue* bqp) { //上锁
pthread_mutex_lock(&bqp->lock);
}
void Unlock(BlockQueue* bqp) { //解锁
pthread_mutex_unlock(&bqp->lock);
}
void Push(BlockQueue* bqp, int val) { //向队列中增加数据的函数,即生产的函数
Lock(bqp);//上锁
while (IsFull(bqp)) { //当队列已满的时候,不断执行以下代码,直到队列有空位出现
WaitProduct(bqp);//生产被阻塞, 此时队列已满,等待队列有空位
NotifyConsume(bqp);//不断催促消费,这样才可以使得队列有空位从而跳出循环
}
bqp->data[bqp->tail++] = val;//在data数组的尾部增加一个元素,并把队尾游标加一
bqp->tail %= bqp->cap;//bqp->tail = bqp->tail % bqp->cap,如果队尾的游标大小没到容量大小就保持不变,超出则取余
//目的就是让队尾游标数值在超出容量数值的时候归0重新覆盖写
//Unlock(bqp);//解锁
bqp->size += 1;//当前产品数量加一
NotifyConsume(bqp);//有产品了通知消费
Unlock(bqp);//解锁
}
void Pop(BlockQueue* bqp, int* popval) { //从队列中取出数据的函数,即消费的函数
Lock(bqp);//上锁
while (IsEmpty(bqp)) { //当队列为空的时候,不断执行以下代码,直到队列不为空
WaitConsume(bqp);//消费被阻塞, 此时队列为空,等待队列有产品
NotifyProduct(bqp);//不断催促生产,这样才可以使得队列有产品(非空)从而跳出循环
}
*popval = bqp->data[bqp->front++];//从data数组的头部读取一个消息,并把队头游标加一
bqp->front %= bqp->cap; //bqp->front = bqp->front % bqp->cap,如果队头的游标大小没到容量大小就保持不变,超出则取余
//目的就是让队头游标数值在超出容量数值的时候归0从头重新读
//Unlock(bqp);//解锁
bqp->size -= 1;//当前产品数量减一
NotifyProduct(bqp);//有空位了通知生产
Unlock(bqp);//解锁
}
void* ConsumeRoutine(void* args) {//消费者线程执行函数,所有消费者共用这个函数
BlockQueue* bqp = (BlockQueue*)args; //此时的线程参数是一个包装好的结构体,在代码头已定义
int popval = 0;
for ( ;; ) { //相当于一个while(1)
Pop(bqp, &popval);//消费的函数,消费一个队头的数据
printf("PopVal is %d, and has %ld Products\n", popval, bqp->size); //bqp结构体中的size成员的类型是size_t,在系统中对size_t的定义是无符号长整形,要用%ld表示
sleep(rand() % 3);//rand() % 3代表随机取一个0~2的整数,即随机睡眠0~2秒,,随机数种子在main中定义
}
return (void*)0;
}
void* ProductRoutine(void* args) {//生产者线程执行函数,所有生产者共用这个函数
BlockQueue* bqp = (BlockQueue*)args;
int pushval = 0;
for ( ;; ) { //相当于一个while(1)
pushval = rand() % 1024;//准备放入队列的数据(产品), 是一个0~1023的随机整数,随机数种子在main中定义
Push(bqp, pushval);//生产的函数,将一个产品塞入队尾(生产一个产品)
printf("PushVal is %d, and has %ld Products\n", pushval, bqp->size); //bqp结构体中的size成员的类型是size_t,在系统中对size_t的定义是无符号长整形,要用%ld表示
sleep(rand() % 3); //rand() % 3代表随机取一个0~2的整数,即随机睡眠0~2秒,,随机数种子在main中定义
}
return (void*)0;
}
int main() {
signal(SIGINT, handler);//当键盘输入“CTRL+C”时,触发SIGINT信号,跳转到自定义的handler函数(信号相关概念)
srand((unsigned int)time(NULL)); //使用“(unsigned int)time(NULL)”作为生成随机数的种子
bqp = InitQueue(30); //初始化并赋值给结构体bqp,设定容量为30
pthread_t consume1, consume2, product1, product2; //定义并创建4个线程
pthread_create(&product1, NULL, ProductRoutine, (void*)bqp);//2个生产者使用生产者共用函数作为启动函数
pthread_create(&product2, NULL, ProductRoutine, (void*)bqp);
pthread_create(&consume1, NULL, ConsumeRoutine, (void*)bqp);//2个消费者使用消费者共用函数作为启动函数
pthread_create(&consume2, NULL, ConsumeRoutine, (void*)bqp);
pthread_join(product1, NULL);//4个线程等待退出
pthread_join(product2, NULL);
pthread_join(consume1, NULL);
pthread_join(consume2, NULL);
return 0;
}