1 . Linux线程概念
1.1 什么是线程
-
在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
-
一切进程至少都有一个执行线程
-
线程在进程内部执行,本质是在进程地址空间内运行
-
Linux系统里轻量级进程就相当于线程。但是创建TCB时候通过复用PCB然后指向同一个地址空间共享资源。所以 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化。
-
透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了进程执行流。
-
线程是CPU调度的基本单位,进程是承担分配系统资源的基本实体。
1.2 线程的优点
- 创建一个新线程的代价要比创建一个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
- 线程占用的资源要比进程少很多
- 能充分利用多处理器的可并行数量
- 在等待慢速I\O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
继续看下面的代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
int g_val = 0;
void *threadRun1(void* args)
{
while(true)
{
sleep(1);
cout<<"t1 thread..."<<getpid()<<" &g_val: "<<&g_val<<" g_val: "<<g_val<<endl;
}
}
void *threadRun2(void* args)
{
while(true)
{
sleep(1);
cout<<"t2 thread..."<<getpid()<<" &g_val: "<<&g_val<<" g_val: "<<g_val++<<endl;
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRun1,nullptr);
pthread_create(&t1,nullptr,threadRun2,nullptr);
while(true)
{
sleep(1);
cout<<"main thread..."<<getpid()<<" &g_val: "<<&g_val<<" g_val: "<<g_val<<endl;
}
}
现象:全局变量,在多线程场景中,我们多个线程看到的是同一个变量。
1.3 线程的缺点
性能缺失
- 一个很少被外部事件阻塞的计算密集型线程往往无法与其他线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
健壮性降低
- 编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成的不良影响的可能性是很大的,换句话说线程之间是缺乏保护的
缺乏访问控制
- 进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响
编程难度提高
- 编写与调试一个多线程程序比单线程程序困难得多
1.4 线程异常
- 单个线程如果出现除零
#include<stdio.h>
#include<pthread.h>
void *thread1_rum(void *args)
{
while(1)
{
printf("我是线程1,我正在运行\n");
sleep(1);
}
}
void *thread2_rum(void *args)
{
while(1)
{
printf("我是线程2,我正在运行\n");
sleep(1);
}
}
void *thread3_rum(void *args)
{
while(1)
{
printf("我是线程3,我正在运行\n");
sleep(1);
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,NULL,thread1_rum,NULL);
pthread_create(&t2,NULL,thread2_rum,NULL);
pthread_create(&t3,NULL,thread3_rum,NULL);
while(1)
{
printf("我是主线程,我正在运行\n");
sleep(1);
}
}
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出。
#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
void *threadRun1(void* args)
{
while(true)
{
sleep(1);
cout<<"t1 thread..."<<getpid()<<endl;
}
}
void *threadRun2(void* args)
{
while(true)
{
sleep(1);
cout<<"t2 thread..."<<getpid()<<endl;
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRun1,nullptr);
pthread_create(&t1,nullptr,threadRun2,nullptr);
while(true)
{
sleep(1);
cout<<"main thread..."<<getpid()<<endl;
}
}
下面我们人为的制造一点异常
#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
void *threadRun1(void* args)
{
while(true)
{
sleep(1);
cout<<"t1 thread..."<<getpid()<<endl;
}
}
void *threadRun2(void* args)
{
char *s = "hello world!";
while(true)
{
sleep(5);
cout<<"t2 thread..."<<getpid()<<endl;
*s='H';
}
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRun1,nullptr);
pthread_create(&t1,nullptr,threadRun2,nullptr);
// pthread_create(&t1,nullptr,threadRun3,nullptr);
while(true)
{
sleep(1);
cout<<"main thread..."<<getpid()<<endl;
}
}
结论:在多线程程序中,任何一个线程奔溃了都会导致进程崩溃。
原因:
- 系统角度:线程是进程的执行分支,线程干了,就是进程干了。
- 信号角度:页表转换的时候MMU识别写入权限的时候,没有验证通过,MMU异常被OS识别,OS给进程发信号,(信号是以进程为主的)
因为执行流看到的资源是通过地址空间看到的,多个LWP看到的是同一个地址空间。所以,所有的进程可能会共享进程的大部分资源!
1.5 线程用途
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一遍下载开发多线程运行的一种表现)
我们以32位机器为例,采用二级页表的方式来映射地址。
页表过大的问题也被解决:由上图可以看出页表最多有2 ^10 + 2 ^ 20行,约为1MB;而且实际情况是:在对物理地址进行检索或者是虚拟地址进行映射的时候,只会创建加载需要的页表,也就是实际页表会小于1MB行
我们实际在申请mallco内存的时候,OS你要给你在虚拟地址空间上申请就行了。当你真正在访问时,OS才会自动给你申请或者填充页表+申请具体的物理内存。也就是使用缺页中断的方式。
缺页中断(英语:Page fault,又名硬错误、硬中断、分页错误、寻页缺失、缺页中断、页故障等)指的是当软件试图访问已映射在虚拟地址空间中,但是目前并未被加载在物理内存中的一个分页时,由中央处理器的内存管理单元所发出的中断。
通常情况下,用于处理此中断的程序是操作系统的一部分。如果操作系统判断此次访问是有效的,那么操作系统会尝试将相关的分页从硬盘上的虚拟内存文件中调入内存。而如果访问是不被允许的,那么操作系统通常会结束相关的进程。
我们来看下面这段代码
const char *s = "hello world";
*s ='H';
通过对C语言的学习,我们很清楚的知道这段代码是会报错的,因为字符常量区是不允许被修改只允许被读取的。
s里面保存的事指向的字符的虚拟起始地址,*s寻址的时候必定会伴随虚拟到物理的转化,我们必定会使用MMU+查页表的方式,然后对你的写操作进行权限审查、发现没有写的权限。此时MMU会产生异常,OS识别到异常,异常转换为限号,发送给目标进程,再从内核转换为用户态的时候,进行信号处理,终止进程。
2. 关于进程线程的问题
2.1 Linux进程VS线程
2.1.1 进程和线程
1、进程是资源分配的基本单位
2、线程是调度的基本单位
3、线程共享进程数据,但也拥有自己的一部分数据
- 线程ID
- 一组寄存器(用于线程动态切换)
- 独立的栈结构
- errno
- 信号屏蔽字
- 调度优先级
进程的多个线程共享同一个地址空间,因此Text Segment、DataSegment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问的道,除此之外,各线程还共享下进程资源和环境:
- 文件描述符表
- 每种信号的处理方式(SIG_IGN,SIG_DFL或者自定义的信号处理函数)
- 当前工作目录
- 用户ID和组id
进程和线程的关系如下图:
pthread_join
exit
`#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
#define NUM 10
void *thread_run(void *args)
{
char* name = (char*)args;
while(true)
{
cout<<"new thread running...,my thread name is "<<name<<endl;
exit(10);
sleep(1);
break;
}
delete name;
return nullptr;
}
int main()
{
pthread_t tids[NUM];
for(int i = 0;i<NUM;i++)
{
//bug?
// char tname[64];
char *tname = new char[64];
snprintf(tname,64,"thread-%d",i+1);
pthread_create(tids+i,nullptr,thread_run,tname);
}
for(int i= 0;i<NUM;i++)
{
int n = pthread_join(tids[i],nullptr);
if(n!=0) cerr<<"pthread_join error"<<endl;
}
cout<<"all thread quit"<<endl;
return 0;
运行结果:
可以看到进程成功创建,exit向进程发送退出信号之后,所有线程都退出了。
2.1.2 创建线程
功能:创建一个新的线程
原型int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(start_routine)(void), void *arg);
参数
thread:返回线程ID
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码
错误检查
- 传统的一些函数是,成功返回0,失败返回-1,并且对全局变量errno赋值以指示错误
- pthreads函数出错时不会设置全局errno(而大部分其他POSIX函数会这样做),而是将错误代码通过返回值返回
- pthreads同样也提供了线程内的errno变量,以支持其他使用errno的代码。对于pthreads函数的错误,建议通过返回值判定,因为读取返回值要比线程内的errno变量的开销更小
2.1.3 线程终止
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
- 从线程函数return。这种方法对主线程不适用,从main函数return、相当于调用exit
#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
#define NUM 10
void *thread_run(void *args)
{
char* name = (char*)args;
while(true)
{
cout<<"new thread running...,my thread name is "<<name<<endl;
sleep(1);
}
delete name;
return nullptr;
}
int main()
{
pthread_t tids[NUM];
for(int i = 0;i<NUM;i++)
{
//bug?
// char tname[64];
char *tname = new char[64];
snprintf(tname,64,"thread-%d",i+1);
pthread_create(tids+i,nullptr,thread_run,tname);
}
sleep(2);
return 0;
}
运行结果:
可以看到主线程退出之后,其他的线程也退出了。
2. 线程可以调用pthread_exit终止自己
pthread_exit
功能:线程终止
原型void pthread_exit(void *value_ptr);
参数
value_ptr:value_ptr不要指向一个局部变量。
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
#define NUM 10
void *thread_run(void *args)
{
char* name = (char*)args;
while(true)
{
cout<<"new thread running...,my thread name is "<<name<<endl;
sleep(2);
break;
}
delete name;
pthread_exit(nullptr);
}
int main()
{
pthread_t tids[NUM];
for(int i = 0;i<NUM;i++)
{
//bug?
// char tname[64];
char *tname = new char[64];
snprintf(tname,64,"thread-%d",i+1);
pthread_create(tids+i,nullptr,thread_run,tname);
}
for(int i= 0;i<NUM;i++)
{
int n = pthread_join(tids[i],nullptr);
if(n!=0) cerr<<"pthread_join error"<<endl;
}
cout<<"all thread quit"<<endl;
return 0;
}
运行结果:
3. 一个线程可以调用pthread_cancel终止同一进程中的另一个进程
pthread_cancel
功能:取消一个执行中的线程
原型int pthread_cancel(pthread_t thread);
参数
thread:线程ID
返回值:成功返回0;失败返回错误码
运行结果:
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<time.h>
using namespace std;
void* threadRun(void* args)
{
const char *name = (const char*)args;
int cnt = 5;
while(cnt)
{
cout<<name<<" is running:"<<cnt--<<endl;
sleep(1);
}
pthread_exit((void*)11);
}
int main()
{
pthread_t tid;
pthread_create(&tid,nullptr,threadRun,(void*)"thread 1");
sleep(3);
pthread_cancel(tid);
void* ret = nullptr;
pthread_join(tid,&ret);
cout<<"new thread exit:"<<(int64_t)ret<<endl;
return 0;
}
2.1.4 线程等待
为什么需要线程等待?
- 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内
- 创建新的线程不会复用刚才退出线程的地址空间
pthread_join
功能:等待线程结束
原型int pthread_join(pthread_t thread, void **value_ptr);
参数
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值返回值:成功返回0;失败返回错误码
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的
终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED。
- 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
- 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
#define NUM 10
//线程退出
//1、线程函数执行完毕 return void*
//2、pthread_exit(void*)
void *thread_run(void *args)
{
char* name = (char*)args;
while(true)
{
cout<<"new thread running...,my thread name is "<<name<<endl;
// exit(10);
sleep(2);
break;
}
delete name;
pthread_exit((void*)1);
// return nullptr;
}
int main()
{
pthread_t tids[NUM];
for(int i = 0;i<NUM;i++)
{
//bug?
// char tname[64];
char *tname = new char[64];
snprintf(tname,64,"thread-%d",i+1);
pthread_create(tids+i,nullptr,thread_run,tname);
}
void *ret = nullptr;
for(int i= 0;i<NUM;i++)
{
int n = pthread_join(tids[i],&ret);
if(n!=0) cerr<<"pthread_join error"<<endl;
cout<<"thread quit:"<<(uint64_t)ret<<endl;
}
cout<<"all thread quit"<<endl;
// sleep(2);
// while(true)
// {
// cout<<"main thread running ..."<<endl;
// sleep(1);
// }
return 0;
}
pthread系列函数的设计都是很巧妙的,尤其是void的使用,我们不仅仅可以传入int等还可以传入类对象,这样的使用更加贴近现实生活中多线程的使用管理
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<time.h>
using namespace std;
class ThreadData
{
public:
ThreadData(const string &name,int id,time_t createTime):_name(name),_id(id),_createTime((uint64_t)createTime){}
~ThreadData()
{
}
public:
string _name;
int _id;
uint64_t _createTime;
};
#define NUM 10
//线程退出
//1、线程函数执行完毕 return void*
//2、pthread_exit(void*)
void *thread_run(void *args)
{
// char* name = (char*)args;
ThreadData *td =static_cast<ThreadData*>(args);
while(true)
{
cout<<"thread is running,name: "<<td->_name<<" create time:"<<td->_createTime<<" index:"<<td->_id<<endl;
// exit(10);
sleep(2);
break;
}
delete td;
pthread_exit((void*)1);
// return nullptr;
}
int main()
{
pthread_t tids[NUM];
for(int i = 0;i<NUM;i++)
{
//bug?
// char tname[64];
// char *tname = new char[64];
char tname[64];
snprintf(tname,64,"thread-%d",i+1);
ThreadData *td = new ThreadData(tname,i+1,time(nullptr));
pthread_create(tids+i,nullptr,thread_run,td);
sleep(1);
}
void *ret = nullptr;
for(int i= 0;i<NUM;i++)
{
int n = pthread_join(tids[i],&ret);
if(n!=0) cerr<<"pthread_join error"<<endl;
cout<<"thread quit:"<<(uint64_t)ret<<endl;
}
cout<<"all thread quit"<<endl;
return 0;
}
运行结果:
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<time.h>
#define NUM 10
using namespace std;
enum{ OK=0,ERROR };
class ThreadData
{
public:
ThreadData(const string &name,int id,time_t createTime,int top):_name(name),_id(id),_createTime((uint64_t)createTime),_status(OK),_top(top),_result(0){}
~ThreadData()
{
}
public:
string _name;
int _id;
uint64_t _createTime;
//返回
int _status;
int _top;
int _result;
};
void *thread_run(void *args)
{
// char* name = (char*)args;
ThreadData *td =static_cast<ThreadData*>(args);
for(int i = 1;i<=td->_top;i++)
{
td->_result += i;
}
cout<<td->_name<<" count down"<<endl;
pthread_exit(td);
}
int main()
{
pthread_t tids[NUM];
for(int i = 0;i<NUM;i++)
{
//bug?
// char tname[64];
// char *tname = new char[64];
char tname[64];
snprintf(tname,64,"thread-%d",i+1);
ThreadData *td = new ThreadData(tname,i+1,time(nullptr),100+i*5);
pthread_create(tids+i,nullptr,thread_run,td);
sleep(1);
}
void *ret = nullptr;
for(int i= 0;i<NUM;i++)
{
int n = pthread_join(tids[i],&ret);
if(n!=0) cerr<<"pthread_join error"<<endl;
ThreadData *td = static_cast<ThreadData*>(ret);
if(td->_status==OK)
{
cout<<td->_name<<" 计算结果:"<<td->_result<<" 它要计算的是:1~"<<td->_top<<endl;
}
delete td;
}
cout<<"all thread quit"<<endl;
return 0;
}
运行结果:
2.1.5 线程分离
pthread_detach
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏
- 如果不关心线程的返回值,join是一种负担,这个时候我们可以告诉系统,当线程退出时,自动释放线程资源
#include<iostream>
#include<string.h>
#include<pthread.h>
#include<cstdio>
#include<unistd.h>
using namespace std;
void *threadRoutine(void* args)
{
string name=static_cast<const char*>(args);
int cnt = 5;
while(cnt)
{
cout<<name<<":"<<cnt--<<endl;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid,nullptr,threadRoutine,(void*)"thread 1");
pthread_detach(tid);
int n = pthread_join(tid,nullptr);
//join失败
if(n!=0)
{
std::cerr<<"error:"<<n<<":"<<strerror(n)<<endl;
}
return 0;
// sleep(10);
}
运行结果:
#include<iostream>
#include<string.h>
#include<pthread.h>
#include<cstdio>
#include<unistd.h>
using namespace std;
void *threadRoutine(void* args)
{
pthread_detach(pthread_self());
string name=static_cast<const char*>(args);
int cnt = 5;
while(cnt)
{
cout<<name<<":"<<cnt--<<endl;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid,nullptr,threadRoutine,(void*)"thread 1");
sleep(1);
int n = pthread_join(tid,nullptr);
//join失败
if(n!=0)
{
std::cerr<<"error:"<<n<<":"<<strerror(n)<<endl;
}
return 0;
// sleep(10);
}
运行结果:
一个线程在join的时候会检查是否joinable,如果已经分离了,那么就会报错。反之不成立。也就是说joinable和分离是冲突的,一个线程不能既是joinable又是分离的。
可以是线程组内其他线程对目标线程进行分离
pthread_detach(pthread_t thread)
也可以是线程自己分离。
pthread_detach(pthread_self());
2.1.6 Linux线程互斥
#include<iostream>
#include<string.h>
#include<pthread.h>
#include<cstdio>
#include<unistd.h>
#include<thread>
using namespace std;
int g_val = 0;
std::string hexAddr(pthread_t tid)
{
char buffer[64];
snprintf(buffer,sizeof(buffer),"0x%x",tid);
return buffer;
}
void *threadRoutine(void* args)
{
string name=static_cast<const char*>(args);
int cnt = 5;
while(cnt)
{
cout<<name<<" g_val:"<<g_val++<<" ,&g_val:"<<&g_val<<endl;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRoutine,(void*)"thread 1");
pthread_create(&t2,nullptr,threadRoutine,(void*)"thread 2");
pthread_create(&t3,nullptr,threadRoutine,(void*)"thread 3");
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
}
可以看到不同的线程看到了同一个全局变量
要实现不同线程看到线程自己私有的全局变量也很简单,只需要在定义全局变量的时候加上__thread
#include<iostream>
#include<string.h>
#include<pthread.h>
#include<cstdio>
#include<unistd.h>
#include<thread>
using namespace std;
__thread int g_val = 0;
std::string hexAddr(pthread_t tid)
{
char buffer[64];
snprintf(buffer,sizeof(buffer),"0x%x",tid);
return buffer;
}
void *threadRoutine(void* args)
{
string name=static_cast<const char*>(args);
int cnt = 5;
while(cnt)
{
// cout<<name<<":"<<cnt--<<":"<<hexAddr(pthread_self())<<" &cnt:"<<&cnt<<endl;
cout<<name<<" g_val:"<<g_val++<<" ,&g_val:"<<&g_val<<endl;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,threadRoutine,(void*)"thread 1");
pthread_create(&t2,nullptr,threadRoutine,(void*)"thread 2");
pthread_create(&t3,nullptr,threadRoutine,(void*)"thread 3");
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
}
可以看到在全局变量g_val前面加了__thread之后,就相当于是把全局变量在栈区拷贝了,然后让每一个线程私有一份,各自改变各自的。
2.2 线程互斥
2.2.1 进程线程间的互斥相关背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
2.2.2 互斥量mutex
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量
- 但有很多时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互
- 多个线程并发的操作共享变量,会带来一些问题
接下来我们来模拟一个简单抢票逻辑的程序
#include<iostream>
#include<string.h>
#include<pthread.h>
#include<cstdio>
#include<unistd.h>
#include<thread>
using namespace std;
int tickets = 10000;
void *threadRoutine(void* name)
{
string tname = static_cast<const char*>(name);
while(true)
{
if(tickets>0)
{
usleep(2000);//模拟抢票花费的时间
cout<<tname<<" get a ticket:"<<tickets--<<endl;
}
else
{
break;
}
}
return nullptr;
}
int main()
{
pthread_t t[4];
int n = sizeof(t)/sizeof(t[0]);
for(int i = 0;i<n;i++)
{
char *data = new char[64];
snprintf(data,64,"thread-%d",i+1);
pthread_create(t+i,nullptr,threadRoutine,data);
}
for(int i = 0;i<n;i++)
{
pthread_join(t[i],nullptr);
}
return 0;
}
运行结果:
为什么可能无法获得正确结果?
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- ticket 操作本身就不是一个原子操作
取出ticket–部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34
操作并不是原子操作,而是对应三条汇编指令:
- load :将共享变量ticket从内存加载到寄存器中
- update : 更新寄存器里面的值,执行-1操作
- store :将新值,从寄存器写回共享变量ticket的内存地址
要解决以上问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。
所以我们需要加锁保证共享资源的安全
细节
- 凡是访问同一个临界资源的线程,都要进行加锁保护,并且必须加同一把锁
- 每一个线程1访问临界区之前,得加锁,加锁本质是给临界区加锁,加锁的粒度尽量要细一些
- 线程访问临界区的时候,需要先加锁->所有线程必须先看到同一把锁->锁本身就是公共资源->锁如何保证自己的安全?->加锁和解锁本身具有原子性!!!
- 临界区可以是一行代码,也可以是一堆代码。
在一个线程访问临界区的时候,线程被切换,也不会有影响。因为在我不在的期间,任何人都没有办法进入临界区,因为他无法成功地申请到锁。
这也是体现互斥带来的串行化的表现,站在其他线程角度,对其他线程有意义的状态就是:锁被我申请(持有锁),锁被我释放(不持有锁),原子性就体现在这儿。
互斥量的接口
初始化互斥量
- 静态分配
pthread_mutex_t mutex ==PTHREAD_MUTEX_INITIALIZER - 动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict
attr);
参数:
mutex:要初始化的互斥量
attr:NULL
销毁互斥量
销毁互斥量需要注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
调用 pthread_ lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
改进上面的程序:
#include<iostream>
#include<string>
#include<unistd.h>
#include<pthread.h>
using namespace std;
int tickets = 1000;//全局变量,被多个线程共享
// pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//不需要用pthread_mutex_destroy来销毁
class TData
{
public:
TData(const string &name,pthread_mutex_t *mutex):_name(name),_pmutex(mutex){}
public:
string _name;
pthread_mutex_t *_pmutex;
};
void* threadRoutine(void *args)
{
TData *td= static_cast< TData*>(args);
while(true)
{
pthread_mutex_lock(td->_pmutex);
if(tickets>0)
{
usleep(2000);
cout<<td->_name<<"get a ticket: "<<tickets--<<endl;
pthread_mutex_unlock(td->_pmutex);
}
else
{
pthread_mutex_unlock(td->_pmutex);
break;
}
//抢完票之后的后续操作
usleep(100);
}
return nullptr;
}
int main()
{
pthread_mutex_t mutex;
pthread_mutex_init(&mutex,nullptr);
pthread_t tids[4];
int n = sizeof(tids)/sizeof(tids[0]);
for(int i = 0;i<n;i++)
{
char name[64];
snprintf(name,64,"thread-%d",i+1);
TData *td = new TData(name,&mutex);
pthread_create(tids+i,nullptr,threadRoutine,td);
}
for(int i = 0;i<n;i++)
{
pthread_join(tids[i],nullptr);
}
pthread_mutex_destroy(&mutex);
return 0;
}
运行结果:
2.2.3 互斥量实现原理探究
- 经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题
- 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单
元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一
个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪
代码改一下
但是我们加锁解锁的方法是不是不是那么优雅,能否像c++构造析构函数那样,我们在创建一个对象之后就自动加锁解锁呢?
thread.hpp
#pragma once
#include<iostream>
#include<pthread.h>
#include<cstdlib>
#include<string>
class Thread
{
public:
typedef enum{
NEW = 0,
RUNNING,
EXITED
}ThreadStatus;
typedef void (*func_t)(void *);
public:
Thread(int num,func_t func,void *args):_tid(0),_status(NEW),_func(func),_args(args)
{
char name[128];
snprintf(name,sizeof(name),"thread-%d",num);
_name = name;
}
int status(){return _status;}
std::string threadname(){return _name;}
pthread_t threadid()
{
if(_status == RUNNING) return _tid;
else
{
return 0;
}
}
//类的成员函数,具有默认参数this,需要static
static void *runHelper(void *args)
{
Thread *ts = (Thread*)args;
// _func(_args);
(*ts)();
return nullptr;
}
void operator()()//仿函数
{
_func(_args);
}
void run()
{
int n = pthread_create(&_tid,nullptr,runHelper,this);
if(n!=0) exit(1);
_status = RUNNING;
}
void join()
{
int n = pthread_join(_tid,nullptr);
if(n!=0)
{
std::cerr<<"main thread join thread"<<_name<<"error"<<std::endl;
return;
}
_status = EXITED;
}
~Thread(){}
private:
pthread_t _tid;
std::string _name;
func_t _func;//线程未来要执行的回调
void *_args;
ThreadStatus _status;
};
lockGuard.hpp
#pragma once
#include<iostream>
#include<pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *pmutex):_pmutex(pmutex)
{}
void lock()
{
pthread_mutex_lock(_pmutex);
}
void unlock()
{
pthread_mutex_unlock(_pmutex);
}
~Mutex()
{}
public:
pthread_mutex_t *_pmutex;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
pthread.cc
#include<iostream>
#include<string>
#include<unistd.h>
// #include<pthread.h>
#include"thread.hpp"
#include"lockGuard.hpp"
using namespace std;
int tickets = 1000;//全局变量,被多个线程共享
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//不需要用pthread_mutex_destroy来销毁
void threadRoutine(void *args)
{
std::string message= static_cast< const char*>(args);
while(true)
{
{
LockGuard lockGuard(&mutex);
// pthread_mutex_lock(&mutex);
if(tickets>0)
{
usleep(2000);
cout<<message<<"get a ticket: "<<tickets--<<endl;
// pthread_mutex_unlock(&mutex);
}
else
{
// pthread_mutex_unlock(&mutex);
break;
}
}
//抢完票之后的后续操作
// usleep(100);
}
}
int main()
{
Thread t1(1,threadRoutine,(void*)" hello world1");
Thread t2(2,threadRoutine,(void*)" hello world2");
Thread t3(3,threadRoutine,(void*)" hello world3");
Thread t4(4,threadRoutine,(void*)" hello world4");
t1.run();
t2.run();
t3.run();
t4.run();
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}
运行结果:
2.2.4 可重入VS线程安全
** 概念**
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
** 常见的线程不安全情况** - 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
** 常见的线程安全的情况** - 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见可重入的情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
3. 常见锁概念
3.1 死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所占用不会释放的资源而处于的一种永久等待状态
死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的条件,在未使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
问题:线程能不对对其他线程解锁?
看下面代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
using namespace std;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void* threadRountine(void* args)
{
cout<<"I am a new thread "<<endl;
pthread_mutex_lock(&mutex);
cout<<"I got a mutex1"<<endl;
pthread_mutex_lock(&mutex);//申请锁的问题,他会停下来
cout<<"I alive again"<<endl;
return nullptr;
}
int main()
{
pthread_t tid;
pthread_create(&tid,nullptr,threadRountine,nullptr);
sleep(3);
cout<<"main thread run begin"<<endl;
pthread_mutex_unlock(&mutex);
cout<<"main thread unlock..."<<endl;
sleep(3);
return 0;
}
运行结果:
可以看到是可以的
4. Linux线程同步
4.1 条件变量
- 当一个线程互斥的访问某个变量时,他可能发现在其他线程改变状态之前,他什么也做不了。
- 例如一个线程访问队列时,发现队列为空,他只能等待,直到其他线程将一个结点添加到队列中。这种情况就需要用到条件变量。
4.1.1 同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
4.1.2 条件变量函数 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int ptread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量
唤醒等待
int pthread_cond_broadcat(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
5. 生产者消费者模型
5.1 生产消费的“321”原则
- “3”–三种关系
生产者和生产者 – 互斥关系
消费者和消费者 – 互斥关系
生产者和消费者 – 同步关系、互斥关系 - “2”–两种角色
生产者、消费者 - “1”–一个交易场所
通常是缓冲区
5.2为何要使用生产者消费者模型
生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
5.3 生产者消费者模型的优点
- 解耦
- 直接并发
- 支持忙闲不均
5.4 基于BlockingQueue的生产者消费者模型
BlockingQueue
在多线程编程中阻塞队列(BlockingQueue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于:
1、当队列为空时,从队列获取元素的操作会被阻塞,直到队列中被放入了元素;
2、当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出
(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
-------------------blockQueue.hpp--------------
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
const int gcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(const int cap = gcap):_cap(cap)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_consumerCond,nullptr);
pthread_cond_init(&_producerCond,nullptr);
}
bool isFull(){ return _q.size() == _cap;}
bool isEmpty(){ return _q.empty();}
void push(const T &in)
{
pthread_mutex_lock(&_mutex);
while(isFull())//我们只能在临界区内部,判断临界资源是否就绪,注定了我们在当前一定是持有锁的
{
//要让线程进行休眠等待,不能持有锁等待,也就是说pthread_cond_wait要有锁释放的能力
pthread_cond_wait(&_producerCond,&_mutex);
//当线程醒来的时候,注定了继续从临界区内部继续运行,因为我是在临界区被切走的
//注定了当线程被唤醒的时候,继续在pthread_cond_wait函数处向后运行,又要重新申请锁,申请成功才会彻底唤醒
}
//没有满,生产
_q.push(in);
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
}
void pop(T *out)
{
pthread_mutex_lock(&_mutex);
if(isEmpty())
{
pthread_cond_wait(&_consumerCond,&_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_producerCond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_producerCond);
}
private:
std::queue<T> _q;
int _cap;
pthread_mutex_t _mutex;//只用一把锁,因为生产和消费访问的是同一个queue,queue被当作一个整体使用
pthread_cond_t _consumerCond;//消费者对应的条件变量,空,wait
pthread_cond_t _producerCond;//生产者对应的条件变量,满,wait
};
------------------task.hpp---------------
#pragma once
#include<iostream>
#include<string>
class Task
{
public:
Task()
{
}
Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0)
{
}
void operator()()
{
switch(_op)
{
case '+':
_result = _x+_y;
break;
case '-':
_result = _x-_y;
break;
case '*':
_result = _x*_y;
break;
case '/':
{
if(_y == 0) _exitCode = -1;
else _result = _x/_y;
}
case '%':
{
if(_y == 0) _exitCode = -2;
_result = _x%_y;
}
default:
break;
}
}
std::string formatArg()
{
return std::to_string(_x)+_op+std::to_string(_y) + '=' ;
}
std::string formatRes()
{
return std::to_string(_result)+"("+std::to_string(_exitCode)+")" ;
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
------------------main.cc--------------------------
#include"blockQueue.hpp"
#include<pthread.h>
#include<unistd.h>
#include"task.hpp"
#include<ctime>
void *consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
sleep(1);
// int data = 0;
// //1、将数据从blockqueue中获取
// bq->pop(&data);
// //2、结合某种业务逻辑,处理数据--完成消费过程
// std::cout<<"consume data:"<<data<<std::endl;
Task t;
//拿到数据
bq->pop(&t);
//处理数据
t();
std::cout<<pthread_self()<<" | consume task:"<<t.formatArg()<<t.formatRes()<<std::endl;
}
}
void *producer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
std::string opers = "+-*/%";
while(true)
{
sleep(1);
//1、先通过某种渠道获取数据
// int data = rand()%10+1;
//2、将数据推送到blockqueue--完成生产过程
// bq->push(data);
// std::cout<<"product data:"<<data<<std::endl;
int x = rand()%20+1;
int y = rand()%10+1;
char op = opers[rand()%opers.size()];
Task t(x,y,op);
bq->push(t);
std::cout<<pthread_self()<<" | product task:"<<t.formatArg()<<"?"<<std::endl;
}
}
int main()
{
srand((uint64_t)time(nullptr)^getpid());
// BlockQueue<int> *bq = new BlockQueue<int>();
BlockQueue<Task> *bq = new BlockQueue<Task>();
//单生产和单消费
pthread_t c[2],p[3];
pthread_create(&c[0],nullptr,consumer,bq);
pthread_create(&c[1],nullptr,consumer,bq);
pthread_create(&p[0],nullptr,producer,bq);
pthread_create(&p[1],nullptr,producer,bq);
pthread_create(&p[2],nullptr,producer,bq);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
pthread_join(p[2],nullptr);
return 0;
}
----------------Makefile---------------------------
cp:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f cp
运行结果:
6. 信号量
- 每一个线程,在访问对应的资源的时候,先申请信号量,申请成功,表示该线程允许使用该资源,申请不成功,目前无法使用该资源
- 信号量的工作机制:信号量机制类似于我们看电影买票,是一种资源的预定机制
- 信号量已经是资源的计数器了,申请信号量成功,本身就表明资源可用,申请信号量失败本身就表明资源不可用–本质就是把判断转化成为信号量的行为
6.1 POSIX信号量
posix信号量和systemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步
初始化信号量
#include<semaphore.h>
int sem_init(sem_t *sem,int pshared,unsigned int value);
参数:
pshared:0表示线程间共享,非0表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem)t *sem);
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1
int sem_post(sem_t *sem);
6.2 基于环形队列的生产消费模型
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者满,所以可以通过加数其器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
通过分析我们知道当生产者消费者不指向同一片区域的时候可以同步进行cp操作,那么当在生产者消费者访问同一个区域时,谁先执行呢?
单生产单消费
--------------------------ringQueue.hpp----------------
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
static const int N = 5;
template <class T>
class ringQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
ringQueue(int num = N)
:_cap(num),_ring(num)
{
sem_init(&_data_sem,0,0);
sem_init(&_space_sem,0,num);
_c_step = _p_step = 0;
}
void push(const T &in)
{
//生产
P(_space_sem);
//一定有对应的空间资源
_ring[_p_step++] = in;
_p_step%=_cap;
V(_data_sem);//V()
}
void pop(T *out)
{
//消费
P(_data_sem);
//一定有对应的数据资源
*out = _ring[_c_step++];
_c_step%=_cap;
V(_space_sem);//V()
}
~ringQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
}
private:
std::vector<T> _ring;
int _cap;//环形队列的容器大小
sem_t _data_sem;//只有消费者关心
sem_t _space_sem;//只有生产者关心
int _c_step;//消费位置
int _p_step;//生产位置
};
-------------------------task.hpp------------------
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
class Task
{
public:
Task()
{
}
Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0)
{
}
void operator()()
{
switch(_op)
{
case '+':
_result = _x+_y;
break;
case '-':
_result = _x-_y;
break;
case '*':
_result = _x*_y;
break;
case '/':
{
if(_y == 0) _exitCode = -1;
else _result = _x/_y;
}
case '%':
{
if(_y == 0) _exitCode = -2;
_result = _x%_y;
}
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x)+_op+std::to_string(_y) + '=' ;
}
std::string formatRes()
{
return std::to_string(_result)+"("+std::to_string(_exitCode)+")" ;
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
------------------Makefile------------------
ringQueue:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:
clean:
rm -f ringQueue
----------------------------main.cc-----------------
#include"ringQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include"task.hpp"
#include<memory>
#include<string.h>
using namespace std;
const char* ops = "=-*/%";
void* consumerRoutine(void* args)
{
ringQueue<Task> *rq = static_cast<ringQueue<Task>*>(args);
while(true)
{
Task t;
rq->pop(&t);
//处理数据
t();
cout<<"consume done:"<<t.formatArg()<<t.formatRes()<<endl;
}
}
void* producerRoutine(void* args)
{
ringQueue<Task> *rq = static_cast<ringQueue<Task>*>(args);
while(true)
{
int x = rand()%100;
int y = rand()%100;
char op = ops[(x+y)%strlen(ops)];
Task t = Task(x,y,op);
rq->push(t);
cout<<"product done:"<<t.formatArg()<<"=?"<<endl;
}
}
int main()
{
srand(time(nullptr)^getpid());
ringQueue<int> *rq = new ringQueue<int>();
//单生产单消费
pthread_t c,p;
pthread_create(&c,nullptr,consumerRoutine,rq);
pthread_create(&p,nullptr,producerRoutine,rq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete rq;
return 0;
}
多消费多生产者
----------------ringQueue.hpp--------------
#pragma once
#include<iostream>
#include<vector>
#include<pthread.h>
#include<semaphore.h>
static const int N = 5;
template <class T>
class ringQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
public:
ringQueue(int num = N)
:_cap(num),_ring(num)
{
sem_init(&_data_sem,0,0);
sem_init(&_space_sem,0,num);
_c_step = _p_step = 0;
pthread_mutex_init(&_c_mutex,nullptr);
pthread_mutex_init(&_p_mutex,nullptr);
}
void push(const T &in)
{
//生产
//一定有对应的空间资源
P(_space_sem);
Lock(_p_mutex);
_ring[_p_step++] = in;
_p_step%=_cap;
V(_data_sem);//V()
Unlock(_p_mutex);
}
void pop(T *out)
{
//消费
//一定有对应的数据资源
P(_data_sem);
Lock(_c_mutex);
*out = _ring[_c_step++];
_c_step%=_cap;
V(_space_sem);//V()
Unlock(_c_mutex);
}
~ringQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ring;
int _cap;//环形队列的容器大小
sem_t _data_sem;//只有消费者关心
sem_t _space_sem;//只有生产者关心
int _c_step;//消费位置
int _p_step;//生产位置
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
----------------task.hpp--------------------
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
class Task
{
public:
Task()
{
}
Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0)
{
}
void operator()()
{
switch(_op)
{
case '+':
_result = _x+_y;
break;
case '-':
_result = _x-_y;
break;
case '*':
_result = _x*_y;
break;
case '/':
{
if(_y == 0) _exitCode = -1;
else _result = _x/_y;
}
case '%':
{
if(_y == 0) _exitCode = -2;
_result = _x%_y;
}
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x)+_op+std::to_string(_y) + '=' ;
}
std::string formatRes()
{
return std::to_string(_result)+"("+std::to_string(_exitCode)+")" ;
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
-----------------------main.cc---------------------------
#include"ringQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include"task.hpp"
#include<memory>
#include<string.h>
using namespace std;
const char* ops = "=-*/%";
void* consumerRoutine(void* args)
{
ringQueue<Task> *rq = static_cast<ringQueue<Task>*>(args);
while(true)
{
Task t;
rq->pop(&t);
//处理数据
t();
cout<<"consume done:"<<t.formatArg()<<t.formatRes()<<endl;
}
}
void* producerRoutine(void* args)
{
ringQueue<Task> *rq = static_cast<ringQueue<Task>*>(args);
while(true)
{
int x = rand()%100;
int y = rand()%100;
char op = ops[(x+y)%strlen(ops)];
Task t = Task(x,y,op);
rq->push(t);
cout<<"product done:"<<t.formatArg()<<"=?"<<endl;
}
}
int main()
{
srand(time(nullptr)^getpid());
ringQueue<int> *rq = new ringQueue<int>();
pthread_t c[3],p[2];
pthread_create(&c[0],nullptr,consumerRoutine,rq);
pthread_create(&c[1],nullptr,consumerRoutine,rq);
pthread_create(&c[2],nullptr,consumerRoutine,rq);
pthread_create(&p[0],nullptr,producerRoutine,rq);
pthread_create(&p[1],nullptr,producerRoutine,rq);
pthread_join(c[0],nullptr);
pthread_join(c[1],nullptr);
pthread_join(c[2],nullptr);
pthread_join(p[0],nullptr);
pthread_join(p[1],nullptr);
delete rq;
return 0;
}
在完成了blockQueue和ringQueue之后,对于信号量以及互斥锁我们有了一定的了解
互斥量和信号量的区别
- 互斥量用于线程的互斥,信号量用于线程的同步。
这是互斥量和信号量的根本区别,也就是互斥和同步之间的区别。
互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源
- 互斥量值只能为0/1,信号量值可以为非负整数。
也就是说,一个互斥量只能用于一个资源的互斥访问,它不能实现多个资源的多线程互斥问题。信号量可以实现多个同类资源的多线程互斥和同步。当信号量为单值信号量是,也可以完成一个资源的互斥访问。
- 互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到。
7. 线程池
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个人无效,而任务数量巨大,你可以想象一个热门网站的点击次数。但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了,因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用,突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存达到极限,出现错误。
版本一
-----------------------threadPool_v1.hpp------------------------------------
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include "task.hpp"
const static int N = 5;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = N) : _num(num), _threads(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
void lockQueue()
{
pthread_mutex_lock(&_lock);
}
void unlockQueue()
{
pthread_mutex_unlock(&_lock);
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 检测有没有任务
// 2. 有:处理
// 3. 无:等待
// 细节:必定加锁
tp->lockQueue();
while (tp->isEmpty())
{
tp->threadWait();
}
Task t = tp->popTask(); // 从公共区域拿到私有区域
tp->unlockQueue();
// for test
t();
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
// t.run(); // 处理任务,应不应该在临界区中处理?1,0
}
}
void init()
{
// TODO
}
void start()
{
for (int i = 0; i < _num; i++)
{
pthread_create(&_threads[i], nullptr, threadRoutine, this); // ?
}
}
void pushTask(const T &t)
{
lockQueue();
_tasks.push(t);
threadWakeup();
unlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<pthread_t> _threads;
int _num;
std::queue<T> _tasks; // 使用stl的自动扩容的特性
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
--------------------task.hpp-------------------------------------------
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
class Task
{
public:
Task()
{}
Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0)
{
}
void operator()()
{
switch(_op)
{
case '+':
_result = _x+_y;
break;
case '-':
_result = _x-_y;
break;
case '*':
_result = _x*_y;
break;
case '/':
{
if(_y == 0) _exitCode = -1;
else _result = _x/_y;
}
case '%':
{
if(_y == 0) _exitCode = -2;
_result = _x%_y;
}
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x)+_op+std::to_string(_y) + '=' ;
}
std::string formatRes()
{
return std::to_string(_result)+"("+std::to_string(_exitCode)+")" ;
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
----------------main.cc--------------------
#include"threadPool_v1.hpp"
#include"task.hpp"
#include<memory>
int main()
{
std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
tp->init();
tp->start();
while(true)
{
int x,y;
char op;
std::cout<<"please Enter x>";
std::cin>>x;
std::cout<<"please Enter y>";
std::cin>>y;
std::cout<<"please Enter op>";
std::cin>>op;
Task t(x,y,op);
tp->pushTask(t);
sleep(1);
}
return 0;
}
运行结果:
版本二
将自己编写的task类加入多线程运行
-----------------------------threadPool_v2.hpp---------------------------------------
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "task.hpp"
const static int N = 10;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = N) : _num(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
void lockQueue()
{
pthread_mutex_lock(&_lock);
}
void unlockQueue()
{
pthread_mutex_unlock(&_lock);
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
static void threadRoutine(void *args)
{
// pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 检测有没有任务
// 2. 有:处理
// 3. 无:等待
// 细节:必定加锁
tp->lockQueue();
while (tp->isEmpty())
{
tp->threadWait();
}
T t = tp->popTask(); // 从公共区域拿到私有区域
tp->unlockQueue();
// for test
t();
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
// t.run(); // 处理任务,应不应该在临界区中处理?1,0
}
}
void init()
{
for (int i = 0; i < _num; i++)
{
_threads.push_back(Thread(i, threadRoutine, this));
}
}
void start()
{
for (auto &t : _threads)
{
t.run();
}
}
void check()
{
for (auto &t : _threads)
{
std::cout << t.threadname() << " running..." << std::endl;
}
}
void pushTask(const T &t)
{
lockQueue();
_tasks.push(t);
threadWakeup();
unlockQueue();
}
~ThreadPool()
{
for (auto &t : _threads)
{
t.join();
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<Thread> _threads;
int _num;
std::queue<T> _tasks; // 使用stl的自动扩容的特性
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
版本三
------------------------------threadPool_v3.hpp-----------------------------------------------
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "task.hpp"
#include"lockGuard.hpp"
const static int N = 10;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = N) : _num(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
pthread_mutex_t* getlock()
{
return &_lock;
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
static void threadRoutine(void *args)
{
// pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 检测有没有任务
// 2. 有:处理
// 3. 无:等待
// 细节:必定加锁
T t;
LockGuard lockguard(tp->getlock());
while (tp->isEmpty())
{
tp->threadWait();
}
t = tp->popTask(); // 从公共区域拿到私有区域
// for test
t();
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
// t.run(); // 处理任务,应不应该在临界区中处理?1,0
}
}
void init()
{
for (int i = 0; i < _num; i++)
{
_threads.push_back(Thread(i, threadRoutine, this));
}
}
void start()
{
for (auto &t : _threads)
{
t.run();
}
}
void check()
{
for (auto &t : _threads)
{
std::cout << t.threadname() << " running..." << std::endl;
}
}
void pushTask(const T &t)
{
LockGuard lockguard(&_lock);
_tasks.push(t);
threadWakeup();
}
~ThreadPool()
{
for (auto &t : _threads)
{
t.join();
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<Thread> _threads;
int _num;
std::queue<T> _tasks; // 使用stl的自动扩容的特性
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
8. 线程安全的单例模式
单例模式是一种“经典的,常用的,常考的设计模式
什么是设计模式
IT行业这么火,涌入的人很多,俗话说林子大了啥鸟都有。为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式
单例模式的特点
某些类,只应该具有一个对象(实例),就称为单例
-------------------------------threadPool_v4.hpp--------------------------------
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
class Task
{
public:
Task()
{}
Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0)
{
}
void operator()()
{
switch(_op)
{
case '+':
_result = _x+_y;
break;
case '-':
_result = _x-_y;
break;
case '*':
_result = _x*_y;
break;
case '/':
{
if(_y == 0) _exitCode = -1;
else _result = _x/_y;
}
case '%':
{
if(_y == 0) _exitCode = -2;
_result = _x%_y;
}
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x)+_op+std::to_string(_y) + '=' ;
}
std::string formatRes()
{
return std::to_string(_result)+"("+std::to_string(_exitCode)+")" ;
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
9.STL、智能指针和线程安全
STL中的容器是否是线程安全的?
不是
原因是,STL设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响,而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶)
因此STL默认不是线程安全,如果需要在多线程环境下使用,往往需要调用者自行保证线程安全
智能指针是否是线程安全的?
对于unique_ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全问题,对于shared_ptr,多个对象需要公用一个引用计数变量,所以会存在线程安全问题,但是标准库实现的时候考虑到了这个问题,基于院子操作(CAS)的方式保证shared_ptr能够高效,原子的操作引用计数。
10.其他常见的各种锁
- 悲观锁:在每次读取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁、写锁、行锁等),当其他线程想要访问数据时,被阻塞挂起
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁,但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要才去两种方式:版本号机制和CAS操作
- CAS操作:当需要更新数据时,判断当前内存和之前取得的值是否相等。如果相等则用新值更新;若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试
- 自旋锁、公平锁、非公平锁