目录
线程互斥
锁的初始化
加锁
解锁
锁的初始化
锁的原理
死锁
线程同步
方案一:条件变量
条件变量初始化
等待
唤醒
条件变量的代码示例
基于阻塞队列的生产消费模型
方案二:POSIX信号量
初始化信号量:
销毁信号量
等待信号量(P())
发布信号量(V())
基于环形队列的生产消费模型
线程池
下面是几个在前面的学习中,提及到的相关的概念:
临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
原子性:可能会被调度机制影响,但是该操作只有两态,要么完成,要么未完成
线程互斥
多个线程访问同一个全局变量,并对它进行数据计算,在并发访问的时候,可能会导致数据不一致的问题,例如下述的抢票的问题
有一个全局变量ticket,假定刚开始有5000张票,线程一执行抢票的操作,在代码中执行ticket--的时候,会有三步操作:第一步读取数据到CPU内的寄存器中;第二步在CPU内部进行计算;第三步将计算的结果写回内存
当线程一刚执行完第二步操作,将ticket--变为了4999,准备执行第三步操作时,这时线程一被切换走,于是将4999保存在线程一的上下文数据中;与此同时,线程二开始进行抢票操作,由于线程一的4999没有写回内存,所以线程二所取得的ticket依旧是5000,此时线程二也执行上述的三步操作,当线程二执行了一段时间,ticket变为了3000的时候,线程二被切换走,该线程一执行时,线程一存储的ticket却是4999,此时线程一执行操作三,ticket又变为了4999,这里就出现了数据不一致的问题
为了解决上述问题,就需要加锁保护,下面介绍如何加锁保护:
pthread_mutex_t是原生线程库提供的一个数据类型,pthread_mutex_t mtx可以定义一个锁
锁的初始化
对于锁需要初始化(也叫互斥量初始化)
第一种方法:可以直接使用pthread_mutex_init进行初始化
第二种方法:这个锁如果是全局的,或是静态定义的,就可以使用PTHREAD_MUTEX_INITIALIZER这样的宏进行初始化
下面先演示使用第二种方法的场景:
而在getticket中,这段区域能够访问临界资源,是临界区,需要加锁保护:
加锁
加锁可以使用pthread_mutex_lock:
直接将刚刚定义的锁mtx取地址,然后传入pthread_mutex_lock即可
如下所示,在每次循环进入临界区前加锁:
每一个线程执行抢票的语句时,都会执行这个代码,这个锁的特点是:任何一个时刻只允许一个线程成功的获得这把锁,然后向后运行执行下面的代码,而其他没有拿到锁的线程只能默认阻塞等待,直到拿到锁的线程把锁释放掉,其他线程才能进入
有加锁,自然也要有解锁,在临界区后需要解锁:
解锁
解锁需要用到pthread_mutex_unlock
代码中的位置如下图所示:
之所以在else中也要解锁,是因为如果加锁后,没有进入if语句,而是进入了else语句,这时break出去,全局的锁依旧处于加锁状态,其他线程就无法继续向后执行了,所以这里在else中也需要有解锁语句
需要注意:加锁的时候需要注意力度,越小越好,加锁的代码越多,效率越低,因为加锁后就变为了串行执行了
锁的初始化
pthread库所提供的pthread_mutex_init可以初始化一把锁,如下:
pthread_mutex_init需要包含头文件:pthread.h
上面说到初始化有两种方式,上面演示的是锁是全局的,下面演示锁不是全局的情况,将锁设置在main函数中,这种情况就需要用到pthread_mutex_init进行初始化了
函数参数:
第一个参数:是定义的锁的地址
第二个参数:锁的属性(一般设为nullptr即可)
返回值:
成功返回0,失败返回错误码
使用方式如下所示:
锁mtx不是全局的,所以需要用到pthread_mutex_init进行初始化,在末尾还需要pthread_mutex_destroy释放锁
在学习了上面的加锁保护后,思考以下几个问题:
①加了锁之后,线程在临界区中是否会被切换,被切换会产生问题吗?
当然会被切换,且不会产生任何问题。虽然被切换了,但是当前线程是持有锁被切换的,所以其他抢票的线程也必须先申请锁,才能够执行临界区的代码,而锁是被当前线程持有的,所以不会申请成功,所以也就不会让其他线程进入临界区中,这样就保证了临界区中数据的一致性
②原子性体现在哪?
在没有持有锁的线程2的角度看,只有两种情况对于线程2有意义,第一:线程1没有持有锁(什么都没做),第二:线程1释放锁(已经做完),此时线程2可以申请锁
所以上述情况可以反映出,线程1在持有锁期间,对其他线程来说,就是原子性的,要不没有做,要么就是已经做完
③加锁后就是串行执行了吗?
对的,对于临界区的代码来说一定是串行执行的
④每个线程必须先申请锁,再访问临界资源,所以每一个线程都必须看到同一把锁并且访问它,这就说明锁本身就是共享的资源,而锁保证全局数据的安全,谁又来保证锁的安全呢?
为了保证锁的安全,申请和释放锁必须是原子性的,所以原子性的申请锁,这样才能够做到一个线程申请锁,直到它完成整个动作释放锁后,其他线程才能申请,这样保证了锁的安全
在学习完下面的锁的原理后,我们就可以明白,是使用swap或exchange指令,通过一行汇编的方式来保证自己的原子性的
锁的原理
首先在执行流视角,是如何看待CPU上面的寄存器的?
CPU内部的寄存器,本质叫做当前执行流的上下文。寄存器的空间是被所有的执行流共享的,但是寄存器的内容,是被每一个执行流私有的(即上下文)
在汇编的角度,如果只有一条汇编语句,我们就认为该汇编语句的执行是原子的
而为了实现互斥锁的操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存的数据相交换,由于只有一条指令,所以保证了原子性
并且我们要知道,交换本质就是将共享变为私有,因为从内存中交换到了寄存器中,也就是变为了线程自己的上下文数据,所以变为私有的了
比如内存中锁的数据是1,线程初始的寄存器数据是0
线程A进行lock的操作,已经执行完寄存器数据和内存数据交换的代码后,线程A被切换出去了,但是在切换前,线程A将寄存器中的1记录下来,作为自己的上下文数据,这时切换为了线程B执行lock操作
此时线程B也执行到了将寄存器数据和内存数据交换的代码,但是此时内存中锁的数据是刚刚线程A交换后的数据,也是0,所以即使线程B也执行这个代码,线程B的寄存器中的数据仍然是0,依然无法完成lock的操作,只能等待线程A执行完lock操作,再执行unlock操作将锁的数据变为1后,线程B才能拿到内存中锁的原始数据1,进而才能执行lock操作
下面再复习几个概念
可重入概念:
可重入是针对于函数来谈的,一个函数被多个执行流重复进入的现象就叫做可重入,在上面的抢票例子中,getticket函数就是被重入了
如果在重入期间,如果没有出问题,这个被重入的函数就被叫做可重入函数
线程安全概念:
线程执行过程中,访问了某些全局的某些数据,或共享的某些资源,可能导致了其他线程出现数据不一致问题、崩溃问题等等,就称之为线程安全问题
函数是可重入的,那就一定是线程安全的
线程安全的时候,函数不一定是可重入的
死锁
死锁就是多线程场景当中,持有锁的线程在持有自身锁的同时,还向对方申请对方的锁,且不释放自己的锁,进而导致代码无法向下推进的情况,称之为死锁
产生死锁的四个必要条件:
即产生死锁时,这四个必要条件一定都被满足了
互斥条件:一个资源每次只能被一个执行流使用
请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁:
破坏死锁的四个必要条件
加锁顺序一致
避免锁未释放的场景
资源一次性分配
线程同步
有两个情况,没有错误,但是却不合理:
①单独的某个线程频繁的申请到资源(造成别人饥饿的问题)
②太过于浪费自己与对方的资源(做无用功)
引入线程同步,主要是为了解决访问临界资源合理性的问题
按照一定的顺序,进行临界资源的访问,就叫做线程同步的过程
线程同步也是有方案的:
方案一:条件变量
当我们申请临界资源前,先要做临界资源是否存在的检测,而做检测的本质也是访问临界资源的,所以对临界资源的检测,也一定是需要在加锁和解锁之间的
若是使用常规方式要检测条件就绪,则注定了我们必须频繁申请和释放锁
那么如何让我们的线程检测到资源不就绪的时候呢?
①不要让线程再频繁的自己检测,让线程进行等待
②当条件就绪的时候,通知对应的线程,让他来进行资源申请和访问
为了实现上面的效果,就需要引入条件变量了
条件变量初始化
条件变量的初始化和互斥锁那里的初始化一样,有两种方式:
如果定义的是全局的或是静态的条件变量,那就可以使用下面的PTHREAD_COND_INITIALIZER这个宏进行初始化
如果定义的是局部的条件变量,那就使用pthread_cond_t来定义它,用pthread_cond_init来初始化它,与互斥锁一样,如果是用pthread_cond_init来初始化的,那就需要用pthread_cond_destroy来销毁它
pthread_cond_init第一个参数是对应的条件变量,第二个参数线程属性,设置为nullptr即可
同样,pthread系列的函数,返回值都是成功返回0,失败返回错误码,下面的pthread函数也是一样的,就不在赘述了
等待
在临界资源中检测对应的临界资源不就绪,此时第一件事就是不用频繁的工作,而是进行等待,而这里的等待就需要用到pthread_cond_wait
pthread_cond_wait的第一个参数就是对应的条件变量,第二个参数是对应的互斥锁
唤醒
发通知需要用到pthread_cond_signal
其中pthread_cond_broadcast是把所有线程全部唤醒
pthread_cond_signal是唤醒指定的一个线程
条件变量的代码示例
下面使用代码实现,主线程随机唤醒四个线程的其中一个,且这4个线程执行的任务是不一样的,在其中一个线程执行时,其他线程在等待队列中等待,共执行8次,观察打印的顺序
定义局部的互斥锁与条件变量,而不定义全局的互斥锁与条件变量,是为了更好的理解:如何让每一个线程都得到同一个局部的互斥锁与条件变量,方法是设置一个Data类,将需要传的数据都创建在类中的成员变量,然后在线程创建时,在pthread_create函数的最后一个参数中,传入该类对象的指针,这样就可以让每一个函数都能够使用该指针,从而得到局部设置的互斥锁和条件变量
如果设置为全局的就不需要给每一个函数传参,比较简单
makefile:
mycond:mycond.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f mycond
mycond.cc代码:
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
using namespace std;
#define TNUM 4
volatile bool quit = false;
//typedef重命名函数为func_t
typedef void (*func_t)(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);
class Data
{
public:
Data(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
:name_(name),func_(func),pmtx_(pmtx),pcond_(pcond)
{}
public:
string name_;
func_t func_;
pthread_mutex_t* pmtx_;
pthread_cond_t* pcond_;
};
void func1(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
//wait一定要在加锁和解锁之间进行
pthread_mutex_lock(pmtx);
//pthread_cond_wait代码被执行,当前线程会立即被挂起,等待被唤醒
pthread_cond_wait(pcond, pmtx);
cout << name << "正在读书" << endl;
pthread_mutex_unlock(pmtx);
}
}
void func2(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
//这里需要检测临界资源是否就绪
pthread_cond_wait(pcond, pmtx);
cout << name << "正在吃饭" << endl;
pthread_mutex_unlock(pmtx);
}
}
void func3(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx);
cout << name << "正在睡觉" << endl;
pthread_mutex_unlock(pmtx);
}
}
void func4(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{
while(!quit)
{
pthread_mutex_lock(pmtx);
pthread_cond_wait(pcond, pmtx);
cout << name << "正在玩耍" << endl;
pthread_mutex_unlock(pmtx);
}
}
void* Entry(void* args)
{
Data* td = (Data*)args;
td->func_(td->name_,td->pmtx_,td->pcond_);
//每一个线程调用完成Entry函数后返回,再delete掉new出来的td
delete td;
return nullptr;
}
int main()
{
//设置互斥锁mtx和条件变量cond
pthread_mutex_t mtx;
pthread_cond_t cond;
//调用init函数初始化互斥锁和条件变量
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
func_t func[TNUM]={func1, func2, func3, func4};
pthread_t tid[TNUM];
for(int i = 0; i < TNUM; i++)
{
//为了传入每一个函数的name都与之对应
string name = "thread ";
name += to_string(i+1);
Data* td = new Data(name, func[i], &mtx, &cond);
pthread_create(tid +i, nullptr, Entry, (void*)td);
}
//执行8次就停止
int num = 8;
//特定的条件变量下去唤醒
while(num)
{
cout << "线程正在执行,倒计时: " << num-- << endl;
pthread_cond_signal(&cond);
sleep(1);
}
//走到这表示执行完5次了,quit设置为true
//此时func1234函数不再进入循环
cout << "执行结束" << endl;
quit = true;
//最后再唤醒一次,整个程序结束
//因为在func函数中,wait函数执行完才解锁
pthread_cond_broadcast(&cond);
//线程等待
for(int i = 0; i < TNUM; i++)
{
pthread_join(tid[i],nullptr);
cout << "thread " << tid[i] << " 已经退出" << endl;
}
//调用init就必须要调用destroy
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
线程是按照顺序被唤醒的,因为线程在条件变量不满足时,所有的线程都会在该条件变量下排队等待,所以主线程就会在排队等待的队列中一个一个唤醒线程,执行完后继续进入等待队列中wait,所以每次都是同一个顺序被唤醒
由于设置了四个线程分别执行func1234函数,所以我们在main运行8次观察打印结果:
可以发现,此次的随机顺序是按2134循环两次
最后执行结束后,再全部唤醒一次,然后主线程pthread_join等待成功,打印已经退出
基于阻塞队列的生产消费模型
关于生产者消费者模型,有以下的321原则
3种关系:生产者和生产者(竞争/互斥),消费者和消费者(竞争/互斥),生产者和消费者(互斥/同步)
2种角色:生产者和消费者
1个交易场所:超市
交易场所本质是一个商品的缓冲区,是为了提高效率,其实是解耦
上述的生产者和消费者,是由线程承担的(给线程角色化)
交易场所是某种数据结构表示的缓冲区
而商品则是数据
我们知道在条件满足的时候,必须需要唤醒指定的线程,而这里是怎么知道条件是否满足呢?
其实是生产者消费者自己清楚知道条件是否满足,例如超市里是否新增货物,肯定生产者最清楚,而超市中还剩余多少空间,供生产者生产,肯定消费者最清楚
所以当生产者生产了商品,就表示这里的数据可以被写,可以别读取了,所以生产者就可以立即通知消费者
同样消费者把数据一拿走,消费者知道空间又有了,就可以通知生产者继续生产了
因此我们就可以让生产者消费者线程互相同步,从而完成生产者消费者模型
阻塞队列的概念:
阻塞队列(Blocking Queue):是一种常用于实现生产者和消费者模型的数据结构。
与我们之前学的队列不一样的是:
当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;
当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出
以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞
下面通过代码具体演示阻塞队列用法:
其中的lockGuard.hpp是关于锁的封装,是RAII风格的加锁方式,Task.hpp是一个新的任务类,里面有自己的元素,用于创建阻塞队列的对象类型
之所以创建lockGuard.hpp进行封装锁,是为了美化代码,在代码中不再需要显示的加锁解锁,只需要创建一个lockGuard的对象,该对象自动调用构造函数加锁,生命周期结束也会自动调用析构函数解锁
makefile:
cp:ConProd.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f cp
BlockQueue.hpp
#pragma once
#include "lockGuard.hpp"
#include <iostream>
#include <pthread.h>
#include <mutex>
#include <queue>
using namespace std;
const int gDefaultCap = 5;
template<class T>
class BlockQueue
{
private:
//判空
bool isQueueEmpty()
{
return bq_.size() == 0;
}
//判满
bool isQueueFull()
{
return bq_.size() == capacity_;
}
public:
BlockQueue(int capacity = gDefaultCap) : capacity_(capacity)
{
//初始化mtx_、isEmpty_、isFull_
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&Empty_, nullptr);
pthread_cond_init(&Full_, nullptr);
}
void push(const T &in) // 生产者
{
// pthread_mutex_lock(&mtx_);
// //1. 先检测当前的临界资源是否能够满足访问条件
// // pthread_cond_wait: 是在临界区中,并且是持有锁的,如果我去等待了,锁该怎么办呢
// // pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放
// // 从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,还是在临界区被唤醒的
// // 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁
// // pthread_cond_wait: 但是只要是一个函数,就可能调用失败
// // pthread_cond_wait: 可能存在 伪唤醒 的情况
// while(isQueueFull()) pthread_cond_wait(&Full_, &mtx_);
// //2. 访问临界资源,100%确定,资源是就绪的!
// bq_.push(in);
// // if(bq_.size() >= capacity_/2) pthread_cond_signal(&Empty_);
// pthread_cond_signal(&Empty_);
// pthread_mutex_unlock(&mtx_);
//下面是使用我们自己封装的锁的使用,是在lockGuard.hpp中封装的
//和上面是等价的
lockGuard lockgrard(&mtx_); // 自动调用lockGuard构造函数
while (isQueueFull())
pthread_cond_wait(&Full_, &mtx_);
// 2. 访问临界资源,100%确定,资源是就绪的!
bq_.push(in);
pthread_cond_signal(&Empty_);
} // 自动调用lockgrard 析构函数
void pop(T *out)
{
//下面这行代码替代了加锁解锁的代码,因为会自动调用
lockGuard lockguard(&mtx_);
// pthread_mutex_lock(&mtx_);
while (isQueueEmpty())
pthread_cond_wait(&Empty_, &mtx_);
*out = bq_.front();
bq_.pop();
pthread_cond_signal(&Full_);
// pthread_mutex_unlock(&mtx_);
}
~BlockQueue()
{
//调用pthread系列的init,都需要调用destroy进行销毁
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&Empty_);
pthread_cond_destroy(&Full_);
}
private:
queue<T> bq_; //阻塞队列
int capacity_;//容量上限
pthread_mutex_t mtx_; //通过互斥锁保证队列的安全
pthread_cond_t Empty_;//用isEmpty来表示bq是否为空的条件
pthread_cond_t Full_; //用isFull来表示bq是否为满的条件
};
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
cout << "正在进行加锁" << endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
cout << "正在进行解锁" << endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;
//myAdd函数即是Task.hpp中的func_t函数
int myAdd(int x, int y)
{
return x + y;
}
void* consumer(void *args)
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
while(true)
{
// 获取任务
Task t;
bqueue->pop(&t);
// 完成任务
cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << endl;
}
return nullptr;
}
void* productor(void *args)
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
// int
// int a = 1;
while(true)
{
// 制作任务
int x = rand()%10 + 1;
//防止x和y创建的间隔太近,而随机的数一样,usleep会
usleep(rand()%1000);
int y = rand()%5 + 1;
Task t(x, y, myAdd);
// 生产任务
bqueue->push(t);
// 输出消息
cout <<pthread_self() <<" productor: "<< t.x_ << "+" << t.y_ << "=?" << endl;
sleep(1);
}
return nullptr;
}
int main()
{
//获取随机数
srand((uint64_t)time(nullptr) ^ getpid() ^ 0x12345);
BlockQueue<Task> *bqueue = new BlockQueue<Task>();
//制造两个生产者,两个消费者
pthread_t c[2],p[2];
pthread_create(c, nullptr, consumer, bqueue);
pthread_create(c + 1, nullptr, consumer, bqueue);
pthread_create(p, nullptr, productor, bqueue);
pthread_create(p + 1, nullptr, productor, bqueue);
//等待
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
delete bqueue;
return 0;
}
Task.hpp
#pragma once
#include <iostream>
#include <functional>
//使用c++11中学习的function
//该函数的返回值是int,参数是int,int
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task(){}
Task(int x, int y, func_t func):x_(x), y_(y), func_(func)
{}
//运算符重载
//仿函数
int operator ()()
{
return func_(x_, y_);
}
public:
int x_;
int y_;
func_t func_;
};
运行结果为:
可以看到,代码中加锁解锁的动作
生产者productor加锁构建任务,消费者consumer解锁获取完成任务
方案二:POSIX信号量
信号量这个概念,在前面也提到过。POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 不同的是POSIX可以用于线程间同步。
信号量的本质其实是一个计数器
信号量初始化成多少,也就代表有多少资源,信号量是对于临界资源的一种预定机制,只要你申请成功,就一定会获得一个共享资源
访问临界资源的时候,必须先申请信号量资源(sem--,预订资源,P操作),使用完毕信号量资源(sem++,释放资源,V操作)
信号量的类型是sem_t,和互斥锁、条件变量一样,信号量也可以定义全局的或局部的,定义以后就可以对信号量进行初始化了
下面先了解信号量的相关接口
初始化信号量:
初始化信号量需要用到sem_init函数
需要包含头文件semaphore.h
函数参数:
第一个参数:信号量对象
第二个参数:若为0表示线程间共享,非0则表示进程间共享
第三个参数:信号量的初始值
销毁信号量
同样,使用init初始化,就需要使用destroy进行销毁
需要包含头文件semaphore.h
函数参数就是信号量对象
等待信号量(P())
同样需要包含头文件semaphore.h
函数参数就是信号量对象
等待信号量,是P操作,会将信号量的值-1
发布信号量(V())
同样需要包含头文件semaphore.h
函数参数就是信号量对象
发布信号量,是V操作,会将信号量的值+1,表示资源使用完毕,可以归还资源了
基于环形队列的生产消费模型
上面写的是基于阻塞队列的生产者消费者模型的代码,其空间可以动态分配,现在基于固定大小的环形队列重写这个生产者消费者模型的代码,该生产者消费者模型的代码中,就不再全部使用互斥锁控制了,而是使用信号量完成其要求
而之所以可以直接使用信号量而不需要互斥锁的原因如下:
在之前我们申请锁,然后判断与访问临界资源,最后释放锁,本质是因为我们并不清楚临界资源的情况,所以在申请到锁后,需要判断是否为空或为满
而信号量的本质是计数器,计数器就可以让我们不用进入临界区,就可以得知临界资源的情况,并且信号量是资源的预定机制,表示的就是空间的情况,只要能申请成功,就一定能够访问,所以信号量甚至可以减少临界区内部的判断是否为空或为满语句,所以可以在外部就知晓临界资源的情况,因此可以使用信号量而不使用锁
而环形结构的特点就是,当执行到最后一个下标的位置时,再++又会回到第一个下标的位置
今天我们使用普通的线性数组实现环形结构,只需要用模运算模拟即可
如0~n-1一共n个元素,当执行到n及后面的下标时时,只需%n即可,所以只需要做到[数组下标] %= n,就可以实现这里的环形结构
在环形结构中,有生产者线程和消费者线程,如果生产和消费指向了环形结构的同一个位置(就表示为空or为满),此时生产和消费要有互斥或者同步问题
而大部分情况下生产和消费都指向的是不同的位置,所以就有下面的想法:
当生产和消费指向同一个位置时,让他们具有互斥同步关系
而当生产和消费不指向同一个位置时,想让他们并发执行
所以在整个过程中,有以下期望:
生产者不能将消费者套圈,否则会出现数据覆盖的情况
消费者不能超过生产者,因为超过就没有资源进行消费了
为空时:一定要让生产者先运行
为满时:一定要让消费者先运行
若是存在其他情况,并发访问即可
生产者:最关注的是空间资源-> spaceSem-> 初始值为n
消费者:最关注的是数据资源-> dataSem -> 初始值为0
生产: P(spaceSem) -> spaceSem--,在特定位置生产,V(dataSem) -> dataSem++
消费: P(dataSem) -> dataSem--,消费特定的数据,V(spaceSem) -> spaceSem++
多生产多消费的意义:
将数据或者任务生产前和拿到之后处理,才是最耗费时间的,多生产多消费虽然同一时间只能有一个生产者或消费者进入临界区中,但是在任务生产前和拿到之后,却是可以并发执行的,这才是多生产多消费的意义
即那任务放任务串行,拿完了放完了就是并发的
下面通过代码具体演示阻塞队列的用法:
sem.hpp是实现了信号量的封装
makefile:
cp:testMain.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f cp
ringQueue.hpp:
#ifndef _RingQueue_HPP_ //防止重复定义
#define _RingQueue_HPP_
#include <iostream>
#include <vector>
#include <ctime>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include "sem.hpp"
const int g_default_num = 5;
using namespace std;
template<class T>
class RingQueue
{
public:
RingQueue(int default_num = g_default_num)
:rq_(default_num)//rq_的size也初始化为default_num
,num_(default_num)
,p_step_(0)
,c_step_(0)
,space_sem_(default_num)
,data_sem_(0)
{
//初始化锁
pthread_mutex_init(&pmtx, nullptr);
pthread_mutex_init(&cmtx, nullptr);
}
//生产者执行:生产者们的临界资源是下标c_step_
void push(const T& in)
{
space_sem_.p();
//先申请信号量,再申请锁
//只有一个生产者线程进入
pthread_mutex_lock(&pmtx);
//将rq_的c_step_下标对应的值改为in
rq_[c_step_++] = in;
//模运算实现环形队列的性质
c_step_ %= num_;
pthread_mutex_unlock(&pmtx);
data_sem_.v();
}
//消费者执行:消费者们的临界资源是下标p_step_
void pop(T* out)
{
data_sem_.p();
//先申请信号量,再申请锁
//只有一个消费者线程进入
pthread_mutex_lock(&cmtx);
*out = rq_[p_step_++];
//模运算实现环形队列的性质
p_step_ %= num_;
pthread_mutex_unlock(&cmtx);
space_sem_.v();
}
~RingQueue()
{
//销毁锁
pthread_mutex_destroy(&pmtx);
pthread_mutex_destroy(&cmtx);
}
private:
vector<T> rq_;
int num_;
int p_step_;//消费者下标
int c_step_;//生产者下标
Sem space_sem_;//空间信号量
Sem data_sem_; //资源信号量
pthread_mutex_t pmtx;//生产者和生产者之间的锁
pthread_mutex_t cmtx;//消费者和消费者之间的锁
};
#endif
testMain.cc:
#include "ringQueue.hpp"
//生产者
void* productor(void* args)
{
RingQueue<int>* rq = (RingQueue<int>*)args;
while(true)
{
sleep(1);
//构建数据或任务对象
//x是随机生成的1~100之间的整数
int x = rand()%100 + 1;
//push推送到环形队列中
rq->push(x);
cout << "线程[" << pthread_self() << "] 生产: " << x << endl;
}
}
//消费者
void* consumer(void* args)
{
RingQueue<int>* rq = (RingQueue<int>*)args;
while(true)
{
sleep(1);
int x;
//从环形队列中获取任务或数据
rq->pop(&x);
//进行一定的处理
cout << "线程[" << pthread_self() << "] 消费: " << x << endl;
}
}
int main()
{
//生成随机数种子
srand((uint64_t)time(nullptr) ^ getpid());
pthread_t c[3],p[3];
RingQueue<int>* rq = new RingQueue<int>();
//创建线程
for(int i = 0; i < 3; i++)
pthread_create(c + i, nullptr, productor, rq);
for(int i = 0; i < 3; i++)
pthread_create(p + i, nullptr, consumer, rq);
//等待线程
for(int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
for(int i = 0; i < 3; i++)
pthread_join(p[i], nullptr);
return 0;
}
sem.hpp:
#ifndef _SEM_HPP_ //防止重复定义
#define _SEM_HPP_
#include <semaphore.h>
class Sem
{
public:
//信号量初始化需要给一个初始值value
Sem(int value)
{
//初始化信号量
sem_init(&sem_, 0, value);
}
//P操作
void p()
{
//sem_wait等待信号量即为P操作
sem_wait(&sem_);
}
//V操作
void v()
{
//sem_post等待信号量即为V操作
sem_post(&sem_);
}
~Sem()
{
//销毁信号量
sem_destroy(&sem_);
}
private:
sem_t sem_;//信号量
};
#endif
在main函数中创建了多生产者多消费者,运行结果为:
线程池
线程池是一种线程使用模式,可以对空间预先申请,而空间的预先申请可以减少系统调用的次数,提高使用内存的效率,它的本质就是用空间换时间的策略
下面是代码简易的实现一个线程池:
makefile:
mythreadpool:testMain.cc
g++ -o $@ $^ -std=c++11 -lpthread -DDEBUG_SHOW -D选项可以在命令行中定义宏
.PHONY:clean
clean:
rm -f mythreadpool
thread.hpp:
#pragma once
#include <iostream>
#include <unistd.h>
#include <vector>
#include <pthread.h>
#include <queue>
#include <string>
#include <cstdio>
typedef void*(*func_t)(void*);
//设置ThreadData是为了未来创建线程的时候,把名字也传进去
class ThreadData
{
public:
void* args_;
std::string name_;
};
class Thread
{
public:
Thread(int num, func_t callback, void* args):func_(callback)
{
//构造函数中初始化name_
char namebuffer[64];
snprintf(namebuffer, sizeof namebuffer, "thread->%d", num);
name_ = namebuffer;
tdata_.args_ = args;
tdata_.name_ = name_;
}
//创建线程
void start()
{
pthread_create(&tid_, nullptr, func_, (void*)&tdata_);
}
//线程等待
void join()
{
pthread_join(tid_, nullptr);
}
std::string name()
{
return name_;
}
~Thread()
{}
private:
std::string name_;
func_t func_;
ThreadData tdata_;
pthread_t tid_;
};
threadPool.hpp:
#pragma once
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int g_thread_num_ = 3;
template <class T>
class ThreadPool
{
public:
// 下面几个函数用于静态函数routine和外部线程取ThreadPool内的成员变量的接口
pthread_mutex_t *getMutex()
{
return &lock;
}
bool isEmpty()
{
return task_queue_.empty();
}
// 在条件变量下等
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
T getTask()
{
T t = task_queue_.front();
task_queue_.pop();
return t;
}
public:
ThreadPool(int g_num = g_thread_num_) : num_(g_num)
{
// 初始化互斥锁和条件变量
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
for (int i = 1; i <= num_; i++)
{
threads_.push_back(new Thread(i, routine, this));
}
}
// 使用static是因为在上面typedef了func_t函数类型只有一个参数void*
// 如果不加static就会在类内定义,类内的函数会多一个参数,即this指针,就会出现类型不匹配的错误了
// 并且routine是消费者需要执行的,但是消费者需要执行的任务都在task_queue_中,静态函数只能访问静态方法
// 所以处理方法就是在ThreadPool的构造函数中,Thread的第三个参数传入this指针
// 这样ThreadPool的内容就会赋值给ThreadData中的args_
// 就可以使用routine函数的args强转的指针类型参数访问ThreadPool里的成员函数
static void *routine(void *args)
{
ThreadData *td = (ThreadData *)args;
ThreadPool<T> *tp = (ThreadPool<T> *)td->args_;
while (true)
{
T task;
// 定义一个代码块{},在这个代码块里lockguard自动加锁,出代码块自动解锁,即为安全的代码块
// 在这个{}中,即为加锁的区间,临界区代码
{
lockGuard lockguard(tp->getMutex());
while (tp->isEmpty())
tp->waitCond();
// 走到这里说明不为空,可以拿任务了
task = tp->getTask();
}
// 处理任务,在Task中有()的运算符重载(仿函数),所以直接task()处理即可
task(td->name_);
}
}
void run()
{
for (auto &it : threads_)
{
it->start();
// std::cout << it->name() << "启动成功" << std::endl;
logMessage(NORMAL, "%s%s", it->name().c_str(), "启动成功");
}
}
// push任务需要加锁解锁,生产
void pushTask(const T &task)
{
// lockguard自动调用构造函数加锁,释放自动调用析构函数解锁
lockGuard lockguard(&lock);
task_queue_.push(task);
pthread_cond_signal(&cond);
}
~ThreadPool()
{
for (auto &it : threads_)
{
it->join();
delete it;
}
// 销毁互斥锁和条件变量
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread *> threads_;
int num_;
std::queue<T> task_queue_;
pthread_mutex_t lock;
pthread_cond_t cond;
};
Task.hpp:任务进行封装
#pragma once
#include "log.hpp"
#include <iostream>
#include <functional>
#include <string>
using namespace std;
typedef function<int(int,int)> fun_t;
class Task
{
public:
Task()
{}
Task(int x,int y,fun_t func):x_(x),y_(y),func_(func)
{}
//仿函数
void operator()(const string& name)
{
// cout << "线程" << name << "处理完成,结果为:" << x_ << "+" << y_ << "=" << func_(x_, y_) << endl;
//__FILE__, __LINE__是预处理符,可以看到哪个文件哪一行在打印
logMessage(WARNING, "%s处理完成%d+%d=%d | %s | %d",
name.c_str(), x_, y_, func_(x_,y_), __FILE__, __LINE__);
}
public:
int x_;
int y_;
fun_t func_;
};
lockGuard.hpp:锁的封装
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
class Mutex
{
public:
Mutex(pthread_mutex_t *pmtx) : pmtx_(pmtx)
{}
void lock()
{
pthread_mutex_lock(pmtx_);
}
void unlock()
{
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx):mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
log.hpp:日志的打印,如果想打印到文件中也可以改变下面的代码,使用例如vfprintf的函数
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <string>
#include <ctime>
//日志等级
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
//日志等级的映射表
const char* gLevelMap[]={
"DEBUG",
"NORMAL",
"WARNING",
"ERROR",
"FATAL"
};
//日志功能至少:日志等级 时间;支持用户自定义(日志内容等等)
//format是输出格式例如%s之类的
void logMessage(int level, const char* format, ...)
{
//条件编译,如果定义了DEBUG_SHOW这个宏,在打印时就正常打印
//如果没有定义这个DEBUG_SHOW宏,所以level是DEBUG的语句就不再打印了
//我们可以在makefile命令行中定义宏,加上-D选项即可
#ifndef DEBUG_SHOW
if(level == DEBUG) return;
#endif
// vprintf/vfprintf/vsprintf/ vsnprintf
//是把传入的参数按照可变的方式分别进行显示到 显示器 / 文件 / 字符串 /指定长度的字符串
//stdBuffer是日志的标准部分,如日志等级 时间
//logBuffer是日志的自定义部分,如日志内容等
char stdBuffer[1024];
char logBuffer[1024];
//这里的时间采用较为简单的时间戳表示
time_t tm = time(nullptr);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%ld]", gLevelMap[level], tm);
va_list args;
va_start(args, format);
vsnprintf(logBuffer,sizeof logBuffer, format, args);
va_end(args);
//拼接两个字符串的内容,一块打印出来
printf("%s %s\n", stdBuffer, logBuffer);
}
testMain.cc:
#include "threadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
#include <unistd.h>
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
ThreadPool<Task>* tp = new ThreadPool<Task>();
tp->run();
while(true)
{
//制作任务
int x = rand()%100 + 1;
usleep(1000);
int y = rand()%50 + 1;
//lambda表达式的使用
Task t(x,y,[](int x, int y)->int{
return x + y;
});
// cout << "任务制作完成" << x << "+" << y << "=?" << endl;
logMessage(DEBUG, "%s:%d+%d=?", "任务制作完成", x, y);
//推送任务到线程池
tp->pushTask(t);
sleep(1);
}
return 0;
}
运行结果如下所示:
如上所示的代码,就可以很好地完成线程池的功能了