线程同步
1. 通过条件变量抛出线程同步概念
在上一篇线程互斥博客就说过,在抢票逻辑中,刚释放完锁的线程由于竞争能力比较强,导致其他线程无法申请到锁,那么长时间其他线程都无法申请到锁,只能阻塞等待着,这样的线程处于饥饿状态!
我们可以举一个例子来理解条件变量是如何实现线程同步的:
假设现在学校开了一间学霸vip自习室,学校规定这间自习室一次只能进去一个人上自习,自习室门口挂着一把钥匙,谁来的早先拿到这把钥匙,就可以打开门进入自习室学习,并且进入自习室之后,把门一反锁,其他人谁都不能进来。然后你第二天准备去学习了,卷的不行,直接凌晨三点就跑过来,拿着钥匙进入自习室上自习了,然后卷了3小时之后,你想出来上个厕所,一打开门发现外面站的一堆人,都在叽叽喳喳的讨论谁先来的,怎么来的这么早?这么卷?然后你怕自己等会儿把钥匙放到墙上之后,上完厕所回来之后有人拿着钥匙进入了自习室,你就又卷不了了,所以你把钥匙揣兜里,拿着钥匙去上厕所了,其他人当然进入不了自习室,因为你拿着钥匙去上厕所了。等你回来的时候,你又打开门,又来里面上了3小时自习,你感觉自己饿的不行了,在不吃饭就饿死在里面了,所以你打开门,准备出去吃饭了,然后突然你自己感觉负罪感直接拉满,我凌晨3点好不容易抢到自习室,现在离开是不太亏了,所以你又打开自习室回去上自习去了,别人当然竞争不过你呀!因为钥匙一直都在你兜里,你出来之后把钥匙放到墙上,你发现有点负罪感,你又拿起来钥匙回去上自习,因为你离钥匙最近,所以你的竞争能力最强。结果你来自习室上了1分钟自习又出来了,然后又负罪的不行,又回去了,周而复始的这么干,结果别人连自习室长啥样都没见到。
像这样由于长时间无法得到锁的线程,没办法进入临界区访问临界资源,我们称这样的线程处于饥饿状态!
所以学校推出了新政策,所有刚刚从自习室出来的人,都必须回到队列的尾部重新排队等待进入自习室,这样的话,其他人也就可以拿到钥匙进入自习室了。
所以,在保证数据安全的前提下,让线程能够按照某种特定的顺序来访问临界资源,从而有效避免其他线程的饥饿问题,这就叫做线程同步!
2. 生产消费模型概念
实际生活中,我们作为消费者,一般都会去超市这样的地方去购买产品,而不是去生产者那里购买产品,因为供货商一般不零售产品,他们都会统一将大量的商品供货到超市,然后我们消费者从超市这样的交易场所中购买产品。
而当我们在购买产品的时候,生产者在做什么呢?生产者可能正在生产商品呢,或者正在放假呢,也可能正在干着别的事情,所以生产和消费的过程互相并不怎么影响,这就实现了生产者和消费者之间的解耦。
而超市充当着一个什么样的角色呢?比如当放假期间,消费爆棚的季节中,来超市购买东西的人就会非常的多,所以就容易出现供不应求的情况,但超市一般也会有对策,因为超市的仓库中都会预先屯一批货,所以在消费爆棚的时间段内,超市也不用担心没有货卖的情况。而当工作期间,大家由于忙着通过劳动来换取报酬,可能来消费的人就会比较少,商品流量也会比较低,那此时供货商如果还是给超市供大量的货呢?虽然超市可能最近确实卖不出去东西,但是超市还是可以把供货商的商品先存储到仓库中,以备在消费爆棚的季节时,能够应对大量消费的场景。所以超市其实就是充当一个缓冲区的角色,在计算机中充当的就是数据缓冲区的角色。
而计算机中哪些场景是强耦合的呢?其实函数调用就是强耦合的一个场景,例如当main调用func的时候,func在执行代码的时候,main在做什么呢?main什么都做不了,他只能等待func调用完毕返回之后,main才能继续向后执行代码,所以我们称main和func之间就是一种强耦合的关系,而上面所说的生产者和消费者并不是一种强耦合的关系。
剖析生产消费模型,超市其实就是典型的共享资源,因为生产者和消费者都要访问超市,所以对于超市这个共享资源,他在被访问的时候,也是需要被保护起来的,而保护其实就是通过加锁来实现互斥式的访问共享资源,从而保证安全性。
在只有一份超市共享资源的情况下,生产和生产,消费和消费,以及生产和消费都需要进行串行的访问共享资源。但为了提高效率我们搞出了同步这样的关系,因为有可能消费者一直霸占着锁,一直在那里消费,但实际超市已经没有物资了,此时消费者由于竞争能力过强,也会造成不合理的问题,因为消费者消费过多之后,应该轮到生产者来生产了,所以对于生产者和消费者之间仅仅只有互斥关系是不够的,还需要有同步关系。
再解释一下为什么需要同步:1. 当超市的货物出现售空的情况,消费者线程不知道共享资源已经没有,但是它还是一直去执行申请锁的过程,所以就需要生产者通过条件变量告诉它,等到消费者收到这个通知之后再去消费 2. 不乏会出现某一个消费者线程竞争太强,其他消费者一直处于排队过程,所以为了防止饥饿线程出现,所以就需要实现线程间的同步
从生产消费模型中可以提取出来一个321原则。即为3种关系,两个角色,1个交易场所。对应的其实是消费线程和消费线程的关系,消费线程和生产线程的关系,生产线程和生产线程的关系,交易场所就是阻塞队列blockqueue。而实现线程同步就需要一个条件变量,比如生产者生产完之后,超市给消费者打个电话,让消费者过来消费,消费完之后,超市在给生产者打个电话,让生产者来生产,这样就不会存在由于某一个线程竞争能力过强,一直生产或一直消费的情况产生,从而导致其他线程饥饿的问题。正是由于生产和消费的这种互斥 同步关系,达到了两者解耦合的目的,提升了生产消费模型的效率。
3. 条件变量实现线程同步的原理(条件变量内部维护了线程的等待队列)
创建条件变量:
pthread_cond_t cond;
条件变量初始化和销毁
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr); //初始化一个局部的条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //初始化一个全局的条件变量
pthread_cond_init 参数解释
放入条件变量的等待队列:
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
第一个参数:创建的条件变量地址。
第二个参数:互斥锁的地址,这个必须有,后面本喵再讲解为什么必须传锁。
返回值:放入条件变量的等待队列成功返回0,失败返回错误码。
唤醒条件变量等待队列中的一个线程:
int pthread_cond_signal(pthread_cond_t *cond);
唤醒条件变量等待队列中的所有线程:
int pthread_cond_broadcast(pthread_cond_t *cond);
4. 条件变量基本代码
基于blockqueue的生产消费模型
由于要实现两个分别存放不同任务的阻塞队列,那我们直接就写出来一个阻塞队列的类模板,这样就可以存放任意类型的对象,所以下面我们先来完善BlockQueue.hpp文件的代码,也就是阻塞队列的类模板代码。
我们需要一把锁来保证阻塞队列这个共享资源访问的安全性,并且生产线程不满足生产条件时,比如阻塞队列已经满了,则生产线程此时就不应该继续生产,而是要去cond的队列中进行wait,直到消费线程唤醒生产线程,所以生产线程要有自己的produce cond,简称pcond。反过来对于消费者来说同样如此,所以消费者在不满足消费条件的时候,也要去自己的cond队列中进行wait,那么消费者也应该要有自己的consume cond,简称ccond。所以类BlockQueue的私有成员应该包括_mutex互斥锁,_ccond,_pcond两个条件变量,我们还需要一个变量来描述阻塞队列的容量大小,然后再加一个STL容器queue< T > _q;然后希望定义出来的所有阻塞队列的最大容量都是同一个的。
阻塞队列需要实现的接口主要为四部分,构造函数内需要初始化好互斥锁以及两个条件变量,因为阻塞队列所使用的锁和条件变量是局部的(对象本身就在函数栈帧中)条件变量和锁,那么就需要在构造函数内进行初始化,在析构函数内完成销毁。
除此之外,还需要实现push和pop两个接口,为了保证向队列中push元素的安全性,所以接口中要进行加锁和解锁,然后就是判断是否满足push的条件,如果队列已经满了,那就不要继续push,也就是不要继续生产了,而是去pcond的队列中进行wait,一旦wait执行流就会阻塞停下来,等待被唤醒,如果满足条件,那直接用STLqueue的push接口push元素即可,非常简单。push元素之后,我们就该唤醒消费线程了,因为现在队列中至少有一个元素,是可以供消费者消费的,所以直接调用pthread_cond_signal唤醒ccond的队列中的线程即可。最后就是释放锁的步骤。
对于pop来说,由于STLqueue的pop接口不会返回pop出来的元素,所以我们需要通过输出型参数的方式拿到pop出来的元素值。与push的实现逻辑一样,pop满足的条件是队列中元素必须不为空,如果为空,则需要去ccond的队列中进行等待,直到被生产线程唤醒。pop数据之后,队列中一定至少有一个空的位置,所以此时应该唤醒生产线程,让生产线程进行元素的push,最后还是不要忘记释放锁。
对于接口的实现,大致逻辑说的差不多了。但在代码中还有几个细节需要特别说明一下。我们知道pthread_cond_wait接口是放在临界区内部的,所以在执行wait代码之前线程是持有锁的,为了在线程等待期间,其他线程也能申请到锁并进入临界区,所以pthread_cond_wait被调用的时候,它会自动的以原子性的方式将锁释放,并将自己阻塞挂起到pcond的队列中。那么当队列中的某一个线程被唤醒的时候,他还是要从pthread_cond_wait开始向后执行,所以此时他还是在临界区内部,所以在pthread_cond_wait返回的时候,会自动重新申请锁,然后继续在临界区中向后执行代码。另外判断逻辑的语句必须是while,不能是if,因为在多生产多消费的情景下,可能出现伪唤醒的情况,比如broadcast唤醒所有生产线程,但实际空位置只有一个,所以此时在唤醒之后,某一个线程竞争到锁,放入元素之后,队列已经满了,然后他释放了锁,其他某一个线程在竞争到锁之后,如果是if逻辑,那就不会重新判断是否满足,而是直接push元素,那就会发生段错误越界访问,所以要用while循环来判断,保证唤醒的线程一定是在条件满足的情况下进行的push元素。至于唤醒对方和释放锁的顺序怎么样都可以,因为唤醒对方,对方没锁的话,还是需要阻塞等待锁被释放,而如果先释放锁的话,由于对方没有被唤醒,那照样还是拿不到锁,所以这两个接口的调用顺序并不影响接口的功能,所以先写谁都可以。
BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template<class T>
class BlockQueue
{
private:
std::queue<T> _q; //队列
int _capacity; // 队列容量
pthread_mutex_t _mutex; //这个阻塞队列是一个SharedResources 所以需要一个锁进行保护
pthread_cond_t _p_cond; //生产者 队列空的时候,让他去_c_cond下面等
pthread_cond_t _c_cond; //消费者 队列满的时候,让他去_p_cond下面等
public:
BlockQueue(int cap = 5)
:_capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr); //局部变量初始化接口,锁是成员变量
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
bool isFull()
{
return _capacity == _q.size();
}
bool isEmpty()
{
return _q.size() == 0;
}
void Push(const T& data) //生产者
{
//进入阻塞队列中,先保证共性资源是多个线程互斥访问的,所以先加锁
pthread_mutex_lock(&_mutex);
// if(isFull())
// {
// //如果,队列现在是满的,所以就需要将当前的生产者进行阻塞等待,在生产者条件变量下等待
// pthread_cond_wait(&_p_cond, &_mutex);
// }
while(isFull())
{
//如果,队列现在是满的,所以就需要将当前的生产者进行阻塞等待,在生产者条件变量下等待
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(data);
pthread_cond_signal(&_c_cond); //我们这里设置的是,生产者生产一个就通知消费者,唤醒等待
pthread_mutex_unlock(&_mutex);
}
void Pop(T* data) //消费者
{
//生产和消费本身就是互斥的,所以需要使用同一把锁保证SharedResoures的安全性
pthread_mutex_lock(&_mutex);
//这里使用if判断是有问题的:如果当前是多线程代码,当生产者生产一个数据的时候,生产者将所有的消费线程唤醒,此时这些消费线程就不在是在条件变量等待接口处等待了,就是在锁的申请地方等待
//此时一个消费者线程将队列中的一个数据拔走之后,其他消费线程也就是获得锁,此时就会出现front()报错
// if(isEmpty()) //如果队列为空,所以就需要让消费者在他的条件变量下去等待
// {
// pthread_cond_wait(&_c_cond, &_mutex);
// }
while(isEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*data = _q.front();
_q.pop();
pthread_cond_signal(&_p_cond); //我们这里设置的是,消费者消费一个就通知生产者,唤醒等待
pthread_mutex_unlock(&_mutex);
}
};
Main.cc
#include "BlockQueue.hpp"
#include <ctime>
#include<unistd.h>
#include "Task.hpp"
//生产者线程函数
void* Producer_Routine(void* args)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*> (args);
while(true)
{
//有数据-》进行生产
int data1 = rand() % 10; //[0, 9]
usleep(rand() % 123);
int data2 = rand() % 10; //[0, 9]
char oper = opers[rand() % opers.size()];
Task task1(data1, data2, oper);
bq->Push(task1);
std::cout << "Producer data: " << task1.show() << std::endl;
//sleep(1);
}
return nullptr;
}
//消费者线程函数
void* Consumer_Routine(void* args)
{
Task t;
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*> (args);
while(true)
{
sleep(1);
//消费数据
bq->Pop(&t);
t.Run();
std::cout << "Consumer access data: " << t.show() << " " << t.show_result() << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand(time(nullptr));
//创建一个阻塞队列
BlockQueue<Task>* Bq = new BlockQueue<Task>();
//我们需要一个生产线程 消费线程
pthread_t Producer, Consumer;
pthread_create(&Producer, nullptr, Producer_Routine, Bq);
pthread_create(&Consumer, nullptr, Consumer_Routine, Bq);
//主线程等待,回收资源
pthread_join(Producer, nullptr); // 不关心线程的返回值
pthread_join(Consumer, nullptr);
return 0;
}
因为我们的BlockQueue设置的是模板,所以我们传递任务的时候,可以不仅仅是内置类型,还可以是自定义类型
Task.hpp
#pragma once
#include <string>
#include <iostream>
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
const std::string opers = "+-*/%()&#";
class Task
{
private:
char _oper;
int _x;
int _y;
int _result = 0; // 结果
int _code; // 结果码 0:表示可信 非0反之
public:
Task() {}
Task(int x, int y, char oper)
: _x(x), _y(y), _oper(oper)
{
}
std::string show()
{
std::string s;
s += std::to_string(_x);
s += _oper;
s += std::to_string(_y);
return s;
}
int show_result()
{
if(_code == ok)
return _result;
else
{
std::cout << "符号异常" << std::endl;
return _code;
}
}
void Run()
{
switch (_oper)
{
case '+':
{
_result = _x + _y;
_code = ok;
break;
}
case '-':
{
_result = _x - _y;
_code = ok;
break;
}
case '*':
{
_result = _x * _y;
_code = ok;
break;
}
case '/':
{
if (_y == 0)
{
_code = div_zero;
}
else
{
_result = _x / _y;
_code = ok;
}
break;
}
case '%':
{
if (_y == 0)
{
_code = mod_zero;
}
else
{
_result = _x % _y;
_code = ok;
}
break;
}
default:
_code = unknow;
break;
}
}
};
生产消费模型高效在哪里?(不影响其他多线程并发或并行的获取任务和执行任务)
上面代码写完了,我们要来回答一个非常重要的问题,就是为什么生产消费模型是高效的?我并没有见到他高效在哪里啊!访问阻塞队列这个共享资源时,不还是得互斥式的访问么?你凭什么说生产消费模型高效呢?
确实!你说的没有问题,很正确!但实际生产消费模型根本不是高效在向阻塞队列中放元素和从阻塞队列中拿元素。而是高效在某一个线程在向阻塞队列中放任务的时候,不会影响其他线程获取任务,某一个线程在从阻塞队列中拿任务的时候,不会影响其他线程在执行任务。
我们今天所写的阻塞队列中不过是存储了一些微不足道的计算任务或保存任务,执行和获取起来根本不费力,但未来线程在真正获取某些大型任务比如从数据库,网络,外设拿来的用户数据需要处理呢?那在获取任务和执行任务的时候,会很费时间的。
而生产消费模型高效就高效在,你某一个线程互斥式的从阻塞队列中拿任务或取任务时,根本就不会影响我其他多个线程在获取任务或执行任务,并且其他多个线程是在并发或并行的执行任务,效率是很高的!
所以总结起来就一句话,生产消费模型并不高效在放任务到阻塞队列和从阻塞队列拿任务,而是真正高效在,某一个线程拿或放任务到blockqueue的时候,并不会影响其他线程并发或并行的获取任务和执行任务。
POSIX信号量
这是之前写的基于阻塞队列的生产者消费者模型中向阻塞队列中push任务的代码。
上面代码中有什么不足之处吗?
一个线程在向阻塞队列中push任务的时候,必须满足临界资源不满的条件,否则就会被放入到条件变量的等待队列中去。
但是临界资源是否为满是不能直接得到答案的,需要先申请锁,然后进入临界区访问临界资源去判断它是否为满。
在判断临界资源是否满足条件的过程中,必须先加锁,再检测,再操作,最后再解锁。
检测临界资源的本质也是在访问临界资源。
只要对临界资源整体加锁,就默认现场会对这个临界资源整体使用,但是实际情况可能存在:一份临界资源,划分为多个不同的区域,而且运行多个线程同时访问不同的区域。
之前代码的不足之处:
在访问临界资源之前,无法得知临界资源的情况。
多个线程不能同时访问临界资源的不同区域。
##1. 信号量的概念
信号量本质是一把计数器,用来衡量临界资源中资源数量多少。
申请信号量的本质:对临界资源中特定的小块资源的预定机制。
上图将一块临界资源划分为9块,要想让多线程访问者9个小的临界资源就需要:
1.创建一个信号量,设置的初始值为9
2. 每一个访问临界资源的线程都需要申请信号量(预定一小块临界资源)
3. 当计数值减到0的时候,说明已经没有临界资源可以使用,需要让线程阻塞等待
4. 申请到信号量的现场就可以进入临界区去访问临界资源,当访问完毕以后,再将信号量加一
信号量的基本操作
创建信号量
#include <semaphore.h>//信号量必须包含的头文件
sem_t sem;//创建信号量
初始化信号量
int sem_init(sem_t* sem, int pshared, unsigned int value);
sem:信号量指针
shared:0表示线程间共享,非0表示进程间共享。我们一般情况下写0就行。
value:信号量初始值,也就是计数器的值。
返回值:成功返回0,失败返回-1,并且设置errno。
销毁信号量
int sem_destroy(sem_t* sem);
等待信号量
int sem_wait(sem_t* sem);//P操作 --
发布信号量
int sem_post(sem_t* sem);//V操作 ++
环形队列实现消费生产模型
这里使用数组来模拟队列,使用取模运算实现环形存储
当环形队列为空时,头和尾都指向同一个位置。
当环形队列为满时,头和尾也都指向同一个位置。
其他任何时候,生产者和消费者访问的都是不同的区域。
消费者不能超过生产者。
生产者不能把消费者套一圈以上。
对于生产者而言,它在意的是环形队列中空闲的空间。
对于消费者而言,它在意的是环形队列中数据的个数。
所以生产者每次在访问临界资源之前,需要先申请空间资源的信号量,申请成功就可以进行生产,否则就阻塞等待。
消费者在访问临界资源之前,需要申请数据资源的信号量,申请成功就可以消费数据,否则就阻塞等待。
空间资源信号量的申请由生产者进行,归还(V操作)由消费者进行,表示生产者可以生产数据。
数据资源信号量的申请有消费者进行,归还(V操作)由生产者进行,表示消费者可以进行消费。
在信号量的初始化时,空间资源的信号量为环形队列的大小,因为没有生产任何数据。数据资源的信号量为0,因为没有任何数据可以消费。
通过信号量的方式同样维护了环形队列的核心操作,消费者消费速度快时,会将数据资源信号量全部申请完,但是此时生产者没有生产数据,也就没有归还数据资源的信号量,所以消费者会阻塞等待,不会超生产者。
生产者生产速度快时,会将空间资源信号量全部申请完,但是此时消费者没有消费数据,也就没有归还空间资源的信号量,所以生产者会阻塞等待,不会超过套消费者一个圈。
代码实现
//CircularQueue.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<vector>
#include <semaphore.h>
template<class T>
class CircularQueue
{
private:
std::vector<T> _v;
int _size; //环形队列的容量
//需要加锁:当出现 多生产+多消费 场景的时候,我们这里的index只有一个,如果多个线程访问这个index就会出现数据覆盖或者数据重复
int _p_index = 0; //生产者下标
int _c_index = 0; //消费者下标
//使用信号量实现线程间的同步
sem_t _space_sem; //空间信号量 生产者
sem_t _data_sem; //数据信号量 消费者
//需要定义两把锁:生产和生产之间需要互斥关系 消费和消费之间需要互斥关系
pthread_mutex_t _p_mutex;
pthread_mutex_t _c_mutex;
public:
void P(sem_t& sem) // --
{
sem_wait(&sem); //获取一个信号量
}
void V(sem_t& sem) // ++
{
sem_post(&sem); //释放一个信号量
}
CircularQueue(int size = 5):_v(size), _size(size)
{
sem_init(&_space_sem, 0, _size);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
~CircularQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
void Push(const T& data)
{
P(_space_sem); //--一个空间资源
pthread_mutex_lock(&_p_mutex);
_v[_p_index] = data;
_p_index++;
_p_index %= _size;
V(_data_sem); //++一个数据资源
pthread_mutex_unlock(&_p_mutex);
}
void Pop(T* data)
{
P(_data_sem);
pthread_mutex_lock(&_c_mutex);
*data = _v[_c_index];
_c_index++;
_c_index %= _size;
V(_space_sem);
pthread_mutex_unlock(&_c_mutex);
}
};
//Main.cc
#include "CircularQueue.hpp"
#include<iostream>
void* producer(void* args)
{
CircularQueue<int>* rq = static_cast<CircularQueue<int>*>(args);
int cnt = 100;
while(true)
{
//消费数据
rq->Push(cnt);
std::cout << "producer a data: " << cnt-- << std::endl;
}
}
void* consumer(void* args)
{
CircularQueue<int>* rq = static_cast<CircularQueue<int>*>(args);
int data = 0;
while(true)
{
sleep(1);
rq->Pop(&data);
std::cout << "-------consumer a data: " << data << std::endl;
}
}
int main()
{
pthread_t c, p;
//创建环形队列
CircularQueue<int>* rq = new CircularQueue<int>();
//线程创建
pthread_create(&c, nullptr, consumer, rq);
pthread_create(&p, nullptr, producer, rq);
//线程等待
pthread_join(c, nullptr);
pthread_join(p, nullptr);
}
我们先让消费者等待1s,让生产者生产满队列,然后消费者消费一个,生产者生产一个