为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的
生产者消费者模型的特点(321原则)
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:
- 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
- 两种角色: 生产者和消费者。(通常由进程或线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)
生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
这里我用STL中的queue实现阻塞队列
#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>
#define MAX_NUM 5
template<class T>
class BlockQueue
{
public:
bool isfull()
{
if(_q.size()==MAX_NUM)
return true;
}
bool isempty()
{
return _q.empty();
}
public:
BlockQueue(int _cap = MAX_NUM)
:cap(_cap)
{
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&full,NULL);
pthread_cond_init(&empty,NULL);
}
void push(const T& data)//从阻塞队列里插入数据(生产者调用)
{
pthread_mutex_lock(&mutex);
while(isfull())
{
pthread_cond_wait(&full,&mutex);
}
_q.push(data);
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&empty);//唤醒在empty条件变量下等待的消费者线程
}
void pop(T& data)
{
pthread_mutex_lock(&mutex);
while(isempty())
{
pthread_cond_wait(&empty,&mutex);
}
data = _q.front();
_q.pop();
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&full);//唤醒在full条件变量下等待的生产者线程
}
~BlockQueue(){
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
}
private:
std::queue<T> _q;//阻塞队列
int cap;//阻塞队列最大的容器数据个数
pthread_mutex_t mutex;
pthread_cond_t full;
pthread_cond_t empty;
};
- 生产者和消费者共用同一个缓冲区
- 生产者在生产之前,都要判断一下,缓冲区也就是阻塞队列是否为满,如果满了,就不再生产,进入阻塞等待的状态等待消费者的唤醒信号
- 消费者在消费之前,都要判断一下,缓冲区是否为空,如果为空,就停止对缓冲区的读取,进入阻塞状态,等待生产者生产后唤醒消费者
- 阻塞队列是会被生产者和消费者同时访问的临界资源,因此我们需要用一把互斥锁将其保护起来。
- 这里的数据,我用一个类封装起来,如果可以,类里面可以有很多的数据,方便不同的生产者生产不同的数据信息,不同的消费者也可以选择自己需要的信息(个人想法)
#include"producer.hpp"
class num
{
public:
num(){
}
num(int _number)
:number(_number)
{}
int getnum()
{
return number;
}
~num(){}
private:
int number;
};
void* producer(void* args)
{
BlockQueue<num>* q =(BlockQueue<num>*) args;
while(1)
{
sleep(1);
num a(rand() % 100 + 1);
q->push(a); //生产数据
std::cout << "Producer: " <<a.getnum()<< std::endl;
}
}
void* consumer(void* args)
{
BlockQueue<num>* q =(BlockQueue<num>*) args;
while(1)
{
sleep(1);
num a;
q->pop(a);
std::cout << "Consumer: " << a.getnum() << std::endl; //消费数据
}
}
int main()
{
pthread_t pro, con;
BlockQueue<int>* bq = new BlockQueue<int>;
//创建生产者线程和消费者线程
pthread_create(&pro, nullptr, producer, (void*)bq);
pthread_create(&con, nullptr, consumer, (void*)bq);
//join生产者线程和消费者线程
pthread_join(pro, nullptr);
pthread_join(con, nullptr);
delete bq;
return 0;
}
生产者消费者步调一致
生产者的速度高于消费者的消费速度
这里可以观察到,由于生产者的速度很快,缓冲区很快就被生产者的数据填满了,但是到后面又变成同步生产消费了,这是因为生产者想要再进行生产就只能在full条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待。最后的同步速度最终会取决于消费者的消费速度
生产者的速度小于消费者的速度
开始阻塞队列中是没有数据的,因此消费者只能在empty条件变量下进行等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,所以生产者和消费者的步调就是一致的。