目录
生产者消费者模型
什么是生产者消费者模型
为什么要使用生产者消费者模型
生产者消费者模型的优点
为什么生产者和生产者要互斥?
为什么消费者和消费者要互斥?
为什么生产者和消费者既是互斥又是同步?
基于BlockingQueue的生产者消费者模型
BlockingQueue
阻塞队列实现
使用阻塞队列实现单生产单消费模型
POSIX信号量
信号量的PV操作
信号量的操作
初始化信号量
销毁信号量
等待信号量(P操作)
发布信号量(V操作)
基于环形队列的生产者消费者模型
环形队列实现
使用环形队列实现单生产单消费模型
生产者消费者模型
什么是生产者消费者模型
生产者消费者模型是一种用于解决多线程或多进程间协作的经典问题。在这个模型中,有两种角色:生产者和消费者。生产者负责生成数据,并将其放入共享的缓冲区中,而消费者则负责从缓冲区中取出数据进行处理。这种模型的目标是保持生产者和消费者之间的同步,以避免生产者试图向已满的缓冲区添加数据,或消费者试图从空缓冲区中获取数据。
为什么要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型的优点
- 解耦
- 支持并发
- 支持忙闲不均
总结:
- 三种关系
- 生产者和生产者(互斥)
- 消费者和消费者(互斥)
- 消费者和生产者 (互斥&&同步)
- 两种角色
- 生产者
- 消费者
- 一个交易场所——内存空间
为什么生产者和生产者要互斥?
我们可以这样来理解:当含有很多个生产者的时候,第一个生产者在第一个内存中放一个数据,第二个生产者也在第一个内存中放一个数据,覆盖第一个生产者在内存中的数据,两个生产者会产生竞争关系;那么会造成第一个内存中数据的不一致性;所以要互斥。
为什么消费者和消费者要互斥?
当只含有一个数据时候,来了两个消费者,这两个消费者也是竞争关系;因此要互斥;
为什么生产者和消费者既是互斥又是同步?
当一个生产者在一个内存中放一个数据,消费者瞬间就会拿取这两个数据,那么生产者到底放没放数据?两者竞争关系;当内存中没数据的时候,消费者是拿不到数据的,因此要生产者放入数据消费者才可以拿取数据,具有一定的顺序性,因此又是同步的。
基于BlockingQueue的生产者消费者模型
BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
阻塞队列实现
#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
const int defauletcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(int cap = defauletcap) : _capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
bool IsFull()
{
return _q.size() == _capacity;
}
void Push(const T &in)
{
pthread_mutex_lock(&_mutex); // 每个关系都是互斥的
while (IsFull())
{
// 满了
// 阻塞等待
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
// 有数据了通知消费者取数据
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
bool IsEmpty()
{
return _q.size() == 0;
}
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
while (IsEmpty())
{
// 空了
// 阻塞等待
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front();
_q.pop();
// 取了一个数据通知生产者生产数据
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex; // 每个关系都是互斥的,同一时刻只能有一个线程访问数据
pthread_cond_t _p_cond; // 生产者的环境变量
pthread_cond_t _c_cond; // 消费者的环境变量
};
使用阻塞队列实现单生产单消费模型
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include"BlockQueue.hpp"
void* consumer(void* args)
{
BlockQueue<int> *bp = static_cast<BlockQueue<int>*>(args);
while(true)
{
//消费者先休息,生产者瞬间打满队列
sleep(1);
int data=0;
bp->Pop(&data);
std::cout<<"consumer data:"<<data<<std::endl;
}
return nullptr;
}
void* productor(void* args)
{
BlockQueue<int> *bp = static_cast<BlockQueue<int>*>(args);
while(true)
{
int data = rand()%10+1;
bp->Push(data);
std::cout<<"productor data:"<<data<<std::endl;
}
return nullptr;
}
int main()
{
srand(uint16_t(time(nullptr)^getpid()*pthread_self()));
BlockQueue<int> *bp = new BlockQueue<int>();
pthread_t c,p;
pthread_create(&c,nullptr,consumer,bp);
pthread_create(&p,nullptr,productor,bp);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
注:
- 生产者和消费者谁先调度我们不清楚,是由调度算法和CPU来决定;因为消费者线程先运行由于队列中没有数据会被阻塞。但是可以确定的是一定是生产者进入阻塞队列填充数据;有了数据以后消费者才可以从阻塞队列中取数据。
- 生产者消费者模型的高效并不体现在交易场所,因为交易产所不论是生产者还是消费者只能有一个线程进入,高效率而是体现在生产数据并发和获得处理数据并发;
上面的代码只需要创建一个生产者数组和消费者数组即可实现多生产者多消费者模型
POSIX信号量
当我们在上面使用阻塞队列实现生产者消费者模型的时候,虽然是多生产者但是在内存中放数据的时候只能有一个线程必须先申请锁最后在释放锁,因此每个线程放数据是串行的。这样会降低效率,我们可以使用信号量解决这个问题。
信号量本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理
- 每个执行流在进入临界区之前都应先申请信号量,申请成功就有了操作临界资源的权限,当操作完毕后就应该释放信号量
信号量的PV操作
信号量的P操作:将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一
信号量的V操作:释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一
注:
- 信号量的PV操作时原子的
- 信号量申请失败会被挂起等待
- 信号量并不仅仅是一个计数器,还包括一个等待队列
信号量的操作
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量(P操作)
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量(V操作)
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
基于环形队列的生产者消费者模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
总结:
- 当消费者阻塞的时候,生产者疯狂生产时,生产者不能把消费者套圈。
- 当生产者阻塞的时候,消费者疯狂消费时,消费者不能超过生产者,必须在生产者的后面。
- 生产者和消费者,只有两种情况会指向同一个位置,其他情况根本就不会指向同一个位置。
- 为空(只能让生产者跑)
- 为满(只能让消费者跑)
- 空间资源不够时,生产者不在生产;数据资源不够时,消费者不在消费。
上面的是单生产和单消费,不能直接改为多生产和多消费时,因为读写位置的坐标只有一个。为了保证这一点,我们可以加锁。
环形队列实现
#pragma noce
#include <iostream>
#include <vector>
#include <semaphore.h>
using namespace std;
const int defaultsize = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = defaultsize)
: _ringqueue(size), _size(size), _p_step(0), _c_step(0)
{
sem_init(&_space_sem, 0, size);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_p_lock,nullptr);
pthread_mutex_init(&_c_lock,nullptr);
}
void Push(const T &in)
{
// 生产
// P操作
// 申请空间资源
P(_space_sem);
//先申请信号量,再申请锁
pthread_mutex_lock(&_p_lock);
{
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size; // 防止越界
}
pthread_mutex_unlock(&_p_lock);
V(_data_sem); // 多生产一个数据
}
void Pop(T *out)
{
// 消费
// P操作
// 申请数据资源
P(_data_sem);
pthread_mutex_lock(&_c_lock);
{
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size; // 防止越界
}
pthread_mutex_unlock(&_c_lock);
V(_space_sem); // 多释放了一个空间
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_lock);
pthread_mutex_destroy(&_c_lock);
}
private:
std::vector<T> _ringqueue;
int _size;
int _p_step; // 生产者的位置
int _c_step; // 消费者的位置
sem_t _space_sem; // 空间信号量
sem_t _data_sem; // 数据信号量
pthread_mutex_t _p_lock;
pthread_mutex_t _c_lock;
};
使用环形队列实现单生产单消费模型
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include "RingQueue.hpp"
using namespace std;
void *Producter(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
int cnt = 100;
while (true)
{
rq->Push(cnt);
cout << "product done, data is :" << cnt << endl;
cnt--;
}
}
void *Consumer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
while (true)
{
sleep(1);
int data = 0;
rq->Pop(&data);
cout << "consumer done , data is :" << data << endl;
}
}
int main()
{
pthread_t c, p;
RingQueue<int> *rq = new RingQueue<int>();
pthread_create(&p, nullptr, Producter, rq);
pthread_create(&c, nullptr, Consumer, rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
今天对生产者消费者模型的分享到这就结束了,希望大家读完后有很大的收获,也可以在评论区点评文章中的内容和分享自己的看法;个人主页还有很多精彩的内容。您三连的支持就是我前进的动力,感谢大家的支持!!!