Gitee仓库:阻塞队列、环形队列
文章目录
- 1. 死锁
- 1. 1 死锁概念
- 1.2 死锁的四个必要条件
- 1.3 解决死锁
- 2. 线程同步
- 3. 条件变量
- 3.1 举个例子
- 3.2 接口
- 4. 生产消费者模型
- 4.1 什么是生产消费者模型
- 4.2 基于阻塞队列的生产消费模型
- 4.3 环形队列(信号量)
1. 死锁
1. 1 死锁概念
多线程里因为锁的问题导致代码都不往后执行了,这就是死锁。
一组线程互相竞争资源,导致“僵持”
1.2 死锁的四个必要条件
- 互斥条件:一个资源只能被一个执行流使用(有锁,前提条件)
- 请求与保持条件:一个执行流因请求资源而阻塞时,对自己持有的资源还保持不放(原则性)
- 不剥夺条件:一个执行流已获得的资源,未使用完之前,不能强行剥夺(原则性)
- 循环等待条件:若干执行流之间形成头尾相接的循环等待资源的关系
1.3 解决死锁
上面这4个条件,不同时满足:
- 请求与不保持,当申请不到对方资源的时候,释放自己的资源,
pthread_mutex_trylock()
- 直接剥夺对方资源(将对方的锁释放)
- 避免形成环路,我们可以采用顺序申请的方式
- 加锁顺序一致
- 用完锁及时释放
- 资源一次性分配
互斥条件一般都不会破坏,因为多线程大部分还是需要加锁的
2. 线程同步
一个线程频繁申请锁、释放锁,只有它一个执行流能进入,这样就会导致其他线程产生饥饿问题,这就是互斥。
然后我们可以规定,一个线程在使用完这个资源之后,不允许立马再次申请使用,必须要重新排队,这样就能在保证数据安全的情况下,让线程按照某种特定顺序访问,避免饥饿问题,这就叫同步。
既然所有线程都去排队了,那为什么还需要锁呢?
这是因为当线程进来之后,并不会立刻去排队,而是去直接申请资源!
3. 条件变量
3.1 举个例子
上图,假设我们的资源在一个框框里面,框边有一把锁,当一个执行流访问完这个资源之后,敲一下铃铛,以表示自己访问完,释放锁,然后等待队列就会唤醒下一个排队的线程来拿锁。
这里的铃铛和队列,就是条件变量
也就是说,条件变量要存在一个通知机制,去唤醒排队的线程;然后还要有一个队列,让线程去等待队列里面
不止有一个,会有多个,告诉线程去哪个队列排队,响的是哪个铃铛等,所以先描述,再组织
这里凡是去条件变量里面排队的,都是先去申请过资源的,申请失败了,才会来条件变量里面排队,所以条件变量一定是要配合互斥锁使用的。
3.2 接口
初始化:
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
这里的接口和互斥锁的接口差不多
-
pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr)
:cond
要初始化的条件变量attr
设置为nullptr
进入等待:
#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
线程申请锁失败时,是知道自己申请失败的,然后将自己投入到等待队列当中
-
pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex)
:cond
:在这个条件变量上等待mutex
:互斥量
pthread_cond_wait
让线程等待的时候,会自动释放锁
唤醒等待:
#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
“铃铛”响之后,要唤醒等待的线程
pthread_cond_broadcast(pthread_cond_t *cond)
:唤醒所有等待的线程pthread_cond_signal(pthread_cond_t *cond)
:唤醒单个线程
class threadData
{
public:
threadData(int num, pthread_mutex_t *lock, pthread_cond_t *cond)
{
_name = "thread " + to_string(num);
_lock = lock;
_cond = cond;
}
public:
string _name;
pthread_mutex_t *_lock;
pthread_cond_t *_cond;
};
int cnt = 0;
void *Count(void *args)
{
threadData *td = static_cast<threadData*>(args);
while(true)
{
pthread_mutex_lock(td->_lock);
//线程休眠,临界资源不就绪,临界资源也有状态
//需要判断是否就绪-> 判断 -> 访问临界资源,必须在加锁之后
pthread_cond_wait(td->_cond, td->_lock); //让线程等待的时候,会自动释放锁
cout << td->_name << " cnt " << cnt++ << endl;
//sleep(1);
pthread_mutex_unlock(td->_lock);
usleep(10);
}
}
int main()
{
vector<pthread_t> tids;
vector<threadData*> thread_datas;
pthread_mutex_t lock;
pthread_mutex_init(&lock, nullptr);
pthread_cond_t cond;
pthread_cond_init(&cond, nullptr);
for(int i = 0; i < 5; i++)
{
pthread_t tid;
threadData *td = new threadData(i, &lock, &cond);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, Count, thread_datas[i]);
tids.push_back(tid);
}
cout << "main thread call" << endl;
while(true)
{
sleep(1);
//pthread_cond_signal(&cond); //唤醒等待的一个线程,默认都是第一个
//cout << "signal..." << endl;
pthread_cond_broadcast(&cond); //唤醒所有线程
cout << "broadcast..." << endl;
}
for(int i = 0; i < 5; i++)
{
pthread_join(tids[i], nullptr);
}
for(auto e : tids) pthread_join(e, nullptr);
for(auto e : thread_datas) delete e;
pthread_mutex_destroy(&lock);
return 0;
}
4. 生产消费者模型
4.1 什么是生产消费者模型
在现实生活中,厂家生产货品,运到超市,我们就可以在超市去买这些商品,这就是最简单的生产消费者例子。
厂家就代表着生产者,我们顾客就代表着消费者,这个超市就是一个大号的缓存。
生产者可以将生产的货物,直接放到超市,然后消费者有空的时候就可以去逛超市买物品,这就解决了忙闲不均的问题,将生产和消费行为进行一定程度的解耦。
这个过程中会产生并发问题:321
三种关系
- 生产者 vs 生产者:互斥
消费者 vs 消费者:互斥
生产者 vs 消费者:互斥、同步
两种角色
- 生产者、消费者
一个交易场所
- 特定内存空间
4.2 基于阻塞队列的生产消费模型
- 队列为空,生产者可以生产数据,而消费者不可消费数据(阻塞)
- 队列为满,生产者不可生产数据(阻塞),消费者可以消费数据
- 队列不空不满,可同时生产消费
生产者生产数据,从哪来?
生产者的数据一般都是从用户或者网络进行获取,所以生产者获取的数据,也是需要花时间的,也就是数据准备工作。
而消费者,不仅仅是拿起数据,还可能会对数据进行加工。
如何看待生产消费的高效性?
当消费者在拿数据的时候,生产者虽然可能无法生产数据,但是它可以去获取数据,也就是准备数据;
而生产者在生产数据时,消费者可能也需要处理数据。
所以生产者和消费者的高效性并不是体现在它们共同访问临界区,而是在它们的非临界区(准备数据、加工数据)是可以并发的。
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
template<class T>
class BlockingQueue
{
static const int defaultNum = 10;
static string toHex(pthread_t tid)
{
char hex[64];
snprintf(hex, sizeof(hex), "%p", tid);
return hex;
}
public:
BlockingQueue(int maxCapacity = defaultNum)
:_maxCapacity(maxCapacity)
{
//初始化锁与条件变量
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_c_cond, nullptr);
pthread_cond_init(&_p_cond, nullptr);
_low_water = _maxCapacity / 3;
_high_water = (_maxCapacity * 2) / 3;
}
T pop() //消费
{
pthread_mutex_lock(&_mutex);
//while防止伪唤醒
while(_q.size() == 0) //没有数据
{
pthread_cond_wait(&_c_cond, &_mutex);
}
T out = _q.front();
_q.pop();
out();
cout << "Consumer handle task: " << out.GetTask() << " ---> " << out.GetResult() << " thread id: " << toHex(pthread_self()) << endl;
//消费数据之后必然有空间可以生产,唤醒生产者
if(_q.size() < _low_water) pthread_cond_signal(&_p_cond);
//pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
return out;
}
void push(const T &in) //生产
{
pthread_mutex_lock(&_mutex);
//防止伪唤醒
while(_q.size() == _maxCapacity)
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
cout << "Productor a task: " << in.GetTask() << " thread id: " << toHex(pthread_self()) << endl;
//生产之后必然可以消费,唤醒消费者
if(_q.size() > _high_water) pthread_cond_signal(&_c_cond);
//pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockingQueue()
{
//销毁
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_c_cond);
pthread_cond_destroy(&_p_cond);
}
public:
pthread_mutex_t _mutex;
private:
queue<T> _q; //共享资源
int _maxCapacity; //队列中极值,最大容量
pthread_cond_t _c_cond;
pthread_cond_t _p_cond;
int _low_water; //"低水位线"
int _high_water; //"高水位线"
};
如果采用
pthread_cond_broadcast
,将队列的线程全部唤醒,这样有序性就乱了,虽然只有一个线程会拿到锁,但是这个线程执行完毕之后还是会释放锁,但这些其他的线程,此时并不在条件变量下等待了,拿到锁之后,那么就会出现“伪唤醒”的情况。简单理解这里的防止伪唤醒:
pthread_cond_wait()
是一个函数,函数可能回调用失败,如果失败了,那它就会直接去生产,当队列为满时,还是会继续生产,所以这里采用while
防止伪唤醒的情况
4.3 环形队列(信号量)
信号量接口:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value); //初始化
int sem_destroy(sem_t *); //销毁
int sem_wait(sem_t *); //申请信号量 P操作
int sem_post(sem_t *); //释放信号量 V操作
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
using namespace std;
const static int defaultcap = 4;
template<class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap),
capacity_(cap),
c_step_(0),
p_step_(0)
{
sem_init(&c_data_sem_, 0, 0); //第二个参数0:表示共享,第三个0:没有初始数据
sem_init(&p_space_sem_, 0, cap); //初始空间为充足
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void Push(const T &in) //生产
{
//1. 申请信号量资源 p操作(本身就是原子的,不需要保护)
P(p_space_sem_);
//下面三行代码需要原子操作
Lock(p_mutex_); //先申请信号量,再申请锁,线程可以并行操作,提供并发度(看电影先买票,预定位置,快开始时凭门票进入)
//2. 生产
ringqueue_[p_step_] = in;
//位置后移,保持环形特征
p_step_++;
p_step_ %= capacity_;
Unlock(p_mutex_);
//3. 数据资源+1 v操作(本身就是原子的)
V(c_data_sem_);
}
//输出型参数 *out
void Pop(T *out) //消费
{
//和生产思路一样
P(c_data_sem_);
Lock(c_mutex_);
* out = ringqueue_[c_step_];
c_step_++;
c_step_ %= capacity_;
Unlock(c_mutex_);
V(p_space_sem_);
}
~RingQueue()
{
sem_destroy(&c_data_sem_);
sem_destroy(&p_space_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
vector<T> ringqueue_;
int capacity_;
int c_step_; //消费者下标
int p_step_; //生产者下标
//信号量解决生产消费之间的互斥关系
sem_t c_data_sem_; //消费者数据资源
sem_t p_space_sem_; //生产者空间资源
//锁解决生产者与生产者 消费者与消费者之间的互斥关系
//多生产 多消费情况
pthread_mutex_t c_mutex_;
pthread_mutex_t p_mutex_;
};
当下标相同时(队列为空或者满),只有有一个去执行(生产者去生产 or 消费者去消费),这就表现出了互斥性;当下标不同的时候,生产消费同时进行,这就表现出了并发性。