文章目录
- 一、初始线程
- 1.概念
- 2.执行
- 3.调度
- 4.切换
- 二、线程控制
- 1.创建
- 2.等待
- 3.分离
- 4.退出
- 5.取消
- 三、线程安全
- 1.互斥
- 1.1初始
- 1.2理解
- 1.3锁
- 1.3.1概念
- 1.3.2原理
- 1.3.4死锁
- 2.同步
- 2.1概念
- 2.2原理
- 3.生产消费者模型
- 总结
- 尾序
一、初始线程
1.概念
- 简单的概念:
线程就是一个进程的一份子。
下面我们从生活的角度我们来简单的理解一下这个概念:
- 鲁迅曾经说过,人类的悲欢并不相通,我只觉得世界吵闹。
- 在这个世界里,每个人都有一份属于自己的剧本,或悲伤或欢喜,但都在演好属于自己的剧本。
- 假设你是父母双全,从小幸福的生活在一个完整的家庭里面。老一辈人在出门放松娱乐,父母一辈在外出忙碌挣钱,年轻一辈在奋力拼搏。整个家庭,每个成员的生活方式虽然各不相同,但都在追求属于更加美好的幸福生活。
- 假设你父母双亡,在这个世界上你孤身一人,虽然无亲无故,但也不甘命运,用自己的努力书写自己命运。
- 两种情况,只有亲身经历,才能感同身受。因此人与人的悲欢各不相同。
我们回顾到线程与进程:
- 第一个家庭的每个成员,就是线程,一个家庭就是进程。
- 第二个家庭的一个成员,是线程。 这个家庭也是进程。也就意味着进程里只有一个线程。
- 人与人的悲欢离合各不相同,即代表着进程之间相互独立,互不影响。
- 因此:进程由若干个线程组成。且从概念上讲,线程 != 进程
下面我们单纯从理论的角度理解上面的概念:
- 先来谈谈进程。
- 进程之间互相独立,也就是第一个家庭与第二个家庭没有任何关系。
- 进程有自己独立的进程地址空间,页表,文件描述符表,appending,block,hanlder表,上下文结构。每个进程运行时互相独立互不干扰。
- 说明:这些知识不熟悉的可见Linux专栏。
- 再来引入与理解线程。
- 直接给出图解:
- 注意:此结构仅代表Linux操作系统是这样实现的,别的不保真哦。
- 观察
- 一个进程拥有多个task_struct数据结构对象。
- 多个task_struct数据结构对象指向同一份进程地址空间。
- 一个进程只拥有一个进程地址空间,一张页表,一份代码和数据。
- 解释:
- 这里的多个task_struct即可理解为一个进程拥有多个线程。
- 只有一个task_struct,可以理解为一个进程只有一个线程的特殊情况。即一个家庭只有一名成员,这个成员一般称之为孤儿寡女。
- 每一个进程(家庭)都拥有自己的task_struct(成员),每个成员共享家庭的资源,但同时也有着自己的任务。在Linux中我们将进程的每个成员正在执行的任务称之为执行流。这个任务的执行需要占据一定的资源。
- 一个进程拥有独属于自己的资源,即进程地址空间等。因此我们说进程是分配资源的基本实体。
- 总结:
- 一个进程可拥有多个线程,即多个执行流。一个线程是一个执行流。
- 进程是分配资源的基本实体。线程之间共享进程资源,即浅拷贝。
- 注意:
Linux中
一个task_struct不再代表着一个进程,而是多个task_struct共同代表着一个进程。但一个页表,一个进程地址空间,一个进程只有一个,因此这些资源可以代表一个进程。
既然进程至少有一个线程,我们于是将:
- main里的执行流,叫做主线程。
- 其余创建的线程,我们称之为子线程或者副线程。
我们再来谈谈Linux为什么要这样设计:
- 从历史的发展角度来看,肯定是进程最先被设置出来。
- 线程与进程之间有着极大的相似程度。
- 重写线程的接口,意味着更高的成本(开发,维护,测试),不稳定性。而复用,即吃现成的,有利于节省人力,物力,财力。如果是我,我也这么干,难道吃现成的不香吗?
- 既然复用了进程的接口,那也就意味着,线程与进程之间的耦合度提高了。但是为了让用户用的明白,因此还要封装(成本较
重写接口
很低)了一层给用户使用的线程库。
- 补充:在Windows设计的时候,是重新设计出了线程的一系列接口与使用,因此Windows的线程,是真正意义,即有与进程明显区分的概念明确的线程。
- 拓展:操作系统这一门学科,是计算机界的哲学,只有理论,而我们今天所讲的是具体操作系统的实现方案,即实践。在学习过程中只学理论是很抽象的,还要结合实际的例子才能深入理解,并学会运用。
那也就意味着Linux没有真正意义的线程,而是将线程与进程融合之后的轻量极进程
的概念。
2.执行
- 既然一个进程中可能有多个线程,那么这多个线程是如何执行的呢?
- 线程本质上就是执行流,执行流的本质就是函数栈帧。
- 每个线程都维护着自己的栈结构,执行流在栈上运行。
- 图解:
- 如果学过函数栈帧的运行,可以理解,执行流的执行就是栈帧不断创建与销毁的过程。
- 如果没学过,可以尝试看一看这篇文章——函数栈帧,简单看汇编代码的执行就等于修炼程序员的内功。
- 多个线程在跑,就是多个执行流在跑。假设APP运行之后只有一个进程,你可以在上面听音乐,也可以看文章。两者可以同时进行,之间不会影响。听音乐和看文章就是两个执行流。能一块跑就是多执行流的功能。
3.调度
- 那一个进程有多个task_struct,这些task_struct如何在队列中排队运行呢?
- 先来回顾一下大O(1)调度算法:
- 其次我们在进一步分析进程:
- 我们之前讲的进程就是task_struct,现在讲的是轻量级进程。
- 一个进程如果有多个task_struct的话,那这岂不是乱套了吗?
- 解释:
- 先来区分一下进程与线程,一个进程是只有一个
pid
的,也就意味属于一个进程的多个task_struct的pid是相同的。- 线程与线程之间,也可以通过
tid
进行区分,也就是一个进程的多个线程tid是不同的。- 既然如此,那这多个pid相同的task_struct,派出一个在队列中排队不就行了,相同pid的task_struct用双向链表管理起来。
- 当调度到task_struct时,这个task_struct代表着进程,但task_struct也是轻量级进程,在双向链表中。
- 因此只需派一个task_struct代表排队即可。
- 大致图解:
- 看着是不是有点像摩天轮呢?
4.切换
- 进程的切换 vs 线程的切换?
- 先来简单谈谈进程的切换。
- 进程是在CPU上进行切换的。
- 进程有属于自己的上下文。具体指的是寄存器信息,地址映射,文件的信息,一些资源,比如内存。
- 为什么要有进程上下文呢?
- 一个进程在CPU上的运行的时间是有一定的限度的。
- 进程正在运行的时间到了,但是你当前任务正在执行,此时操作系统会强行将你从CPU上剥离下来,可不管你是否执行完毕了。
- 因此为了保证你下一次能继续执行,因此需要保存上下文,以便于后面再进行加载时的恢复。
- 举个例子,假如你在自习室学习,可是到了晚上10点就要停电,也就意为着10点之后不能继续学习了,因此如果你学到了9点50是不是就该收拾东西走人了,那收拾这个动作的目的就是为了明天能够继续学习。上下文与之同理。
- 进程的切换需要切换,进程地址空间,页表,文件,文件描述符表等资源。
- 进程的切换的流程分为三步:
- 保存当前进程的上下文。
- 加载下一个进程的上下文。
- 加载下一个进程的资源。之后开始由加载到CPU上的进程执行。
- 再来谈谈线程的切换。
- 首先了解了进程的切换,线程的切换就了解了一半。
- 线程的切换就是执行流的切换。
- 执行流的切换,需要将栈的信息进行保存进行切换 。
- 因此线程的上下文包括,与栈帧相关的寄存器的信息等。
- 线程的切换的流程分为三步:
- 保存当前线程的上下文。
- 加载下一个线程的上下文。
- 加载下一个线程的资源。切换完成,执行已加载好的线程。
最后我们总结一下:
- 进程切换VS线程切换
- 常见的进程切换的资源:
- 进程地址空间与页表
- 打开的文件(IO)
- 可执行程序(数据)与当前工作目录
- 文件描述符表
- 信号的三张表(a,b,h)
- 进程pid
- 常用的线程切换资源:
- 一些与栈帧相关寄存器的信息,例eax,ebx,ecx(通用),pc指针。
- 当前所处的权限(用户态或者内核态)。
- 线程的局部存储(线程自己的数据)。
- 线程tid
- 总结:
- 线程共享进程资源,但也有属于自己的资源。
- 进程切换的资源比线程切换的资源要多,就拿进程地址空间来说,进程要切换进程地址空间,而线程就只切换自己执行相关的资源即可,且共享同一进程地址空间,因此地址空间不切换。
- 拓展: 同一进程的线程的资源是在同一块进程地址空间里面的,这也就意味着线程之间的独立性并不是很强,因此如果想要访问彼此间的资源也是可以的,但是这也说明了线程之间具有天然的通信功能。
- 拓展:重谈地址空间之页表
. * 之前我们都是这样画页表的,看起来比较形象直观,那真实的页表是这样吗?
- 先进行简单的计算(32位):
- 先假设一个进程要占用所有物理内存。
- 虚拟地址4字节,物理地址4字节。
- 物理内存4GB即232 字节
- 再乘8(一个页表的元素存放虚拟和物理地址共8字节),即可得到表示所有地址的页表大小,即32GB。
- 因此我们可以明显的看到即使把内存用完还远远不够,更何况还有代码和实际数据。更加不够,因此很明显页表的原理绝对不是这样的。
-
那真实的页表原理是如何的呢?
-
可以粗略计算 存满最多消耗 210 * 4 * 210 * 4 byte = 16MB左右
拓展:
- CR2寄存器存放转换失败的地址。
- CR3寄存器存放转换成功的地址(物理)。
- CR0和CR4,用于控制处理器的相关模式与功能。
- CR1保留未被使用。
二、线程控制
- 上文中提及过在Linux下,只有轻量级进程的概念,如果要提供给用户使用,必然会封装相应的接口,然后打包以库的形式呈现给用户。
下面我们来了解与之对应的线程接口吧!
1.创建
- 接口
/*
头文件
*/
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
/*
函数声明:
一、参数
1.thread,输入型参数,是一个pthread_t 类型变量的地址。
2.attr,用于设置线程的属性,这里我们不关心设为NULL即可。
3.stat_routine,一个返回类型为void* 参数为 void*的函数指针,即线程
执行流的窗口。
4.arg,是给start_routine传的参数,不关心,可设置为NULL。
二、返回值
1.成功,返回0。
2.失败返回错误码,注意并不是设置errno全局变量,是直接以返回值的形式返
回错误码。
说明: Compile and link with -pthread.
使用第三方库在编译与链接时需在后面加上 -pthread选项。
*/
- 简单使用
#include<pthread.h>
#include<iostream>
#include<cstring>
#include<unistd.h>
using namespace std;
void* RunRoutine(void*)
{
cout << "I am new_pthread..." << endl;
return nullptr;
}
int main()
{
pthread_t tid;
int pret = pthread_create(&tid,nullptr,RunRoutine,nullptr);
/*RunRoutine == &RunRoutine*/
sleep(1);//先让子线程跑完。
if(!pret)
cout << "creat success!" << endl;
else
cout << "creat fail , the reason is " << strerror(pret)
<< endl;
cout << "I am main_pthread..." << endl;
//休眠一秒防止,进程过早退出。
sleep(1);
return 0;
}
- Makefile
pthread:pthread.cpp
g++ -o $@ $^ -std=c++11 -pthread
# 这里后面的 -pthread必须添加,因为线程库是第三方库。
# 后面的例子默认已带。
.PHONY:clean
clean:
rm -f pthread;
-
说明:
-
运行结果:
- 可见,结果是符合预期的。
我们再补充几点细节:
- 第一个sleep(1),因为主线程和子线程两个执行流不知道哪一个先执行,所以主线程休眠一秒,以便于控制程序的执行先后顺序,其次这里的显示器是共享资源,如果不区分哪一个先执行,则并发访问可能会导致显示器打印数据错乱的现象。
- 第二个sleep(1),因为如果return 0,代表着进程退出,进程退出不管线程是否还在执行都要退出。
- 错误信息以返回值的形式,而不以错误码(全局变量),原因与第一个sleep(1)相同,多执行流会导致一份资源被多人使用,从而导致数据与预期不符的问题。
- main函数为主程序的入口(最开始学习C语言就知道),而创建线程我们也要给其它线程一个入口函数,因此第三个参数是以函数指针的形式进行传入的。而函数指针和函数变量的内容是相同的,因此传哪一个都可以。
- 除此之外,tid 我们下面再进行深入讨论。
2.等待
/*
头文件
*/
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
/*
一、函数声明
1.thread,线程tid,类型为pthread_t。
2.ret(urn)val(ue),输出型参数,即线程的返回值(运行情况)。
也传空指针表示不关心。
二、返回值
1.成功返回0.
2.失败返回错误码。
*/
- 简单使用
#include<pthread.h>
#include<iostream>
#include<cstring>
#include<unistd.h>
using namespace std;
void* RunRoutine(void* args)
{
cout << "I am new_pthread..." << endl;
return nullptr;
}
void Creat_Check(int ret)
{
if(!ret)
cout << "creat success!" << endl;
else
cout << "creat fail , the reason is " << strerror(ret)
<< endl;
}
void Wait_Check(int ret)
{
if(!ret)
cout << "wait success!" << endl;
else
cout << "wait fail , the reason is " << strerror(ret)
<< endl;
}
int main()
{
cout << "I am main_pthread..." << endl;
pthread_t tid;
int pret = pthread_create(&tid,nullptr,RunRoutine,nullptr);
sleep(1);//先让子线程跑完。
Creat_Check(pret);
pret = pthread_join(tid,nullptr);//阻塞等
Wait_Check(pret);
return 0;
}
- 运行结果:
- 显而易见,这里的打印结果是符合预期的。
- 需要提及的是这里的pthread_join为
阻塞式等待
。等待的目的无非就是关系子线程的任务的执行情况。
为了进一步理解pthread_join的第二个参数,且更进一步理解线程,下面我们让一个线程执行1到100之间的和,并将结果返回给我们的主线程。
- 代码:
struct cal_task
{
cal_task(int start,int end)
:_start(start),_end(end)
{}
int process()
{
int sum = 0;
for(int i = _start; i <= _end; i++)
{
sum += i;
}
return sum;
}
int _start;
int _end;
};
struct Result
{
Result(int res,int check)
:_res(res),_check(check)
{}
int _res;
int _check;//检查是否执行正确。
};
void* RunRoutine(void* args)
{
cal_task* task = static_cast<cal_task*>(args);
return new Result(task->process(),0);
}
void Creat_Check(int ret)
{
if(!ret)
cout << "creat success!" << endl;
else
cout << "creat fail , the reason is " << strerror(ret)
<< endl;
}
void Wait_Check(int ret)
{
if(!ret)
{
cout << "wait success!" << endl;
}
else
cout << "wait fail , the reason is " << strerror(ret)
<< endl;
}
int main()
{
cout << "I am main_pthread..." << endl;
pthread_t tid;
int pret = pthread_create(&tid,nullptr,
RunRoutine,new cal_task(1,100));
sleep(1);//先让子线程跑完。
Creat_Check(pret);
Result* ret;
pret = pthread_join(tid,(void**)&ret);//阻塞等
Wait_Check(pret);
cout << "result is: " << ret->_res << " check is: "
<< ret->_check << endl;
return 0;
}
- 运行结果:
- 显然完成了我们所给的任务。
此处我们稍稍解释一下:
- 我们派发给子线程了一个指针,指针指向了一个任务对象(堆上开辟的)。
- 子线程完成之后,给我们返回了一个存放任务执行结果的对象(堆上开辟的)。
- 此处计算的是1到100的值。返回的对象中包含了计算的结果和计算的执行是否正确的信息。
- 因此我们传给创建子线程传入的最后一个参数从更为现实的角度来说,意味着派发了一个任务交给它执行。而子线程的返回值意味着任务执行的结果。在从更为深入的情况进行讨论,这里的void的返回值和void的参数使这个子线程的参数更为灵活。
- 补充:我们之前学习的例如 malloc的返回值,memset的参数,qsort其中都有void* 的变量,其目的就是为了让函数的使用更加的灵活,其实就等同于C++的模版。
前两种都是阻塞式等待,有没有非阻塞等待呢?
- 答案是没有,不过有个类似的,是与主线程断开连接,即分离子线程。
3.分离
- 接口
/*
头文件
*/
#include <pthread.h>
pthread_t pthread_self(void);
/*获取线程的tid*/
int pthread_detach(pthread_t thread);
/*分离线程*/
/*
函数参数:线程的tid
返回值:成功返回0,失败返回错误码。
*/
- 简单使用
#include<pthread.h>
#include<iostream>
#include<vector>
#include<cstring>
#include<unistd.h>
using namespace std;
void Creat_Check(int ret)
{
if(!ret)
cout << "[main pthread]:creat success!" << endl;
else
cout << "creat fail , the reason is " << strerror(ret)
<< endl;
}
void Wait_Check(int ret)
{
if(!ret)
{
cout << " wait success!" << endl;
}
else
cout << "wait fail , the reason is " << strerror(ret)
<< "the retval is " << ret << endl;
}
void detach_check(int ret)
{
if(!ret)
{
cout << "detach success!" << endl;
}
else
{
cout << "detach fail,the reason is " << strerror(ret)
<< endl;
}
}
void* RunRoutine(void*)
{
//当然子线程自己也可以分离
/*
pthread_deatch(pthread_self());
*/
cout << "[new_pthread]:my tid is " << (int *)pthread_self()
<< endl;
return nullptr;
}
const int pthread_num = 2;
int main()
{
vector<pthread_t> tids;
for(int i = 0; i < pthread_num; i++)
{
pthread_t tid;
int pret = pthread_create(&tid,nullptr,RunRoutine,nullptr);
sleep(1);//先让子进程执行完毕。
Creat_Check(pret);
tids.push_back(tid);
}
for(int i = 0; i < pthread_num; i++)
{
int dret = pthread_detach(tids[i]);//将线程分离.
detach_check(dret);
int wret = pthread_join(tids[i],nullptr);
Wait_Check(wret);
}
return 0;
}
- 运行结果:
- 下面我们分析一下上面的代码:
- 这里我们创建了两个子线程,可以根据运行结果看到这里的tid,其实是个地址,如果再仔细分析,这里的tid其实是共享区的地址,这里先简要的点出,下面我们会进行讨论。
- 我们是在子线程运行结束之后,再将子线程与主线程之间的链接进行取消的。这就好比,你的亲人去世了,你要跟他断绝关系一样,只不过你的亲人没办法从墓里面爬出来跟你闹而已,比较轻松。
- 我们在子线程运行时,进行分离,就是你的亲人在世时跟他断绝关系,这时你可能跟你的亲人就有一段不可描述的经历了。分离的过程可能比较费事。
- 这里的分离只断关系,不会影响子线程之后的执行。
4.退出
- 如果我们想让这个线程单独退出呢?
- 我们在执行的时候,并不是从上往下,就一条执行流,还穿插着一些函数调用。
int add(int x,int y)
{
//exit(0);
return x + y;
}
int main()
{
//...
add(1,2);
//...
return 0;
}
- 就拿这个例子来说,main函数在执行过程中,会调用add函数,进入add的函数栈帧,执行完毕之后可以用返回值的形式,也可以直接用exit(0), 提醒这里的exit(0)是退出进程。
- 同理如果说,线程函数在执行过程中,调用了某个函数,发生了错误,但不想让整个进程退出,于是可以只退出执行错误的线程,同时将处理的错误返回给主线程进行处理。
- 因此我们也需要有线程的退出函数。
- 接口
/*
头文件:
*/
#include <pthread.h>
void pthread_exit(void *retval);
/*
参数:就是返回值。
*/
- 接口其实很简单,不过下面我们要用这个接口做几个实验。
- 检查子线程退出后的状态。
- 主线程先退出后子线程的情况。
- 实验一
#include<pthread.h>
#include<iostream>
#include<vector>
#include<cstring>
#include<unistd.h>
using namespace std;
void Creat_Check(int ret)
{
if(!ret)
cout << "[main pthread]:creat success!" << endl;
else
cout << "creat fail , the reason is " << strerror(ret)
<< endl;
}
void Wait_Check(int ret)
{
if(!ret)
{
cout << " wait success!" << endl;
}
else
cout << "wait fail , the reason is " << strerror(ret)
<< ",the retval is " << ret << endl;
}
void* RunRoutine(void*)
{
cout << "[new_pthread]:my tid is " << (int *)pthread_self()
<< endl;
sleep(1);
return nullptr;
}
const int Pthread_num = 4;
int main()
{
vector<pthread_t> tids;
for(int i = 0; i < Pthread_num; i++)
{
pthread_t tid;
int pret = pthread_create(&tid,nullptr,RunRoutine,nullptr);
Creat_Check(pret);
tids.push_back(tid);
}
/*
此处加上 sleep(5)观察现象会更加直观。
*/
for(int i = 0; i < Pthread_num; i++)
{
int wret = pthread_join(tids[i],nullptr);
Wait_Check(wret);
}
sleep(2);
return 0;
}
- 运行结果:
说明一下:
while :; do ps -aL; sleep 1 ; done;
bash脚本,方便我们查看这里的线程。- LWP,light weight process,即轻量级进程的编号(操作系统层面识别线程的编号),不是tid(用户层面,访问线程地址)!
- 我们可以从运行结果中可以看出线程的编号是按照一定顺序进行排列的。
- 实验现象:这里的子线程退出之后就真的退出了,并不会陷入僵尸状态。
- 补充一下:LWP是操作系统的概念,tid是封装之后呈现给用户的概念,两者具有本质的区别。
- 实验二
#include<pthread.h>
#include<iostream>
#include<vector>
#include<cstring>
#include<unistd.h>
using namespace std;
void Creat_Check(int ret)
{
if(!ret)
cout << "[main pthread]:creat success!" << endl;
else
cout << "creat fail , the reason is " << strerror(ret)
<< endl;
}
void Wait_Check(int ret)
{
if(!ret)
{
cout << " wait success!" << endl;
}
else
cout << "wait fail , the reason is " << strerror(ret)
<< ",the retval is " << ret << endl;
}
void* RunRoutine(void*)
{
cout << "[new_pthread]:my tid is " << (int *)pthread_self()
<< endl;
sleep(10);
return nullptr;
}
const int Pthread_num = 4;
int main()
{
for(int i = 0; i < Pthread_num; i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,RunRoutine,nullptr);
}
pthread_exit(0);
return 0;
}
- 运行结果:
- 显而易见,主线程用pthread_exit退出之后,主线程会变成僵尸,但是子线程还是会运行的。
- 这就好比小时候,爸妈不在家的经典场景,在家里任由我们折腾。不过在现实场景中,主线程是要对子线程进行负责的,不能对子线程不管不顾,如果不负责,则可能会导致一些内存泄漏的问题。
- 因此从中我们也可以得出,最好让
主线程最后退出
的结论。
5.取消
- 取消一个线程与退出一个线程的作用差不多相同。
- 但是如果涉及到控制,则取消更为灵活,可以让别的线程来控制此线程的生死。
- 接口
/*
头文件
*/
#include<pthread.h>
int pthread_cancel(pthread_t thread);
/*
1. 函数参数,thread为线程的tid
2. 返回值,成功返回0,失败返回错误码。
*/
- 简单使用
#include<pthread.h>
#include<iostream>
#include<vector>
#include<cstring>
#include<unistd.h>
using namespace std;
void* RunRoutine(void*)
{
sleep(3);
cout << "[new_pthread]:my tid is " << (int *)pthread_self()
<< endl;
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid,nullptr,RunRoutine,nullptr);
pthread_cancel(tid);//取消子线程。
void *ret = nullptr;
pthread_join(tid,&ret);
cout << (long long)ret << endl;
return 0;
}
- 运行结果:
- 显而易见,这里的子线程被取消了,后面的打印动作没有执行。
- 这里取消子线程时会让子线程直接返回 (void*) -1。即我们看到打印结果。
说明:这是Linux的对线程取消的返回值的宏。
三、线程安全
1.互斥
在正式讲互斥之前我们需要铺垫一下,即先理解互斥这个概念。
1.1初始
- 是什么?
- 互斥其实很简单,以日常的视角看就是能保证一个坑位只能有一个人来拉屎(虽然有味道,但是很形象)。或者说能保证看电影的时候一张电影票只能对应一个人。
- 为什么?
- 再谈谈为什么会出现互斥,本篇的主题是线程,即一个进程至少有一个线程.
- 如果出现了两个线程,在进程地址空间中,资源是共享的,即使看起来不是共享的,那也只是一层窗户纸,真要想捅破只需略施手段。
- 既然这样如果两个线程同时要访问一份资源,那么就会出现数据不一致的问题,且无法保证数据是我们想要的结果。
- 怎么办?
- 问题的关键是:多个线程访问同一份资源。
- 解决问题的关键就是: 一个线程访问这份资源的同时,其它线程不能访问这份资源。
- 在Linux中采用了互斥量(锁)的概念,多个线程申请一把锁,即一个线程拿到锁之后,就意味着这份资源暂时属于这个线程,当使用完之后,要将使用权,即锁还回去。方便其它线程进行使用。
- 那锁既然要被多个线程同时申请,那就又回到 2(为什么?)上了,因此操作系统必然能保证当一个线程申请锁成功后,别的线程无法再申请这个锁,即保证原子性。
这里只是对锁进行粗糙的提及一下,后面我们会细讲。
1.2理解
接下来我们先以两个例子具体开头为互斥铺垫一下:
- 代码(线程局部变量)
#include<iostream>
#include<vector>
using namespace std;
#include<pthread.h>
#include<unistd.h>
void* run_routinue(void* arg)
{
long long cnt = reinterpret_cast<long long>(arg);
//这里的reinterpret_cast是用于不安全类型之间的转化。
cout << "[pthread_" << cnt << "] " << &cnt << " "
<< (void*)pthread_self() << endl;
//查看线程的变量的地址。并将此线程的tid以地址的形式进行打印
return nullptr;
}
const int pthread_num = 2;
vector<pthread_t> tids;
void creat_pthread()
{
for(uint64_t i = 0; i < pthread_num; i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,run_routinue,(void*)i);
sleep(1);//防止出现打印信息错乱的情况。
tids.push_back(tid);
}
}
void wait_pthread()
{
for(int i = 0; i < pthread_num; i++)
{
pthread_join(tids[i],nullptr);
}
}
int main()
{
int i = 0;
cout << "[main pthread]: "<<&i << endl;
//打印查看一下栈区的大致地址范围。
creat_pthread();
wait_pthread();
return 0;
}
- 运行结果:
其实光看现象是不够的,下面我们透过现象分析一下底层逻辑:
- 先来分析代码:
- sleep(1)的目的,是为了防止子线程打印信息出现错乱的情况,因为显示器也是共享资源,因此为了保证互斥,加了sleep(1),作用是先让创建的子线程执行完,从而避免显示器被多个线程共同访问的情况。因此勉强来说这里sleep的作用跟锁的作用差不多。
- 在创建线程时,我们是传入了 i 变量的拷贝,并没有传入i变量的地址,因为如果传入i变量的地址,线程内部通过地址访问的i是符合要求的因为i会随着每次循环而进行改变,甚至更严重的是当for循环进行结束之后,i变量会进行销毁,此时通过地址访问i,就变成了野指针。
- 这里使用long long 而不是 int 的原因是在类型检查时,在Linux下(64位)指针大小为8个字节,如果要进行表示int,会发生截断,截断会导致数据丢失因此是不安全的,所以我们这里统一使用 long long int(uint_64_t)。
- 再来分析运行结果——以进程地址空间这张图进行分析:
- 可以看出,主线程的栈变量的地址比子线程的栈的地址大,可以验证主线程的栈就是图中的栈,而子线程的栈大致是在堆栈之间,即共享区的。
- 子线程中,栈区的变量地址和tid以指针方式打印出来的值高度相近。因此我们可以判断tid是子线程的起始地址。
- 如果我们再细心一点,大致可以看出tid的值每创建一个线程,其值在不断的减小。也就是说线程在创建时,是从高地址向低地址的方向进行创建的。(具体平台,要做相应的实验与测试才更加准确)。为了验证我们再多创建几个线程——看下面的图的tid变化趋势,更为明显。
- 因此线程是在共享区里面创建的,此时我们再回头想一下谁也在共享区呢?共享内存,动态库的链接都在共享区,此时如果敏锐一点,就会将线程与线程库链接起来,想到这一点,就会更深一步的理解,其实线程是在线程库里面进行维护的。下面我们给出一张图进行理解:Linux里面就是这样实现的。
- 此时我们更进一步的分析,动态库是所有进程都要链接的,那岂不是所有进程的线程都在这个动态库里面么。答案是肯定的,如果我们再来分析一下,一个栈(满)大概2MB。如果将所有进程的线程都放在这个库里面,那真是
线程之多!库之大!一库装的下!
- 实验二(线程的局部存储,看上面的图)
#include<iostream>
#include<vector>
using namespace std;
#include<pthread.h>
#include<unistd.h>
int g_val = 0;
__thread int cnt = 0;//线程的局部存储
void* run_routinue(void* arg)
{
long long cnt = reinterpret_cast<long long>(arg);;
cout << "[pthread_" << cnt << "] " << &cnt << " "
<< (void*)pthread_self() << endl;
//查看线程的变量的地址。并将此线程的tid以地址的形式进行打印
return nullptr;
}
const int pthread_num = 6;
vector<pthread_t> tids;
void creat_pthread()
{
for(uint64_t i = 0; i < pthread_num; i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,run_routinue,(void*)i);
sleep(1);//防止出现打印信息错乱的情况。
tids.push_back(tid);
}
}
void wait_pthread()
{
for(int i = 0; i < pthread_num; i++)
{
pthread_join(tids[i],nullptr);
}
}
int main()
{
cout << "[main pthread]: "<<&g_val << endl;
//打印查看一下栈区的大致地址范围。
creat_pthread();
wait_pthread();
return 0;
}
- 运行结果:
- 在
内置类型
前加__thread
,意为将此内置类型的变量,转为线程的局部存储。- 通过地址我们可以看出,全局变量的地址与线程局部存储变量cnt的地址,相差甚远。且距离tid较近,因此线程局部存储的位置还是在共享区。
- 以上两种实验,线程内部的变量和局部存储的变量,大多数情况是线程自己才能够访问的,除非你想将这层窗户纸给捅破。因此保证了互斥的作用,即数据在自己访问时没有人干扰。
下面我们用多执行流访问同一个全局变量。
- 实验(模拟抢票流程)
#include<iostream>
#include<vector>
using namespace std;
#include<pthread.h>
#include<unistd.h>
int tickets = 100;
void* Runroutinue(void* arg)
{
long long num = reinterpret_cast<long long>(arg);
while(true)
{
if(tickets > 0)
{
usleep(1000);
printf("[pthread_%d]: get ticket_%d\n",num,tickets);
tickets--;
}
else
break;
}
return nullptr;
}
const int pthread_num = 5;
vector<pthread_t> tids;
void create_pthread()
{
for(uint64_t i = 0; i < pthread_num; i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,Runroutinue,(void*)i);
tids.push_back(tid);
}
}
void wait_pthread()
{
for(int i = 0; i < pthread_num; i++)
{
pthread_join(tids[i],nullptr);
}
}
int main()
{
create_pthread();
wait_pthread();
return 0;
}
- 运行结果:
- 说明:
- 抢票过程中出现了,多人同时抢到一张票的情况。不过这里再仔细分析,可以看到98,97张票没有打印,很显然是printf的问题。
- 最后出现了tickets小于0的情况, 这个现象直观上看是很奇怪的。
- 这里usleep(1000),即休眠1000 ms是tickets小于0的关键。
- 分析:
- 首先printf中存在stdin 这个全局变量,printf为不可重入函数,但是可能里面做了特殊的处理,因此我们看到的打印信息没有错乱的情况,且多个执行流一次只能有一个进行访问,从汇编的角度看,传参的本质就是压栈,调用函数本质上是执行call指令,因此参数可以拷贝先传进去,再开始使用printf函数时可能做了特殊处理,比如加锁之类的操作,因此我们看到票数有一样的很正常。
- 其次因为有usleep(1000)存在,一个线程再抢过一次票,且是
判断之后再休眠
的,这就可能会导致在休眠之后,可能有其它线程将票抢到等于0的情况,但是我们已经判断了,不可能再回去进行判断,只能将错就错。而且错的可能还不止这一个线程,即可能有多个线程同时去抢0张票,此时这张票会被减多次,因此出现-1,-2,-3的情况。- 如果没有这里的usleep(1000),则执行流执行过快,一个线程就会一次就会抢几十张票,如果每个线程都一次去抢几十张票,这个出现抢0张票的概率就会极大的降低。因此为了观察到小于0的现象,这里我们最好加上usleep(1000)。
为了下面较为轻松的进行讨论,此处我们由上述代码引出几个概念:
- 临界资源,在上述代码中就是tickets。
- 临界区,图解如下。
- 原子性:要么不做,要么就做完,只有两种情况,不存在执行中的概念。即保证了操作的确定性。
1.3锁
1.3.1概念
-
如何解决上述现象呢?—— 很简单,加锁。
-
常见接口:
- 初始化与销毁
/*
头文件:
*/
#include <pthread.h>
/*
1.全局方式创建与初始化锁:
*/
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
/*
说明:PTHREAD_MUTEX_INITIALIZER初始化的锁无需进行释放。
*/
/*
2.局部方式初始化与创建锁
*/
/*
初始化锁
*/
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
/*
参数1:初始化的锁。
参数2:锁的属性,一般我们不关心,设置为空即可。
*/
/*
释放锁
*/
int pthread_mutex_destroy(pthread_mutex_t *mutex);
/*
返回值:成功返回0,失败返回错误码。
*/
/*
头文件:
*/
#include <pthread.h>
/*
加锁
*/
int pthread_mutex_lock(pthread_mutex_t *mutex);
/*
解锁
*/
int pthread_mutex_unlock(pthread_mutex_t *mutex);
/*
释放锁
*/
int pthread_mutex_destory(pthread_mutex_t *mutext);
/*
返回值:成功返回0,失败返回错误码。
*/
- 简单使用
#include<iostream>
#include<vector>
using namespace std;
#include<pthread.h>
#include<unistd.h>
int tickets = 100;
//设置一个静态的锁
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void* Runroutinue(void* arg)
{
long long num = reinterpret_cast<long long>(arg);
while(true)
{
pthread_mutex_lock(&lock);
if(tickets > 0)
{
printf("[pthread_%d]: get ticket_%d\n",num,tickets);
tickets--;
pthread_mutex_unlock(&lock);
}
else
{
pthread_mutex_unlock(&lock);
break;
}
usleep(1000);
}
return nullptr;
}
const int pthread_num = 5;
vector<pthread_t> tids;
void create_pthread()
{
for(uint64_t i = 0; i < pthread_num; i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,Runroutinue,(void*)i);
tids.push_back(tid);
}
}
void wait_pthread()
{
for(int i = 0; i < pthread_num; i++)
{
pthread_join(tids[i],nullptr);
}
}
int main()
{
create_pthread();
wait_pthread();
return 0;
}
- 运行结果:
- 通过结果可以看出,加锁之后,这里的票没有出现相同和负数的情况。
除此之外,我们再来补充一些细节:
- 加锁的本质是对临界区进行加锁,因为这一块是临界资源执行的逻辑代码。
- 一个线程占有锁之后,其它线程会陷入阻塞状态。解锁之后,因为当前其它线程需要从阻塞到唤醒需要有一定的时间,而一个线程解锁后再进行申请锁中间不需要什么过程,因此这个线程再次申请占用锁资源的速度更快。这种现象我们叫做离锁更近。这也是我们在解锁之后让其usleep(1000); 的原因。
- 这里我们不仅要在tickets大于0时解锁 ,也要再tickets小于等于0时进行解锁,因为在访问临界区有两个出口,即大于0和小于等于0,如果其中一个出口不解锁,其它线程可能会因为锁资源不就绪而一直陷入阻塞状态,这种现象我们称之为死锁。
- 临界区同时只能允许一个线程进行执行,将多线程的并发访问,变成了串行访问,虽然线程安全了,但是时间成本增加了。
除此之外,我们还可利用RAII的思想写一个出临界资源自动析构的锁。
- 代码
- 说明:这里只给出函数和RAII锁的风格的实现。剩余的代码与上面的例子的代码还是一样的。
struct mutex
{
mutex(pthread_mutex_t *lock)
:_mutex(lock)
{
pthread_mutex_lock(_mutex);
}
~mutex()
{
pthread_mutex_unlock(_mutex);
usleep(1000);//防止此线程一直占用着锁,不给其它线程机会。
}
pthread_mutex_t* _mutex;
};
void* Runroutinue(void* arg)
{
long long num = reinterpret_cast<long long>(arg);
while(true)
{
mutex mu(&lock);
if(tickets > 0)
{
printf("[pthread_%d]: get ticket_%d\n",num,tickets);
tickets--;
}
else
break;
//还有这种写法,{}内部即为临界区。方便进行区分。
// {
// if(tickets > 0)
// {
// printf("[pthread_%d]: get ticket_%d\n",num,tickets);
// tickets--;
// }
// else
// break;
// }
}
return nullptr;
}
1.3.2原理
- 本质上就是保证申请锁时的原子性。
先根据上面的tickets --;
的这个动作看反汇编进行深入分析:
- 我们只看这一条语句可能以为这条语句是原子的,但是看了汇编之后,就会看到,一条简单的tickets–,也是由三条汇编指令执行的。
- 操作系统规定一条汇编语句的执行是原子的。那么三条汇编和起来组成的tickets–就不是原子的。
- 理解了这一点之后,我们再来看保证申请锁的原子性,如何实现呢?其实只需保证锁的申请仅需一条汇编代码即可。
- 在计算机中为了实现锁,把交换指令(swap/exchange)变成一条汇编,即保证了原子性, 具体我们以下面的这张图进行分析:
对应图解:
分析:
- 这里多个线程申请锁,第一步置为0更像是对原来锁的清空处理。
- 第二步一旦有一个线程交换锁成功,那么其余线程都将陷入等待,直到将锁还回去,且因为到了该解锁时,线程是已经出临界区的,没有出就是程序员的锅了。
- 既然出了临界区,原来寄存器的值改不改并不重要,只需要将锁还回去,即将mutex赋值为1即可。且下次申请锁时会自动将al里面的值赋值为0。(首尾呼应)
- 补充:ticket减减的过程既然分为三步,那可能会导致一个线程拿到 100 时,时间片到了直接切换,将最开始的tickets(100)放进自己的上下文,切换之后,其它线程将ticket减到了10,此时这个线程再切回来,拿到的100如果减减再写回内存,这个tickets不就又变成99了么,因此通过上文抢票的现象,我们很明显的可以进推断在切回来的时候还会对数据做进一步的检查,就是为了防止数据不一致。
1.3.4死锁
- 下面我们从三个角度来理解问题:
- 是什么?
- 从生活的角度中,假如说你欠我钱不还,我还欠着你的钱我也不还,其实就算死锁。
- 回归到理论,是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种
永久等待
状态。
- 为什么?
- 程序员手残(犯贱了),比如已经申请锁之后,再一次申请锁,锁已经被申请走了,你还要申请,但是没有锁,那只能陷入永久等待了。
- 多线程在不同的时间结点,分别占用资源和申请资源,假如有个线程A和线程B, A使用着 C资源,B使用着D资源,到了某一个时间,线程A需要使用D资源完成任务,线程B需要使用C资源完成任务,这时就尴尬了,线程A正用着C资源呢,线程B正用着D资源呢。一来二去,不就死锁了么。
- 怎么办?
- 第一步,分析问题,归根结底都是锁惹的祸,那不用锁不就成了?这是一种办法,但是锁也是用来解决并发访问的问题的,因此不到万不得已,不可走这一步险棋。
- 第二步,既然你要申请我的资源,那我给你不就成了。这一步需要有一个线程做出牺牲,即暂时放弃已经申请的资源。等到另一个线程用完,再还回来不就成了么。
- 第三步,既然我要不到资源,我还要死锁,那我干脆不如把所有资源全释放就完了,直接躺平开始摆烂。
- 第四步,你还要我的资源,我还要你的资源,那我不要你的资源或者你也别要我的资源,即双方不申请对方的资源,不就不会死锁了么。
- 回归现实:
- 在成年人的世界里,想要改变他人变相的就是申请他的资源,但是人家大概率是不会做出改变的,如果硬要改变,不好意思大概率会陷入死锁。因此成年人的世界里一般只做筛选(合适的 || 喜欢的),但(大概率)不做改变(成本太高)(不合适 && 喜欢的)。
- 总结一下:
- 死锁的前提是得有锁。
- 要么一方被动的做出牺牲。
- 要么一方主动的做出牺牲。
- 要么从此你走你的阳关道我走我的独木桥,互不干扰。
- 最后我们再贴出显而易见的理论:
- 死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
- 说明:这四个条件,对应上述的四步。
- 避免死锁
- 破坏死锁的四个必要条件
- 加锁顺序一致(同步,下面专门细讲)
- 避免锁未释放的场景
- 资源一次性分配(要么不用,要么就用完)
2.同步
2.1概念
-
谈完了互斥,就该进一步了解同步了,因为同步可以更好的完善互斥。
-
举个例子,方便大家更能理解同步。
- 假设12点,你正去往你经常去的餐厅窗口打饭,此时你发现所有的人都没有排队,而是一窝蜂的围在一个窗口,嚷嚷着 “阿姨先给我打饭”。
- 这样做的有好有坏。
- 好处在于离窗口更近,身体更强壮的人,更容易让阿姨给你打饭。
- 坏处在于不知道什么时候才能抢到饭,只能干等着或者挤进去抢饭,而且身体弱小的人,是最后抢到饭的。
- 既然这样为了一定的公平性,我们才采取排队的策略,不管身体强壮,还是谁最先离窗口近,按照先到先得的顺序依次的排好队,更具有公平性。
- 一窝蜂导致的有人吃不上饭(或者很晚才吃上饭)的问题,我们称之为饥饿问题。
- 排好队打饭,即按照一定的顺序打饭,我们称之为同步。
- 同步的目的是为了让线程按照
一定的顺序
访问资源,即排好队再去访问资源。而单纯的互斥,可能会导致上饥饿问题,因此同步可以更好的实现/完善互斥。
2.2原理
- 原理其实并不难理解,其实线程申请不到资源时,放在队列中排队即可。
说明:Linux是用条件变量
来实现同步的,具体原理如上。
下面我们来了解一下对应的接口:
/*
头文件
*/
#include<pthread.h>
/*
1.静态变量初始化,条件变量
pthread_cond_t con = PTHREAD_COND_INITIALIZER;
说明:与互斥量相同,不用销毁与初始化
*/
/*
2.局部变量
*/
/*
初始化
*/
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
/*
参数1:局部条件变量的地址。
参数2:条件变量的属性,一般我们设置为空。
*/
/*
销毁
*/
int pthread_cond_destroy(pthread_cond_t *cond);
/*
参数:局部条件变量的地址。
*/
/*
等待
*/
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
/*
等待时,需要将锁变量传进去,目的是不能让线程带着锁进入队列(进了就尴尬了)。
*/
/*
唤醒
*/
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
/*
说明:第一个是一次唤醒的是全部线程,第二个一次唤醒的是一个线程。
*/
- 简单使用:
#include<iostream>
#include<vector>
using namespace std;
#include<pthread.h>
#include<unistd.h>
int tickets = 100;
//设置一个静态的锁
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
//设置一个条件变量
pthread_cond_t con = PTHREAD_COND_INITIALIZER;
struct mutex
{
mutex(pthread_mutex_t *lock)
:_mutex(lock)
{
pthread_mutex_lock(_mutex);
}
~mutex()
{
pthread_mutex_unlock(_mutex);
usleep(1000);//防止此线程一直占用着锁,不给其它线程机会。
}
pthread_mutex_t* _mutex;
};
void* Runroutinue(void* arg)
{
long long num = reinterpret_cast<long long>(arg);
while(true)
{
mutex mu(&lock);//加锁
//这一步的作用葫芦娃救爷爷,一个一个来,都进来先去排队。
pthread_cond_wait(&con,&lock);
if(tickets > 0)
{
printf("[pthread_%d]: get ticket_%d\n",num,tickets);
tickets--;
}
else
break;
}
return nullptr;
}
const int pthread_num = 5;
vector<pthread_t> tids;
void create_pthread()
{
for(uint64_t i = 0; i < pthread_num; i++)
{
pthread_t tid;
pthread_create(&tid,nullptr,Runroutinue,(void*)i);
tids.push_back(tid);
}
sleep(2);//先让所有的线程都进队列里面。
}
void wait_pthread()
{
for(int i = 0; i < pthread_num; i++)
{
pthread_join(tids[i],nullptr);
}
}
void ctrl_pthread()
{
//一次唤醒一个线程
while(true)
{
pthread_cond_signal(&con);
usleep(100000);
}
//一次唤醒所有线程。
// while(true)
// {
// pthread_cond_broadcast(&con);
// usleep(100000);
// }
}
int main()
{
create_pthread();
ctrl_pthread();
wait_pthread();
return 0;
}
- 运行结果
- 可以看出,这里的是按照一定的顺序进行唤醒的。 因此体现了同步的效果。
- 此时我们将ctrl_pthread的函数里面,上面的代码注释,下面的代码再放开,再次查看运行的结果。
此处多解释几句,因为是一次性唤醒的,那再次进入循环时,多线程由于谁先申请锁不知道,所以还是先申请锁的先队列,不过再进入队列后的顺序是一致的,因此也是体现了同步的效果,此处需要多思考一下。
3.生产消费者模型
- 先来了解一下生产消费者模型:
- 有生产者和消费者,2个角色,有1个超市(仓库)。
- 这里的关键就在于分析这个超市的作用:
- 生产者生产一些产品放在超市里面。
- 消费者从超市里面消费产品。
- 生产者与消费者之间没有必然联系,而是通过超市这个中间媒介进行联系。
- 超市里面没有数据,消费者不能从中拿产品。需要生产者进行放产品。
- 超市里面放满数据,生产者不能放入产品,需要消费者从中拿产品。
- 这两种角色之间还存在着3种关系:
- 生产者,即不同的厂商,都在超市往里面放数据,但是这个超市是临界资源,因为两个厂商同时放数据可能超市这个容器无法准确做好统计(具体用抢票的例子进行理解),因此生产者之间同时只能有一个人去超市放数据,因此生产者之间是互斥的关系。
- 消费者,即不同的消费者之间,都往超市里面拿数据,具体原因与生产者同理,假如只有产品只剩下了一个,那么就可能导致数据不准确(可用抢票时出现负数的情况进行理解),因此消费者之间同时只能有一个人去超市拿数据,因此消费者之间是互斥关系。
- 生产者与消费者之间,互斥很好理解,超市是临界资源,同时只能有一个人去访问,其次还存在着同步的关系,因为生产与消费之间有个先后顺序,生产者先进行生产,生产完了,消费者才能进行消费。不可能没有产品了,消费还在消费。
- 最后再来谈谈超市的作用:
- 超市是一个大型容器,生产者可以生产一批的产品之后,将这一批产品放在超市里面。
- 消费者从中拿产品拿一段时间之后,消耗一定数据的产品之后。再由超市通知生产者进行再生产。
- 由此可见消费者与生产者可通过超市达成降低耦合度,因为通过超市消费者消费产品需要一定的时间,其次生产者生产产品也需要一定的时间,两者的时间是不同时进行的,因此两者没有很强的时效性,即一边生产,一边消费。
- 由前三点,超市可以起着宏观调控的作用,可以协调生产与消费,就是让供求之间平衡。
- 总结一下:3种关系 ,2种角色,1种容器,可以用"321",可以进行速记。
- 说明一下:在这里我们只是讲述生产消费者模型的大致联系,具体的实现还得看实际的情况,关键是要学会灵活变通。
下面我们举一个例子进行验证:
在实现生产消费者模型时,一种阻塞队列的数据结构经常被使用,下面我们基于这个数据结构,举一个样例。
- 阻塞的原因在于:
- 消费者从队列中把产品消费到一定程度了,如果再进行消费可能会出现供求不平衡的情况,此时消费者线程就该阻塞,让生产者线程生产一段时间之后,再让消费者进行消费,从而实现供求平衡的情况。
- 生产者往队列中存放数据,放了一定的数据之后,再生产就会出现供过于求,从而供需又不平衡了,从而生产者线程陷入阻塞,让消费者消费一段时间之后,再让生产者进行生产。
- 从中我们看出阻塞的目的就在于达成供求动态平衡。
- 实现代码:
#include<iostream>
#include<vector>
#include<queue>
using namespace std;
#include<pthread.h>
#include<unistd.h>
struct mutex
{
mutex(pthread_mutex_t *lock)
:_mutex(lock)
{
pthread_mutex_lock(_mutex);
}
~mutex()
{
pthread_mutex_unlock(_mutex);
usleep(1000);//防止此线程一直占用着锁,不给其它线程机会。
}
pthread_mutex_t* _mutex;
};
struct Product
{
Product(int a,int b)
:_a(a),_b(b)
{}
int cost()
{
return _a + _b;
}
int _a;
int _b;
};
template<class T>
struct Blockqueue
{
public:
static const int default_number = 20;
Blockqueue(int min = default_number / 3,
int max = default_number * 2 / 3,int size = default_number)
:_min(min),_max(max),_size(size)
{
//初始化条件变量
pthread_cond_init(&_consumer,nullptr);
pthread_cond_init(&_product,nullptr);
//初始化互斥量
pthread_mutex_init(&_con_mutex,nullptr);
pthread_mutex_init(&_pro_mutex,nullptr);
}
void push(T product)
{
//多线程访问时要先进行加锁
mutex mu(&_pro_mutex);
if(_que.size() >= _max)
{
//说明生产者生产完数据了。
//需要给所有消费者线程发信息,让其来消费数据。
pthread_cond_broadcast(&_consumer);
/*
而生产者只需要在此默默等待(让员工放假),等到消费者把数据消费到一
定程度(员工放完假了)再进行生产即可。
*/
pthread_cond_wait(&_product,&_pro_mutex);
}
//需要一直生产数据。
_que.push(product);
usleep(100000);
}
T pop()
{
//多线程访问时需要先加锁
mutex mu(&_con_mutex);
if(_que.size() <= _min)
{
//消费者消费到一定程度了。
//需要给所有生产者发消息,让其进行生产数据。
pthread_cond_broadcast(&_product);
//消费者陷入阻塞进行等待即可。
pthread_cond_wait(&_consumer,&_con_mutex);
}
T pro = _que.front();
_que.pop();
usleep(100000);//防止数据打印过快。
return pro;
}
private:
queue<T> _que;
//为了能让队列中的数据动态平衡,我们设置两个条件变量,两个互斥量
pthread_cond_t _consumer;
pthread_cond_t _product;
pthread_mutex_t _con_mutex;
pthread_mutex_t _pro_mutex;
int _min;
//消费者最低能消费的数据
int _max;
//生产者最多能生成的数据
int _size;
};
Blockqueue<Product> bk_que;
void* pro_pthreads(void* args)
{
while(true)
{
Product pro(rand() % 100,rand() % 100);
bk_que.push(pro);
cout << "product a data:" << pro._a << "+" << pro._b
<< endl;
usleep(10000);
}
}
void* con_pthreads(void* args)
{
while(true)
{
Product res = bk_que.pop();
//将计算结果进行返回。
cout << "consumer a product:" << res._a << "+" << res._b
<< "="<< res.cost() << endl;
usleep(10000);
}
}
vector<pthread_t> tids;
void create_pthreads()
{
pthread_t tid;
pthread_create(&tid,nullptr,pro_pthreads,nullptr);
tids.push_back(tid);
pthread_create(&tid,nullptr,con_pthreads,nullptr);
tids.push_back(tid);
}
void wait_pthreads()
{
for(int i = 0; i < tids.size(); i++)
{
pthread_join(tids[i],nullptr);
}
}
int main()
{
//生产的产品为0 - 100以内两位数的加法
//消费者的消费产品是计算 0-100以内两位数的加法。
srand((unsigned int)time(nullptr));//设立随机数起点。
create_pthreads();
wait_pthreads();
return 0;
}
- 运行结果:
- 观察现象:消费者在不断的消费之前的旧数据,生产者在不断生产新的数据。
- 这里为了实现方便,我们使用了上文提及的RAII风格的锁,以便减少代码量。
- 这里消费线程和阻塞线程分别是两种角色,需要对应两把锁,以及两个等待队列,因此用了两个互斥量和条件变量。
- 其次这里的产品是与具体场景有关的,这里方便演示,我用两个数之间的加法作为产品进行演示。
- 这里我们规定了两个水位线,即最多生产的产品数量,和最少生产的产品数量,并且让生产与消费不同时进行,从而降低了耦合度。
- 此处两个子线程是一个while死循环,目的为了演示,具体要看实际场景对循环进行添加判断条件。
总结
- 我们从线程的概念,执行,调度,切换初步认识了线程。
- 我们从线程的创建,等待,分离,退出,取消,使用相关接口理解了线程。
- 我们从同步与互斥看到了多线程带来的问题,并了解了对应的解决方法。
- 我们从模型的角度,更加现实的看待多线程,并理解同步与互斥的实际用处。
- 最后,博主的这篇文章,从开始到结束
历时5天
左右,字数超过3万字
,用心带给各位C友理解线程,如果觉得文章不错,点赞 + 收藏 就是对本博主最大的支持了。
尾序
我是舜华,期待与你的下一次相遇!