开玩笑的
本篇详细讲述了多线程的各种细节及操作方法
对锁的各种操作,以及原子性的阐述
原谅我嚣张的标题
Begin:本篇文章尽可能详细的讲述了线程的概念、使用、安全问题,以及消费者生产者模型的设计理念和实现代码。对于单例模式的两种实现代码以及STL设计的线程安全方面问题都有详细讲述。
目录
Linux线程概念
什么是线程:
线程的优点
线程的缺点
线程异常:
线程用途:
Linux进程 VS 线程
进程和线程
线程控制:
POSIX线程库:
创建线程:
线程ID及进程地址空间布局:
线程终止:
线程等待:
线程分离:
Linux线程控制:
进程间的互斥相关背景:
互斥量mutex:
互斥量的接口:
互斥量的初始化:
销毁互斥量:
互斥量的加锁解锁:
原子性:
可重入 VS 线程安全
概念:
常见的线程不安全情况:
常见的线程安全的情况:
常见的不可重入的情况:
常见的可重入的情况:
可重入与线程安全:
可重入与线程安全区别:
常见锁概念
死锁:
死锁的四个必要条件:
避免死锁:
Linux线程同步
条件变量:
同步概念与竞态条件:
条件变量的初始化:
条件变量的销毁:
等待条件变量满足:
唤醒等待:
条件变量的理解:
为什么pthread_cond_wait()需要配合互斥量使用
生产者消费者模型
为什么要使用生产者消费者模型:
基于BlockingQueue的生产者消费者模型
BlockingQueue :
POSIX信号量:
基于环形队列的生产消费模型:
线程池
线程池应用场景:
线程安全的单例模式
什么是单例模式:
什么是设计模式:
单例模式的特点:
饿汉和懒汉的实现方式:
饿汉方式实现单例模式:
懒汉方式实现单例模式:
STL,智能指针和线程安全
STL:
智能指针:
其他的各种锁...
总结:
Linux线程概念
什么是线程:
- 线程是一个“进程内部的控制序列”
- 一切进程至少有一个执行线程
- 线程在进程内部运行,本质实在进程地址空间内运行
- 在Linux系统中,cpu看到的pcb都要比传统进程更轻量化、
- 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流
线程的优点
- 创建一个新线程的代价要比创建一个新进程的小的多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少的多
- 线程占用的资源要比进程小很多
- 充分利用多处理器的可并行数量
- 在等待慢速I/O的操作结束的同时,可以执行其他的计算任务
- 计算密集型应用,为了能在多处理系统上运行,将计算分解到多线程中执行
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作
创建新线程的代价比进程小的原因:
1.资源共享:线程是进程内的执行流,他们共享进程的地址空间和大部分资源,包括文件描述符、信号处理器、打开的文件等。因此创建线程时不需要复制整个进程的资源,只需要为线程分配栈空间和一些必要的数据结构就好。 而创建新进程需要复制整个进程的资源,包括内存映像、打开的文件等,开销更大
2.上下文切换:线程的上下文切换比进程的上下文切换快。上下文切换指的是从一个执行流切换到另一个执行流的过程,需要保存和恢复执行环境。由于线程共享进程的地址空间,上下文切换时只需要切换线程的栈指针和程序计数器等少量信息,比起进程切换时需要切换整个地址空间的内容代价更小
3.调度开销:线程的调度开销更小。线程是进程内的执行单元,对于调度器来说,切换进程的开销要比切换进程的开销来的小的很多,线程的调度更轻量级,可以通过用户态的线程库来管理,避免了内核态与用户态之间的切换
4:复用进程环境:创建线程时,可以重用已有的进程环境。已有进程中的资源和状态可以在新线程中共享和利用,避免了重复创建和初始化的开销
线程的缺点
1.共享资源问题:线程之间共享进程的资源,如内存、文件描述符等。这页意味着多个线程可以同时访问和修改相同的资源,但是也容易引发数据竞争和冲突。
2.安全性问题:线程并发执行带来了一些安全风险,因为多个线程访问相同的内存区域,如果没有适当的同步措施,可能会导致数据不一致、死锁和活锁等问题。
3.调试困难:多线程的调试更难。由于多个线程同时运行,调试器可能无法提供准确的调试信息,因为线程的执行顺序和交错是不确定的(看调度器心情),线程之间的相互影响、竞争条件和死锁等问题都可能增加程序的调式难度。
4.内存开销:每个线程都需要一定的栈空间,用于保存局部变量和函数调用。这就意味着如果线程的数量增加,内存开销也会对应增加,当线程数量过大或栈空间过大,就将导致内存资源紧张,反倒影响了性能
线程异常:
- 单个线程如果出现异常,类似除零错误,野指针问题,导致线程崩溃,进程也会跟着崩溃
- 线程是进程的执行分支,线程出现异常,就类似进程出现异常,进而触发信号机制终止该进程即进程下的所有线程。
线程用途:
- 合理使用多线程,能提高CPU密集型程序的执行效率
- 合理使用多线程,能提高IO密集型程序的用户体验
CPU密集型程序:主要就是利用了计算机的处理能力。
- 数值计算和科学模拟:大数值计算、天气预报、分子动力学等
- 加密和解密算法:AES(高级加密标准)、DES(数据加密标准)、RSA等
- 图像各视频处理:图像和视频处理任务,如图像滤波,图像识别
- 数据分析和机器学习:对于大规模数据进行处理、分析和机器学习的任务如大规模数据挖掘、深度学习训练等
IO密集型:程序运行时候主要依赖输入输出。
- 文件系统操作:涉及大量文件的读写,如文件备份,文件同步
- 网络服务器:包括Web服务器、数据库服务器等
- 数据库操作:涉及频繁的数据库操作,如检索、插入、更新、删除等
- 图片音视频处理:涉及大文件的读取和处理,如图像编辑软件,音视频转码工具等
Linux进程 VS 线程
ps:对上文做了归纳
进程和线程
- 进程是资源分配的基本单位
- 线程是调度基本单位
- 线程共享进程数据,但也拥有自己的一部分数据
- 线程ID
- 一组寄存器
- 栈
- errno
- 信号屏蔽字
- 调度优先级
- 共享的进程资源
- 文件描述符表
- 每种信号的处理方式(SIG_IGN...或自定义的信号处理函数)
- 当前工作目录
- 用户id和组id
进程和线程的关系图:
线程控制:
POSIX线程库:
- 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以“pthread_”打头的
- 要使用这些函数库,要通过引入头文<pthread.h>
- 链接这些线程函数库时要使用编译器命令的“-lpthread”选项
创建线程:
函数:int pthread_create(phtread_t *thread,const pthread_attr_t *attr,void *(*start_routine)(void*),void *arg);
参数:
thread:返回线程的id
attr:设置线程的属性,attr为NULL表示使用默认属性
start_routine:是个函数地址,线程启动后要执行的函数
arg:传给线程启动函数的参数
返回值:成功返回0,失败返回错误码
错误检查:
- 传统函数成功则返回0,失败返回-1,并且对全局变量errno赋值来指示错误
- pthreads函数出错不会设置全局变量errno(而大部分其他POSIX函数会这么做)。而是将错误代码通过返回值返回
- pthreads同样也提供了线程内的errno变量,以支持其他使用errno的代码。对于pthreads函数的错误则建议通过返回值判定,因为读取返回值要比读取线程内的errno变量的开销更小
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
void *rout(void *arg) {
int i;
for( ; ; ) {
printf("I'am thread 1\n");
sleep(1);
}
}
int main( void )
{
pthread_t tid;
int ret;
if ( (ret=pthread_create(&tid, NULL, rout, NULL)) != 0 ) {
fprintf(stderr, "pthread_create : %s\n", strerror(ret));
exit(EXIT_FAILURE);
}
int i;
for(; ; ) {
printf("I'am main thread\n");
sleep(1);
}
return 0;
}
//控制台显示效果为:
//I’am main thread
//I’am thread 1
//I’am main thread
//I’am thread 1
//I’am main thread
//I’am thread 1
//I’am main thread
//I’am thread 1
Q:为什么是交替打印的?
A:主线程和子线程同时运行,因此你可以看到它们交替打印消息。这是因为在多线程程序中,线程的执行是由系统调度的,不同线程的执行顺序是不确定的。
线程ID及进程地址空间布局:
- pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID不是一回事
- 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程
- pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。
- 线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID:
pthread_t pthread_self(void);
A:所以pthread_t到底是一个什么类型呢?
Q:其实对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址。
线程终止:
如果需要终止某个线程而不是终止整个进程,可以有三个方法:
- 从线程函数return,这种方法对主线程不适用,从main函数return相当于调用exit
- 线程可以调用pthread_exit终止自己
- 一个线程可以调用pthread_cancel终止同一进程中的另一线程
函数:void pthread_exit(void *value_ptr);
功能:线程终止
参数:
value_ptr:value_ptr不要指向一个局部变量
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者
注意:pthread_exit或者return返回的指针所指向的内存单元必须是全局的或者malloc分配的,不能再函数栈上分配,因为当其他线程得到这个返回指针的时候函数就退出了.
/
/
函数:int phtread_cacel(pthread_t thread);
功能:取消一个执行中的线程
参数:
thread:线程ID
返回值:成功返回0,失败返回错误码
线程等待:
Q:为什么需要线程等待?
A:已经退出的线程其空间没有被释放,仍然在进程的地址空间内,创建新的线程不会复用刚才退出线程的地址空间
函数:int pthread_join(pthread_t thread, void **value_ptr);
参数:
thread:线程ID
value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回0,失败返回错误码
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED。
- 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
- 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
void *thread1(void *arg)
{
printf("thread 1 returning ... \n");
int *p = (int *)malloc(sizeof(int));
*p = 1;
return (void *)p;
}
void *thread2(void *arg)
{
printf("thread 2 exiting ...\n");
int *p = (int *)malloc(sizeof(int));
*p = 2;
pthread_exit((void *)p);
}
void *thread3(void *arg)
{
while (1)
{ //
printf("thread 3 is running ...\n");
sleep(1);
}
return NULL;
}
int main(void)
{
pthread_t tid;
void *ret;
// thread 1 return
pthread_create(&tid, NULL, thread1, NULL);
pthread_join(tid, &ret);
printf("thread return, thread id %X, return code:%d\n", tid, *(int *)ret);
free(ret);
// thread 2 exit
pthread_create(&tid, NULL, thread2, NULL);
pthread_join(tid, &ret);
printf("thread return, thread id %X, return code:%d\n", tid, *(int *)ret);
free(ret);
// thread 3 cancel by other
pthread_create(&tid, NULL, thread3, NULL);
sleep(3);
pthread_cancel(tid);
pthread_join(tid, &ret);
if (ret == PTHREAD_CANCELED)
printf("thread return, thread id %X, return code:PTHREAD_CANCELED\n", tid);
else
printf("thread return, thread id %X, return code:NULL\n", tid);
return 0;
}
线程分离:
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
int pthread_detach(pthread_t thread);
可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离:
pthread_detach(pthread_self());
joinable和分离是冲突的,一个线程不能既是joinable又是分离的。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run(void *arg)
{
pthread_detach(pthread_self());
printf("%s\n", (char *)arg);
return NULL;
}
int main()
{
pthread_t tid;
if (pthread_create(&tid, NULL, thread_run, (void*)("thread1 run...")))
{
printf("create thread error\n");
return 1;
}
int ret = 0;
sleep(1); // 很重要,要让线程先分离,再等待
if (pthread_join(tid, NULL) == 0)
{
printf("pthread wait success\n");
ret = 0;
}
else
{
printf("pthread wait failed\n");
ret = 1;
}
return ret;
}
如果在这时候不sleep上1s就可能会让pthread_join先执行 ,从而分离失败。
Linux线程控制:
进程间的互斥相关背景:
临界资源:多线程执行流共享的资源就叫做临界资源
临界区:每个线程内部,访问临界资源的代码,就叫做临界区
互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
互斥量mutex:
大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
多个线程并发的操作共享变量,会带来一些问题。
// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg)
{
char *id = (char *)arg;
while (1)
{
if (ticket > 0)
{
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
}
else
{
break;
}
}
}
int main(void)
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
}
上面是模拟四个线程抢票行为 ,运行结果为:
这样100张ticket硬生生被抢成-2张,从结果上看出来是错误的,下列是产生错误的几个原因 :
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- --ticket 操作本身就不是一个原子操作
--操作不是原子操作
- load :将共享变量ticket从内存加载到寄存器中
- update : 更新寄存器里面的值,执行-1操作
- store :将新值,从寄存器写回共享变量ticket的内存地址
要解决上面问题,需要做到下面三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到上面三点,本质上就是需要一把锁。Linux提供的这把锁叫互斥量!
互斥量的接口:
互斥量的初始化:
1.互斥量的静态分配 :pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
2.动态分配:int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrictattr);
参数:
mutex:要初始化的互斥量
attr:NULL
销毁互斥量:
-
使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
-
不要销毁一个已经加锁的互斥量
-
已经销毁的互斥量,要确保后面不会有线程再尝试加锁
互斥量的加锁解锁:
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
在调用 pthread_lock可能会遇到下列情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
下面是改进上面的收票系统:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
int ticket = 100;
pthread_mutex_t mutex;
void *route(void *arg)
{
char *id = (char *)arg;
while (1)
{
pthread_mutex_lock(&mutex);
if (ticket > 0)
{
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
pthread_mutex_unlock(&mutex);
}
else
{
pthread_mutex_unlock(&mutex);
break;
}
}
}
int main(void)
{
pthread_t t1, t2, t3, t4;
pthread_mutex_init(&mutex, NULL);
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
pthread_mutex_destroy(&mutex);
}
- 经过上面的例子,我们就可以知道++和-- 操作都不是原子的,可能会有数据一致性的问题
- 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令作用是把寄存器和内存单元的数据相交换,由于只有一条指令,就保证了原子性。
原子性:
1.当一个线程调用pthread_mutex_lock()函数时,如果当前锁可以获取(没被其他线程持有),线程就会直接获得锁,函数调用会立即返回。如果锁被其他进程所持有,当前线程就会被添加到等待队列中,并进入睡眠状态。
2.在另一个线程释放锁时,他会通知内核更新等待队列,并将等待队列中的一个或多个线程唤醒。被唤醒的线程会重新尝试获取锁,这个过程是由内核控制的,即加锁的核心就一行汇编代码,保证了锁的获取和释放是原子性的。
可重入 VS 线程安全
概念:
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数
常见的线程不安全情况:
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量的指针函数
- 调用线程不安全的函数
常见的线程安全的情况:
-
每个线程对全局变量过着静态变量只有读取的权限,没有写入的权限,一般来说都是安全的
-
类或者接口对线程来说都是原子操作
-
多个线程之间的切换不会导致该接口的执行结果存在二义性
常见的不可重入的情况:
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见的可重入的情况:
-
不使用全局变量或静态变量
-
不使用用malloc或者new开辟出的空间
-
不调用不可重入函数
-
不返回静态或全局数据,所有数据都有函数的调用者提供
-
使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全:
-
函数是可重入的,那就是线程安全的
-
函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
-
如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的
可重入与线程安全区别:
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的
常见锁概念
死锁:
- 死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
死锁的四个必要条件:
-
互斥条件:一个资源每次只能被一个执行流使用
-
请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
-
不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
-
循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
PS:就好像A手上拿着可乐,B手上拿着雪碧,A要B的雪碧,B要A的可乐,但是他们只有一只手,怎么样都不能交换双方的资源,谁都不愿意先松口。
避免死锁:
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
避免死锁算法:
- 死锁检测算法
死锁状态检测算法(Deadlock Detection Algorithm):死锁状态检测算法通过周期性地检测系统中的资源分配和请求情况,来判断是否存在死锁。该算法通过对资源分配情况和进程请求情况的监测,构建系统的资源分配状态,并进行资源的回收和重新分配,以查找死锁的存在。
- 银行家算法
银行家算法(Banker’s Algorithm):银行家算法是一种用于避免死锁的资源分配和请求算法。该算法通过模拟资源的分配和释放过程,来判断在分配资源的情况下,是否可能发生死锁。银行家算法需要对系统中的资源和进程进行一定的初始化,然后根据进程的资源请求情况判断是否满足安全序列条件,从而进行资源分配。
Linux线程同步
条件变量:
-
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了
-
例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步概念与竞态条件:
- 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
线程的饥饿问题:
- 资源饥饿:当一个线程无法获取到其所需的资源时,就会发生资源饥饿。这些资源可以是临界区资源、锁、内存等。如果某个线程长时间无法获取到所需的资源,它可能无法继续执行或被迫等待,进而影响整个程序的性能和响应时间(就像抢票只让线程A抢其他几个线程一点机会都没有)
CPU饥饿:当某个线程无法获得足够的CPU时间来执行任务时,就会发生CPU饥饿。这可能是因为系统负载过高、优先级调度问题、线程调度策略不合理等原因。如果某个线程一直得不到执行,那么其他线程可能会占据大部分的CPU时间,导致该线程无法得到执行的机会。
调度饥饿:当一个线程在高频率的被其他线程抢占时,无法得到执行的机会,就会发生调度饥饿。这可能是因为线程优先级设置不恰当、调度算法不公平等原因导致。线程的饥饿会影响程序的公平性和性能。
解决的方法:
公平的资源分配:确保资源的分配是公平的,避免某个线程一直占用资源。
合理的调度策略:选择合适的线程调度策略,防止某个线程一直被其他线程抢占。
优先级调度:根据线程的重要性和优先级,合理设置线程的优先级,避免低优先级线程长时间被高优先级线程抢占。
合理的资源管理:对临界区资源、锁等进行合理的管理和分配,避免资源的过度占用和不公平分配。
条件变量的初始化:
函数:int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrictattr);
参数:
cond:要初始化的条件变量
attr:NULL
条件变量的销毁:
函数:int pthread_cond_destroy(pthread_cond_t *cond)
等待条件变量满足:
函数:int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量
唤醒等待:
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
来个例子 :条件变量的创建并使用
条件变量的理解:
条件变量(Condition Variable)是一种线程同步机制,用于线程之间的通信和同步。它允许一个或多个线程等待某个条件的发生,而不需要忙等待或占用CPU资源 。条件变量的使用一般与互斥锁(Mutex)结合使用,以确保线程同步和数据的一致性。
使用六步曲:
1.定义条件变量和互斥锁:
pthread_cond_t cond;
pthread_mutex_t mutex;
2.初始化条件变量和互斥锁
pthread_cond_init(&cond, NULL);
pthread_mutex_init(&mutex, NULL);
3.在等待条件的线程中,使用pthread_cont_wait()函数来等待条件的满足:
pthread_mutex_lock(&mutex);
while (!condition) { // 检查条件是否满足,如果不满足则等待
pthread_cond_wait(&cond, &mutex);
}
// 条件满足,继续执行代码
pthread_mutex_unlock(&mutex);
4.在满足条件的线程中使用pthread_cond_signal或pthread_cond_broadcast()函数唤醒等待的线程
- pthread_cond_signal(&cond):唤醒至少一个等待的线程。
- pthread_cond_broadcast(&cond):唤醒所有等待的线程。
5.修改和更新条件时,需要加锁保护:
pthread_mutex_lock(&mutex);
// 修改和更新条件
pthread_cond_signal(&cond); // 发送信号通知等待的线程条件已经改变
pthread_mutex_unlock(&mutex);
6.在程序结束前,释放条件变量和互斥锁:
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
ps:通过使用条件变量可以在需要等待某个条件满足的线程中,避免不必要的盲等待,并提高系统的效率和性能。
注意:
-
线程在调用pthread_cont_wait()之前,必须获取互斥锁并在条件不满足时等待。pthread_cont_wait()将在解锁时使线程进入阻塞等待状态,并在被唤醒后再次获取互斥锁,并继续执行。
-
当条件发生变化时,需要使用互斥锁保护条件的修改,以避免竞态条件(Race Condition)。
-
使用pthread_cond_signal()通知等待线程时,只有一个线程会被唤醒,而使用pthread_cond_broadcast()会唤醒所有等待的线程。
-
条件变量在某种程度上依赖于互斥锁。因此,在使用条件变量时,需要先获取互斥锁,然后检查和等待条件,以及操作共享数据,最后释放互斥锁。
PS:综上所述,条件变量是一种用于线程通信和同步的机制,可以有效地等待和唤醒线程。通过与互斥锁结合使用,可以确保线程同步和数据的一致性 .
为什么pthread_cond_wait()需要配合互斥量使用
-
确保线程安全:pthread_cont_wait()函数调用时,会将线程放入等待队列,并在解锁互斥量时将线程阻塞。当其他线程发送信号(调用pthread_cond_signal()或pthread_cond_broadcast())时,等待的线程被唤醒并尝试重新获取互斥量。如果没有互斥量的保护,当线程被唤醒时,其他线程可能已经修改了条件变量和共享数据,导致竞态条件和不确定的结果。通过互斥量的加锁和解锁,在等待期间,可以确保只有一个线程能够访问共享资源,从而实现线程安全。
-
防止虚假唤醒(Spurious Wakeup):在某些情况下,即使没有调用pthread_cond_signal()或pthread_cond_broadcast()函数,等待的线程也可能被唤醒。这种情况被称为虚假唤醒。为了防止虚假唤醒,pthread_cont_wait()函数应始终在一个循环中使用,每次唤醒后都要检查条件是否满足。互斥量的使用可以确保线程在重新获取互斥锁之前不会检查和访问条件和共享数据,从而避免了虚假唤醒。
下面就是一个的生产者和消费者的模型来更好的理解上文所述...
生产者消费者模型
为什么要使用生产者消费者模型:
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
(就像超市给我们消费提供了一个购物的地方,而那些生产商品的厂家就是生产者,超市就是一个阻塞队列)
基于BlockingQueue的生产者消费者模型
BlockingQueue :
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
下面是C++queue模拟阻塞队列的生产者模型
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>
#define NUM 8
class BlockQueue
{
private:
std::queue<int> q;
int cap;
pthread_mutex_t lock;
pthread_cond_t full;
pthread_cond_t empty;
private:
void LockQueue()
{
pthread_mutex_lock(&lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&lock);
}
void ProductWait()
{
pthread_cond_wait(&full, &lock);
}
void ConsumeWait()
{
pthread_cond_wait(&empty, &lock);
}
void NotifyProduct()
{
pthread_cond_signal(&full);
}
void NotifyConsume()
{
pthread_cond_signal(&empty);
}
bool IsEmpty()
{
return (q.size() == 0 ? true : false);
}
bool IsFull()
{
return (q.size() == cap ? true : false);
}
public:
BlockQueue(int _cap = NUM) : cap(_cap)
{
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&full, NULL);
pthread_cond_init(&empty, NULL);
}
void PushData(const int &data)
{
LockQueue();
while (IsFull())
{
NotifyConsume();
std::cout << "queue full, notify consume data, product stop." << std::endl;
ProductWait();
}
q.push(data);
// NotifyConsume();
UnLockQueue();
}
void PopData(int &data)
{
LockQueue();
while (IsEmpty())
{
NotifyProduct();
std::cout << "queue empty, notify product data, consume stop." << std::endl;
ConsumeWait();
}
data = q.front();
q.pop();
// NotifyProduct();
UnLockQueue();
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
}
};
void *consumer(void *arg)
{
BlockQueue *bqp = (BlockQueue *)arg;
int data;
for (;;)
{
bqp->PopData(data);
std::cout << "Consume data done : " << data << std::endl;
}
}
// more faster
void *producter(void *arg)
{
BlockQueue *bqp = (BlockQueue *)arg;
srand((unsigned long)time(NULL));
for (;;)
{
int data = rand() % 1024;
bqp->PushData(data);
std::cout << "Prodoct data done: " << data << std::endl;
// sleep(1);
}
}
int main()
{
BlockQueue bq;
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void *)&bq);
pthread_create(&p, NULL, producter, (void *)&bq);
pthread_join(c, NULL);
pthread_join(p, NULL);
return 0;
}
POSIX信号量:
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
初始化信号量:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量:
int sem_destroy(sem_t *sem);
等待信号量:
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem);
发布信号量:
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//
上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量)
基于环形队列的生产消费模型:
- 环形队列采用数组模拟,用模运算来模拟环状特性
- 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
#include <unistd.h>
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#define NUM 16
class RingQueue
{
private:
std::vector<int> q;
int cap;
sem_t data_sem;
sem_t space_sem;
int consume_step;
int product_step;
public:
RingQueue(int _cap = NUM) : q(_cap), cap(_cap)
{
sem_init(&data_sem, 0, 0);
sem_init(&space_sem, 0, cap);
consume_step = 0;
product_step = 0;
}
void PutData(const int &data)
{
sem_wait(&space_sem);
q[consume_step] = data;
consume_step++;
consume_step %= cap;
sem_post(&data_sem);
}
void GetData(int &data)
{
sem_wait(&data_sem);
data = q[product_step];
product_step++;
product_step %= cap;
sem_post(&space_sem);
}
~RingQueue()
{
sem_destroy(&data_sem);
sem_destroy(&space_sem);
}
};
void *consumer(void *arg)
{
RingQueue *rqp = (RingQueue *)arg;
int data;
for (;;)
{
rqp->GetData(data);
std::cout << "Consume data done : " << data << std::endl;
sleep(1);
}
}
// more faster
void *producter(void *arg)
{
RingQueue *rqp = (RingQueue *)arg;
srand((unsigned long)time(NULL));
for (;;)
{
int data = rand() % 1024;
rqp->PutData(data);
std::cout << "Prodoct data done: " << data << std::endl;
}
}
int main()
{
RingQueue rq;
pthread_t c, p;
pthread_create(&c, NULL, consumer, (void *)&rq);
pthread_create(&p, NULL, producter, (void *)&rq);
pthread_join(c, NULL);
pthread_join(p, NULL);
}
线程池
对于线程池的理解:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误
#ifndef __M_TP_H__
#define __M_TP_H__
#include <iostream>
#include <queue>
#include <pthread.h>
#define MAX_THREAD 5
typedef bool (*handler_t)(int);
class ThreadTask
{
private:
int _data;
handler_t _handler;
public:
ThreadTask() : _data(-1), _handler(NULL) {}
ThreadTask(int data, handler_t handler)
{
_data = data;
_handler = handler;
}
void SetTask(int data, handler_t handler)
{
_data = data;
_handler = handler;
}
void Run()
{
_handler(_data);
}
};
class ThreadPool
{
private:
int _thread_max;
int _thread_cur;
bool _tp_quit;
std::queue<ThreadTask *> _task_queue;
pthread_mutex_t _lock;
pthread_cond_t _cond;
private:
void LockQueue()
{
pthread_mutex_lock(&_lock);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_lock);
}
void WakeUpOne()
{
pthread_cond_signal(&_cond);
}
void WakeUpAll()
{
pthread_cond_broadcast(&_cond);
}
void ThreadQuit()
{
_thread_cur--;
UnLockQueue();
pthread_exit(NULL);
}
void ThreadWait()
{
if (_tp_quit)
{
ThreadQuit();
}
pthread_cond_wait(&_cond, &_lock);
}
bool IsEmpty()
{
return _task_queue.empty();
}
static void *thr_start(void *arg)
{
ThreadPool *tp = (ThreadPool *)arg;
while (1)
{
tp->LockQueue();
while (tp->IsEmpty())
{
tp->ThreadWait();
}
ThreadTask *tt;
tp->PopTask(&tt);
tp->UnLockQueue();
tt->Run();
delete tt;
}
return NULL;
}
public:
ThreadPool(int max = MAX_THREAD) : _thread_max(max), _thread_cur(max),_tp_quit(false)
{
pthread_mutex_init(&_lock, NULL);
pthread_cond_init(&_cond, NULL);
}
~ThreadPool()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
bool PoolInit()
{
pthread_t tid;
for (int i = 0; i < _thread_max; i++)
{
int ret = pthread_create(&tid, NULL, thr_start, this);
if (ret != 0)
{
std::cout << "create pool thread error\n";
return false;
}
}
return true;
}
bool PushTask(ThreadTask *tt)
{
LockQueue();
if (_tp_quit)
{
UnLockQueue();
return false;
}
_task_queue.push(tt);
WakeUpOne();
UnLockQueue();
return true;
}
bool PopTask(ThreadTask **tt)
{
*tt = _task_queue.front();
_task_queue.pop();
return true;
}
bool PoolQuit()
{
LockQueue();
_tp_quit = true;
UnLockQueue();
while (_thread_cur > 0)
{
WakeUpAll();
usleep(1000);
}
return true;
}
};
#endif
/*main.cpp*/
bool handler(int data)
{
srand(time(NULL));
int n = rand() % 5;
printf("Thread: %p Run Tast: %d--sleep %d sec\n", pthread_self(), data, n);
sleep(n);
return true;
}
int main()
{
int i;
ThreadPool pool;
pool.PoolInit();
for (i = 0; i < 10; i++)
{
ThreadTask *tt = new ThreadTask(i, handler);
pool.PushTask(tt);
}
pool.PoolQuit();
return 0;
}
线程安全的单例模式
什么是单例模式:
单例模式是一种 "经典的, 常用的, 常考的" 设计模式.
什么是设计模式:
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是 设计模式.
单例模式的特点:
某些类, 只应该具有一个对象(实例), 就称之为单例.
例如一个男人只能有一个媳妇.,一把钥匙对一个锁孔
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据。
饿汉和懒汉的实现方式:
饿汉方式实现单例模式:
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
- 优点 :
- 现简单,线程安全:在类加载时就创建了单例对象,不需要处理线程安全的问题。
- 线程访问速度快:因为单例对象在类加载时就创建,并且在程序运行期间一直存在,所以访问单例对象的速度很快。
- 缺点:
- 占用内存:即使在程序运行过程中没有使用该单例对象,也会提前占用内存。
- 不支持延迟加载:单例对象在类加载时就创建,无法实现按需延迟加载。
懒汉方式实现单例模式:
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
- 优点:
- 延迟加载:单例对象在第一次使用时才会被创建,实现了按需延迟加载的效果,节省了内存。
- 支持多线程环境:可以通过加锁等方式,保证多线程环境下的线程安全。
- 缺点:
- 实现复杂,存在线程安全问题:使用懒汉模式需要考虑线程安全,特别是在多线程环境下可能会出现竞态条件,需要通过加锁等手段解决。
- 线程访问速度相对较慢:在多线程环境下,因为需要考虑线程安全,可能会引入额外的同步开销,使得访问单例对象的速度较慢。
懒汉方式的线程安全版本:
// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
volatile static T *inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T *GetInstance()
{
if (inst == NULL)
{ // 双重判定空指针, 降低锁冲突的概率, 提高性能.
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
if (inst == NULL)
{
inst = new T();
}
lock.unlock();
}
return inst;
}
};
注意事项:加锁解锁的位置,双重 if 判定, 避免不必要的锁竞争,volatile关键字防止过度优化
总结:
饿汉模式适用于希望在程序启动时就创建单例对象,并且对性能要求较高的场景。而懒汉模式适用于延迟加载和对性能要求相对较低的场景,但需要考虑线程安全问题。
STL,智能指针和线程安全
STL:
STL并不是线程安全的,STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响,而且对于不同的容器加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶)。
智能指针:
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数
其他的各种锁...
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS(原子)操作
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试
- 自旋锁,公平锁,非公平锁?
这里就不详细展开了。
总结:
本篇文章尽可能详细的讲述了线程的概念、使用、安全问题,以及消费者生产者模型的设计理念和实现代码。对于单例模式的两种实现代码以及STL设计的线程安全方面问题都有详细讲述。