目录
前言
Linux线程基础
线程概念
底层示意图
线程vs进程
Linux线程控制
创建线程
线程ID
线程终止
线程等待
线程分离
Linux线程互斥
背景概念
互斥量mutex
1.相关接口
2.实现原理
可重入vs线程安全
死锁
Linux线程同步
条件变量
生产者消费者模型
基于阻塞队列的生产者消费者模型
1.实现参考代码与讲解
2.运行测试
POSIX信号量
基于环形队列的生产者消费者模型
1.实现参考代码与讲解
2.运行测试
线程池
1.实现参考代码与讲解
2.运行测试
后记
前言
在学习了信号一大章节过后,我们来到了操作系统又一大重点部分——线程,之前学习过进程,为什么又来个线程,进程线程又有什么关系,线程该如何用?想必这是大家共同的疑惑,在本篇文章中,会为大家一一解答,包括线程的本质和底层实现,线程控制、线程互斥与同步相关接口等,最后也会手把手讲解一个线程池小项目来整合所有知识点。别看知识点一大堆,其实都是纸老虎,别怕有我在,下面我们来一一介绍!
Linux线程基础
-
线程概念
线程是操作系统能够进行运算调度的最小单位,它被包含在进程中,是进程中的实际执行单位。更准确的说,线程是一个进程内部的控制序列。
- 一个进程可以拥有多个线程,这些线程可以并发执行,共享进程的资源
- 线程在进程内部运行,本质是在进程地址空间内运行
- 线程比进程更轻量级,因为它们共享进程的资源,不需要自己拥有独立的内存空间
- 由于线程共享进程的资源,所以需要进行同步和互斥操作,以避免数据竞争和冲突
-
底层示意图
如下可以看出线程的本质,其实,这里的每个task_struct都可称为线程,通过一定的技术将当前进程的资源以一定方式划分给不同的tesk_struct,这是Linux特有的实现线程的方案——Linux没有真正意义上的线程结构,是用进程的pcb模拟的线程,而windows下有真正的线程结构,因此Linux下这种实现方案创建的进程被称为轻量级进程(LWP),其以库(pthread线程库)的方式提供给用户使用。
下图展示了更细节更具体的过程,cpu按照调度机制派发任务给某一个task_struct,映射到共享的地址空间中执行对应代码(执行任务),其他细节与进程完成一个任务的过程一模一样。
-
线程vs进程
- 进程是资源分配的基本单位,线程是调度的基本单位
- 线程共享进程数据,但也拥有自己的一部分数据,包括线程ID、一组寄存器、栈(可以共享,但认为是私有)、errno、 信号屏蔽字等
- 进程的多个线程共享同一地址空间,比如定义一个函数或全局变量,则在各线程中都可以调用或使用,除此之外,各线程还共享其他进程资源和环境,包括文件描述符表、每种信号的处理方式、代码区、堆区、共享区等
总的来说,线程相比进程具有更小的开销、更高的并发度和更方便的通信与同步方式,但同时也更容易出现线程安全问题。在实际应用中,应根据具体需求来选择使用进程还是线程。进程与线程的关系如下图:
Linux线程控制
线程控制主要讲解pthread线程库相关接口的使用,在此之前需要注意以下几点:
1.使用库函数之前,需要引入头文件<pthread.h>
2.大多数接口名字都是pthread_开头,记忆方便
3.链接pthread线程库要使用编译器命令【-lpthread】,其中 l 表示引入第三方库,如
-
创建线程
功能:创建一个线程
参数:
thread:pthread_t类型指针,pthread_t是线程ID的类型,其实这是一个输入型参数,函数外定义一个pthread_t变量,取地址传入,函数结束会将生成的线程id通过此变量传出,作为此线程的标识
attr:设置线程属性,填入nullpyr表示使用默认属性
start_routine:见类型是一个函数指针变量,传入线程启动后要执行的线程函数
arg:线程函数需要的void*类型的参数
返回值:成功返回0,失败返回错误码
eg:
void* threadTask(void* arg)
{
while(1)
{
cout<<(char*)arg<<"运行中"<<endl;
sleep(1);
}
}
int main()
{
pthread_t tid;
char* arg = "线程1";
pthread_create(&tid,nullptr,threadTask,(void*)arg);
while(1)
{
cout<<"main thread"<<"运行中"<<endl;
}
return 0;
}
运行:
-
线程ID
线程id分为两种:系统级线程id和用户级线程id。
系统级线程id:就是前面所说的pthread_t类型的值,对于os来说是线程唯一的标识,这个值我们不需要知道具体是多少,只要知道在线程创建后,拿到的这个线程id可以用于告诉os你执行相关线程控制接口想要控制的是哪个线程。同时线程库提供了获得线程自身id的接口:
用户级线程id:该值对应用户层面的进程id,是pid_t类型的值,进程id使用系统调用getpid函数获得,而线程id使用gettid函数获得。同时我们也可以使用ps指令查看所有线程及相关信息,其中LWP就是其线程id,而PID表示此线程所在进程的进程id:
现在考虑一个问题,系统级线程id的本质到底是什么?其实它就是进程地址空间的一个地址。我们看如下图是进程地址空间与pthread库链接的示意图,主线程栈在栈区,在栈区之下,堆区之上的叫做共享区,共享区有一块空间叫做mmap区域,这是一种常用实现共享内存的机制,链接着pthread动态库。在动态库中,每创建一个线程,都会有此线程专属的空间,其实这一块空间的起始地址就是系统级线程id的值。
还值得提一嘴的是,所有线程函数代码会编译到代码区,pthread线程库动态链接到了共享区中,当代码区需要库函数时就会跳转到共享区调用即可。再看到每一块线程专属空间中,有一个线程栈区域,也是属于共享区,那如何保证此栈区被每个线程独占呢?os维护轻量级进程的调度管理工作,而关于线程的相关属性信息会被单拎出来又pthread库来维护管理——“先描述,再组织”,“描述”中包括线程栈的首尾地址,这一块就是每个线程独占的栈结构,位于共享区。
-
线程终止
自己终止线程的情况并不多见,一般都是让线程正常跑完,但是这里也提供三个方法来应对需要只终止某个线程而不终止整个进程的情形:
- 从线程函数中return或exit
- 在线程函数章调用pthread_exit函数来终止自己
- 在一个线程中调用pthread_cancel函数来终止同一进程下的另一线程
功能:终止线程
参数:retval可以指向任何类型的数据,除了该线程函数中的局部变量,它指向的数据将作为线程退出时的返回值。如果线程不需要返回任何数据,将 retval 参数置为 NULL 即可
功能:取消同一进程中的某线程
参数:thread填入想要取消的线程id
返回值:成功返回0,失败返回错误码
eg:
void* threadTask(void* arg)
{
while(1)
{
cout<<(char*)arg<<"运行中"<<endl;
sleep(1);
}
cout<<"新线程退出"<<endl;
return (void*)11;
}
int main()
{
pthread_t tid;
int* ret=nullptr;
pthread_create(&tid,nullptr,threadTask,(void*)"thread 1");
int n=5;
while(n--)
{
cout<<"main thread"<<"运行中"<<endl;
sleep(1);
}
pthread_cancel(tid); //主线程控制新线程的退出
pthread_join(tid,(void**)&ret); //默认阻塞等待新线程退出
cout<<"ret:"<<(long long)ret<<endl; //pthread_cancel掉新线程后,join的退出码是-1
sleep(5);
cout<<"主线程退出"<<endl;
return 0;
}
运行:
-
线程等待
由于已经退出的线程,其空间没有被释放,会仍然在进程的地址空间中,因此需要线程等待,调用线程等待接口的线程将会挂起等待,直到被等待的线程终止,释放其空间。线程等待很重要,每创建一个线程都必须被等待,就像每打开一个文件指针最后就必须被关闭一样。
功能:等待指定线程结束
参数:
thread:指定线程id
retval:输出型参数,指向一个指针,拿到指定线程return的东西
返回值:成功返回0,失败返回错误码
eg:
void* threadTask(void* arg)
{
int n=5;
int* data=new int[3]{1,2,3};
while(n--)
{
cout<<(char*)arg<<"运行中 n:"<<n<<endl;
sleep(1);
}
cout<<"新线程退出"<<endl;
//exit(10); //exit会终止整个进程
//pthread_exit((void*)data); //终止线程,与线程中的return作用一样
//return (void*)10;
return (void*)data;
}
int main()
{
pthread_t tid;
int* ret=nullptr;
pthread_create(&tid,nullptr,threadTask,(void*)"thread 1");
pthread_join(tid,(void**)&ret); //默认阻塞等待新线程退出
while(1)
{
//cout<<"main thread"<<"运行中 ret:"<<(long long)ret<<endl; //因为linux下指针大小是8字节,一定要强转成long long,而不是int
cout<<"main thread"<<"运行中 ret:";
for(int i=0;i<3;i++)
{
cout<<ret[i]<<" ";
}
cout<<endl;
sleep(1);
}
return 0;
}
运行:
-
线程分离
默认情况下,新创建的线程退出后,需要对其进行pthread_join操作,否则无法释放资 源,从而造成系统泄漏。但是如果不关心线程的返回值,join是一种负担,这个时候,我们可以调用线程分离函数告诉系统,当线程退出时,自动释放线程资源。
功能:分离指定线程
参数:thread为指定线程id
返回值:成功返回0,失败返回错误码
值得注意的是,此线程分离后,如果发生了异常(如除0),也会影响到整个进程。并且,join和detach是冲突的,一个线程不能既被join又分离,也就是说detach后无需join,也不能join,否则join函数会返回错误码。
eg:
int g_val=0;
void* threadTask(void* arg)
{
pthread_detach(pthread_self());
g_val=50;
while(1)
{
cout<<(char*)arg<<"运行中 "<<"g_val:"<<g_val<<endl;
g_val++;
sleep(1);
break;
}
cout<<"新线程退出"<<endl;
return (void*)11;
}
int main()
{
pthread_t tid;
int* ret=nullptr;
pthread_create(&tid,nullptr,threadTask,(void*)"thread 1");
pthread_detach(tid);
g_val=100;
while(1)
{
cout<<"main thread"<<"运行中 g_val:"<<g_val<<endl;
g_val++;
sleep(1);
break;
}
int tmp=pthread_join(tid,(void**)&ret);
if(tmp==0)
{
cout<<"等待成功"<<endl;
}
else
cout<<"等待失败:"<<strerror(tmp)<<endl;
cout<<"主线程退出"<<endl;
return 0;
}
运行:
Linux线程互斥
-
背景概念
临界资源:多线程执行流共享的资源
临界区:每个线程内部,访问临界资源的代码
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
大部分情况下,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,所以此变量归属单个线程,其他线程无法获得这种变量。但有时有些变量需要在线程间共享,称为共享变量,可以通过数据的共享,完成线程之间的交互。但是,多个线程并发的操作共享变量,会带来一些问题,比如说多线程购票系统,票的个数就是共享变量,在不进行其他操作的情况下,我们看一下会出现什么问题:
购票系统:
int tickets = 1000;
void *getTickets(void *args)
{
while (1)
{
if (tickets > 0)
{
usleep(1000);//usleep参数单位是微秒,这里是模拟实际抢票花费的时间
tickets--;
printf("%s",(char*)args);
cout<<"抢票成功,剩余票数:"<<tickets<<endl;
}
else
{
break;
}
usleep(1000);//模拟抢完票后续动作所花费时间,加上后可以让不同的线程抢票
}
return nullptr;
}
int main()
{
pthread_mutex_t mutex;
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, getTickets, (void *)"thread 1");
pthread_create(&t2, nullptr, getTickets, (void *)"thread 2");
pthread_create(&t3, nullptr, getTickets, (void *)"thread 3");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
return 0;
}
运行:
按照正常情况下,当最后一张票被购买以后,应当到剩余票数为0时就停止,但是此时却来到了-1,这就产生了很大的问题。问题根本原因所在,就是ticket这个全局变量,进行ticket--时,此操作并不是一个原子操作,导致在这个操作过程中,其他线程也进入到这个操作里影响到这个完整行为。看一下【--】操作的汇编就知道,可以对应三条汇编指令:
- load:将共享变量ticket从内存加载到寄存器中
- update: 更新寄存器里面的值,执行-1操作
- store:将新值,从寄存器写回共享变量ticket的内存地址
如下图是ticket--操作的过程,比如存在两个线程t1、t2,cpu先调度t1,执行完①②后停止调度t1,开始调度t2,t2可能执行了一遍ticket--,也有可能很多遍(这里举例一遍),ticket此时变成了99,后cpu停止调度t2,又调度t1,而t1该继续步骤③了,将t1的99也放进了ticket,正常情况下应该是98,但这里是99,导致数据不一致问题。
要解决上述问题,必须做到以下三点:
- 互斥行为:当一个线程进入临界区执行时,不允许其他线程进入该临界区
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区
- 如果一个线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
因此,Linux提供了一把锁,叫做互斥量(mutex),也叫互斥锁。
-
互斥量mutex
1.相关接口
功能:初始化互斥量
参数:
mutex:要初始化的互斥锁地址
attr:默认设置为nullptr
注意:其实初始化的方法有两种,init函数是一种,还有一种就是赋值PTHREAD_MUTEX_INITIALIZER,后者常用于全局或静态互斥锁变量,并且无需销毁,而前者需要手动销毁。
返回值:成功返回0,失败返回错误码(后面函数大部分返回情况都是如此,特殊再加以介绍)
功能:销毁互斥量
参数:
mutex:需要销毁的互斥量地址
注意:不能销毁一个已经加锁的互斥量,使用init函数初始化的互斥量必须使用此函数销毁,就像打开的文件必须关闭一样,也就是必须搭配出现。
功能:lock和trylock是加锁,unlock是解锁
参数:
mutex:需要操作的互斥量地址
注意:
若互斥量已经加锁,则后面调用lock函数的线程将会被阻塞挂起,但调用的trylock函数的线程将不会被阻塞,而是直接返回错误码,除此之外,两个函数没有任何区别。
eg(给购票系统加上互斥量):
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tickets = 1000;
void *getTickets(void *args)
{
while (1)
{
pthread_mutex_lock(&mutex); //加锁时一定要保证粒度越小越好,比如说这里把锁加在while外面就不太好
if (tickets > 0)
{
usleep(1000);//usleep参数单位是微秒,这里是模拟实际抢票花费的时间
tickets--;
printf("%s",(char*)args);
cout<<"抢票成功,剩余票数:"<<tickets<<endl;
pthread_mutex_unlock(&mutex);
}
else
{
pthread_mutex_unlock(&mutex);
break;
}
usleep(1000);//模拟抢完票后续动作所花费时间,加上后可以让不同的线程抢票
}
return nullptr;
}
int main()
{
pthread_mutex_t mutex;
pthread_t t1, t2, t3;
pthread_create(&t1, nullptr, getTickets, (void *)"thread 1");
pthread_create(&t2, nullptr, getTickets, (void *)"thread 2");
pthread_create(&t3, nullptr, getTickets, (void *)"thread 3");
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
return 0;
}
运行:
注意加锁的时机,我们把将会访问全局变量ticket的代码块加上锁,解锁的地方也要注意,特别是存在if语句的地方,会导致一些情况解锁,另一些情况不能解锁,对此我们可以使用RAII的思想去控制加锁解锁操作。
2.实现原理
在讲其实现原理之前,我们考虑这样几个问题。
加锁后,此线程在临界区是否会被cpu切换?会!但不会有任何影响,二者不矛盾,因为该线程是持有锁被切换的(本质就是锁标志被放进了该线程上下文),即使切换到其他抢票线程要执行临界区代码,也要先申请锁,但无法申请成功,并且会被挂起阻塞,等待锁定线程解锁,因此保证了其他线程不会进入临界区,保证了数据一致性。
要访问临界资源,每个线程都必需先申请锁,这就得让这些线程看到同一把锁且可以访问它,所以锁本身就是一种共享资源,那么谁来保证锁的安全呢?其实是锁本身的实现保证,并且申请锁和释放锁都必须是原子的,如何保证呢,这就要看锁的实现了。
前提:
线程视角是如何看待cpu的寄存器的?cpu上的寄存器本质叫做当前执行流的上下文,也就是寄存器是在cpu上且只有一份,但其中的内容可以多份,调度到此执行流,就将上下文数据放入寄存器,停止调度时就从寄存器中拿回上下文中。
锁实现原理的大致理解:
锁只要一份,存在于内存中(共享),一个线程申请了并将其占为己有,即放入自己的上下文中,也就保证了锁可以一直跟随此线程,即使线程们被cpu调度来调度去,因为只有一份,也需要锁的其他线程拿不到就会阻塞等待有锁的线程解锁,也就是持有锁的线程将锁放回内存中,其他申请的线程按需申请。
锁的实现原理:
如下图,%al是cpu中的一个寄存器,mtx是互斥量,这里可看作成整数(比如1)(锁标志),lock是lock函数的指令伪代码。比如说,有两个线程A、B,tA进入lock,将0放入%al中,此时cpu停止调度tA,将tA的东西(%al中的内容)放入tA的上下文,开始调度tB,tB进入lock,将0放入%al中,再交换(mtx的1与%al的0交换),此时%al为1判断大于0,则return 0,tB申请锁成功,tB出了lock后继续执行,某个时刻cpu将tA又切换回来了,同时将%al的内容(1)放进了tB的上下文中,tA的上下文放回%al中(此时为0),继续执行,交换(此时mtx为0),再判断发现等于0则挂起等待(即阻塞了其他线程),直到tB解锁后,tA再重新执行lock申请锁。
此时,如下图的unlock函数就很好理解了,将mtx变量再赋值为1即可,也就是将锁放回内存中,同时唤醒等待的线程。
-
可重入vs线程安全
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,称为不可重入函数。
常见不可重入的情况:
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见可重入的情况:
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
线程安全:多个线程并发执行同一段代码时,不会出现不同的结果。比如说,对全局变量或者静态变量进行操作,并且没有锁保护的情况下,就会出现线程安全问题。
常见线程不安全的情况:
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
常见线程安全的情况;
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
值得注意的是,线程安全概念针对于线程,重入概念针对于函数,可不可重入没有对错,但不能出现线程安全问题,也就是一定要保证线程安全。
可重入和线程安全的联系:
- 函数是可重入的,那就是线程安全的,反之不对
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的
可重入和线程安全的区别:
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生 死锁,因此是不可重入的。
-
死锁
死锁是指在并发系统中,两个或多个进程因为争夺资源而陷入无限等待的状态,无法继续向前推进。这种情况下,每个进程都在等待其他进程释放资源,导致系统资源无法有效利用。比如说,ta占用锁1,要申请锁2,同时tb占用着锁2,要申请锁1,互相等待对方释放锁而停滞不前。
产生死锁的四个必要条件:
- 互斥条件:一个资源每次只能被一个执行流使用,就是加锁
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
针对四个必要条件的解释:
- 若一个资源能够被多个执行流使用,那也不会因这个资源被其他执行流占用而产生死锁,也就是不加锁又怎么会产生死锁
- 若一个执行流只是请求某个资源并没有占用其他执行流的资源,那就只是普通的申请资源操作,并不会产生死锁。保持条件也是如此。
- 若可以剥夺的话,一个执行流可直接剥夺其他执行流占用的资源,这样也不会产生死锁了
- 比如说,ta需要的资源被tb占用,tb需要的资源被tc占用,若tc需要的资源不被ta占用,那也不会产生死锁。
避免死锁:
- 破坏死锁的四个必要条件。对于互斥条件,不加锁即可破坏死锁;对于请求和保持条件,可以反复调用trylock函数发现申请不了,则可以先释放所占用的资源再次尝试;对于不剥夺条件,可设置优先级,优先级高的线程可以抢占优先级低的线程作占用的资源;对于循环等待条件,设置加锁顺序一致机制。
- 加锁顺序一致。每个线程申请锁必须按照一个顺序,比如有锁1、2、3,某线程必须先申请锁1,申请不成功再申请锁2,申请成功了,则其他线程必须从锁2开始申请。
- 避免锁未释放的场景。可利用RAII思想管理锁的申请和释放或尽量缩短临界区代码
- 资源一次性分配。若需要多个资源,尽量加一次锁申请所有资源,不要加多次锁来申请。
Linux线程同步
线程同步是指多个线程在访问共享资源时,需要通过一定的机制进行协调和同步,以保证数据的正确性和一致性。主要目的是防止多个线程在同一时间对共享资源进行读写操作,导致数据不一致的情况发生。在多线程环境下,由于线程的执行顺序是不确定的,多个线程同时访问共享资源时,可能会产生竞争条件(race condition)。
常见的线程同步机制包括互斥锁(mutex)、信号量(semaphore)、条件变量(condition variable)、读写锁(read-write lock)等。这些机制可以通过限制和协调线程对共享资源的访问,保证线程的正确执行顺序和数据的一致性。
-
条件变量
条件变量是一种特殊的线程同步机制,用于线程之间的通信和协调。在条件变量的基础上,结合互斥锁得使用就可以实现线程的等待和唤醒操作,从而实现线程的同步。
功能:初始化条件变量
参数:
cond:条件变量地址
addr:默认设置为nullptr
注意:其实可以发现,条件变量的初始化和下面要介绍的销毁和互斥量一模一样
功能:销毁条件变量
功能:等待条件满足
参数:
cond:条件变量地址
mutex:互斥量地址,后面解释为什么需要一把锁
功能:唤醒等待
注意:signal函数一次只能唤醒一个线程,而broadcast函数是一次唤醒所有睡眠的线程
-
生产者消费者模型
这里我们通过生产者消费者模型将条件变量的使用讲清楚,在此之前,先来介绍一下什么是生产者消费者模型。
如下图,生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过一个仓库来进行通讯,所以生产者生产完数据之后不用等待消费者处理, 直接扔给仓库,消费者不找生产者要数据,而是直接从仓库里取,仓库就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个仓库就是用来给生产者和消费者解耦的,可以是一个队列,也可以是一个栈,根据需要选择。
其实,进程间通信就是一种生产者消费者模型。还值得注意的是,消费者拿到数据以后处理的同时,生产者向仓库放数据,反之也一样,这是并发过程提高效率的地方,不能狭隘的认为只有生产->仓库->消费者的过程,认为都必须加锁然后串行地执行,其实只有访问仓库地过程是串行地进行。
-
基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
如下代码块就是基于阻塞队列的生产者消费者模型的参考代码,
1.实现参考代码与讲解
根据上面对基于阻塞队列的生产者消费者模型的介绍,我们必须先定义一个阻塞队列,成员包括queue、容量、锁、两个条件变量(一个标记满一个标记空)。对于成员的初始化和释放的构造函数和析构函数不同说,主要是针对于此队列的push、pop函数。对于push函数,我们要将数据push到队列中,而队列是共享资源,因此整个过程需要加锁,循环检测队列是否为满,是的话我们就调用wait函数去等待其消费者消费数据,直到消费者拿走数据,队列不为满,循环跳出,才将数据push到队列中。在此之后呢,也要调用signal或者broadcast函数唤醒使用标记空的条件变量的线程,表示“我生产者这边生产了一个数据,队列现在肯定不为空了,消费者那边的线程可以继续了”。注意,解锁操作和signal函数的顺序无所谓。
对于pop函数,函数体的定义与push函数体的定义一一对应,也是先加锁,再循环检测队列是否为空,空就阻塞等待,等待push函数插入数据,具体参考下方代码。
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include "lockguard.hpp"
using namespace std;
#define DEFAULT_CAP 5
template<class T>
class BlockQueue
{
public:
BlockQueue(int capacity=DEFAULT_CAP)
:_capacity(capacity)
{
pthread_mutex_init(&_mtx,nullptr);
pthread_cond_init(&_emptycond,nullptr);
pthread_cond_init(&_fullcond,nullptr);
}
void push(const T& t)
{
pthread_mutex_lock(&_mtx);
//先检测当前临界资源是否满足访问条件,不满足就得阻塞等待条件满足,但检测临界资源就是在访问临界资源,
//得加锁,但这万一不满足访问条件又等待了,也就是拿着锁去等待了,别的线程怎么去访问临界资源让你满足访问条件呢?
//因此wait函数得传一个锁,意思是阻塞等待时这个锁会被自动释放,当被唤醒时又自动帮助获取这个锁(哪里阻塞就从哪里唤醒)
//这里使用while循环检测而不是if只检测一遍的原因:因为wait是个函数是有可能调用失败的,
//调用失败就会继续向下执行访问临界区,正常向下执行的情况是被唤醒了,因此这就存在“伪唤醒”的情况
//使用while循环检测是因为无论是真正被唤醒还是伪唤醒,我们都再检测一次条件,真正意义上达到条件满足了才向下执行
while(_q.size()==_capacity)
pthread_cond_wait(&_fullcond,&_mtx);
_q.push(t);
//if(_q.size()>=_capacity/2) //策略
// pthread_cond_signal(&_emptycond);
pthread_mutex_unlock(&_mtx); //unlock函数与signal函数位置前后无所谓
pthread_cond_signal(&_emptycond); //当前这样设置是,生产一个消费一个,也可以设置其他策略,
//eg如上策略,pop到size为0阻塞时,push函数这边push到
//size到容量一半时才唤醒进行pop,pop函数中也可以设置这样的策略
}
void pop(T* t)
{
pthread_mutex_lock(&_mtx);
while(_q.size()==0)
pthread_cond_wait(&_emptycond,&_mtx);
*t = _q.front();
_q.pop();
pthread_mutex_unlock(&_mtx);
pthread_cond_signal(&_fullcond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_emptycond);
pthread_cond_destroy(&_fullcond);
}
private:
queue<T> _q; //std容器
int _capacity; //队列容量,可以设置策略比如说数据量达到容量2/3时为满,这里我们设置为达到容量1时为满
pthread_mutex_t _mtx; //互斥量
pthread_cond_t _emptycond; //触发队列为空时的条件变量
pthread_cond_t _fullcond; //触发队列满时的条件变量
};
2.运行测试
对于测试的整体框架,创建一批生产者线程生产数据和一批消费者线程处理数据,做出一个多生产者多消费者模型;对于数据,也就是一个任务,这里举例两个数字相加的任务,也就是生产者生成两个数字,消费者拿到将其相加即为处理数据。参考代码如下。
#include "blockqueue.hpp"
#include <time.h>
typedef function<int (int,int)> func_t;
int myAdd(int x,int y)
{
return x+y;
}
class Task
{
public:
Task(){}
Task(int x,int y,func_t op=myAdd) //可以实现多个功能函数,传入Task类中操作两个数,这里举例只写了加法运算
:_x(x)
,_y(y)
,_op(op)
{}
int operator()()
{
return _op(_x,_y);
}
public:
int _x;
int _y;
func_t _op;
};
//生产(消费)一个task类型,也就是执行一个任务 | 多生产者多消费者
void* prodthread(void* args)
{
BlockQueue<Task>* bq=(BlockQueue<Task>*)args;
while(1)
{
int x=rand()%10; //这里也可以从键盘拿x,y,也就是用户输入
int y=rand()%10;
bq->push(Task(x,y));
cout<<pthread_self()<<" 生产了一个任务:"<<x<<"+"<<y<<"=?"<<endl;
sleep(1);
}
}
void* conthread(void* args)
{
BlockQueue<Task>* bq=(BlockQueue<Task>*)args;
while(1)
{
Task t;
bq->pop(&t);
cout<<pthread_self()<<" 消费了一个任务:"<<t._x<<"+"<<t._y<<"="<<t()<<endl;
//sleep(1);
}
}
int main()
{
srand((unsigned int)time(nullptr));
BlockQueue<Task>* bq=new BlockQueue<Task>();
pthread_t productor[2],consumer[3];
pthread_create(productor,nullptr,prodthread,(void*)bq);
pthread_create(productor+1,nullptr,prodthread,(void*)bq);
pthread_create(consumer,nullptr,conthread,(void*)bq);
pthread_create(consumer+1,nullptr,conthread,(void*)bq);
pthread_create(consumer+2,nullptr,conthread,(void*)bq);
pthread_join(productor[0],nullptr);
pthread_join(productor[1],nullptr);
pthread_join(consumer[0],nullptr);
pthread_join(consumer[1],nullptr);
pthread_join(consumer[2],nullptr);
delete bq;
return 0;
}
运行:
-
POSIX信号量
在操作系统中,信号量是一种用于线程同步的机制。POSIX (Portable Operating System Interface) 是一种标准的操作系统接口,而 POSIX 信号量则是符合 POSIX 标准的信号量。
其实,POSIX 信号量是一个计数器,用于实现线程之间的互斥和协作。它可以有一个或多个资源,并且可以通过两个基本操作来操作这些资源:P(等待)和V(释放)。P 操作会将信号量计数器减一,如果计数器变为负数,则线程会进入等待状态。V 操作会将信号量计数器加一,如果计数器大于等于 0,则唤醒一个等待的线程。
功能:初始化信号量
参数:
sem:信号量地址
pshared:0表示线程间共享,非0表示进程间共享
value:信号量初始值
功能:销毁信号量
功能:等待信号量,就是P操作,会将信号量的值减1
功能:发布信号量,就是V操作,会将信号量的值加1
-
基于环形队列的生产者消费者模型
环形队列(Circular Queue)是一种特殊的队列数据结构,具有固定大小的缓冲区和两个指针,分别指向队头和队尾。在环形队列中,当队尾指针指向缓冲区末尾时,如果还有新元素要入队列,则指针会回到缓冲区的起始位置,形成一个环形。因此可以发现,环形队列的起始状态和结束状态是一样的,不好判断为空或者为满,所以可以通过加计数器来判断满或者空,而计数器就可以用到信号量。
如下代码块就是基于环形队列的生产者消费者模型的参考代码,
1.实现参考代码与讲解
首先,我们先将信号量的系统接口封装成一个类,方便调用,包括构造函数、析构函数、p操作(wait函数)和v操作(post函数)。
其次,基于此类去封装我们的环形队列的类,成员包括vector(模拟环形队列)、头尾指针、记录数据多少的信号量、记录空间多少的信号量以及两把锁。除了构造函数去初始化这些成员和析构函数释放这些成员之外,就是两个主要的接口——push(插入数据到环形队列中)、pop函数(从队列取数据)。对于push函数,因为要插入数据,所以先空间信号量减一,之后加锁访问共享资源——vector,去插入数据,插入成功之后更新数据信号量,即加一;对于pop函数则相反,一开始因为要取数据,所以数据信号量减一,在加锁将数据取出以后,把空间信号量加一。
#include <iostream>
#include <semaphore.h>
#include <pthread.h>
#include <vector>
using namespace std;
const int default_size=5;
class Sem{
public:
Sem(int value)
{
sem_init(&_sem,0,value);
}
~Sem()
{
sem_destroy(&_sem);
}
void p()
{
sem_wait(&_sem);
}
void v()
{
sem_post(&_sem);
}
private:
sem_t _sem;
};
template<class T>
class CircularQueue
{
public:
CircularQueue(int size=default_size)
:_vec(size)
,_size(size)
,_front(0)
,_rear(0)
,_spacesem(_size)
,_datasem(0)
{
pthread_mutex_init(&_pmtx,nullptr);
pthread_mutex_init(&_cmtx,nullptr);
}
~CircularQueue()
{
pthread_mutex_destroy(&_pmtx);
pthread_mutex_destroy(&_cmtx);
}
//在队列为空或满时,首尾指针指向同一个元素,因此生产者和消费者在此时必须存在互斥或者同步关系。
//当队列为空,只能进入一方(无论先调度生产者还是消费者线程都无所谓,因为即使是调度消费者线程,也会因为队列为空
//而堵塞挂起在spacesem的p操作中,等待生产者线程生产完数据),也就是生产者,由此实现互斥的关系
//队列为满时也是如此,只有消费者可以进入访问队列资源
//而在队列非空也非满时,生产者和消费者线程是处于并发的状态的,也就是一块执行,个执行各的。
// void push(const T& t)
// {
// _spacesem.p();
// _vec[_front]=t;
// _front=(_front+1)%_size;
// _datasem.v();
// }
// void pop(T* t)
// {
// _datasem.p();
// *t=_vec[_rear];
// _rear=(_rear+1)%_size;
// _spacesem.v();
// }
//以上是单生产者单消费者的情形,下面是多生产多消费情形的push、pop操作
//比较于单生产单消费,需要多维护生产者与生产者、消费者与消费者两个关系,比如看生产者与生产者
//因为生产者们的临界资源是下标(front),无论是队列为空还是非空状态,都只能让一个生产者进入到push操作中,所以需要加锁控制
//一般先申请信号量,再加锁。原因:
//因为若先加锁再申请信号量,那么进来push的生产者们就只能一个一个等待别的生产者解锁之后再申请信号量然后才能访问临界资源
//那要是先申请信号量,再加锁的话,就可以在一个生产者访问临界资源时,其他生产者已经申请好信号量,当该生产者解锁之后,
//其他生产者就可以按照信号量的申请,拿着锁进而访问临界资源了,效率更高。就比如多人在电影院门口等待买票观影,有这样两种情况:
//一是随机一个一个进,进来之后买票,买好票再刷票观影;二是先大家该买好票的买好,然后一个个排队进入观影,明显后者效率更高
void push(const T& t)
{
_spacesem.p();
pthread_mutex_lock(&_pmtx);
_vec[_front]=t;
_front=(_front+1)%_size;
pthread_mutex_unlock(&_pmtx);
_datasem.v();
}
void pop(T* t)
{
_datasem.p();
pthread_mutex_lock(&_cmtx);
*t=_vec[_rear];
_rear=(_rear+1)%_size;
pthread_mutex_unlock(&_cmtx);
_spacesem.v();
}
private:
vector<T> _vec;
int _size;
int _front;
int _rear;
Sem _spacesem;
Sem _datasem;
pthread_mutex_t _pmtx;
pthread_mutex_t _cmtx;
};
2.运行测试
对于测试基于环形队列的生产者消费者模型,我们一样是采用多生产者多消费者去竞争的情形。对此,分别定义生产者和消费者线程函数,生产者线程函数则是循环生产数据(这里也可以用上一个模型的task,这里我们就是随机生成一个数),调用push函数插入数据到环形队列,而消费者线程函数则是调用pop函数从环形队列取数据。参考代码如下。
#include "circularqueue.hpp"
#include <unistd.h>
void* prodthread(void* args)
{
CircularQueue<int>* cp=(CircularQueue<int>*)args;
while(1)
{
//sleep(1);
int data=rand()%100;
cp->push(data);
cout<<"["<<pthread_self()<<"]生产:"<<data<<endl;
}
return nullptr;
}
void* conthread(void* args)
{
CircularQueue<int>* cp=(CircularQueue<int>*)args;
while(1)
{
sleep(1);
int data;
cp->pop(&data);
cout<<"["<<pthread_self()<<"]消费:"<<data<<endl;
}
return nullptr;
}
//多生产多消费
int main()
{
srand((unsigned int)time(nullptr));
CircularQueue<int>* cq=new CircularQueue<int>();
pthread_t prod[3],con[2];
pthread_create(prod,nullptr,prodthread,(void*)cq);
pthread_create(prod+1,nullptr,prodthread,(void*)cq);
pthread_create(prod+2,nullptr,prodthread,(void*)cq);
pthread_create(con,nullptr,conthread,(void*)cq);
pthread_create(con+1,nullptr,conthread,(void*)cq);
pthread_join(prod[0],nullptr);
pthread_join(prod[1],nullptr);
pthread_join(prod[2],nullptr);
pthread_join(con[0],nullptr);
pthread_join(con[1],nullptr);
return 0;
}
//单生产单消费
// int main()
// {
// srand((unsigned int)time(nullptr));
// CircularQueue<int>* cq=new CircularQueue<int>();
// pthread_t prod,con;
// pthread_create(&prod,nullptr,prodthread,(void*)cq);
// pthread_create(&con,nullptr,conthread,(void*)cq);
// pthread_join(prod,nullptr);
// pthread_join(con,nullptr);
// return 0;
// }
运行:
线程池
线程池是一种用于管理和复用线程的技术。它是一组预先创建的线程,这些线程可以在需要时被重用来执行任务。线程池的主要目的是减少线程创建和销毁的开销,并且可以限制同时执行的线程数量,以防止系统资源被耗尽。
使用场景:
- 需要大量线程来完成任务,并且要求完成时间比较短
- 对性能要求苛刻
- 需要接收大量突发性请求
优势:
降低资源消耗:通过复用线程,减少了线程创建和销毁的开销。
提高响应速度:线程池能够快速分配任务给可用的线程,从而减少任务执行的延迟时间。
控制并发线程数:通过设置线程池的大小,可以控制同时执行的线程数量,以避免资源耗尽。
提供任务排队和调度机制:线程池通过任务队列来管理待执行的任务,可以按照一定的规则进行调度和排序。
总之,线程池是一种提高程序性能和资源利用率的重要技术,它在多线程编程中被广泛应用。
1.实现参考代码与讲解
首先,为了方便起见,我们先将线程的基本操作接口封装成了一个类Thread,成员哈桑农户包括create、join函数,成员变量包括线程id、线程名、线程函数、线程数据,同时也封装了线程数据的类,包括所在线程的线程名以及线程所在的线程池对象指针(为什么需要后面说)。
其次,最重要的就是线程池类的实现。其实,线程池外生产数据放入线程池,其中的所有线程竞争去处理数据,这本质就是一个生产者消费者模型,因此我们也需要一个交易场所,这里采用基于阻塞队列的生产者消费者模型的框架,所以线程池类的成员变量包括队列(仓库)、锁、条件变量和最基本的存放多个线程的vector和线程个数。
对于成员函数,除了构造函数和析构函数去初始化和释放成员变量,定义run函数去启动所有线程、pushtask函数是线程池外调用此函数去插入数据到线程池的仓库中,以及一些获取当前线程池对象的成员的接口以便静态成员函数routine使用。
着重强调一下作为线程池中所有线程的线程函数routine。这里用static的原因:在类内写线程函数routine,函数会自带一个参数——this指针,传参时会存在类型不匹配问题。当然也可以将routine函数写在类外,就不用加static关键字,但是这里如果这里加了static,那么routine函数就成了静态函数,是访问不了非静态成员tasks(函数内必须要访问)的,一种方法是将成员tasks也设置成static的,但是这就导致定义多个线程池对象时,只能使用这一个队列,还有一种方法就是将this指针通过放入ThreadData中传入到routine函数中,这里才采取第二种,因此线程数据类型的实现中我们加入了所在线程池对象的指针。
其实,对于线程函数routine函数的实现并不复杂,完全可以参考基于阻塞队列的生产者消费者模型的实现——循环检测线程池仓库有无数据,无数据就阻塞,一旦外界向仓库中插入了数据,这里循环出来让线程去处理数据,当然这个过程还是要加锁的,这里我们采用lockguard这种RAII思想的加锁形式。
#include "thread.hpp"
#include "lockguard.hpp"
#include <vector>
#include <queue>
#include "task.hpp"
#include "log.hpp"
using namespace std;
const int default_size =3;
typedef void*(*func_t)(void*);
class ThreadData
{
public:
string _tname;
void* _tp; //由于routine函数那里需要this指针,因此也放进线程数据里传入
};
//将线程接口封装成类,方便调用
class Thread
{
public:
Thread(){}
Thread(int num,func_t func,void* args)
:_func(func)
{
stringstream oss;
oss<<"thread"<<num;
_tname=oss.str();
_tdata._tname=_tname;
_tdata._tp=args;
}
void create()
{
pthread_create(&_tid,nullptr,_func,(void*)&_tdata);
}
void join()
{
pthread_join(_tid,nullptr);
}
private:
pthread_t _tid;
string _tname;
func_t _func;
ThreadData _tdata;
};
//线程池文件
template<class T>
class ThreadPool
{
public:
ThreadPool(int size=default_size)
:_size(size)
{
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_cond,nullptr);
_threads.resize(_size);
for(int i=0;i<_size;i++)
{
_threads[i]=Thread(i+1,routine,this); //构造函数的花括号内是可以使用this指针的,因为在构造函数头(初始化列表)结束后
//对象就已经构造出来了,花括号是一个赋值的过程
}
}
void run()
{
for(int i=0;i<_size;i++)
{
_threads[i].create();
}
}
static void* routine(void* args) //消费者
{
ThreadData* td=(ThreadData*)args;
ThreadPool<T>* tp=(ThreadPool<T>*)td->_tp;
while(1)
{
T task;
{
LockGuard lg(&tp->getlock()); //这里应该是创建一个临时对象lg,而不是定义一个匿名对象
while(tp->isEmpty())
pthread_cond_wait(&tp->getcond(),&tp->getlock());
task=tp->getTask();
} //增加花括号:使得执行任务时不用占有锁
//cout<<td->_tname<<"解决任务:"<<task._x<<"+"<<task._y<<"="<<task()<<endl; //执行任务(提供仿函数执行此任务)
logMessage(NORMAL,"%s解决问题:%d+%d=%d",td->_tname.c_str(),task._x,task._y,task());
// cout<<task<<endl;
// sleep(1);
}
}
//线程池本质就是一个生产消费模型,通过pushtask函数向队列里面放任务,线程们去竞争着解决任务
void pushtask(const T& task) //生产者
{
LockGuard lg(&_lock);
_tasks.push(task);
pthread_cond_broadcast(&_cond);
}
~ThreadPool()
{
for(int i=0;i<_size;i++)
{
_threads[i].join();
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
public:
pthread_mutex_t& getlock()
{
return _lock;
}
bool isEmpty()
{
return _tasks.size()==0;
}
pthread_cond_t& getcond()
{
return _cond;
}
T getTask()
{
T t=_tasks.front();
_tasks.pop();
return t;
}
private:
vector<Thread> _threads;
int _size;
queue<T> _tasks;
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
2.运行测试
对于线程池的测试,我们依旧使用给定两个数字将其相加的数据形式task,而主函数也是很简单,生成两个数字之后调用线程池提供的pushtask接口向队列中插入数据,那么之后线程池的线程就会竞争去取数据然后处理数据了,以此完成两者的交互。参考代码如下。
using namespace std;
#include "threadpool.hpp"
#include "task.hpp"
#include <ctime>
#include <unistd.h>
typedef function<int (int,int)> Func;
int myAdd(int x,int y)
{
return x+y;
}
class Task
{
public:
Task(){}
Task(int x,int y,Func op=myAdd) //可以实现多个功能函数,传入Task类中操作两个数,这里举例只写了加法运算
:_x(x)
,_y(y)
,_op(op)
{}
int operator()()
{
return _op(_x,_y);
}
public:
int _x;
int _y;
Func _op;
};
int main()
{
//logMessage(NORMAL,"%s %d %c %f\n","test",12,'c',3.14);
srand((unsigned long)time(nullptr) ^ getpid());
ThreadPool<Task> tp;
tp.run();
int a,b;
while(1)
{
int x = rand()%100 + 1;
usleep(7721);
int y = rand()%30 + 1;
Task task(x,y);
//cout<<"任务:"<<task._x<<"+"<<task._y<<"=?"<<endl;
logMessage(NORMAL,"任务:%d+%d=?",task._x,task._y);
tp.pushtask(task);
sleep(1);
}
return 0;
}
运行:
后记
看了以上的讲解,大家应该对线程概念、本质、底层逻辑及相关接口的使用有了一定的了解,在学完所有知识点之后,大家可以自己尝试写一些线程池的实现代码,在这个过程中检验一下自己对这些知识点的掌握程度,不熟悉的可以回头再看一下。总的来说,线程部分的知识点并没有太难,就是很多很繁杂,但是这不正是Linux操作系统学习的共同点嘛。在这一章节介绍过后,我们就会进入网络部分,网络部分的大部分操作都会基于线程的学习,所以要把这一章节学透了再进入到网络部分,会容易过渡一些。不说了加油吧,不会的举手哦!