文章目录
- 一、模式简介
- 二、头文件、全局变量
- 2.1 仓库类的设计
- 2.1.1 关于仓库类的分析
- 2.1.2 仓库类的设计代码
- 2.2 工厂类的设计
- 2.2.1 关于工厂类的分析
- 2.2.2 工厂类的设计代码
- a 将产品item放到仓库repo
- b 将产品item从仓库repo取出
- c 生产者操作
- d 消费者操作
- 2.2.3 主函数代码
- 三、运行效果和说明
一、模式简介
假设你有一个工厂Factory,配有一个仓库Repository,仓库大小为20,要生产200个产品,你有两个工人producer,三个受众群体consumer,应当如何描述这200个产品从生产到消费的全过程?
这一种生活中的问题在多线程程序设计上称为生产者消费者模式,形象的表示了多线程中数据获取、数据存储、数据处理的过程,关键点在于用互斥锁、条件变量等解决数据存取、同时存、同时取之间的冲突。
二、头文件、全局变量
引入头文件,按照问题描述,可以得到计划产品个数和仓库大小两个全局变量。
#include<iostream>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<queue>
using namespace std;
const int kProduceAimSize = 200; //计划产品个数
const int kRepositorySize = 20; //仓库大小
2.1 仓库类的设计
2.1.1 关于仓库类的分析
按照上上面提出的问题,我们大致能分析得到以下线索:
工厂描述了问题总体,是一个用于解决问题的类,核心在于用于数据交互的仓库类。
对于仓库的数据管理,应当设计三个互斥量,依次为:存取冲突互斥量、同时存的互斥量、同时取的互斥量。两个条件变量,依次表示为:仓库状态为空、仓库状态为满。
2.1.2 仓库类的设计代码
template <typename _T>
class Repository { //仓库
public:
deque<_T> items_buff;
mutex mmutex; //生产者和消费者互斥量,解决从仓库中存取的冲突
mutex mmutex_produce; //生产者之间的互斥量,解决同时生产同一个产品的冲突
mutex mmutex_consume; //消费者之间的互斥量,解决同时消费同一个产品的冲突
mutex mmutex_print; //保证由cout打印的提示信息不会中断
condition_variable repo_full; //描述仓库为满的条件变量
condition_variable repo_empty; //描述仓库为空的条件变量
size_t cnt_produce; //生产者目前产生的产品总数
size_t cnt_consume; //消费者目前消费的产品总数
size_t current_size; //仓库中产品个数
Repository() { //初始化
cnt_produce=0;
cnt_consume=0;
current_size = 0;
}
};
2.2 工厂类的设计
2.2.1 关于工厂类的分析
按照问题描述,工厂由工人和消费者组成,流程上需要完成四个基本任务:
1、生产产品
2、将产品放入仓库
3、从仓库中取出产品
4、消费产品
同时注意工厂类包含一个仓库类实例。
因此工厂类的大致架构如下:
2.2.2 工厂类的设计代码
a 将产品item放到仓库repo
void PutInto(Repository<_T>& repo, _T item) {
unique_lock< mutex> lk(repo.mmutex);
//仓库是一个存取数据的地方,因而对仓库进行操作需要上锁。(解决从仓库中存取的冲突)
while (repo.current_size == kRepositorySize) { //如果仓库为满,无法向仓库中继续放
unique_lock<mutex> pt(repo.mmutex_print);
cout << "仓库已满,无法放入更多产品,需先消费。" << endl;
pt.unlock();
repo.repo_full.wait(lk); //将取互斥量抛出,等待其他线程通知
}
repo.items_buff.push_back(item);
repo.current_size++;
repo.repo_empty.notify_all(); //有产品了,唤醒仓库为空情况下的线程
}
b 将产品item从仓库repo取出
_T GetFrom(Repository<_T>& repo) {
unique_lock<mutex> lk(repo.mmutex);
//仓库是一个存取数据的地方,因而对仓库进行操作需要上锁。(解决从仓库中存取的冲突)
while (repo.current_size == 0) { //如果仓库为空,等待
unique_lock<mutex> pt(repo.mmutex_print);
cout << "无货源,等待..." << endl;
pt.unlock();
repo.repo_empty.wait(lk); //将存取互斥量抛出,等待其他线程通知
}
_T data = repo.items_buff.front();
repo.items_buff.pop_front();
repo.current_size--;
repo.repo_full.notify_all(); //从仓库取出了产品,唤醒仓库为满情况下的线程
return data;
}
c 生产者操作
void ProduceTask() {
bool ready_to_exit = false; //线程结束条件
while (true) {
unique_lock<mutex> lk(repo.mmutex_produce); //不能生产同一个产品
if (repo.cnt_produce < kProduceAimSize) {//需要生产(没达到目标)
repo.cnt_produce++;
//生产产品假设0.05s
this_thread::sleep_for(0.05s);
_T item = repo.cnt_produce; //生产产品具体过程的化简
unique_lock<mutex> pt(repo.mmutex_print);
cout << "生产者id:" << this_thread::get_id() << " as[" << item<<"]" << endl;
pt.unlock();
PutInto(repo, item);
}
else {
ready_to_exit = true;
}
if (ready_to_exit == true)break;
}
}
d 消费者操作
void ConsumeTask() {
bool ready_to_exit = false; //线程结束条件
while (true){
unique_lock<mutex> lk(repo.mmutex_consume); //不能同时消费一个产品
if (repo.cnt_consume < kProduceAimSize) {//需要消费(没达到目标)
_T item = GetFrom(repo); //获取需要消费的产品
repo.cnt_consume++;
//消费产品代码,假设消费0.06s
this_thread::sleep_for(0.06s); //消费过程的化简
unique_lock<mutex> pt(repo.mmutex_print);
cout << "消费者id:" << this_thread::get_id() << " as[" << item<<"]" << endl;
pt.unlock();
}
else {
ready_to_exit = true;
}
if (ready_to_exit == true)break;
}
}
2.2.3 主函数代码
int main() {
cout << "主线程id:" << this_thread::get_id() << endl;
//一个工厂类
Factory<int> MyFactory;
//两个生产者
thread producer1(&Factory<int>::ProduceTask, ref(MyFactory));
thread producer2(&Factory<int>::ProduceTask, ref(MyFactory));
//三个消费者
thread consumer1(&Factory<int>::ConsumeTask, ref(MyFactory));
thread consumer2(&Factory<int>::ConsumeTask, ref(MyFactory));
thread consumer3(&Factory<int>::ConsumeTask, ref(MyFactory));
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
consumer3.join();
return 0;
}
三、运行效果和说明
有概率出现无货源、仓库满两种情况,均能进行正常处理,对于同一次运行,生产者id有两种,消费者id有三种,下面两张截图来自于不同运行。
如果你觉得文章不错,对你有帮助,不妨点个关注,欢迎批评指正,谢谢!