⭐前言:本文会先后讲解生产消费者模型、条件变量和基于阻塞队列的生产消费者模型。
1.生产消费者模型
什么是生产消费者模型?
认识生产消费者模型
使用学生(消费者),超市,供货商(生产者)来举一个例子。
学生是典型的消费者,供货商是典型的生产者。假设学生有泡面、火腿肠、玩具等等的需求,而供货商会生产尽可能覆盖学生需求的商品,但是一般并不会直接卖给学生,而是供货给超市,从而在超市里做买卖。
如果是春节期间,或者是什么特殊时期,供货商暂停生产商品,但由于超市已经进了很大一批货,生产者停止生产压根不会影响到消费者消费。这里,就是生产与消费过程的互不干扰,称之为解耦。而超市作为大量进货的一方,一般不会进很少货,而是进很多,达到满足消费需求,因此可以看作是临时保存商品的地方,我们对应的是计算机中的缓冲区!
接下来举一个反例,即强耦合关系的生产消费者来加深理解->在我们平时写的代码中,比如实现一个加法函数Add和调用Add函数的main函数。在调用Add函数的时候,main函数是在等待阶段的,并且Add函数是会直接影响到main函数的。
因此,从上面例子来正确认识一下生产消费者模型:
超市(缓冲区)作为消费者消费的地方,生产者放资源的地方,它必须要让消费者和生产者都看到,即它是一份共享资源!这部分共享资源会涉及到多线程访问,那就必须被保护起来。
生产者和消费者是线程,顾名思义,作为生产者的线程用于向缓冲区存入数据,消费者线程向缓冲区拿数据。
因此,生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者消费者关系
生产者和消费者,都是对应着一个或多个线程。
生产者和生产者之间的关系是互斥关系,就好像金华火腿和双汇火腿不能同时摆放在一个货架上,否则会很乱,消费者就不好消费了,一定是先摆放好一种火腿,再去摆放好另外一种火腿。
消费者和消费者之间的关系也是互斥关系。很简单的一个例子:黄牛抢票。
消费者和生产者的关系也是有互斥关系。比如超市里面有一个展架,上面刚放了生产者提供的火腿,但是还没贴价格标签,但是消费者已经来想要消费了,很明显价格还没贴是不能消费先的,因此需要互斥关系,并且这能保证资源是安全的。
消费者和生产者的关系还有同步关系。超市会根据火腿的售卖量来决定入货的数量,即向消费者卖了多少火腿,就向生产者要多少火腿,然后火腿有货的时候,就会通知消费者来消费,缺火腿的时候,就会通知生产者进货,使其有顺序性。从而达到了消费者和生产者的同步与互斥关系。
总结:“321”原则
3种关系:①生产者和生产者的互斥关系。②消费者和消费者的互斥关系。③生产者和消费者的互斥(互斥是为了保证共享资源的安全性)、同步关系。
2种角色:生产者线程和消费者线程。
1个交易场所:一段特定结构的缓冲区。
生产消费者模型的优点
解耦 支持并发 支持忙闲不均
如何维护生产消费者模型中生产者和消费者的同步关系?
现在我们有了消费者线程,也有了生产者线程,并且它们都是线程安全的,加了互斥锁。
生产者线程在生产数据加入到缓冲区中,是对消费者线程互斥的,也就是说生产者线程在生产的时候,会对缓冲区加锁,只有将生产后的数据加入到缓冲区中,并且解锁之后,理论上消费者才可以去消费。但是生产者的优先级高,即使缓冲区中已经满了,但是生产者依旧不断地加锁----到缓冲区中判断是否满了,如果没满,则放入数据,如果满了则解锁离开。离开之后,又有生产者线程来了......这样就导致消费者线程永远进不来。这就只保证了生产者和消费者之间的互斥,保证了共享资源的安全性,但是没有维护好两者的同步关系!
因此我们需要引入条件变量来维护同步关系!
2.条件变量
什么是条件变量
条件变量就是一种变量,它包含了条件变量的状态和队列的指针,它可以链接不满足条件,需要等待的线程。待线程满足条件,就可以将其唤醒拿出来,放到CPU上等待队列。
这里我们可以看到,所有的线程都必须要看到这个条件变量,因此条件变量需要定义为全局的。
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,不能让它申请锁了,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
用条件变量的逻辑为:
加锁----->判断生产和消费条件是否满足----->解锁。其中判断条件也是访问临界资源的一种,需要保护起来。当条件满足,则做相应的事,如果不满足,则解锁之后,不再申请锁,并且将自己阻塞挂起等待。
条件变量函数
条件变量的类型:
pthread_cond_t
①初始化
int pthread_cond_init(pthread_cond_t* restrict cond, const pthread_condattr_t* restrict attr);
参数:
cond:要初始化的条件变量
attr:NULL
返回值:成功返回0,失败返回错误码。
②销毁
int pthread_cond_destroy(pthread_cond_t *cond)
③等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
④等待唤醒
//唤醒一批线程
int pthread_cond_broadcast(pthread_cond_t *cond);
//唤醒一个线程
int pthread_cond_signal(pthread_cond_t *cond);
下面通过条件变量的简单案例,来看看条件变量是如何工作的。
通过创建两个线程,让两个线程交替执行去抢票,通过条件变量和互斥锁,让两个线程有顺序地执行。两个线程执行的时候,先会在条件变量中排队,等待主线程的唤醒,依次执行。
#include<iostream>
#include<string>
#include<pthread.h>
#include<unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量
int tickets = 1000;
void* start_routine(void *args)
{
std::string name = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&mutex);//加锁
//每一个线程进来后先等等
pthread_cond_wait(&cond,&mutex);
//if(tickets)
std::cout<<name<<"-> "<<tickets<<std::endl;
tickets--;
pthread_mutex_unlock(&mutex);//解锁
}
}
int main()
{
//通过条件变量控制线程的执行
//创建两个线程,让两个线程交替执行
pthread_t t1,t2;
pthread_create(&t1,nullptr,start_routine,(void*)"thread 1");
pthread_create(&t2,nullptr,start_routine,(void*)"thread 2");
while(true)
{
//主线程,控制节奏,在其它线程在条件变量的等待的时候,一次唤醒一个线程进行运行
sleep(2);
//每隔2秒唤醒一下
pthread_cond_signal(&cond);
//预期每一个线程在执行的时候,一定会有非常强的顺序性
std::cout<<"main thread wakeup one thread..."<<std::endl;
}
//线程等待
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
return 0;
}
可以看到结果,顺序性很强,都是1->2->1->2这样的执行顺序。
我们再创建多个线程,跟上面一样,让多个线程在执行的时候,先在条件变量中排队,等待主线程的唤醒,然后依次执行。
这里的唤醒是使用pthread_cond_signal,每次唤醒一个线程。
#include<iostream>
#include<string>
#include<pthread.h>
#include<unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量
int tickets = 1000;
void* start_routine(void *args)
{
std::string name = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&mutex);//加锁
//每一个线程进来后先等等
pthread_cond_wait(&cond,&mutex);
std::cout<<name<<"-> "<<tickets<<std::endl;
tickets--;
pthread_mutex_unlock(&mutex);//解锁
}
}
int main()
{
//通过条件变量控制线程的执行
//创建多个线程,让多个线程交替执行
pthread_t t[5];
for(int i = 0;i<5;i++)
{
char *name = new char[64];
snprintf(name,64,"thread %d",i+1);
pthread_create(t+i,nullptr,start_routine,name);
}
while(true)
{
//主线程,控制节奏,在其它线程在条件变量的等待的时候,一次唤醒一个线程进行运行
sleep(1);
//每隔1秒唤醒一下
pthread_cond_signal(&cond);
//预期每一个线程在执行的时候,一定会有非常强的顺序性
std::cout<<"main thread wakeup one thread..."<<std::endl;
}
for(int i = 0;i < 5;++i)
{
pthread_join(t[i],nullptr);
}
return 0;
}
使用pthread_cond_broadcast,一次唤醒一批线程。
//pthread_cond_signal(&cond);
pthread_cond_broadcast(&cond);
3.基于阻塞队列的生产消费者模型
终于,我们知道了什么叫做生产消费者模型,什么是条件变量,并且简单了解了条件变量如何去使用了。
现在,我们左手一个生产消费者模型,右手一个条件变量和互斥量,是时候徒手搓氢弹....呃....基于阻塞队列的生产消费者模型了!
什么是基于阻塞队列的生产消费者模型?
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
C++queue模拟阻塞队列的生产消费者模型
以单生产者,单消费者,来进行学习。
头文件BlockQueue.hpp:
#pragma once
#include <iostream>
#include<queue>
#include<pthread.h>
using namespace std;
const int gmaxcap = 5;//设置默认缺省值为5,方便进行测试
template<class T>
class BlockQueue
{
public:
BlockQueue(const int &_maxcap = gmaxcap)
:_maxcap(gmaxcap)
{
pthread_mutex_init(&_mutex,nullptr);//初始化锁
pthread_cond_init(&_pcond,nullptr);//初始化条件变量
pthread_cond_init(&_ccond,nullptr);
}
//放数据
void push(const T &in)//输入型参数,一般用const 和引用&修饰
{
//线程进来先加锁,生产者
pthread_mutex_lock(&_mutex);
//在放入数据的前,需要判断阻塞队列是否满了
if(is_full())
{
//满了不能生产,那就去对应的条件变量中等待
pthread_cond_wait(&_pcond,&_mutex);
}
//没满,就放入数据
_q.push(in);
//此时我们可以绝对保证,阻塞队列里面一定有数据,至少有一个
//此时可以让消费者来消费,即唤醒消费者
pthread_cond_signal(&_ccond);//这里可以加一定的策略,待队列的元素超过三分之一再唤醒消费者
//最后解锁
pthread_mutex_unlock(&_mutex);
}
//拿数据
void pop(T *out)//输出型参数,一般用*
{
pthread_mutex_lock(&_mutex);//消费者和生产者用的是同一把锁,因为必须让生产者和消费者互斥
//判断队列是否空的,如果空,那就等待
if(is_empty())
{
pthread_cond_wait(&_ccond,&_mutex);
}
//走到这里,我们能保证一定不为空
//拿数据
*out =_q.front();
_q.pop();
//绝对能保证,阻塞队列里面至少有一个空的位置
//此时可以将生产者唤醒,去生产
pthread_cond_signal(&_pcond);//这里也可以有一定的策略
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
//判断阻队列是否为空
bool is_empty()
{
return _q.empty();
}
//判断阻队列是否满了
bool is_full()
{
return _q.size()==_maxcap;
}
private:
std::queue<T> _q;
int _maxcap;//阻塞队列中最多能有多少个元素
//_q这个队列本身不是线程安全的,因此我们保护起来
pthread_mutex_t _mutex;
//当队列满了之后,生产者进入休眠,队列空,消费者进入休眠,因此定义条件变量
pthread_cond_t _pcond;//生产者对应的条件变量
pthread_cond_t _ccond;//消费者对应的条件变量
};
①生产速度比消费速度慢,那么会出现生产一个,消费一个这样的情况,因为会阻塞嘛。
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
using namespace std;
#include "BlockQueue.hpp"
void* consumer(void *_bq)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(_bq);
while(true)
{
//消费活动
int data;
bq->pop(&data);
std::cout<<"消费数据: "<<data<<std::endl;
}
return nullptr;
}
void* productor(void *_bq)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(_bq);
while(true)
{
//生产活动
int data = rand()%10 +1;//1到10的数据
bq->push(data);
std::cout<<"生产数据: "<<data<<std::endl;
sleep(1);//让生产速度比消费速度慢一点
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());//随机生成数据据
BlockQueue<int>* bq = new BlockQueue<int>();//阻塞队列
pthread_t c,p;//消费者和生产者线程
pthread_create(&c,nullptr,consumer,bq);//消费者
pthread_create(&p,nullptr,productor,bq);//生产者
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
②生产速度比消费速度快,那么一开始会生产出好几个,然后会生产一下,消费一下。接着我们可以通过结果看出,消费的都是先生产出来的。
通过上面的代码和结果,我们很明显地感受到了生产者和消费者之间的协同,也就是同步了!
接下来我们完善这一份代码和一些细节的说明:
⭐细节1:我们在放入数据或拿数据的时候,是添加了互斥锁的!也就是说,线程在拿到锁后,进入等待的时候,是拿着锁一起等待的!那么这就意味着,锁被拿走了,谁也无法进入这个临界区了!
可事实上并不是这样,因为pthread_cond_wait这个函数的第二个参数,必须是我们正在使用的互斥锁,该函数会以原子性的方式将锁释放,并且将自己(线程)挂起,在被唤醒返回的时候,会自动重新获取我们传入的锁。
⭐细节2:如果生产者有10个,此时的队列是满的,只有一个消费者。那么消费者在消费后,阻塞队列就有一个位置空了出来,如果我们让一批生产者唤,而空间只有1个,那么在放入数据的时候,就有很大的可能会放入很多个数据!因此,我们将放入数据的操作中的判空操作,从if语句改为while循环,生产者醒来之后,再判断一下队列是否为空或者为满!同样的道理,消费者拿数据也一样,将if改为while循环。
⭐细节3:pthread_cond_signal这个函数,可以放在临界区内部,也可以放在外部。
⭐细节4:上面的代码中,只有打印1,2,3,4这样的数据,实在有点捞,我们可以改一下代码,让生产者给消费者派发任务!
派发任务Task.hpp:
#pragma once
#include <iostream>
#include <functional>
using namespace std;
class Task
{
using func_t = std::function<int(int,int)>;
// typedef std::function<int(int,int)> func_t;
public:
Task()
{}
Task(int x, int y, func_t func):_x(x), _y(y), _callback(func)
{}
int operator()()
{
int result = _callback(_x, _y);
return result;
}
private:
int _x;
int _y;
func_t _callback;
};
修改后的代码:
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
using namespace std;
#include "BlockQueue.hpp"
#include "Task.hpp"
int myadd(int x,int y)
{
return x+y;
}
void* consumer(void *_bq)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(_bq);
while(true)
{
//消费活动
Task t;
bq->pop(&t);
std::cout<<"消费数据: "<<t()<<std::endl;
sleep(1);
}
return nullptr;
}
void* productor(void *_bq)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(_bq);
while(true)
{
//生产活动
int x = rand()%10 +1;//1到10的数据
int y = rand()%5 +1;//1到5
Task t(x,y,myadd);
bq->push(t);
//std::cout<<"生产数据: "<<data<<std::endl;
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());//随机生成数据据
BlockQueue<Task>* bq = new BlockQueue<Task>();//阻塞队列
pthread_t c,p;//消费者和生产者线程
pthread_create(&c,nullptr,consumer,bq);//消费者
pthread_create(&p,nullptr,productor,bq);//生产者
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}