生产者消费者模型
文章目录
- 生产者消费者模型
- 概念
- 原则
- 优点
- 基于BlockingQueue的生产者消费者模型
- BlockingQueue
- 模拟实现单生产者消费者模型
- 基于计算任务和存储任务的生产者消费者模型
概念
- 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
- 生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取
- 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的
原则
实际上生产者消费者模型本质上是维护321原则
生产者消费者模型是多线程互斥与同步的经典场景,通常有以下原则:
- 三种关系:生产者与生产者(互斥关系)、消费者与消费者(互斥关系)、生产者与消费者(互斥和同步关系)
- 两种角色:生产者和消费者(通常由线程或进程充当)
- 一个交易场所:通常指的是一个特定的缓冲区,临时保存数据的场所
- 在生产者之间或消费者之间,阻塞队列作为公共资源要被多个线程访问,那么就要被互斥量保护起来即作为临界资源。而互斥锁需要被多个线程竞争式申请,对于生产者而言,一次只允许一个生产者访问临界资源即生产者之间具有互斥关系;对于消费者而言,一次只允许一个消费者访问临界资源即消费者之间具有互斥关系;
- 若生产者一直生产,直到缓冲区满了则生产失败,而消费者一直消费,直到缓冲区为空则消费失败。一边一直占用锁导致另一边的饥饿问题,这是非常低效的。因此在生产者和消费者之间,阻塞队列也需要被互斥量保护起来即是临界资源,生产者和消费者不能同时访问即二者具有互斥关系,而生产者和消费者需要具有一定的顺序访问即具有同步关系
优点
- 解耦
- 支持并发
- 支持忙先不均
- 我们在主函数调用目标函数实际上是强耦合,主函数将参数传参给目标函数,主函数传递了数据作为生产者,形成变量即变量暂时保存了数据,目标函数将该变量进行操作并返回,即目标函数消费了数据作为消费者,而主函数需要等待接收目标函数的返回值才能往下执行,因此主函数和目标函数是强耦合关系。
- 而生产者消费者模型中,生产者向缓冲区中生产数据,若缓冲区没满则可以一直生产;消费者可以一直从缓冲区里取数据,若缓冲区不为空则可以一直消费;那么生产者和消费者就可以并发执行,即生产者消费者模型是对强耦合关系的解耦。
基于BlockingQueue的生产者消费者模型
BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
- 其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
模拟实现单生产者消费者模型
为了方便理解,下面以单生产者消费者为例子
main.cc
#include<iostream>
#include<pthread.h>
#include<assert.h>
#include<queue>
#include<stdlib.h>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include"blockqueue.hpp"
using namespace std;
void* producter(void* args)
{
blockqueue<int> * _pbq=static_cast<blockqueue<int>*>(args);
int num=0;
while(true)
{
num=rand()%200+1;//设置随机数据
_pbq->push(num);//生产者放入数据
cout<<"producter push num: "<<num<<" in bq"<<endl;
sleep(1);
}
return nullptr;
}
void* consumer(void* args)
{
blockqueue<int> * _cbq=static_cast<blockqueue<int>*>(args);
int ret=0;
while(true)
{ _cbq->pop(&ret);//消费者取出数据
cout<<"consumer take num: "<<ret<<" from bq"<<endl;
// sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
blockqueue<int>* _bq= new blockqueue<int>();
pthread_t p,c;//
int n=pthread_create(&p,nullptr,producter,(void*)_bq);//生产者线程
assert(n==0);
int m=pthread_create(&p,nullptr,consumer,(void*)_bq);//消费者线程
assert(m==0);
pthread_join(p,nullptr);//回收生产者线程
pthread_join(c,nullptr);//回收消费者线程
delete _bq;//删除队列
return 0;
}
blockqueue.hpp(以.hpp开头的文件可以将定义和实现放一起,调用者只需要调用该文件即可)
#include<iostream>
#include<pthread.h>
#include<assert.h>
#include<queue>
using namespace std;
#define GMAXCAP 5//宏定义阻塞队列的最大容量
template<class T>
class blockqueue
{
public:
blockqueue()//构造
:_maxcap(GMAXCAP)
{
pthread_mutex_init(&_mut,nullptr);//初始化互斥锁
pthread_cond_init(&_pcond,nullptr);//初始化生产者的条件变量
pthread_cond_init(&_ccond,nullptr);//初始化消费者的条件变量
}
void push(T&in)//输入型参数用&
{
pthread_mutex_lock(&_mut);//加锁
while(is_full())
{
pthread_cond_wait(&_pcond,&_mut);//若队列为满,生产者需要阻塞等待,直到队列不为满才能放数据
}
//走到这就放数据
_q.push(in);//往队列中放数据
pthread_mutex_unlock(&_mut);//解锁
pthread_cond_signal(&_ccond);//唤醒消费者线程
}
void pop(T* out)//输出型参数用*。输入输出型参数用&
{
pthread_mutex_lock(&_mut);//加锁
while(is_empty())//队列为空消费者就需要阻塞等待,直到队列中至少存在一个数据
{
pthread_cond_wait(&_ccond,&_mut);
}
//走到这可以取数据
*out=_q.front();
_q.pop();
pthread_mutex_unlock(&_mut);//解锁
pthread_cond_signal(&_pcond);//唤醒生产者线程
}
~blockqueue()
{
pthread_mutex_destroy(&_mut);//释放互斥锁
pthread_cond_destroy(&_pcond);//释放生产者条件变量
pthread_cond_destroy(&_ccond);//释放消费者生产变量
}
bool is_full()
{
return _q.size()==_maxcap;//判断队列是否为满
}
bool is_empty()
{
return _q.size()==0;//判断队列是否为空
}
private:
queue<T> _q;//队列
int _maxcap;//队列中的最大容量
pthread_mutex_t _mut;//互斥锁
pthread_cond_t _pcond;//生产者的条件变量
pthread_cond_t _ccond;//消费者的条件变量
};
- 阻塞队列要给生产者往里push数据,让消费者从中pop数据,那么该阻塞队列需要被这两个线程所看到。因此在创建生产者线程和消费者线程时,需要将阻塞队列作为参数传参给者两个线程
- 生产者负责生产随机数并往阻塞队列中push,push成功并打印日志;消费者负责从阻塞队列中拿取数据,拿取成功并打印日志
- 我们实现的是单生产者单消费者模型,需要维护生产者和消费者之间的互斥和同步关系
- blockqueue队列存储数据的上限为5,当队列中存储了5组数据后生产者将会阻塞不能生产
- 生产者线程在判断is_full的时候用的是while而if,理由如下:(消费者线程相同)
- 无论是生产者线程还是消费者线程都是先加锁再进行判断是否满足条件。以生产者线程来说,若判断is_full是用的if,那么有可能函数
pthread_wait
调用失败,那么继续往后走就会出问题;其次是在多生产者的情况下,是可能唤醒生产者线程的函数是pthread_cond_broadcast
,那么情况是唤醒全部的生产者线程,其实待唤醒的线程就一个,就会导致伪唤醒而造成多个线程进入临界区的情况。为了避免以上情况,我们又需要先加锁再判断满足条件,就需要用到while来判断
- 当生产者push一个数据后就意味着阻塞队列中至少有一个数据,若此时消费者线程在is_empty里面阻塞等待的话,生产者线程就会唤醒消费者线程醒来;相同的当消费者线程pop一个数据后意味着阻塞队列中至少有一个空间,消费者线程就会唤醒生产者线程醒来。
- 生产者生产地慢,消费者消费地块
- 生产者每生产一个数据,往队列中push,消费者就从队列里拿一个数据并pop,呈现出生产者线程和消费者线程交错执行
- 生产者生产地块,消费者消费地慢
- 可以看到先是生产者生产了五个数据并push进阻塞队列,此时阻塞队列满了生产者生产失败需要消费者去pop数据,然后消费者消费一个数据,队列中有了空位,生产者生产一个数据并push,然后呈现出这两个线程交替执行
基于计算任务和存储任务的生产者消费者模型
- 根据图可以看到,我们维护的还是单线程阻塞队列,比之前的模型多了一个阻塞队列和线程
blockqueue.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include<assert.h>
#include<queue>
using namespace std;
#define GMAXCAP 5//宏定义阻塞队列的最大容量
template<class T>
class blockqueue
{
public:
blockqueue()//构造
:_maxcap(GMAXCAP)
{
pthread_mutex_init(&_mut,nullptr);//初始化互斥锁
pthread_cond_init(&_pcond,nullptr);//初始化生产者的条件变量
pthread_cond_init(&_ccond,nullptr);//初始化消费者的条件变量
}
void push(T&in)//输入型参数用&
{
pthread_mutex_lock(&_mut);//加锁
while(is_full())
{
pthread_cond_wait(&_pcond,&_mut);//若队列为满,生产者需要阻塞等待,直到队列不为满才能放数据
}
//走到这就放数据
_q.push(in);//往队列中放数据
pthread_mutex_unlock(&_mut);//解锁
pthread_cond_signal(&_ccond);//唤醒消费者线程
}
void pop(T* out)//输出型参数用*。输入输出型参数用&
{
pthread_mutex_lock(&_mut);//加锁
while(is_empty())//队列为空消费者就需要阻塞等待,直到队列中至少存在一个数据
{
pthread_cond_wait(&_ccond,&_mut);
}
//走到这可以取数据
*out=_q.front();
_q.pop();
pthread_mutex_unlock(&_mut);//解锁
pthread_cond_signal(&_pcond);//唤醒生产者线程
}
~blockqueue()
{
pthread_mutex_destroy(&_mut);//释放互斥锁
pthread_cond_destroy(&_pcond);//释放生产者条件变量
pthread_cond_destroy(&_ccond);//释放消费者生产变量
}
bool is_full()
{
return _q.size()==_maxcap;//判断队列是否为满
}
bool is_empty()
{
return _q.size()==0;//判断队列是否为空
}
private:
queue<T> _q;//队列
int _maxcap;//队列中的最大容量
pthread_mutex_t _mut;//互斥锁
pthread_cond_t _pcond;//生产者的条件变量
pthread_cond_t _ccond;//消费者的条件变量
};
- 可以看到这里的阻塞队列的.hpp文件和之前的单生产者单消费者模型的文件是一样的
main.cc
#include<iostream>
#include<pthread.h>
#include<assert.h>
#include<queue>
#include<stdlib.h>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include"blockqueue.hpp"
#include"task.hpp"
using namespace std;
int mymath(int x,int y,char op)//计算任务需要调用的函数即计算四则运算并返回结果
{
int ret=0;
switch(op)
{
case '+':
ret=x+y;
break;
case '-':
ret=x-y;
break;
case '*':
ret=x*y;
break;
case '/':
if(y==0)
{
cerr<<"div zero erro!"<<endl;
ret=-1;
}else
{
ret=x/y;
}
break;
case '%':
if(y==0)
{
cerr<<"div zero erro!"<<endl;
ret=-1;
}else
{
ret=x%y;
}
break;
default:
//do nothing
break;
}
return ret;
}
template<class C,class S>//C-calculate,S-save
class blockqueues
{
public:
blockqueue<C>* c_bq;//计算队列
blockqueue<S>* s_bq;//存储队列
};
const string Goper="+-*/%";//运算符集合
void* producter(void* args)//生产者
{
blockqueue<Caltask> * _cbq=(static_cast<blockqueues<Caltask,SaveTask>*>(args))->c_bq;
while(true)
{
int x=rand()%200+1;//设置随机数据
int y=rand()%100;//设置随机数据
int num=rand()%Goper.size();//随机运算符
Caltask cal(x,y,Goper[num],mymath);//创建计算任务对象
_cbq->push(cal);//生产者放入数据
cout<<"producter push num: "<<cal.taskstringforP()<<" in bq"<<endl;
sleep(1);
}
return nullptr;
}
void* consumer(void* args)//消费者
{
blockqueue<Caltask> * _cbq=(static_cast<blockqueues<Caltask,SaveTask>*>(args))->c_bq;//取出传过来的参数中的c_bq队列
blockqueue<SaveTask>* _sbq=(static_cast<blockqueues<Caltask,SaveTask>*>(args))->s_bq;//取出传过来的参数中的s_bq队列
while(true)
{
Caltask ret;
_cbq->pop(&ret);//消费者取出数据
cout<<"consumer take savetask: "<<ret()<<" from bq"<<endl;
string result=ret();
SaveTask sat(result,tosave);
_sbq->push(sat);
cout<<"consumer push savetask to bq"<<endl;
// sleep(1);
}
return nullptr;
}
void* saver(void* args)
{
blockqueue<SaveTask>* _sbq=(static_cast<blockqueues<Caltask,SaveTask>*>(args))->s_bq;
while(true)
{
SaveTask ret;
_sbq->pop(&ret);//saver取出数据
ret();//调用仿函数
cout<<"saver finish task"<<endl;
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
blockqueues<Caltask,SaveTask> _bqs;
_bqs.c_bq=new blockqueue<Caltask>();//创建计算队列
_bqs.s_bq=new blockqueue<SaveTask>();//创建存储队列
pthread_t p,c,s;
int n=pthread_create(&p,nullptr,producter,&_bqs);//生产者线程
assert(n==0);
int m=pthread_create(&c,nullptr,consumer,&_bqs);//消费者线程
assert(m==0);
int k=pthread_create(&s,nullptr,saver,&_bqs);//存储线程
assert(k==0);
pthread_join(p,nullptr);//回收生产者线程
pthread_join(c,nullptr);//回收消费者线程
pthread_join(s,nullptr);//回收消费者线程
delete _bqs.c_bq;//删除队列
delete _bqs.s_bq;//删除队列
return 0;
}
- 创建一个blockqueues对象,里面有两个成员分别是计算任务对应的队列c_bq和存储任务对应的队列s_bq
- delete对象时,不能直接删除blockqueues对象,这样会造成内存泄漏,需要删除里面的成员即队列
- 生产者负责生产随机参数一、随机参数二、随机运算符将这些参数传递给Caltask对象,然后将Caltask放入到阻塞队列c_bq中
- 消费者负责从队列中取出Caltask对象并调用对象的仿函数进行运算并返回运算式字符串加上打印运算式,然后将运算式字符串放入s_bq中,并且打印日志
- saver负责将运算式字符串从s_bq中取出,然后将运算式字符串存储到当前路径的log.txt文件中,并且打印日志
- 还需注意的是,这个模型的节奏是按照生产者来的,生产者隔一秒生产一个计算任务,然后消费者消费一次,saver存储一次,即生产者生产的慢,消费者消费的快从而saver存储的快
task.hpp
#pragma once
#include<stdlib.h>
#include<functional>
using namespace std;
class Caltask
{
typedef function<int(int,int,char)> fun_c;
public:
Caltask(){}//无参构造
Caltask(int x,int y,char op,fun_c func)
:_x(x)
,_y(y)
,_op(op)
,_caltask(func)
{}
string operator()()//()运算符重载
{
int ret=_caltask(_x,_y,_op);//调用外部传进来的计算任务
char buffer[128];
snprintf(buffer,sizeof buffer,"%d %c %d =%d",_x,_op,_y,ret);
return buffer;
}
string taskstringforP()//供外部调用打印运算式字符串
{
char buffer[128];
snprintf(buffer,sizeof buffer,"%d %c %d =?",_x,_op,_y);
return buffer;
}
private:
int _x;//参数一
int _y;//参数二
char _op;//运算符号
fun_c _caltask;//需调用的外部计算函数
};
class SaveTask
{
typedef function<void(string)> fun_c;
public:
SaveTask(){}//默认构造
SaveTask( string & s,fun_c func)
:_str(s)
,_func(func)
{}
void operator()()
{
_func(_str);
}
private:
string _str;
fun_c _func;
};
void tosave(const string&s)
{
string target="./log.txt";//文件的路径
FILE*fp=fopen(target.c_str(),"a+");// 以追加的方式打开文件
if(!fp)//文件打开失败
{
cout<<"fopen error"<<endl;
}
fputs(s.c_str(),fp);//往文件里面写数据
fputs("\n",fp);// 往文件里面写换行符
fclose(fp);//关闭文件
}