目录
生产者-消费者模型介绍
生产者-消费者模型优点
生产者-消费者之间的关系
基于阻塞队列实现生产者-消费者模型
基于环形队列实现生产者-消费者模型
生产者-消费者模型介绍
● 计算机中的生产者和消费者本质都是线程/进程
● 生产者和消费者不直接通讯,而是通过一段内存缓冲区进行通讯
● 生产者生产完数据直接放在内存缓冲区,不必等待消费者消费;消费者需要消费数据直接从内存缓冲区中取,不必等待生产者生产
● 生产者和消费者只有1个,是单生产-单消费模型;生产者和消费者有多个,是多生产-多消费模型
● 缓冲区满了,生产者会阻塞等待,不会再生产数据了,直到消费者消费了数据;缓冲区空了,消费者会阻塞等待,不会再消费数据了,直到生产者生产了数据
生产者-消费者模型优点
● 平衡消费者和生产者处理数据的能力,一方面起到缓存的作用,另一方面达到解耦合的效果
消费者和生产者数据数据的能力可能差别比较大,比如消费者消费速度很快,而生产者生产比较慢,那么消费者快速消费完数据后就会经常等待生产者生产,而引入一段内存缓冲区之后,可以缓存一定的数据,生产者/消费者直接使用这段内存缓冲区即可,一定程度上平衡消费者和生产者处理数据的能力,生产同时可以消费,消费同时也可以生产,一定程度上将生产者和消费者解耦,支持并发运行和忙闲不均
生产者-消费者之间的关系
● 生产者-生产者:互斥
● 消费者-消费者:互斥
● 生产者-消费者:互斥+同步
1. 多个生产者可能会同时生产数据,多个消费者也可能会同时消费数据,生产者和消费者也可以同时生产和消费数据,总结一下,同一时刻会有多个执行流访问内存缓冲区,因此内存缓冲区是一份公共资源,因此需要使用互斥锁保护起来,因此生产者-生产者,消费者-消费者,生产者-消费者都会竞争锁,属于互斥关系
2. 缓冲区满了,生产者会阻塞等待,不会再生产数据了,直到消费者消费了数据;缓冲区空了,消费者会阻塞等待,不会再消费数据了,直到生产者生产了数据,因此生产者和消费者呈现出一定的同步关系,需要让生产者线程和消费者线程协同起来
基于阻塞队列实现生产者-消费者模型
BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "LockGuard.hpp"
using namespace std;
const int defaultcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(int cap = defaultcap)
: _capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.size() == 0;
}
void Push(const T &in) // 生产者的
{
LockGuard lockguard(&_mutex);
while (IsFull())
{
// 队列满了, 生产线程阻塞等待
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in); // 将任务放到队列
pthread_cond_signal(&_c_cond); // 只要生产了,就立即通知消费者可以消费了!
}
void Pop(T *out) // 消费者的
{
LockGuard lockguard(&_mutex);
while (IsEmpty())
{
// 队列空了, 消费线程阻塞等待
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front(); // 从队列中拿数据
_q.pop();
pthread_cond_signal(&_p_cond); // 只要消费了,就立即通知生产者可以生产了!
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
queue<T> _q; // 阻塞队列
int _capacity; // 容量 _q.size() == _capacity, 满了,不能再生产; _q.size() == 0, 空, 不能消费了
pthread_mutex_t _mutex; // 互斥
pthread_cond_t _p_cond; // 给生产者的, 生产条件不满足,就排队
pthread_cond_t _c_cond; // 给消费者的, 消费条件不满足,就排队
};
● 使用STL中的队列模拟阻塞队列,只是额外限制了队列的容量上限
● 为了维护生产者和消费者之间的互斥+同步关系,引入锁和条件变量
● 队列为空,消费者不能再消费,需要在特定条件变量下等待,直到生产者生产了数据,通知消费者可以消费了(实际也可以制定其他策略,比如生产者生产的数据超过队列容量的1/3)
● 队列满了,生产者不能再生产,需要在特定条件变量下等待,直到消费者消费了数据,通知生产者可以生产了(实际也可以制定其他策略,比如消费者生产的数据超过队列容量的2/3)
● 判断生产条件是否满足以及消费条件是否满足时,我们使用的是while而不是if,是因为使用if可能会导致 pthread_cond_wait 出现伪唤醒状态
因为生产者可能是多个线程,如果队列满了,多个生产线程都在条件变量下等待,消费线程消费了一个数据后,一次性唤醒所有生产者线程,那么多个生产者线程就会重新竞争锁,只有1个线程竞争到并持有锁了,继续向下执行,又生产了一个数据,队列满了,然后释放了锁,此时消费者还没来得及消费,刚才阻塞在锁处的其他线程有1个竞争到了锁,就会继续往下执行,继续往满了的队列中生产数据,就会出错
同理消费者可能是多个线程,如果队列空了,多个消费线程都在条件变量下等待,生产线程生产了一个数据后,一次性唤醒所有消费者线程,那么多个消费者线程就会重新竞争锁,只有1个线程竞争到并持有锁了,继续向下执行,又消费了一个数据,队列空了,然后释放了锁,此时生产者还没来得及生产,刚才阻塞在锁处的其他线程有1个竞争到了锁,就会继续往下执行,继续从空了的队列中取走数据,就会出错
main.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
using namespace std;
void *consumer(void *args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int> *>(args);
while(true)
{
//1.消费数据
int data = 0;
bq->Pop(&data);
//2.对数据进行处理
cout << "consume data: " << data << endl;
sleep(1);
}
return nullptr;
}
void *productor(void *args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int> *>(args);
while(true)
{
//1.有数据
int data = rand() % 10 + 1; //[1, 10]
//2.进行生产
bq->Push(data);
cout << "produce data: " << data << endl;
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self()); //形成更加随机的数
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
● 生产者要往阻塞队列里push数据,前提是得有数据,我们采用生成随机数的方式构造数据
● 消费者从阻塞队列里读走数据之后,需要处理数据,我们目前先进行简单打印方便观察现象
● 生产线程和消费线程谁先被调度,这是不确定的,取决于调度器的调度算法
● 开始时由于阻塞队列为空,不满足消费条件,因此即便消费线程先被调度,也会在条件变量下阻塞等待,直到生产线程生产了数据,这是线程同步的体现
● 当生产者一次生产出一堆数据之后,队列满了,不满足生产条件,就得等待消费者,直到消费者取走了数据,这是线程同步的体现
● 消费者每间1s消费并处理数据,而生产者没有休眠一直生产,但生产者生产速度也会跟随消费者变慢,和消费者协同起来,这是线程同步的体现
● 显示器也是公共资源,消费线程和生产线程谁竞争能力强,谁就会往显示器上打印,因此打印的顺序是不确定的,也可能会出现消息混合打印的情况
● 上述代码只是为了简单测试生产者-消费者模型,因此消费者只是简单的将生产者生产的数据打印,而我们在设计阻塞队列时,使用到了模版,因此可以设计一个任务类,生产者往阻塞队列中放任务对象,消费者从阻塞队列中取走任务并处理任务打印结果
Task.hpp
#pragma once
#include <iostream>
#include <string>
using namespace std;
const int defaultvalue = 0;
enum
{
ok,
div_zero,
mode_zero,
unknown
};
const string opers = "+-/*/%)(&";
class Task
{
public:
Task(){}
Task(int x, int y, char op)
: data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
{}
void Run()
{
switch (oper)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
{
if (data_y == 0)
code = div_zero;
else
result = data_x / data_y;
}
break;
case '%':
{
if (data_y == 0)
code = mode_zero;
else
result = data_x % data_y;
}
break;
default:
code = unknown;
break;
}
}
string PrintTask()
{
string s;
s = to_string(data_x);
s += oper;
s += to_string(data_y);
s += "=?";
return s;
}
string PrintResult()
{
string s;
s = to_string(data_x);
s += oper;
s += to_string(data_y);
s += "=";
s += to_string(result);
s += " [";
s += to_string(code);
s += "]";
return s;
}
~Task(){}
private:
int data_x; //操作数
int data_y; //操作数
char oper; //操作符
int result; //结果
int code; // 结果码, 为0表示结果可信, 否则表示结果不可信
};
问题: 生产者生产数据和消费者消费的过程,本身就是互斥的啊,也就是串行的,那生产者-消费者模型的高效如何体现?
● 生产数据前,得有数据,数据从哪里来? 是取决于具体场景的,比如从网络中获取数据,这是需要花一定时间的, 而有了内存缓冲区,生产者获取数据的过程中消费者可以同时消费和处理数据
● 取走数据后,还没有结束,还需要处理数据,处理数据也是要花一定时间的,而有了内存缓冲区,消费者处理数据的过程中生产者可以同时获取并生产数据
● 而如果是多生产和多消费模型,多个生产线程可以同时从网络中获取数据,多个消费线程也可以同时处理数据
● 因此生产者-消费者模型的高效性并不体现在同步和互斥,而是在于: 处理数据的时候可以并发!!!
如何将上面的代码改为多生产多消费模型呢??
由于生产数据(push)和消费数据(pop)时,我们都加了锁,因此上述代码本身就实现了多个生产者之间的互斥,多个消费者之间的互斥,因此BlockQueue模块不需要改动,我们只需要创建多个生产者和多个消费者即可!
多生产者-多消费者模型
main.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include "Task.hpp"
class ThreadData
{
public:
BlockQueue<Task> *bq;
string name;
};
void *consumer(void *args)
{
ThreadData *td = (ThreadData *)args;
while (true)
{
// 1.消费数据
Task t;
td->bq->Pop(&t);
// 2.处理数据
t(); // 仿函数
cout << "I am " << td->name << ", consumer data: " << t.PrintResult() << endl;
}
return nullptr;
}
void *productor(void *args)
{
ThreadData *td = (ThreadData *)args;
while (true)
{
// 1.有数据, 从具体场景中来, 从网络中拿数据
int data1 = rand() % 10; //[0, 9]
int data2 = rand() % 10; //[0, 9]
char oper = opers[rand() % opers.size()];
Task t(data1, data2, oper); // 构建任务
// 2.进行生产
td->bq->Push(t);
// for debug
cout << "I am " << td->name << ", productor data : " << t.PrintTask() << endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self()); // 形成更加随机的数
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[2], p[3];
ThreadData *td1 = new ThreadData();
td1->bq = bq;
td1->name = "Thread-1";
pthread_create(&c[0], nullptr, consumer, td1);
ThreadData *td2 = new ThreadData();
td2->bq = bq;
td2->name = "Thread-2";
pthread_create(&c[0], nullptr, consumer, td2);
ThreadData *td3 = new ThreadData();
td3->bq = bq;
td3->name = "Thread-3";
pthread_create(&p[0], nullptr, productor, td3);
ThreadData *td4 = new ThreadData();
td4->bq = bq;
td4->name = "Thread-4";
pthread_create(&p[0], nullptr, productor, td4);
ThreadData *td5 = new ThreadData();
td5->bq = bq;
td5->name = "Thread-4";
pthread_create(&p[0], nullptr, productor, td5);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
return 0;
}
基于环形队列实现生产者-消费者模型
预备知识: 信号量
● 信号量本质是一把计数器,表示可用资源的数目
● 申请信号量的本质是预定资源,只要预定成功,就一定有自己的一份资源
● 申请信号量是p操作,释放信号量是v操作,p操作和v操作都是原子的
● 基于阻塞队列实现的生产者消费者模型中的公共资源我们当成一个整体来使用,因此需要加锁来解决多执行流并发访问临界资源导致数据不一致的问题
● 如果将公共资源划分成多部分使用,此时多线程就可以并发访问被细分的不同资源了!
● 信号量初始化成资源总数,有线程访问公共资源,先申请信号量,只要信号量不为0,信号量--,申请信号量就成功了,就一定能够访问到资源
● 信号量的接口
初始化信号量(pshared设为0,表示线程间共享, value表示信号量的初始值)
int sem_init(sem_t *sem, int pshared, unsigned int value);
销毁信号量
int sem_destroy(sem_t *sem);
申请信号量
int sem_wait(sem_t *sem);
释放信号量
int sem_post(sem_t *sem);
基于环形队列实现生产者-消费者模型
● 环形队列逻辑上是一个环,存储结构本质是一个线性数组,当下标越界时,取模数组长度就可以回到最开始,因此实现了环形效果
● 环形队列中的"公共资源"划分成多部分使用,不当成一个资源使用
● 开始时,消费者和生产者都指向环形队列的同一个位置
● 生产者不能把消费者套一个圈,否则生产者会覆盖掉生产过的数据,从而导致出错
● 消费者不能超过生产者,否则没有数据消费了,还消费就会出错
● 当环形队列为空为满,消费者和生产者才会指向同一个位置,访问同一份临界资源
● 当环形队列为空时,生产者先跑;为满时,消费者先跑, 这就体现了互斥和同步
● 除了为空未满,其余情况,不会指向同一个位置,那么多线程可以并发进入临界区访问资源
● 消费者认为,环形队列中的数据是资源;消费者认为,环形队列中的空间是资源!
● 开始时,没有数据,数据信号量初始化为0;开始时空间大小为N,空间信号量初始化为N
● 生产数据前,申请空间资源(空间信号量--),生产完数据后,多了一个数据(数据信号量++)
● 消费数据前,申请数据资源(数据信号量--),消费完数据后,多了一个空间(空间信号量++)
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
using namespace std;
const int defaultsize = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem); //原子的
}
void V(sem_t &sem)
{
sem_post(&sem); //原子的
}
public:
RingQueue(int size = defaultsize)
: _ringqueue(size), _size(size), _p_step(0), _c_step(0)
{
sem_init(&_space_sem, 0, _size); // 0表示线程间共享
sem_init(&_data_sem, 0, 0);
}
void Push(const T &in)
{
P(_space_sem);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
V(_data_sem);
}
void Pop(T *out)
{
P(_data_sem);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
}
private:
vector<T> _ringqueue; //环形队列
int _size; //环形队列大小
int _p_step; // 生产者生产位置
int _c_step; // 消费者消费位置
sem_t _space_sem; // 生产者信号量
sem_t _data_sem; // 消费者信号量
};
main.cc
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
void *Productor(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);
int cnt = 100;
while(true)
{
rq->Push(cnt);
cout << "produce done, data is : " << cnt << endl;
cnt--;
}
}
void *Consumer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);
while(true)
{
sleep(1);
int data = 0;
rq->Pop(&data);
cout << "consume done, data is : " << data << endl;
}
}
int main()
{
pthread_t c, p;
RingQueue<int> *rq = new RingQueue<int>();
pthread_create(&c, nullptr, Productor, rq);
pthread_create(&p, nullptr, Consumer, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
环形队列中不是只可以放整形数据哦,同样可以让生产者指派任务,放到环形队列中,消费者从环形队列中取任务,然后执行任务,输出结果
main.cc
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
#include <unistd.h>
#include <ctime>
#include "Task.hpp"
void *Productor(void *args)
{
// 1.有数据
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
int data1 = rand() % 10;
int data2 = rand() % 10;
char op = opers[rand() % opers.size()];
// 2.生产
Task t(data1, data2, op);
rq->Push(t);
cout << "productor data : " << t.PrintTask() << endl;
sleep(1);
}
}
void *Consumer(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
// 2.处理任务
while (true)
{
// 1.消费数据
Task t;
rq->Pop(&t);
//2.对数据做处理
t.Run();
cout << "consumer data: " << t.PrintResult() << endl;
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>();
pthread_t c, p;
pthread_create(&c, nullptr, Productor, rq);
pthread_create(&p, nullptr, Consumer, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
基于环形队列的多生产者多消费者模型
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include "LockGuard.hpp"
using namespace std;
const int defaultsize = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem); // 原子的
}
void V(sem_t &sem)
{
sem_post(&sem); // 原子的
}
public:
RingQueue(int size = defaultsize)
: _ringqueue(size), _size(size), _p_step(0), _c_step(0)
{
sem_init(&_space_sem, 0, _size); // 0表示线程间共享
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
void Push(const T &in)
{
P(_space_sem);
{
LockGuard lockguard(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
}
V(_data_sem);
}
void Pop(T *out)
{
P(_data_sem);
{
LockGuard lockguard(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
}
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
private:
vector<T> _ringqueue; // 环形队列
int _size; // 环形队列大小
int _p_step; // 生产者生产位置
int _c_step; // 消费者消费位置
sem_t _space_sem; // 生产者信号量
sem_t _data_sem; // 消费者信号量
pthread_mutex_t _p_mutex; // 生产者的锁
pthread_mutex_t _c_mutex; // 消费者的锁
};
● 基于环形队列的单生产者单消费者模型没有用到锁,只用到了信号量,因为我们把环形队列的资源划分为多部分资源使用,除了为空和为满,多线程访问的是不同位置的资源,因此不需要加锁
● 基于环形队列的多生产者多消费者模型是需要用到锁的,因为多个生产线程可能会同时往同一个位置上生产,会出现数据不一致问题,因此多个生产者需要一把锁;同理,多个消费线程可能会同时从同一个位置拿数据,也会出现数据不一致的问题,因此多个消费者也需要一把锁
● 理论上,申请锁和申请信号量的顺序是无所谓的,但建议先申请信号量, 只要资源足够,多个线程申请信号量就会成功,那么多个线程就会竞争锁,只要有1个线程竞争到了锁,其他线程就会卡在申请锁的地方,当该线程释放了锁,其他某个线程竞争到了锁就立马就可以进入临界区执行代码!而如果先申请锁,只有1个线程竞争到了锁,其余线程卡在锁的地方, 当该线程释放了锁,其他线程竞争到锁之后还要再申请信号量,这个过程是比较慢的
main.cc
#include "RingQueue.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "Task.hpp"
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
class ThreadData
{
public:
RingQueue<Task> *rq;
string name;
};
void *Productor(void *args)
{
// 1.有数据
ThreadData *td = static_cast<ThreadData *>(args);
while (true)
{
int data1 = rand() % 10; //[0, 9]
int data2 = rand() % 10; //[0, 9]
char op = opers[rand() % opers.size()];
// 2.生产
Task t(data1, data2, op);
td->rq->Push(t);
pthread_mutex_lock(&mutex);
cout << "Thread name: " << td->name << ", productor data : " << t.PrintTask() << endl;
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
void *Consumer(void *args)
{
ThreadData *td = static_cast<ThreadData *>(args);
while (true)
{
// 1.消费数据
Task t;
td->rq->Pop(&t);
// 2.对数据做处理
t.Run();
pthread_mutex_lock(&mutex);
cout << "Thread name: " << td->name << ", consumer data: " << t.PrintResult() << endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());
RingQueue<Task> *rq = new RingQueue<Task>();
pthread_t c[2], p[3];
ThreadData *td1 = new ThreadData();
td1->rq = rq;
td1->name = "thread-1";
pthread_create(&p[0], nullptr, Productor, td1);
ThreadData *td2 = new ThreadData();
td2->rq = rq;
td2->name = "thread-2";
pthread_create(&p[1], nullptr, Productor, td2);
ThreadData *td3 = new ThreadData();
td3->rq = rq;
td3->name = "thread-3";
pthread_create(&p[2], nullptr, Productor, td3);
ThreadData *td4 = new ThreadData();
td4->rq = rq;
td4->name = "thread-4";
pthread_create(&c[0], nullptr, Consumer, td4);
ThreadData *td5 = new ThreadData();
td5->rq = rq;
td5->name = "thread-5";
pthread_create(&c[1], nullptr, Consumer, td5);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
return 0;
}