目录
前言:
1 线程同步引入
2 条件变量
2.1 线程饥饿
2.2 条件变量接口
2.3 添加条件变量
3 生产者和消费者模型
前言:
本篇主要讲解了关于线程同步的相关知识,还有生产者和消费者模型的认识和使用。
1 线程同步引入
在讲解线程同步之前,我们先来看一下当一个程序之中只有线程互斥时会有什么样的问题,这份代码是我上一篇买票程序的改写。
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<string>
using namespace std;
int Ticket = 1000;
#define NUM 5
//初始化锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//买票线程执行
void* buyTicket(void* args)
{
string name = static_cast<const char*>(args);
while(1)
{
pthread_mutex_lock(&mutex);
if(Ticket > 0)
{
usleep(1000);
--Ticket;
cout << name << "购买了一张票,还剩下:" << Ticket<< endl;
pthread_mutex_unlock(&mutex);
}
else
{
pthread_mutex_unlock(&mutex);
}
usleep(10);
}
return nullptr;
}
//放票线程执行
void* putTicket(void* args)
{
string name = static_cast<const char*>(args);
while(1)
{
//每十秒钟放1000张票
sleep(10);
pthread_mutex_lock(&mutex);
Ticket+=1000;
pthread_mutex_unlock(&mutex);
}
return 0;
}
int main()
{
//创建买票线程
pthread_t tids[NUM];
for(int i = 0; i < NUM; ++i)
{
char* name = new char[64];
snprintf(name,64,"%s-%d","thread",i+1);
int n = pthread_create(tids+i,nullptr,buyTicket,name);
if(n != 0)
{
cout << "create thread fail" << endl;
}
}
//创建放票线程
pthread_t creT;
char* name = new char[64];
snprintf(name,64,"%s", "放票线程");
int n = pthread_create(&creT,nullptr,putTicket,name);
if(n != 0)
{
cout << "create thread fail" << endl;
}
//等待
for(int i = 0; i < NUM; ++i)
{
n = pthread_join(tids[i],nullptr);
if(n != 0)
{
cout << "join thread fail" << endl;
}
}
n = pthread_join(creT,nullptr);
if(n != 0)
{
cout << "join thread fail" << endl;
}
return 0;
}
上面的程序当中,我创建了5个线程用于执行买票这个动作,当票买完之后并不退出继续判断,然后每隔十秒钟,我们的放票线程会放出1000张票,然后买票线程继续买票,整个程序的临界资源通过锁来保护。这是上面一段代码的运行逻辑。
输出为以下结果:
从结果上来看,我们的程序是正确运行了,但是大家有没有想过一个问题,既然我们的票都已经没有了,我们的线程还一直在哪里进行无意义的死循环,不断地加锁,解锁,访问资源,占用CPU的运行是不是不太好啊?难道就没有一种方式能够让我们的程序只有在有票的时候才去拿,没票的时候就直接等待着吗?
答案是,当然有,不然你以为我们的线程同步是干嘛的,就是为了防止这种无价值的执行逻辑占用我们的CPU资源,所以也就引出了我们的条件变量这一概念。
2 条件变量
2.1 线程饥饿
在讲条件变量之前,我得给大伙补充一个知识,那就是线程饥饿问题。线程饥饿从概念上理解就是一个线程处于长时间等待资源,但是申请不到的状态。可以简单理解为另类的“死锁”,但是不是死锁哈。
什么意思呢?注意到我们的买票不管成功与否,是不是我们都进行了usleep(10)这一句代码呢?我添加他的意义是什么呢?
假设我们的临界区是一个自习室,而正在执行的线程就是我,这个自习室只能有一个人在里面学习,钥匙也只有一把,不能强夺,这是前提条件。
今天,我凌晨4点就跑到了自习室里面去了,由于规则的限制,后来的人只能在外面等我出去,然后把钥匙拿出来。这符合锁的逻辑。然后呢,我学着学着饿了,就学不进去,想去干饭,刚一出去,把锁放下,其他人正等着拿这把钥匙,我转念一想“不行,我出去了岂不是说我的自习室没了?”,因为我距离这把钥匙最近,所以我又把他那回到了手里,从回自习室,其它的人只能再继续等待。回去1分钟之后,我又遭不住了,又出去,如此循环操作,整个上午,我任何事情也没有做,只是在这里把钥匙拿起来,放回去,非常的欠揍啊。
上面我的这个行为肯定是有问题的,但是我做错了什么吗?是不是你说的自习室只能有一个人在里面?是不是你说的想要进入必须拿到教室?是不是你说的不能抢钥匙?我只是啥也没干,就玩,你能说我做错了什么吗?不能,这个例子不就是我们买票程序不加usleep(10)的运行结果吗?
所以基于这样一个问题,我们能咋做呢?只能添加规则,出自习室之后,放回钥匙,想要重新进入这个自习室必须到最后去排队,之前在自习室外等待的人也必须排队。这样就能方式饥饿问题的产生。那么如何做到的呢?由我们的条件变量来控制。
2.2 条件变量接口
初始化方法:和锁差不多
等待条件成立:需要配合锁使用,单独用不起作用
根据线程执行这一段代码的顺序,会根据这个顺序对线程排序等待,并且这个函数能够自动释放锁,并且等待唤醒函数。
唤醒线程:signal唤醒一个,broadcast唤醒所有
这一点博主会在后续为大家展示出区别。
2.3 添加条件变量
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<string>
using namespace std;
int Ticket = 1000;
#define NUM 5
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void* buyTicket(void* args)
{
string name = static_cast<const char*>(args);
while(1)
{
pthread_mutex_lock(&mutex);
while(Ticket <= 0)
{
pthread_cond_wait(&cond, &mutex);
}
if(Ticket > 0)
{
usleep(1000);
--Ticket;
cout << name << "购买了一张票,还剩下:" << Ticket<< endl;
pthread_mutex_unlock(&mutex);
}
else
{
pthread_mutex_unlock(&mutex);
}
usleep(10);
}
return nullptr;
}
void* putTicket(void* args)
{
string name = static_cast<const char*>(args);
while(1)
{
//每十秒钟放1000张票
sleep(10);
pthread_mutex_lock(&mutex);
Ticket+=1000;
cout << name << "放出了1000张票" << endl;
//pthread_cond_broadcast(&cond);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
return 0;
}
int main()
{
//创建买票线程
pthread_t tids[NUM];
for(int i = 0; i < NUM; ++i)
{
char* name = new char[64];
snprintf(name,64,"%s-%d","thread",i+1);
int n = pthread_create(tids+i,nullptr,buyTicket,name);
if(n != 0)
{
cout << "create thread fail" << endl;
}
}
//创建放票线程
pthread_t creT;
char* name = new char[64];
snprintf(name,64,"%s", "放票线程");
int n = pthread_create(&creT,nullptr,putTicket,name);
if(n != 0)
{
cout << "create thread fail" << endl;
}
//等待
for(int i = 0; i < NUM; ++i)
{
n = pthread_join(tids[i],nullptr);
if(n != 0)
{
cout << "join thread fail" << endl;
}
}
n = pthread_join(creT,nullptr);
if(n != 0)
{
cout << "join thread fail" << endl;
}
return 0;
}
pthread_mutex_lock(&mutex);
while(Ticket <= 0)
{
pthread_cond_wait(&cond, &mutex);
}
……运行代码
注意我是如何添加条件变量等待的?我通过直接放在了锁的下面,然后放在了运行代码的上面,这是为了干什么?因为条件变量就像是一个先决条件一样,他成功了才能允许代码向后执行,而不是代码已经执行完了再来判断这个行为是否需要被处理。
broadcast:
pthread_cond_broadcast(&cond);
运行结果:
signal:
pthread_cond_signal(&cond);
运行结果:
出现上述两种结果上的差异的原因是由于signal每一次只唤醒一个线程,而我们等待的线程并不只是一个,所以才会出现原来有很多的线程,但是放票之后只有一个线程在跑的情况。
而broadcast则可以将所有的线程按照循序全部唤醒,注意这个过程一定不是同时的,因为要保证锁的唯一性,也只能是等待释放才会继续唤醒下一个。
3 生产者和消费者模型
对于生产者和消费者模型其实原理上是非常容易理解的,我们将其转换成为一个现实中的具象物,我们就是消费者,商品厂家就是生产者。也就是厂家生产我们消费,很简单。
但是大家有没有想到,我们现实当中买一个东西是直接跑到人家厂家哪里去的吗?并不是欸,我们的方式是跑到超市去买,而厂家会直接卖一根火腿肠给我们吗?也不是,他会将大量的货给超市。所以说生产者和消费者模型当中,必然涉及到了一个场所的存在。
第二个问题,某一天我想要去买一根火腿肠,但是超市里面已经没有了,怎么办呢?没办法,没有就是没有,只能回去,但是我又想吃,然后我又跑到超市去了,超市还是没有,这个行为我连续做了1000次,还是没有。这个时候超市的店员就有问题了,这个时候他应该给我一个联系方式,等到有火腿肠的时候再给我打电话让我来买,而不是我一直跑过来问;其次他还应该给厂家通知需要进货了,而不是让场所一直没有商品,也就是需要保证消费者和生产者的同步关系。
第三个问题,超市里面有货了,但是只有一根火腿肠,我和张三都想要这跟火腿肠,但是只有一根,我们打了一架,不分高低,最后对超市造成了不好的影响。这个时候超市就无语了,只好定下一个新的规定,谁先进店谁能优先购买,快一点点也算。这表示了消费者之间需要保持互斥关系。
第四个问题,此时商家打了电话给商家上货之后,同时来了两家商家上货,它们谁也不让谁,非要放到同一个货架里面,放了自己的还把别人的丢了,因此两家争吵不止,于是超市也定下了,谁先到超市谁有优先权上货。这表示了生产者和生产者之间需要保持互斥关系。
第五个问题,一个商家正在上货,我正好要的就是这个产品,然后我就去拿了一些下来,商家不乐意了,我还没结账呢,你这拿了算谁的,然后它反手就给我抢了回来,我服它吗?我不服,所以我又抢回来了,又开始争起来了,没办法,商家只能规定,在上货的时候不能买,在买货的时候不能上货。也就是消费者和生产者要保持一个互斥的关系。
总结起来就是,生产者和消费者模型一共需要一个场所(交易场所),两个角色(生产者和消费者),三种关系(生产者和生产者的互斥关系,消费者和消费者的互斥关系,生产者和消费者的同步与互斥关系)。
我还听说过生产者和消费者模型有高效性哇,这是在哪里体现出来的?维护生产者和消费者各自的关系不是会让多线程并发执行变为单执行流执行吗?这个不是与高效性正好相悖吗?
我的回答是,提出这个问题的伙伴眼界放窄了,我们要知道并不是说每时每刻都是所有线程在消费,每时每刻都是所有线程在生产,它们有的是正在处理拿到的数据,有的正在拿,有的正在放,有的正在获取数据来源。也就是保证我们的整个进程的所有执行流分别做着自己的事,因为有交易场所的存在,所以说它们会有条不紊的执行自己的事情。这才是高效的真正体现。
#include <iostream>
#include <pthread.h>
#include "mythread.hpp"
#include<unistd.h>
using namespace std;
// 实现一个生产者和消费者模型
// 消费者执行动作
void *comsumer(void *args)
{
// 类型转换
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
sleep(1);
int data = 0;
bq->pop(&data);
cout << "消费者拿到一个数据:"<<pthread_self() << " : " << data << endl;
}
}
// 生产者执行动作
void *productor(void *args)
{
// 类型转换
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while(true)
{
int data = rand() % 20;
bq->push(data);
cout << "生产者放入一个数据:" << data << endl;
}
}
int main()
{
pthread_t c[2], p[3];
BlockQueue<int> *bq = new BlockQueue<int>();
pthread_create(&c[0], nullptr, comsumer, bq);
pthread_create(&c[1], nullptr, comsumer, bq);
pthread_create(&p[0], nullptr, productor, bq);
pthread_create(&p[1], nullptr, productor, bq);
pthread_create(&p[2], nullptr, productor, bq);
// 保持生产者和消费者的执行
while (true)
{
;
}
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
return 0;
}
#pragma once
#include<iostream>
#include<pthread.h>
#include<queue>
using namespace std;
#define CAP 5
//无法判断我们需要的类型是什么,添加模板参数
template<class T>
class BlockQueue
{
public:
//初始化容量和锁以及条件变量
BlockQueue() :_cap(CAP)
{
//初始化锁和条件变量
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_comsumerCond,nullptr);
pthread_cond_init(&_productorCond,nullptr);
}
bool isFull()
{
return _cap == _q.size();
}
bool isEmpty()
{
return _q.empty();
}
//添加
void push(const T& in)
{
//添加数据的时候需要互斥访问
pthread_mutex_lock(&_mutex);
//因为有容量的限制,那么如果在数据已经满了的情况下,不能再添加数据了,只能等待
while(isFull())
{
//等待的是自己的条件变量
pthread_cond_wait(&_productorCond,&_mutex);
//等待完成之后这个锁会重新回归这个线程,这部分工作由wait这个接口自己完成
//为了防止多线程同时被唤醒的操作,所以上面的条件判断需要通过循环来二次规避
}
_q.push(in);
//添加完成数据之后,可以去唤醒消费者线程消费了,因为生产者一定会放一个数据进入
pthread_cond_signal(&_comsumerCond);
//使用完成之后需要释放锁
pthread_mutex_unlock(&_mutex);
}
//删除
void pop(T* out)
{
//同理,对于拿数据也需要对临界资源做出相应的保护措施
pthread_mutex_lock(&_mutex);
//如果数据已经为空了,消费者应该处于一个等待状态
while(isEmpty())
{
pthread_cond_wait(&_comsumerCond,&_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_productorCond);
pthread_mutex_unlock(&_mutex);
}
//释放条件变量和锁
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_comsumerCond);
pthread_cond_destroy(&_productorCond);
}
private:
//共享队列,容量,锁,各自的条件变量
queue<T> _q;
const int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _comsumerCond;
pthread_cond_t _productorCond;
};
以上就是我对线程同步和生产者消费者模型的全部理解了,希望能够帮助到大家。