1.线程同步的概念
下面我们来谈一个故事理解线程同步的概念:假设学校里面有一个环境非常好的超级vip自习室是公共的,但是有个前提,一次只能进去一个学生,因为只有一把钥匙挂在超级vip自习室门外的墙上的。假设你是一个学习成绩比较好的小学霸,起的比较早,所以一大早就打算去超级vip自习室学习,所以一到自习室门口就把钥匙拿了下来,然后打开门进入自习室把门关上进行学习,自习了一段时间后外面的学生多了起来,其他的学生进不来,因为墙上没钥匙了,其他学生进不去呢就知道等一等才有机会使用自习室。
所以这个自习室就是临界资源,而钥匙就是学生要进入临界区访问临界资源需要申请的锁,而学生就是一个个的线程。当然如果某个时刻你自习到某个时间点想去上厕所,那么如果你把钥匙带走然后把门关了,那么其他学生还是进不来,也就是说当你抱起锁挂起的时候,其他人不能对你的学习资料有所影响(避免了数据被修改的影响),如果过了两三个小时饿了,想去吃饭了,这个时候你就想出自习室,然后把钥匙挂墙上,你突然发现外面等的人这么多,我好不容易能用到自习室,到时候过来了又要等别人用完自己才能用,这样太耗时间了,所以你就又把钥匙拿下来打开自习室的门又进去了,然后坐了没几分钟你肚子太饿了又想去吃饭,然后你又出来把钥匙挂墙上,就这样反反复复好几次。就这种现象,你每次都只在自习室待几分钟,不长待,我们把这种情况叫做你在不停的申请锁,(无效)访问资源,释放锁。因为任何时刻只允许一个人进自习室(访问临界资源)。由于你长时间的占用临界资源导致其他的学生(线程)无法享用自习室(临界资源)而导致了其他线程的饥饿问题。
所以了肯定就会有学生投诉吐槽这种现象,那么学校不能容忍这种现象,所以就采取对应的措施了:从自习室归还钥匙之后,短时间内不能在立即申请钥匙(锁),应该在外面排队,外面的人申请失败的也要继续排队。所以如果你自习了一段时间之后归还钥匙之后下次使用自习室就得去队尾进行排队了,所以着也就解决了造成其他线程的饥饿问题。互斥可以保证资源的安全,同步能够充分高效的使用资源。我们把这种在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
2.生产者消费者模型
现实生活中的超市就是我们典型的生产者消费者模型。供应商把商品交给超市,然后消费者在超市里面拿商品,其实这就好比我们的一个线程把数据交给另一个线程。
生产消费者模型的本质也就是来进行执行流间数据传递(通信)的。
生成消费的过程是安全的!!
我们需要理清楚生产者和消费者之间的关系。这里的超市其实就是一段内存空间——基于特定空间的数据结构或者容器。
生产者之间是什么关系呢?竞争——互斥
理解:超市本来就这么大,放商品的位置就这么多,假如一个供应商放的商品多了占用的商品空间也就多了,那么其他供应商放商品的空间就少了,所以供应商(生产者)之间是竞争的关系,同时某个展架的某个位置一次只有一个,只能放置一个商品,只能放一个供应商的一个商品,占用了之后其他供应商的商品不能再放了,所以这就是互斥关系,所以两者就是竞争——互斥的关系。
消费者和消费者之间是什么关系呢?竞争——互斥
理解:消费者与消费者之间可能会因为想买同一件商品,比如说同一个展架某个位置的火腿肠,只有一个消费者可以拿这一件商品,该消费者拿了之后其他消费者不能再拿,所以这是互斥的 关系,所以这也需要这些消费者对这一件商品进行竞争,谁先拿到是谁的,所以也是竞争关系。
生产者和消费者是什么关系呢?互斥&&同步
理解:比如说今天一个消费者要去超市拿一个火腿肠,但是那个展架那个位置生产者供应商到底供应了火腿肠没有呢?不确定,那么消费者什么时候去拿?也不确定。可能会有这种生产者正在供应火腿肠,展架上还没放好,那么消费者就还不能拿,而放好了之后消费者正在拿的时候,还没有完全拿走的时候,展架那里也暂时不能放,所以这要保证原子性,必须展架上放好了火腿肠消费者才能拿,展架上的火腿肠拿走了,才能继续放。所以这就是需要保持两者的互斥关系,如果呢展架上没有火腿肠了,而消费者跑到去问售货员说有没有火腿肠了,售货员说没有火腿肠了,消费者然后走了,后面第二天又来问售货员展架上有没有火腿肠发现又没有,然后去了一个月发现30天都没有火腿肠,这就导致了消费者一直在访问临界资源但是什么都没做,也就造成了无效访问,超市里面的售货员每天都在接受这些消费者的询问有没有火腿肠,导致供应商找售货员询问需不需要火腿肠了,售货员都没时间搭理它,
所以这也就造成了生产者的饥饿问题,我们知道要先生产,再消费,消费完再生成按照这样的顺序来访问临界资源效率才是最高的,所以还需要同步的关系。
3:三种关系本质就是用锁&&条件变量来进行维护的。
2种角色:生产者(1 or n) ,消费者(1 or n) --- 线程或进程
1:一个交易场所:内存空间
为了便于记忆,我们将其称之为:“321原则”
3.如何理解CP(消费者和生产者)问题
我们之前写的一些单执行流的代码我们都知道,我们可以在main函数种通过函数调用进入函数内部,暂时main函数后面的代码可以先不执行,但是这样会让调用函数的速度影响整体的执行效率
假设add函数执行的很慢,那么就会拖慢整体的执行效率,因为main函数需要得到返回值才能继续向后执行,我们把这种称之为串行的。
而现在我们是多线程的情况下,我们使用生产者消费者模型:
一个线程执行的比较快就可以先把数据放入内存中然后继续向后执行,而不需要等待线程拿了数据之后再向后执行,同时另一个执行比较慢的线程也只需要拿数据慢慢执行就行了,没有拖慢较快进行的效率,这就做到了多执行流之间实现了执行解耦,支持忙闲不均。这样就提高了处理数据的效率。
4.条件变量
假设一个家庭里面有爸爸妈妈,哥哥,姐姐,弟弟,妹妹六口人,然后要玩一个游戏,四个孩子把眼睛蒙上,爸爸往盘子里面放苹果,但是四个孩子都不知道爸爸什么时候放,如果放的时候刚好他们有一个人去拿,那就会造成数据不一致问题,但是如果比如说哥哥一直去拿,但是盘子里没有苹果,那么就会无效访问临界资源,爸爸会造成饥饿问题。
此时如果我们加入一个铃铛,那游戏规则就变成了,爸爸每在盘子里面放一个苹果,都敲一下铃铛,而四个孩子排着队,第一个人听到铃铛响的声音就可以相当于唤醒了,可以申请锁去盘子上拿苹果,拿了苹果之后吃完了如果还想拿苹果那就需要再向后面排队,到后面的姐姐拿苹果了。这里的铃铛就相当于条件变量。这样就可以使得即将访问临界资源的线程:1.不做无效访问的锁申请
2.执行具有顺序
条件变量我们目前可以理解为:
struct cond
{
//条件是否满足
int flag;
//维护一个线程队列
tcb_queue;
}
条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
4.1条件变量接口认识
下面我们可以通过man手册快速认识一下有关条件变量的接口:
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond);
这里的条件变量的接口跟我们之前申请锁的接口非常的相似
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
唤醒等待:
int pthread_cond_broadcast(pthread_cond_t *cond);//条件成立后唤醒所有线程
int pthread_cond_signal(pthread_cond_t *cond);//条件成立后唤醒一个线程
4.2条件变量接口测试
快速编写一段简单的代码:
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<string>
//pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void * threadRoutine(void * args)
{
std::string name = static_cast<const char *>(args);
while(true)
{
pthread_mutex_lock(&mutex);
//pthread_cond_wait(&cond,&mutex);
std::cout<<"I am a thread: "<<name<<std::endl;
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRoutine,(void*)"thread-1");
pthread_create(&t2,nullptr,threadRoutine,(void*)"thread-2");
pthread_create(&t3,nullptr,threadRoutine,(void*)"thread-3");
// sleep(5);
// while(true)
// {
// pthread_cond_signal(&cond);
// sleep(1);
// }
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
return 0;
}
Makefile
testCond:testCond.cc
g++ -o $@ $^ -lpthread -std=c++11
.PHONY:clean
clean:
rm -rf testCond
运行结果:
每次执行这三个线程都没有特定的先后顺序,而如果我们把条件变量加进来,也就是把上述注释部分的代码打开:
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<string>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void * threadRoutine(void * args)
{
std::string name = static_cast<const char *>(args);
while(true)
{
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond,&mutex);
std::cout<<"I am a thread: "<<name<<std::endl;
pthread_mutex_unlock(&mutex);
//sleep(1);
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRoutine,(void*)"thread-1");
pthread_create(&t2,nullptr,threadRoutine,(void*)"thread-2");
pthread_create(&t3,nullptr,threadRoutine,(void*)"thread-3");
sleep(5);
while(true)
{
pthread_cond_signal(&cond);
sleep(1);
}
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
return 0;
}
运行结果:
前5秒是没有打印任何消息的
5秒过后:
我们发现这些线程一直都是保持着特定的顺序执行着,因为这些线程在pthread_cond_wait()那个地方排着队等着条件变量成立然后被唤醒。
下面我们测试把pthread_cond_signal接口换成pthread_cond_broadcast,也就是让它一次性不是唤醒一个线程,而是唤醒所有线程
运行结果:
前5秒:
5秒过后:
这里我们将所有线程同时唤醒那就又变成了没有顺序的执行了。
因为加锁和解锁之间我们往往要访问临界资源,可是临界资源不一定是满足条件的,所以需要判断。
下面我们加入一个临界资源,模拟抢票,修改一下代码:
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<string>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int ticket = 1000;
void * threadRoutine(void * args)
{
std::string name = static_cast<const char *>(args);
while(true)
{
pthread_mutex_lock(&mutex);
if(ticket>0)
{
std::cout<<name<<", get a ticket :"<<ticket--<<std::endl;
usleep(1000);
}
else
{
std::cout<<"没有票了,"<<name<<std::endl;
pthread_cond_wait(&cond,&mutex);
}
pthread_mutex_unlock(&mutex);
//sleep(1);
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRoutine,(void*)"thread-1");
pthread_create(&t2,nullptr,threadRoutine,(void*)"thread-2");
pthread_create(&t3,nullptr,threadRoutine,(void*)"thread-3");
while(true)
{
sleep(6);
//pthread_cond_signal(&cond);
pthread_mutex_lock(&mutex);
ticket+=1000;
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex);
}
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
return 0;
}
这段代码的功能是一开始有1000张票,然后创建3个新线程让他们去抢票,抢完之后会进行等待条件成立再继续向后执行,而主线程是每6秒钟放入1000张票,然后将所有的线程同时唤醒,唤醒之后这些线程通过判断之后又可以继续抢票,运行结果:
我们发现这些主线程放进去的票都是新线程按随意顺序抢的,而如果要控制抢票顺序的话,可以将唤醒接口pthread_cond_broadcast换成pthread_cond_signal接口这样就可以按照特定顺序实现抢票了。
运行结果:
首先是3然后是2,然后是1,我们发现后面一直都是按照这个顺序执行下去的。
其实这段代码的重要接口就是pthread_cond_wait()接口,它的作用如下:
1.让线程在进行等待的时候会自动释放锁
2.线程被唤醒的时候是在临界区内被唤醒的,当线程被唤醒,线程在pthread_cond_wait返回的时候要重新申请并持有锁。
3.当线程被唤醒的时候,重新申请并持有锁本质也是要参与到锁的竞争当中的
5.基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
BlockQueue.hpp
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
const int Defaultcapacity = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(int capacity=Defaultcapacity,int consumer_water_line=Defaultcapacity/5*3
,int productor_water_line=Defaultcapacity/5*3)
:_capacity(capacity)
,_consumer_water_line(consumer_water_line)
,_productor_water_line(productor_water_line)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_p_cond,nullptr);
pthread_cond_init(&_c_cond,nullptr);
}
void Push(const T& in)
{
pthread_mutex_lock(&_mutex);
if(isFull())
{
//阻塞等待
pthread_cond_wait(&_p_cond,&_mutex);
}
_q.push(in);
// if(_q.size()>_productor_water_line)pthread_cond_signal(&_c_cond);
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
void Pop(T * out)
{
pthread_mutex_lock(&_mutex);
if(isEmpty())
{
//阻塞等待
pthread_cond_wait(&_c_cond,&_mutex);
}
*out = _q.front();
_q.pop();
// if(_q.size()<_consumer_water_line)pthread_cond_signal(&_p_cond);
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
}
bool isFull(){return _q.size()==_capacity;}
bool isEmpty(){return _q.size()==0;}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;//阻塞队列
int _capacity;//阻塞队列容量
pthread_mutex_t _mutex;//锁
pthread_cond_t _p_cond;//生产者条件变量
pthread_cond_t _c_cond;//消费者条件变量
int _consumer_water_line;//消费者水位线
int _productor_water_line;//生产者水位线
};
上述代码是一个阻塞队列类,里面封装了一个stl容器中的一个队列,还有队列容量,以及锁,生产者条件变量,消费者条件变量,消费者水位线,生产者水位线这些属性
Main.cc
#include"BlockQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<unistd.h>
void * consumer(void * args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
//1.拿数据 bq->pop(&data)
int data = 0;
bq->Pop(&data);
//2.处理数据
std::cout<<"consumer data: "<<data<<std::endl;
//消费者没有sleep
}
return nullptr;
}
void * productor(void * args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
//1.有数据
int data = rand()%10+1;
//2.生产数据
bq->Push(data);
std::cout<<"productor data: "<<data<<std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());//只是为了生成更随机的数据
BlockQueue<int>* bq = new BlockQueue<int>();
pthread_t c,p;
pthread_create(&p,nullptr,productor,bq);
pthread_create(&c,nullptr,consumer,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
运行结果:
如果不通过达到消费者水位线再唤醒消费者线程来拿数据的话现象如下:
达到的效果是生产一个消费一个,生产一个,消费一个。
如果通过使用达到消费者水位线再唤醒消费者线程来拿数据的话现象如下:
也就是生产了4个数据的时候才唤醒消费者线程进行拿数据,拿完之后再唤醒生产者线程生产数据,所以就有了以上结果。
5.1关于pthread_cond_wait的进一步理解
我们经过前面的分析已经知道了,当加锁之后的线程执行到pthread_cond_wait这个接口的时候会释放锁,等到被唤醒的时候会进行重新申请锁,虽然我们上述的代码不存在这样一个问题,就是我们这段代码是只唤醒一个线程,但是如果当生产者生产了一个数据放到阻塞队列中时,假设有3个消费者线程被唤醒了然后去申请锁,那么这3个消费者线程就存在一种竞争关系,那么这3个消费者线程只有一个线程可以申请锁成功,也就是只有一个线程申请锁之后条件满足再继续向后执行,但是数据只有一个,只有一个消费者线程可以满足要求,这样就会有其他的2个线程被阻塞在申请锁那个阻塞队列里了,而不是当前这段条件不满足的阻塞队列里,所以这个接口就会引起一个伪唤醒的状态:对应的条件并不满足,但是线程却被唤醒了。
如果要解决这个伪唤醒状态的问题,我们只需要将上述Pop和Push两个成员函数的if判断改为while就行了:
5.2将之前封装的lockguard引入到基于BlockingQueue的生产者消费者模型中
LockGuard.hpp
#pragma once
#include<pthread.h>
//不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
Mutex(pthread_mutex_t * lock):_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex(){}
private:
pthread_mutex_t * _lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t * lock):_mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
修改之后:
BlockQueue.hpp
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
#include"LockGuard.hpp"
const int Defaultcapacity = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(int capacity=Defaultcapacity,int consumer_water_line=Defaultcapacity/5*3
,int productor_water_line=Defaultcapacity/5*3)
:_capacity(capacity)
,_consumer_water_line(consumer_water_line)
,_productor_water_line(productor_water_line)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_p_cond,nullptr);
pthread_cond_init(&_c_cond,nullptr);
}
void Push(const T& in)
{
LockGuard lockguard(&_mutex);
//pthread_mutex_lock(&_mutex);
while(isFull())
{
//阻塞等待
pthread_cond_wait(&_p_cond,&_mutex);
}
_q.push(in);
// if(_q.size()>_productor_water_line)pthread_cond_signal(&_c_cond);
pthread_cond_signal(&_c_cond);
//pthread_mutex_unlock(&_mutex);
}
void Pop(T * out)
{
LockGuard lockguard(&_mutex);
//pthread_mutex_lock(&_mutex);
while(isEmpty())
{
//阻塞等待
pthread_cond_wait(&_c_cond,&_mutex);
}
*out = _q.front();
_q.pop();
// if(_q.size()<_consumer_water_line)pthread_cond_signal(&_p_cond);
pthread_cond_signal(&_p_cond);
//pthread_mutex_unlock(&_mutex);
}
bool isFull(){return _q.size()==_capacity;}
bool isEmpty(){return _q.size()==0;}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond;//生产者条件变量
pthread_cond_t _c_cond;//消费者条件变量
int _consumer_water_line;//消费者水位线
int _productor_water_line;//生产者水位线
};
修改后的运行结果:
5.3重新理解生产者消费者模型(代码+理论)
Main.cc
#include"BlockQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<unistd.h>
#include<string>
#include"task.hpp"
void * consumer(void * args)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
sleep(1);
//1.拿数据 bq->pop(&data)
Task t;
bq->Pop(&t);
//2.处理数据
t.Run();
std::cout<<"consumer Result: "<<t.PrintResult()<<std::endl;
//消费者没有sleep
}
return nullptr;
}
void * productor(void * args)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
//1.有数据
int data1 = rand()%10;
usleep(rand()%123);
int data2 = rand()%10;
usleep(rand()%123);
char oper = opers[rand() % opers.size()];
//2.生产数据
Task t(data1,data2,oper);
bq->Push(t);
std::string task_string = t.PrintTask();
std::cout<<"productor task: "<<task_string<<std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());//只是为了生成更随机的数据
BlockQueue<Task>* bq = new BlockQueue<Task>();
pthread_t c,p;
pthread_create(&p,nullptr,productor,bq);
pthread_create(&c,nullptr,consumer,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
task.hpp
#pragma once
const int defaultvalue = 0;
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
const std::string opers = "+-*/%&()";
class Task
{
public:
Task(){}
Task(int x, int y, char op, int result=defaultvalue, int code=ok)
: _data_x(x), _data_y(y), _oper(op)
{
}
std::string PrintTask()
{
std::string s;
s= std::to_string(_data_x);
s+=_oper;
s+=std::to_string(_data_y);
s+="=?";
return s;
}
std::string PrintResult()
{
std::string s;
s= std::to_string(_data_x);
s+=_oper;
s+=std::to_string(_data_y);
s+="=";
s+=std::to_string(_result);
s+=" [";
s+=std::to_string(_code);
s+="]";
return s;
}
void Run()
{
switch (_oper)
{
case '+':
_result = _data_x + _data_y;
break;
case '-':
_result = _data_x - _data_y;
break;
case '*':
_result = _data_x * _data_y;
break;
case '/':
{
if (_data_y == 0)
{
_code = div_zero;
}
else _result = _data_x / _data_y;
}
break;
case '%':
{
if (_data_y == 0)
{
_code = mod_zero;
}
else _result = _data_x % _data_y;
}
break;
default:
_code = unknow;
break;
}
}
~Task() {}
private:
int _data_x; // 操作数
int _data_y; // 操作数
char _oper; // 运算符
int _result; // 结果
int _code; // 结果码 0:可信 !0:不可信
};
在上述代码的基础上将Main.cc修改成如上代码,同时编写task.hpp代码,运行结果:
重新理解生产者消费者模型:生产者消费者模型里面交换的基本数据也可以是类对象。
5.4代码整体改成多生产多消费
只需在以上代码的基础上改动Main.cc的代码即可:
Main.cc
#include"BlockQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<unistd.h>
#include<string>
#include"task.hpp"
class ThreadData
{
public:
ThreadData(const std::string & name,BlockQueue<Task>* bq)
: threadname(name)
,_bq(bq)
{}
BlockQueue<Task> * getBQ()
{
return this->_bq;
}
void setBQ(BlockQueue<Task>* bq)
{
this->_bq = bq;
}
std::string getName()
{
return this->threadname;
}
void setName(std::string name)
{
this->threadname = name;
}
private:
std::string threadname;
BlockQueue<Task>* _bq;
};
void * consumer(void * args)
{
ThreadData* td = static_cast<ThreadData*>(args);
while(true)
{
sleep(1);
//1.拿数据 bq->pop(&data)
Task t;
td->getBQ()->Pop(&t);
//2.处理数据
t.Run();
std::cout<<"consumer Result: "<<t.PrintResult()<<" threadname: "<<td->getName()<<std::endl;
//消费者没有sleep
}
return nullptr;
}
void * productor(void * args)
{
ThreadData* td = static_cast<ThreadData*>(args);
while(true)
{
//1.有数据
int data1 = rand()%10;
usleep(rand()%123);
int data2 = rand()%10;
usleep(rand()%123);
char oper = opers[rand() % opers.size()];
//2.生产数据
Task t(data1,data2,oper);
td->getBQ()->Push(t);
std::string task_string = t.PrintTask();
std::cout<<"productor task: "<<task_string<<" threadname: "<<td->getName()<<std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self());//只是为了生成更随机的数据
BlockQueue<Task>* bq = new BlockQueue<Task>();
pthread_t c[3],p[2];
ThreadData td1("consumer-1",bq),td2("consumer-2",bq),td3("consumer-3",bq),td4("productor-1",bq),td5("productor-2",bq);
pthread_create(&p[0],nullptr,productor,&td4);
pthread_create(&p[1],nullptr,productor,&td4);
pthread_create(&c[0],nullptr,consumer,&td1);
pthread_create(&c[1],nullptr,consumer,&td2);
pthread_create(&c[2],nullptr,consumer,&td3);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(c[2],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
return 0;
}
运行结果:
这样就是多生产多消费了。