文章目录
- 一、什么是生产者与消费者模型?
- 二、示例模型
- 示例模型介绍
- 交易场所(blockQueue)
- 消费者与生产者
- 运行结果
- 总结
一、什么是生产者与消费者模型?
参照日常生活中,购买商品的人群可以被称之为消费者,生产商品的工厂可以被称之为生产者,而在两者之间还存在超市被称之为交易场所。 它们还存在三种关系,生产者与生产者之间是互斥关系,消费者与消费者之间也是互斥关系,消费者与生产者存在互斥/同步关系。这三种关系在多线程有着重要体现,通过我们之前所学习的互斥锁和条件变量可以反映这三种关系。
今天我们就需要用我们的多线程知识来构建一个简单的生产者与消费者模型。
二、示例模型
示例模型介绍
该模型由生产者线程随机生成(生产)两个随机数,消费者线程来进行对两个随机数进行计算(消费), 而我们的交易场所就由队列(queue)来充当。
生产者不断往队列放数据,消费者不断从队列拿数据,但是队列总有满/空的时候,所以我们就要严格控制他们的互斥/同步!
交易场所(blockQueue)
blockQueue.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
#include <queue>
#include "Task.hpp"
#include "lockGuard.hpp"
const int df_cap = 5;
template <class Data>
class blockQueue
{
public:
blockQueue(int capacity = df_cap)
: _capacity(capacity)
{
//初始化
pthread_cond_init(&_empty, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_mutex_init(&_mutex, nullptr);
}
bool isEmpty()
{
return _queue.size() == 0;
}
bool isFull()
{
return _queue.size() == _capacity;
}
void push(const Data &data)
{
pthread_mutex_lock(&_mutex);
while (isFull()) //由于pthread_cond_wait函数存在申请锁失败的情况,所以我们要使用while循环检查队列是否为满
pthread_cond_wait(&_full, &_mutex);
_queue.push(data);
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_mutex);
}
void pop(Data *output)
{
pthread_mutex_lock(&_mutex);
while (isEmpty())//由于pthread_cond_wait函数存在申请锁失败的情况,所以我们要使用while循环检查队列是否为空
pthread_cond_wait(&_empty, &_mutex);
*output = _queue.front();
_queue.pop();
pthread_cond_signal(&_full);
pthread_mutex_unlock(&_mutex);
}
~blockQueue()
{
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
pthread_mutex_destroy(&_mutex);
}
private:
std::queue<Data> _queue;
int _capacity;
pthread_cond_t _empty;
pthread_cond_t _full;
pthread_mutex_t _mutex;
};
该blockQueue主要是对队列进行一个封装,因为STL提供的容器一般都不是线程安全的,所以需要自己进行封装使它成为一个可重入的类型。
Task.hpp
#pragma once
#include <iostream>
#include <functional>
class Task
{
public:
typedef std::function<int(int, int)> func_t;
Task() {}
Task(int x, int y, func_t func)
: _func(func), _x(x), _y(y)
{
}
int operator()()
{
return _func(_x, _y);
}
int getelement1()
{
return _x;
}
int getelement2()
{
return _y;
}
private:
int _x;
int _y;
func_t _func;
};
消费者与生产者
#include "blockQueue.hpp"
#include <ctime>
#define P_TNUM 1
#define C_TNUM 5
void *consumer(void *args)
{
blockQueue<Task> *queue = (blockQueue<Task> *)args;
while (1)
{
// sleep(1);
Task tmp;
queue->pop(&tmp);
std::cout << pthread_self() << "获取结果: " << tmp.getelement1() << " + " << tmp.getelement2() << " = " << tmp() << std::endl;
}
return nullptr;
}
void *productor(void *args)
{
blockQueue<Task> *queue = (blockQueue<Task> *)args;
while (1)
{
int x = rand() % 100 + 1;
usleep(1000);
int y = rand() % 10 + 1;
Task task(x, y, [](const int x, const int y) -> int
{ return x + y; }); //lambda表达式
queue->push(task);
std::cout << pthread_self() << "生产数据: " << task.getelement1() << " + " << task.getelement2() << " = ? " << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned int)time(nullptr) ^ 0xfffffff ^ 0x12345678);
pthread_t parr[P_TNUM], carr[C_TNUM];
blockQueue<Task> *queue = new blockQueue<Task>;
for (int i = 0; i < P_TNUM; i++)
{
pthread_create(parr + i, nullptr, productor, (void *)queue);
}
sleep(1);
for (int i = 0; i < C_TNUM; i++)
{
pthread_create(carr + i, nullptr, consumer, (void *)queue);
}
for (int i = 0; i < P_TNUM; i++)
{
pthread_join(parr[i], nullptr);
}
for (int i = 0; i < C_TNUM; i++)
{
pthread_join(carr[i], nullptr);
}
delete queue;
return 0;
}
运行结果
总结
本章主要检验自身多线程学习的成果,灵活运用了线程互斥和条件变量的接口函数,大家下来可以自己尝试写一遍。