文章目录
- 1 生产者消费者模型
- 2 阻塞队列
- 2.1 成员变量
- 2.2 消费者操作
- 2.3 生产者生产
- 3 总结
1 生产者消费者模型
在多线程环境中,生产者消费者模型是一种经典的线程同步模型,用于处理生产者线程与消费者线程之间的工作调度和资源共享问题。在这个模型中,生产者和消费者共享一个缓冲区,生产者往缓冲区中放入商品(或者数据),而消费者则从缓冲区中取出商品(或者数据)。为了确保线程安全,避免资源竞争,通常需要使用同步机制如互斥锁(mutex) 和 条件变量(condition variable)。
2 阻塞队列
阻塞队列在生产者消费者模型中是非常常见的一种设计,通过互斥锁和条件变量来确保线程同步,避免数据竞争。生产者和消费者分别在合适的时机阻塞和唤醒彼此,使得生产者和消费者能平稳地进行数据的生产和消费。
2.1 成员变量
class BlockQueue
{
static const int defaultnum = 20;
public:
BlockQueue(int maxcap = defaultnum)
:_maxcap(maxcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_c_cond, nullptr);
pthread_cond_init(&_p_cond, nullptr);
}
private:
std::queue<T> _q; // 队列,存储生产者生产的数据
int _maxcap; // 队列的最大容量
pthread_mutex_t _mutex; // 互斥锁,用于保护队列的访问
pthread_cond_t _c_cond; // 消费者条件变量,用于阻塞消费 者
pthread_cond_t _p_cond; // 生产者条件变量,用于阻塞生产者
};
_q
是用于存储数据的队列。_maxcap
是队列的最大容量。_mutex
是互斥锁,用来保证生产者和消费者在访问队列时的互斥性。_c_cond
是消费者的条件变量,当队列为空时,消费者会被阻塞,直到队列有数据。_p_cond
是生产者的条件变量,当队列满时,生产者会被阻塞,直到队列有空间。
2.2 消费者操作
T pop()
{
//1.上锁 --> 消费的时候,需要给消费者上锁
pthread_mutex_lock(&_mutex);
while(_q.size() == 0)
{
//当商品为空的时候,就阻塞消费者
pthread_cond_wait(&_c_cond, &_mutex);
}
//3.走到这里,两种情况 : 1.队列满了 2.被唤醒
T out = _q.front();
_q.pop();
//4.当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
return out;
}
pthread_mutex_lock(&_mutex);
1. 先上锁,保证数据的安全
while(_q.size() == 0)
{
//当商品为空的时候,就阻塞消费者
pthread_cond_wait(&_c_cond, &_mutex);
}
T out = _q.front();
_q.pop();
2. 上锁之后,可以进行消费,有两种情况:
case 1 : 队列为空,没有数据,则阻塞消费者
case 2: 队列不为空,进行消费
注意:这里的pthread_cond_wait(&_c_cond, &_mutex);
在阻塞消费者的同时会释放mutex互斥锁,避免死锁的产生
//当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
pthread_cond_signal(&_p_cond);
3. 消费之后,队列一定不满(至少都有一个空位,因为刚刚消费了)。所以可以唤醒生产者进行生产
pthread_mutex_unlock(&_mutex);
4. 所有操作结束之后,释放锁,避免死锁
2.3 生产者生产
void push(const T& in)
{
//1.上锁 --> 生产的时候,需要给生产者上锁
pthread_mutex_lock(&_mutex);
//2.当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
//伪唤醒的情况
while(_q.size() == _maxcap)
{
//自动唤醒 --> 释放_p_cond持有的锁,进入阻塞状态
pthread_cond_wait(&_p_cond, &_mutex);
}
//3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
_q.push(in);
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
pthread_mutex_lock(&_mutex);
1. 先上锁,保证数据的安全
//当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
//伪唤醒的情况
while(_q.size() == _maxcap)
{
//自动唤醒 --> 释放_p_cond持有的锁,进入阻塞状态
pthread_cond_wait(&_p_cond, &_mutex);
}
//3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
_q.push(in);
2. 上锁之后,可以进行生产,有两种情况:
case 1 : 队列为满,不能继续生产,则阻塞生产者
case 2: 队列不为满,继续生产
为什么要使用while?
防止伪唤醒。
注意:这里的pthread_cond_wait(&_p_cond, &_mutex);
在阻塞生产者的同时会释放mutex互斥锁,避免死锁的产生
pthread_cond_signal(&_c_cond);
3. 生产之后,队列一定不为空(至少有一个商品,因此可以继续消费)所以可以唤醒消费者进行消费
pthread_mutex_unlock(&_mutex);
4. 所有操作结束之后,释放锁,避免死锁
3 总结
main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
void* Consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
Task t = bq->pop();
std::cout << "消费了一个任务 : " << t.GetTask() << " 运算结果是 : "
<< t.GetResult() << "thread id : " << pthread_self() << std::endl;
sleep(1);
}
}
void* Productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
int x = 10, y = 20;
while(true)
{
int data1 = rand() % 10 + 1; //控制data1为[1,10]之间
usleep(10);
int data2 = rand() % 10 + 1; //控制data2为[1,10]之间
char op = opers[rand() % opers.size()]; //随机选取一个运算符
//构建任务
Task t(data1, data2, op);
bq->push(t);
std::cout << "生产了一个任务 : " << t.GetTask() << "thread id : " << pthread_self() << std::endl;
sleep(1);
}
}
int main()
{
srand(time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[5];
for (int i = 0; i < 3; ++ i)
{
pthread_create(c + i, nullptr, Consumer, bq);
}
for (int i = 0; i < 5; ++ i)
{
pthread_create(p + i, nullptr, Productor, bq);
}
for (int i = 0; i < 3; ++ i)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; ++ i)
{
pthread_join(p[i], nullptr);
}
delete bq;
return 0;
}
BlockQueue.hpp
#include <iostream>
#include <queue>
#include <pthread.h>
template<class T>
class BlockQueue
{
static const int defaultnum = 20;
public:
BlockQueue(int maxcap = defaultnum)
:_maxcap(maxcap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_c_cond, nullptr);
pthread_cond_init(&_p_cond, nullptr);
}
T pop()
{
//1.上锁 --> 消费的时候,需要给消费者上锁
pthread_mutex_lock(&_mutex);
while(_q.size() == 0)
{
//当商品为空的时候,就阻塞消费者
pthread_cond_wait(&_c_cond, &_mutex);
}
//3.走到这里,两种情况 : 1.队列满了 2.被唤醒
T out = _q.front();
_q.pop();
//4.当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
return out;
}
void push(const T& in)
{
//1.上锁 --> 生产的时候,需要给生产者上锁
pthread_mutex_lock(&_mutex);
//2.当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
//伪唤醒的情况
while(_q.size() == _maxcap)
{
//自动唤醒 --> 释放_p_cond持有的锁,进入阻塞状态
pthread_cond_wait(&_p_cond, &_mutex);
}
//3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
_q.push(in);
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
private:
std::queue<T> _q; //共享资源,只有一个,但是可以被当成很多个
int _maxcap; //最大值
pthread_mutex_t _mutex; //锁
pthread_cond_t _c_cond; //consumer cond 消费者条件变量
pthread_cond_t _p_cond; //productor cond 生产者条件变量
};
Task.hpp
#include <iostream>
#include <string>
std::string opers = "+-*%";
enum
{
DivZero = 1,
ModZero,
Unkown
};
class Task
{
public:
Task(int x1, int x2, char oper)
:_data1(x1)
,_data2(x2)
,_oper(oper)
,_result(0)
,_exitcode(0)
{}
void run()
{
switch(_oper)
{
case '+':
_result = _data1 + _data2;
break;
case '-':
_result = _data1 - _data2;
break;
case '*':
_result = _data1 * _data2;
break;
case '/':
if (_data2 == 0)
_exitcode = DivZero;
else
_result = _data1 / _data2;
break;
case '%':
if (_data2 == 0)
_exitcode = ModZero;
else
_result = _data1 % _data2;
break;
default:
_exitcode = Unkown;
break;
}
}
//重载operator()
void operator()()
{
run();
}
std::string GetTask()
{
std::string r = std::to_string(_data1);
r += _oper;
r += std::to_string(_data2);
r += "= ?";
return r;
}
std::string GetResult()
{
std::string r = std::to_string(_data1);
r += _oper;
r += std::to_string(_data2);
r += "= ";
r += std::to_string(_result);
r += "[code: ";
r += std::to_string(_exitcode);
r += "]";
return r;
}
~Task()
{}
private:
int _data1;
int _data2;
char _oper;
int _result;
int _exitcode;
};