文章目录
- 13. 线程
- 13.1 什么是线程
- 13.2 Linux下的线程
- 13.2.1 pthread_create
- 13.2.2 线程为什么高效?
- 13.2.3 线程的优缺点
- 13.2.4 线程异常
- 13.2.5 线程用途
- 13.4 虚拟地址空间
- 13.5 Linux线程控制
- 13.5.1 POSIX线程库
- 13.5.2 创建线程
- 13.5.3 线程ID及进程地址空间布局
- 13.5.4 线程终止
- 13.5.5 线程等待
- 13.5.7 __thread
- 13.5.8 线程的封装
- 13.6 Linux线程互斥
- 13.6.1 背景概念
- 13.6.2 互斥量mutex
- 13.6.3 互斥量的接口
- 13.6.4 互斥量实现原理探究
- 13.6.5 互斥锁的封装
- 13.8 常见的锁概念
- 13.9 Linux线程同步
- 13.9.2 同步概念与竞态条件
- 13.9.3 条件变量函数
- 13.9.4 条件变量使用规范
- 13.10 生产消费者模型
- 13.10.1 基于BlockingQueue的生产者消费者模型
- 13.10.2 POSIX信号量
- 13.10.3 基于环形队列的生产消费模型
- 13.11 线程池
- 13.12 日志系统
- 13.13 线程安全的单例模式
- 13.14 STL,智能指针和线程安全
- 13.15 其他常见的各种锁
- 13.16 读者写者问题
- 13.17 自旋锁
13. 线程
13.1 什么是线程
- 线程是程序内的执行路线:线程是程序内部的一条执行路径,它是程序的一部分,可以被操作系统调度执行。一个程序可以包含多个线程,每个线程都可以独立执行不同的任务。
- 线程是进程内部的控制序列:线程是在进程内部运行的,它们共享同一个进程的资源,如内存空间、文件描述符等。因此,线程之间可以方便地共享数据和通信。
- 一个进程至少包含一个执行线程:每个进程至少包含一个执行线程,即主线程。主线程通常是程序的入口点,负责启动程序,并在程序退出时进行清理工作。
- 线程在进程地址空间内运行:线程与进程共享同一个地址空间,即它们可以访问同一个进程的内存空间。这使得线程之间的数据共享和通信变得更加高效和方便。
- CPU眼中的PCB更加轻量化:PCB(进程控制块)是操作系统用来管理进程和线程的数据结构。在 Linux 系统中,线程的 PCB 比传统的进程的 PCB 更加轻量化,这意味着线程的创建和切换开销更小,可以更快地进行线程的调度和执行。
- 通过进程虚拟地址空间,可以看到进程的大部分资源:线程与进程共享进程的资源,包括虚拟地址空间、打开的文件、信号处理器等。因此,线程可以直接访问进程的资源,而不需要进行额外的复制或传递。
- 将进程资源合理分配给每个执行流,就形成了线程执行流:线程执行流是指线程在进程中执行的控制路径。操作系统将进程的资源合理分配给每个线程,使得每个线程都能够独立执行任务,并且能够有效地利用系统资源。
13.2 Linux下的线程
在Linux系统中,线程和进程之间的关系确实有些不同于其他操作系统。Linux内核中没有直接的线程概念,而是通过进程来模拟线程。这种模拟线程的方式是通过创建多个轻量级进程(Lightweight Process,LWP)来实现的。
在Linux下,每个进程都有一个独立的地址空间、堆栈和文件描述符等资源。传统上,每个进程只有一个执行流,也就是说每个进程只有一个线程。但是,为了实现多线程并发,Linux引入了轻量级进程的概念。轻量级进程是在单个进程内部创建的,它们共享相同的地址空间和其他资源,但拥有独立的执行流。
这意味着在Linux系统中,一个进程可以拥有多个执行流,每个执行流都被称为一个轻量级进程(LWP)。这些轻量级进程共享相同的进程上下文和资源,但可以独立地执行不同的任务,实现了多线程并发的效果。
因此,虽然Linux内核中没有直接的线程概念,但通过创建多个轻量级进程,可以实现多线程编程的效果。这种方式相比传统的线程实现有一些优势,例如更灵活地管理线程的资源和更好地利用多核处理器。
示例:
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
int gcnt = 100;
// 新线程
void *ThreadRoutine(void *arg)
{
const char *threadname = (const char *)arg;
while (true)
{
std::cout << "I am a new thread: " << threadname << ", pid: " << getpid() << "gcnt: " << gcnt << " &gcnt: " << &gcnt << std::endl;
gcnt--;
sleep(1);
}
}
int main()
{
// 已经有进程了
pthread_t tid;
pthread_create(&tid, nullptr, ThreadRoutine, (void *)"thread 1");
sleep(3);
pthread_t tid1;
pthread_create(&tid1, nullptr, ThreadRoutine, (void *)"thread 2");
sleep(3);
pthread_t tid2;
pthread_create(&tid2, nullptr, ThreadRoutine, (void *)"thread 3");
sleep(3);
pthread_t tid3;
pthread_create(&tid3, nullptr, ThreadRoutine, (void *)"thread 4");
sleep(3);
// 主线程
while (true)
{
std::cout << "I am main thread"
<< ", pid: " << getpid() << "gcnt: " << gcnt << " &gcnt: " << &gcnt << std::endl;
sleep(1);
}
return 0;
}
ps -aL
ps -aL
命令的作用是列出系统中所有进程的详细信息,包括每个进程的线程信息。这些信息可能包括进程的PID(进程ID)、PPID(父进程ID)、状态、CPU使用情况、内存占用等,以及每个进程所拥有的线程的信息,比如线程ID、优先级、状态等。
LWP就是轻量级进程。
在Linux系统中,CPU调度是以轻量级进程(LWP)为单位进行的。线程是进程内部的执行流,本质是线程在进程内部的进程空间内执行。
13.2.1 pthread_create
pthread_create()
是一个 POSIX 标准的函数,用于创建一个新的线程。它的原型如下:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine)(void*), void *arg);
让我们来解释每个参数的含义:
thread
: 一个指向pthread_t
类型的指针,用于存储新创建线程的标识符。这个标识符可以用来对线程进行操作,比如等待线程结束或者发送信号给线程等。attr
: 一个指向pthread_attr_t
类型的指针,用于设置新线程的属性,通常可以传入 NULL,表示使用默认属性。start_routine
: 这是一个函数指针,指向新线程所要执行的函数。该函数必须具有特定的签名void* start_routine(void*)
,即接受一个void*
类型的参数,并返回void*
类型的指针。新线程将从这个函数开始执行。arg
: 一个void*
类型的参数,用于传递给start_routine
函数的参数。可以是任意类型的指针。
pthread_create()
函数成功执行时会返回 0,否则返回一个非零的错误码,表示创建线程失败的原因。
# makefile
mythread:pthread.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f mythread
// pthread.cc
#include <iostream>
#include <string>
#include <functional>
#include <vector>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
// typedef std::function<void()> func_t;
using func_t = std::function<void()>;
const int threadnum = 5;
class ThreadData
{
public:
ThreadData(const std::string &name, const uint64_t &ctime, func_t f)
: threadname(name), createtime(ctime), func(f)
{
}
public:
std::string threadname;
uint64_t createtime;
func_t func;
};
void Print()
{
std::cout << "我是线程执行的大任务的一部分" << std::endl;
}
// 新线程
void *ThreadRountine(void *args)
{
int a = 10;
ThreadData *td = static_cast<ThreadData *>(args);
while (true)
{
std::cout << "new thread"
<< " thread name: " << td->threadname << " create time: " << td->createtime << std::endl;
td->func();
if(td->threadname == "thread-4")
{
//std::cout << td->threadname << " 触发了异常!!!!!" << std::endl;
// a /= 0; // 故意制作异常
}
sleep(1);
}
}
// 获取返回值
// 主线程
int main()
{
std::vector<pthread_t> pthreads;
for (size_t i = 0; i < threadnum; i++)
{
char threadname[64];
snprintf(threadname, sizeof(threadname), "%s-%lu", "thread", i);
pthread_t tid;
ThreadData *td = new ThreadData(threadname, (uint64_t)time(nullptr), Print);
pthread_create(&tid, nullptr, ThreadRountine, td);
pthreads.push_back(tid);
sleep(1);
}
std::cout << "thread id: ";
for(const auto &tid: pthreads)
{
std::cout << tid << ",";
}
std::cout << std::endl;
while (true)
{
std::cout << "main thread" << std::endl;
sleep(3);
}
}
注意:在 POSIX 系统上,比如大多数的 Linux 和 Unix 系统,使用线程相关的函数和操作时,需要链接 pthread
库。在编译时通过 -lpthread
选项告诉编译器链接 pthread
库,这样程序才能正确地找到线程相关的函数实现。
13.2.2 线程为什么高效?
- 共享资源:线程可以共享同一进程的资源,包括内存空间、文件句柄等。这意味着线程之间的通信成本比不同进程之间的通信要低,因为它们无需通过进程间通信(IPC)机制进行数据交换,而是可以直接访问共享的内存区域。这可以减少上下文切换的开销和内核态与用户态之间的切换。
- 轻量级:线程通常比进程更轻量级,创建、销毁和切换线程的开销要比进程小得多。这是因为线程在创建时无需分配新的地址空间,而是可以共享所属进程的地址空间。此外,线程的状态切换通常只涉及到寄存器的保存和恢复,而不需要更新内存中的页面表等操作,因此线程切换的开销更小。
13.2.3 线程的优缺点
优点:
- 创建一个新线程的代价要比创建一个新进程小得多
- 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
- 线程占用的资源要比进程少很多
- 能充分利用多处理器的可并行数量
- 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
- 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
- I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。
缺点:
- 性能损失
- 一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
- 健壮性降低
- 编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了 不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
- 缺乏访问控制
- 进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
- 编程难度提高
- 编写与调试一个多线程程序比单线程程序困难得多
13.2.4 线程异常
- 单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
- 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该进程内的所有线程也就随即退出
13.2.5 线程用途
- 合理的使用多线程,能提高CPU密集型程序的执行效率
- 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)
##13.3 Linux进程VS线程
进程是资源分配的基本单位
线程是调度的基本单位
线程共享进程数据,但也拥有自己的一部分数据:
- 线程ID
- 一组寄存器(每个线程都有着自己独立的硬件上下文)
- 栈
- errno
- 信号屏蔽字
- 调度优先级
进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
- 文件描述符表
- 每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)
- 当前工作目录
- 用户id和组id
进程和线程的区别:
- 定义:
- 进程(Process):进程是程序在执行过程中的一个实例。它包含了程序执行时所需的代码、数据以及程序执行的上下文信息。每个进程都拥有独立的地址空间,即使是同一程序的多个实例也是如此。
- 线程(Thread):线程是进程中的一个实体,是进程的执行单元。一个进程可以拥有多个线程,这些线程共享进程的地址空间和资源,但拥有各自的栈空间。
- 资源占用:
- 进程:进程是系统分配资源的基本单位,每个进程都有自己的内存空间、文件描述符、打开的文件等。进程之间的资源是独立的,需要通过进程间通信来共享数据。
- 线程:线程是在进程内部创建的,它们共享进程的内存空间和资源。线程之间可以直接访问共享内存,因此线程间通信更加简单高效。
- 硬件上下文和栈:
- 进程:每个进程都有自己的地址空间、寄存器集、打开的文件等。进程的切换涉及到保存和恢复整个进程的状态,包括寄存器、内存等,因此进程切换开销相对较大。
- 线程:每个线程都有自己的栈空间,用于存储局部变量和函数调用信息。线程共享进程的地址空间和其他资源,但每个线程都有自己的硬件上下文和栈,因此线程切换的开销相对较小。
13.4 虚拟地址空间
- 虚拟地址转换为物理地址:
- 在计算机系统中,程序运行时使用的是虚拟地址,而实际物理内存是由操作系统管理的。
- 当 CPU 访问内存时,虚拟地址首先通过内存管理单元(MMU)转换为物理地址,这个过程称为地址转换。
- 文件系统 I/O 的基本单位大小:
- 在文件系统中,I/O 操作的基本单位是页面大小(通常为 4KB),也就是说每次从文件系统读取或写入的数据大小是页面大小的整数倍。
- 虚拟地址结构:
- 假设系统的虚拟地址空间大小为 32 位,即虚拟地址由 32 个比特位组成。
- 虚拟地址的前 10 位用于页目录(Page Directory)的索引,中间的 10 位用于页表(Page Table)的索引,最低的 12 位用于页内偏移。
- 页表转换:
- 通过页目录和页表,虚拟地址被转换为对应的物理地址。首先,根据虚拟地址的前 10 位找到对应的页目录条目,然后根据中间的 10 位找到对应的页表条目,最后加上最低的 12 位页内偏移,得到物理地址。
- 划分页表的本质:
- 划分页表的本质是将虚拟地址空间划分为多个页面,每个页面都对应着物理内存中的一页框。
- 这种划分使得操作系统能够更加灵活地管理内存,将物理内存的管理抽象为页面的管理,提高了内存的利用率。
- 进程视角下的地址空间:
- 在进程的视角下,虚拟地址空间是进程的一个重要资源。每个进程都有自己的虚拟地址空间,包括代码段、数据段、堆、栈等。
- 进程的虚拟地址空间由操作系统管理,包括分配和回收内存、管理页面等。
- 进程的组成:
- 单纯的 PCB(Process Control Block)是轻量级进程,它记录了进程的基本信息,如进程 ID、状态等。
- 但一个完整的进程不仅包括 PCB,还包括地址空间页表和多个执行流(线程),这些共同组成了一个完整的执行环境。这样的设计使得进程能够独立地运行和管理资源,实现了进程间的隔离和并发执行。
在一个32位系统中,虚拟地址空间大小通常是32位,这意味着地址空间中可以表示的地址数量为 232232 个,即4GB。
对于给定的虚拟地址空间,如果采用二级页表结构来管理内存,其中每个页表条目的大小为4字节(32位),每个页表中有 2^10 个条目,那么页表的大小计算如下:
- 计算页表的级数: 由于采用了二级页表结构,因此有两级页表:一级页表和二级页表。
- 计算每级页表的条目数:
- 对于一级页表,由于需要覆盖整个虚拟地址空间,每个页表有 2^10 个条目。
- 对于二级页表,每个页表条目的大小为4字节,需要的条目数为 2^10 个。
- 计算页表的大小:
- 对于一级页表,每个页表条目占用4字节,因此一级页表的大小为 2^10×4 字节。
- 对于二级页表,每个页表条目占用4字节,因此二级页表的大小为 2^10× 2^10×4 字节。
将这两级页表的大小相加,即可得到总的页表大小。
具体计算如下:
一级页表大小: 2^10×4=4KB
二级页表大小: 2^10× 2^10×4=4MB
因此,总的页表大小为 4KB+4MB=4.004MB。
13.5 Linux线程控制
13.5.1 POSIX线程库
- 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以“pthread_”打头的
- 要使用这些函数库,要通过引入头文
- 链接这些线程函数库时要使用编译器命令的“-lpthread”选项
13.5.2 创建线程
前面提及的pthread_create
13.5.3 线程ID及进程地址空间布局
- pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID不是一回事。
- 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
- pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于NPTL线程库的范畴。
- 线程库的后续操作,就是根据该线程ID来操作线程的。 线程库NPTL提供了pthread_ self函数,可以获得线程自身的ID:
pthread_t pthread_self(void);
pthread_t 到底是什么类型呢?取决于实现。
它实际上是一个不透明的结构体类型,用于存储线程的标识符。在大多数实现中,pthread_t
被定义为一个整数类型或者结构体指针类型,但具体的定义会根据不同的操作系统和编译器而有所不同。
(对于Linux目前实现的NPTL实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址。)
thread_id 本质是一个地址
在程序中,你可以使用 pthread_t
类型的变量来存储线程的标识符,并通过这个标识符来操作线程,比如等待线程结束、发送信号给线程等。通常情况下,你不需要直接操作 pthread_t
类型的变量,而是使用线程相关的函数,比如 pthread_create()
、pthread_join()
等来创建和管理线程。
13.5.4 线程终止
如果需要只终止某个线程而不终止整个进程,可以有三种方法:
- 从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit。
- 线程可以调用pthread_ exit终止自己。
- 一个线程可以调用pthread_ cancel终止同一进程中的另一个线程。
pthread_exit:
-
pthread_exit
函数用于终止当前线程的执行,并将一个指定的退出状态传递给线程的创建者(通常是主线程)。 -
当调用
pthread_exit
时,当前线程会立即停止执行,并返回到创建它的线程中。 -
pthread_exit
的原型如下:
void pthread_exit(void *value_ptr);
-
value_ptr
参数是一个指向要传递给创建者的退出状态的指针。这个退出状态可以是任何类型的指针,通常用于传递线程的执行结果或其他信息。
pthread_cancel:
-
pthread_cancel
函数用于请求取消指定的线程。 -
当调用
pthread_cancel
时,它向指定的线程发送一个取消请求。被请求取消的线程在接收到请求后,会根据线程的取消状态和设置来执行相应的操作。 -
pthread_cancel
的原型如下:
int pthread_cancel(pthread_t thread);
-
thread
参数是要取消的线程的标识符。 -
如果取消请求成功发送,
pthread_cancel
函数会返回0;否则,返回一个非零值表示错误。
13.5.5 线程等待
- 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
- 创建新的线程不会复用刚才退出线程的地址空间。
pthread_join
是 POSIX 线程库中的一个函数,用于等待指定线程的结束,并获取该线程的退出状态。
当一个线程通过 pthread_create
创建后,通常会在主线程中使用 pthread_join
函数来等待它的结束。这样做可以保证主线程在子线程结束后再继续执行,从而避免了竞态条件的发生。
pthread_join
的原型如下:
int pthread_join(pthread_t thread, void **retval);
thread
参数是要等待的线程的标识符。retval
参数是一个指向指针的指针,用于获取线程的退出状态。如果不关心线程的退出状态,可以将此参数设置为NULL
。
pthread_join
函数的行为如下:
- 当调用
pthread_join
时,主线程会一直阻塞,直到指定的线程结束为止。 - 如果指定的线程已经结束,
pthread_join
会立即返回,并将线程的退出状态存储在retval
指向的位置。 - 如果指定的线程还未结束,主线程会一直阻塞,直到线程结束为止。
- 如果成功等待到线程结束并获取到退出状态,
pthread_join
返回0;否则,返回一个非零值表示错误。
需要注意的是,pthread_join
只能等待已经创建的线程,而且每个线程只能被等待一次。如果尝试对同一个线程多次调用 pthread_join
,或者尝试等待一个未创建的线程,都会导致未定义的行为。
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
- 如果thread线程通过return返回,value_ ptr所指向的单元里存放的是thread线程函数的返回值。
- 如果thread线程被别的线程调用pthread_ cancel异常终掉,value_ ptr所指向的单元里存放的是常数PTHREAD_ CANCELED。
- 如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。
- 如果对thread线程的终止状态不感兴趣,可以传NULL给value_ ptr参数。
###13.5.6 分离线程
- 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
- 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
pthread_detach
是 POSIX 线程库中的一个函数,用于将指定的线程标记为"分离状态",从而使得线程在结束后能够自动释放资源,而无需主线程调用 pthread_join
来等待线程结束。
pthread_detach
函数的原型如下:
int pthread_detach(pthread_t thread);
thread
参数是要标记为分离状态的线程的标识符。
pthread_detach
函数的行为如下:
- 当调用
pthread_detach
函数时,将指定的线程标记为分离状态。 - 一个线程被标记为分离状态后,线程结束时会自动释放资源,而不需要其他线程调用
pthread_join
来等待它的结束。 - 如果线程已经是分离状态,或者线程已经结束,那么调用
pthread_detach
函数会返回失败。
需要注意的是,一旦线程被标记为分离状态,就无法再将其恢复为可连接状态。因此,必须在线程创建后、线程开始执行前调用 pthread_detach
函数。
pthread_detach
函数常用于创建后台线程,这样可以确保在主线程结束时,所有的后台线程也能够自动结束,而无需手动调用 pthread_join
函数等待它们的结束。
可以是线程组内其他线程对目标线程进行分离,也可以是线程自己分离:
pthread_detach(pthread_self());
joinable和分离是冲突的,一个线程不能既是joinable又是分离的。
void *thread2( void *arg )
{
printf("thread 2 exiting ...\n");
int *p = (int*)malloc(sizeof(int));
*p = 2;
pthread_exit((void*)p);
}
void *thread3( void *arg )
{
while ( 1 ){ //
printf("thread 3 is running ...\n");
sleep(1);
}
return NULL;
}
int main( void )
{
pthread_t tid;
void *ret;
// thread 1 return
pthread_create(&tid, NULL, thread1, NULL);
pthread_join(tid, &ret);
printf("thread return, thread id %X, return code:%d\n", tid, *(int*)ret);
free(ret);
// thread 2 exit
pthread_create(&tid, NULL, thread2, NULL);
pthread_join(tid, &ret);
printf("thread return, thread id %X, return code:%d\n", tid, *(int*)ret);
free(ret);
// thread 3 cancel by other
pthread_create(&tid, NULL, thread3, NULL);
sleep(3);
pthread_cancel(tid);
pthread_join(tid, &ret);
if ( ret == PTHREAD_CANCELED )
printf("thread return, thread id %X, return code:PTHREAD_CANCELED\n", tid);
else
printf("thread return, thread id %X, return code:NULL\n", tid);
}
运行结果:
[root@localhost linux]# ./a.out
thread 1 returning ...
thread return, thread id 5AA79700, return code:1
thread 2 exiting ...
thread return, thread id 5AA79700, return code:2
thread 3 is running ...
thread 3 is running ...
thread 3 is running ...
thread return, thread id 5AA79700, return code:PTHREAD_CANCELED
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
void *thread_run( void * arg )
{
pthread_detach(pthread_self());
printf("%s\n", (char*)arg);
return NULL;
}
int main( void )
{
pthread_t tid;
if ( pthread_create(&tid, NULL, thread_run, "thread1 run...") != 0 ) {
printf("create thread error\n");
return 1;
}
int ret = 0;
sleep(1);//很重要,要让线程先分离,再等待
if ( pthread_join(tid, NULL ) == 0 ) {
printf("pthread wait success\n");
ret = 0;
} else {
printf("pthread wait failed\n");
ret = 1;
}
return ret;
}
13.5.7 __thread
__thread
是 C 和 C++ 中用于定义线程局部存储(Thread Local Storage,TLS)的关键字。它的作用是声明一个变量为线程局部变量,使得每个线程都拥有自己独立的变量副本,互不影响。
使用 __thread
关键字声明的变量,每个线程都会有一个独立的副本,每个线程中的变量值互不干扰。这种特性使得 __thread
变量非常适合在多线程环境中使用,比如在线程函数中保存线程特有的状态信息。
__thread
变量的定义语法如下:
__thread int var;
或者在 C++ 中也可以使用:
thread_local int var;
需要注意的是,__thread
变量只能定义在全局或静态存储区域,不能在函数内部或动态分配的内存中定义。这是因为 __thread
变量的生命周期与线程的生命周期密切相关,它必须在编译时就确定其存储位置。
在多线程程序中,__thread
变量通常用于存储线程私有的状态信息,比如线程 ID、线程名称、线程计数器等。它们可以避免线程间的竞态条件,提高程序的并发性能。
需要注意的是,__thread
是 C11 和 C++11 引入的标准,但在一些老的编译器中可能不支持,或者可能需要特定的编译选项来启用。
13.5.8 线程的封装
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
// 设计方的视角
//typedef std::function<void()> func_t;
template<class T>
using func_t = std::function<void(T)>;
template<class T>
class Thread
{
public:
Thread(const std::string &threadname, func_t<T> func, T data)
:_tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data)
{}
static void *ThreadRoutine(void *args) // 类内方法,
{
// (void)args; // 仅仅是为了防止编译器有告警
Thread *ts = static_cast<Thread *>(args);
ts->_func(ts->_data);
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, ThreadRoutine, this/*?*/);
if(n == 0)
{
_isrunning = true;
return true;
}
else return false;
}
bool Join()
{
if(!_isrunning) return true;
int n = pthread_join(_tid, nullptr);
if(n == 0)
{
_isrunning = false;
return true;
}
return false;
}
std::string ThreadName()
{
return _threadname;
}
bool IsRunning()
{
return _isrunning;
}
~Thread()
{}
private:
pthread_t _tid;
std::string _threadname;
bool _isrunning;
func_t<T> _func;
T _data;
};
13.6 Linux线程互斥
13.6.1 背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完 成
13.6.2 互斥量mutex
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
- 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
- 多个线程并发的操作共享变量,会带来一些问题。
多线程抢票逻辑
//Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
// 设计方的视角
//typedef std::function<void()> func_t;
template<class T>
using func_t = std::function<void(T)>;
template<class T>
class Thread
{
public:
Thread(const std::string &threadname, func_t<T> func, T data)
:_tid(0), _threadname(threadname), _isrunning(false), _func(func), _data(data)
{}
static void *ThreadRoutine(void *args) // 类内方法,
{
// (void)args; // 仅仅是为了防止编译器有告警
Thread *ts = static_cast<Thread *>(args);
ts->_func(ts->_data);
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, ThreadRoutine, this/*?*/);
if(n == 0)
{
_isrunning = true;
return true;
}
else return false;
}
bool Join()
{
if(!_isrunning) return true;
int n = pthread_join(_tid, nullptr);
if(n == 0)
{
_isrunning = false;
return true;
}
return false;
}
std::string ThreadName()
{
return _threadname;
}
bool IsRunning()
{
return _isrunning;
}
~Thread()
{}
private:
pthread_t _tid;
std::string _threadname;
bool _isrunning;
func_t<T> _func;
T _data;
};
//main.cc
#include <iostream>
#include <unistd.h>
#include <vector>
#include <cstdio>
#include "Thread.hpp"
// 应用方的视角
std::string GetThreadName()
{
static int number = 1;
char name[64];
snprintf(name, sizeof(name), "Thread-%d", number++);
return name;
}
void Print(int num)
{
while (num)
{
std::cout << "hello world: " << num-- << std::endl;
sleep(1);
}
}
int ticket = 10000; // 全局的共享资源
void GetTicket(pthread_mutex_t *mutex)
{
while (true)
{
// 临界区
if (ticket > 0)
{
// 充当抢票花费的时间
usleep(1000);
printf("get a ticket: %d\n", ticket);
ticket--;
}
else
{
break;
}
}
}
int main()
{
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, nullptr);
std::string name1 = GetThreadName();
Thread<pthread_mutex_t *> t1(name1, GetTicket, &mutex);
std::string name2 = GetThreadName();
Thread<pthread_mutex_t *> t2(name2, GetTicket, &mutex);
std::string name3 = GetThreadName();
Thread<pthread_mutex_t *> t3(name3, GetTicket, &mutex);
std::string name4 = GetThreadName();
Thread<pthread_mutex_t *> t4(name4, GetTicket, &mutex);
t1.Start();
t2.Start();
t3.Start();
t4.Start();
t1.Join();
t2.Join();
t3.Join();
t4.Join();
pthread_mutex_destroy(&mutex);
return 0;
}
一次执行结果:
thread 4 sells ticket:10000
…
thread 4 sells ticket:1
thread 2 sells ticket:0
thread 1 sells ticket:-1
thread 3 sells ticket:-2
为什么可能无法获得正确结果?
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- –ticket 操作本身就不是一个原子操作
–ticket的汇编代码
mov eax, dword ptr
sub eax, 1
mov dword ptr, eax
一条汇编语句是原子的,但多条不是原子的。–ticket在执行过程中可能会被中断或分割。
if (ticket > 0)
也不是原子的,所以有可能进入if之后中断, 导致了上面这些问题。
要解决以上问题,需要做到三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量(mutex)。
13.6.3 互斥量的接口
初始化互斥量有两种方法:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
使用了静态初始化的方式,通过定义一个互斥锁变量并使用PTHREAD_MUTEX_INITIALIZER
进行初始化。这种方式会在编译时为互斥锁变量分配内存并初始化为默认值。int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr)
是动态初始化的方式,通过调用pthread_mutex_init
函数来初始化互斥锁。这种方式会在运行时为互斥锁分配内存并进行初始化。
初始化参数:
- 在静态初始化方式中,使用
PTHREAD_MUTEX_INITIALIZER
初始化的互斥锁会使用默认的属性,无法指定其他属性。 - 在动态初始化方式中,可以通过传递第二个参数
pthread_mutexattr_t *attr
来指定互斥锁的属性,比如设置互斥锁的类型(普通锁、递归锁等)或者其他属性。
灵活性:
- 静态初始化方式简单方便,适用于简单的场景,不需要额外的参数和配置。
- 动态初始化方式更加灵活,可以根据需要设置不同的属性和参数,适用于更复杂的场景。
销毁互斥量:
销毁互斥量需要注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁
- 不要销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号
调用 pthread_ lock
时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量, 那么 pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
增加互斥锁的代码
//main.cc
#include <iostream>
#include <unistd.h>
#include <vector>
#include <cstdio>
#include "Thread.hpp"
// 应用方的视角
std::string GetThreadName()
{
static int number = 1;
char name[64];
snprintf(name, sizeof(name), "Thread-%d", number++);
return name;
}
void Print(int num)
{
while (num)
{
std::cout << "hello world: " << num-- << std::endl;
sleep(1);
}
}
int ticket = 10000; // 全局的共享资源
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //全局的锁
void GetTicket(pthread_mutex_t *mutex)
{
while (true)
{
pthread_mutex_lock(mutex); //申请锁是安全的,原子的
// 临界区
if (ticket > 0)
{
// 充当抢票花费的时间
usleep(1000);
printf("get a ticket: %d\n", ticket);
ticket--;
pthread_mutex_unlock(mutex);
}
else
{
pthread_mutex_unlock(mutex);
break;
}
}
}
int main()
{
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, nullptr);
std::string name1 = GetThreadName();
Thread<pthread_mutex_t *> t1(name1, GetTicket, &mutex);
std::string name2 = GetThreadName();
Thread<pthread_mutex_t *> t2(name2, GetTicket, &mutex);
std::string name3 = GetThreadName();
Thread<pthread_mutex_t *> t3(name3, GetTicket, &mutex);
std::string name4 = GetThreadName();
Thread<pthread_mutex_t *> t4(name4, GetTicket, &mutex);
t1.Start();
t2.Start();
t3.Start();
t4.Start();
t1.Join();
t2.Join();
t3.Join();
t4.Join();
pthread_mutex_destroy(&mutex);
return 0;
}
加锁由程序员自己保证
13.6.4 互斥量实现原理探究
lock:
mov $0, %al
xchgb %al, mutex
if (al寄存器的内容 > 0){
return 0;
}
else{
挂起等待;
}
goto lock;
unlock:
movb $1, mutex
唤醒等待mutex的线程
return 0;
这段伪代码描述了一个简单的互斥量的实现
大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相互交换。
由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
寄存器硬件在cpu中只有一套,但是寄存器内容每个线程都有一份,属于自己的上下文。
xchgb的作用:将一个共享的mutex资源,交换到自己的上下文中,属于线程自己。
13.6.5 互斥锁的封装
//LockGuard.hpp
#pragma once
#include <pthread.h>
// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
Mutex(pthread_mutex_t *lock):_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock): _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
pthread_mutex_lock(td->pmutex);
//临界区
{
LockGuard lockguard(td->pmutex);
//.....
}
利用构造就可以自动加锁解锁
###13.7 可重入VS线程安全
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们 称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重 入函数,否则,是不可重入函数。
常见的线程不安全的情况
-
不保护共享变量的函数
-
函数状态随着被调用,状态发生变化的函数
-
返回指向静态变量指针的函数
-
调用线程不安全函数的函数
常见的线程安全的情况
-
每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
-
类或者接口对于线程来说都是原子操作
-
多个线程之间的切换不会导致该接口的执行结果存在二义性
常见不可重入的情况
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
常见可重入的情况
- 不使用全局变量或静态变量
- 不使用用malloc或者new开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
可重入与线程安全联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
13.8 常见的锁概念
死锁
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。
死锁四个必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁
- 死锁检测算法
- 银行家算法
13.9 Linux线程同步
###13.9.1 条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情 况就需要用到条件变量。
条件变量提供了一种有效的机制,使得线程可以在等待某些条件满足时进行阻塞,而不是浪费 CPU 时间在轮询上,从而提高了线程的效率和系统的整体性能。
13.9.2 同步概念与竞态条件
同步是指在多线程环境下保证数据安全的同时,通过某种机制使得线程按照特定的顺序访问临界资源,以避免出现饥饿(Starvation)等问题。
饥饿是指某个线程由于种种原因无法获得所需的资源而无法继续执行的情况,通常是由于其他线程占用了资源或者优先级较高而导致。
通过同步机制,可以在保证数据安全的前提下,协调多个线程的执行顺序,从而避免饥饿问题的发生。
竞态条件则是指由于线程执行顺序的不确定性,导致程序的行为出现异常。在并发编程中,由于线程执行顺序的不可预测性,可能会导致多个线程同时访问共享资源,从而产生意料之外的结果。竞态条件通常发生在多个线程同时修改某个共享资源的情况下,由于执行时序不确定而导致的问题。例如,两个线程同时检查某个变量的值,然后根据值的不同执行不同的操作,但由于执行时序的不确定性,导致最终结果出现异常。
综上所述,同步机制是为了解决竞态条件而设计的,通过同步机制可以确保在多线程环境中安全地访问临界资源,并且按照一定的顺序执行,从而避免了竞态条件可能引发的问题。
13.9.3 条件变量函数
pthread_cond_init
:- 原型:
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
- 功能:初始化条件变量。通过该函数初始化的条件变量是动态分配的,并且可以在多个线程之间共享。
cond
参数是指向要初始化的条件变量的指针,attr
参数是条件变量的属性,通常可以设置为NULL
,表示使用默认属性。
- 原型:
pthread_cond_destroy
:- 原型:
int pthread_cond_destroy(pthread_cond_t *cond);
- 功能:销毁条件变量。该函数释放由
pthread_cond_init
初始化的条件变量所占用的资源,释放后的条件变量不能再被使用。
- 原型:
pthread_cond_wait
:- 原型:
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
- 功能:等待条件变量。该函数使当前线程等待条件变量的信号,同时释放
mutex
参数指向的互斥锁,并阻塞在条件变量上。当另一个线程调用pthread_cond_signal
或pthread_cond_broadcast
发送信号时,该函数会使等待的线程重新获得互斥锁,并继续执行。
- 原型:
pthread_cond_broadcast
:- 原型:
int pthread_cond_broadcast(pthread_cond_t *cond);
- 功能:广播信号。该函数用于向等待条件变量的所有线程发送信号,唤醒所有等待的线程,让它们可以继续执行。
- 原型:
pthread_cond_signal
:- 原型:
int pthread_cond_signal(pthread_cond_t *cond);
- 功能:发送信号。该函数用于向等待条件变量的某一个线程发送信号,唤醒其中一个等待的线程,让它可以继续执行。
- 原型:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
可以静态分配全局初始化变量,这种方式的不用销毁。
代码示例:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tickets = 1000;
void *threadRoutine(void *args)
{
std::string name = static_cast<const char*>(args);
while(true)
{
// sleep(1);
pthread_mutex_lock(&mutex); // 所以需要判断:加锁和解锁之间,我们往往要访问临界资源,可是,临界资源不一定是满足条件的
if(tickets > 0)
{
std::cout << name<< ", get a ticket: " << tickets-- << std::endl; // 模拟抢票
usleep(1000);
}
else
{
std::cout << "没有票了," << name << std::endl; // 就是每一个线程在大量的申请锁和释放锁
// 1. 让线程在进行等待的时候,会自动释放锁
// 2. 线程被唤醒的时候,是在临界区内唤醒的,当线程被唤醒, 线程在pthread_cond_wait返回的时候,要重新申请并持有锁
// 3. 当线程被唤醒的时候,重新申请并持有锁本质是也要参与锁的竞争的!!
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);
}
}
// 主线程
int main()
{
pthread_t t1, t2, t3;
pthread_create(&t2, nullptr, threadRoutine, (void*)"thread-2");
pthread_create(&t1, nullptr, threadRoutine, (void*)"thread-1");
pthread_create(&t3, nullptr, threadRoutine, (void*)"thread-3");
sleep(5); // for test, 5s开始进行让cond成立,唤醒一个线程
while(true)
{
// pthread_cond_signal(&cond);
// pthread_cond_broadcast(&cond);
// 临时
sleep(6);
pthread_mutex_lock(&mutex);
tickets += 1000;
pthread_mutex_unlock(&mutex);
// pthread_cond_broadcast(&cond);
pthread_cond_signal(&cond);
}
pthread_join(t1, nullptr);
pthread_join(t2, nullptr);
pthread_join(t3, nullptr);
return 0;
}
13.9.4 条件变量使用规范
- 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
- 给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
13.10 生产消费者模型
生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:
- 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
- 两种角色: 生产者和消费者。(通常由进程或线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点
- 解耦
- 支持并发
- 支持忙闲不均
13.10.1 基于BlockingQueue的生产者消费者模型
BlockingQueue
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
C++ queue模拟阻塞队列的生产消费模型
BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "LockGuard.hpp"
const int defaultcap = 5; // for test
template<class T>
class BlockQueue
{
public:
BlockQueue(int cap = defaultcap):_capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.size() == 0;
}
void Push(const T &in) // 生产者的
{
LockGuard lockguard(&_mutex);
// pthread_mutex_lock(&_mutex); // 2. lockguard 3. 重新理解生产消费模型(代码+理论) 4. 代码整体改成多生产,多消费
while(IsFull()) // 写出来的代码,具有较强的鲁棒、健壮性
{
// 阻塞等待
pthread_cond_wait(&_p_cond, &_mutex); // 1. 关于pthread_cond_wait在进一步理解!
}
_q.push(in);
// if(_q.size() > _productor_water_line) pthread_cond_signal(&_c_cond);
pthread_cond_signal(&_c_cond);
// pthread_mutex_unlock(&_mutex);
}
void Pop(T *out) // 消费者的
{
LockGuard lockguard(&_mutex);
// pthread_mutex_lock(&_mutex);
while(IsEmpty())
{
// 阻塞等待
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front();
_q.pop();
// if(_q.size() < _consumer_water_line) pthread_cond_signal(&_p_cond);
pthread_cond_signal(&_p_cond);
// pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity; // _q.size() == _capacity, 满了,不能在生产,_q.size() == 0, 空,不能消费了
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 给生产者的
pthread_cond_t _c_cond; // 给消费者的
// int _consumer_water_line; // _consumer_water_line = _capacity / 3 * 2
// int _productor_water_line; // _productor_water_line = _capacity / 3
};
LockGuard.hpp
#pragma once
#include <pthread.h>
// 不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
Mutex(pthread_mutex_t *lock):_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);
}
void Unlock()
{
pthread_mutex_unlock(_lock);
}
~Mutex()
{}
private:
pthread_mutex_t *_lock;
};
class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock): _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
const int defaultvalue = 0;
enum
{
ok = 0,
div_zero,
mod_zero,
unknow
};
const std::string opers = "+-*/%)(&";
class Task
{
public:
Task()
{
}
Task(int x, int y, char op)
: data_x(x), data_y(y), oper(op), result(defaultvalue), code(ok)
{
}
void Run()
{
switch (oper)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
{
if (data_y == 0)
code = div_zero;
else
result = data_x / data_y;
}
break;
case '%':
{
if (data_y == 0)
code = mod_zero;
else
result = data_x % data_y;
}
break;
default:
code = unknow;
break;
}
}
void operator()()
{
Run();
sleep(2);
}
std::string PrintTask()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(data_x);
s += oper;
s += std::to_string(data_y);
s += "=";
s += std::to_string(result);
s += " [";
s += std::to_string(code);
s += "]";
return s;
}
~Task()
{
}
private:
int data_x;
int data_y;
char oper; // + - * / %
int result;
int code; // 结果码,0: 结果可信 !0: 结果不可信,1,2,3,4
};
Main.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
class ThreadData
{
public:
BlockQueue<Task> *bq;
std::string name;
};
void *consumer(void *args)
{
ThreadData *td = (ThreadData*)args;
// BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// sleep(1);
Task t;
// 1. 消费数据 bq->pop(&data);
td->bq->Pop(&t);
// 2. 进行处理
// t.Run();
t(); // 处理任务的时候,会不会消耗时间呢???
std::cout << "consumer data: " << t.PrintResult() << ", " << td->name << std::endl;
// 注意:消费者没有sleep!!!
}
return nullptr;
}
void *productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
// BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
// BlockQueue *bq = static_cast<BlockQueue *>(args); // 生产者和消费者同时看到了一份公共资源
while (true)
{
// 1. 有数据,从具体场景中来,从网络中拿数据
// 生产前,你的任务从哪里来的呢???
int data1 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODO
usleep(rand()%123);
int data2 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODO
usleep(rand()%123);
char oper = opers[rand() % (opers.size())];
Task t(data1, data2, oper);
std::cout << "productor task: " << t.PrintTask() << std::endl;
// 2. 进行生产
// bq->Push(data);
bq->Push(t);
sleep(1);
// for debug
}
return nullptr;
}
int main()
{
srand((uint16_t)time(nullptr) ^ getpid() ^ pthread_self()); // 只是为了形成更随机的数据
// 可不可以让生产者给消费者分派任务呢??
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[2]; // 消费者和生产者
ThreadData *td = new ThreadData();
td->bq = bq;
td->name = "thread-1";
pthread_create(&c[0], nullptr, consumer, td);
ThreadData *td1 = new ThreadData();
td1->bq = bq;
td1->name = "thread-2";
pthread_create(&c[0], nullptr, consumer, td1);
ThreadData *td2 = new ThreadData();
td2->bq = bq;
td2->name = "thread-3";
pthread_create(&c[0], nullptr, consumer, td2);
pthread_create(&p[0], nullptr, productor, bq);
pthread_create(&p[1], nullptr, productor, bq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
return 0;
}
testblockqueue:Main.cc
g++ -o $@ $^ -lpthread
.PHONY:clean
clean:
rm testblockqueue
13.10.2 POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
信号量概念
信号量是一种用于控制对共享资源的访问的同步机制。它本质上是一个计数器,用于表示可用资源的数量。通过对信号量进行操作,可以实现对临界资源的更细粒度的管理。
信号量的PV操作
信号量的PV操作是信号量的基本操作,用于修改信号量的值。PV操作通常由两个函数组成:P操作(也称为wait操作)和V操作(也称为signal操作)。
- P操作(Wait操作):
- P操作用于请求一个资源,即减小信号量的值。
- 如果信号量的值大于0,则将其减1,并继续执行。
- 如果信号量的值为0,则阻塞当前线程或进程,直到信号量的值变为大于0为止。
- V操作(Signal操作):
- V操作用于释放一个资源,即增加信号量的值。
- 当有线程或进程释放了一个资源后,可以调用V操作来增加信号量的值。
- 如果有其他线程或进程在等待信号量的值变为大于0,则V操作会唤醒其中一个等待的线程或进程。
信号量函数
-
sem_init:
-
功能:
sem_init
用于初始化一个新的信号量。 -
参数
:
sem
: 指向要初始化的信号量的指针。pshared
: 用于指定信号量是在进程间共享还是在线程间共享的标志。如果设置为0,则信号量在进程内部共享。如果设置为非零值,则信号量在进程之间共享。value
: 指定信号量的初始值。
-
返回值: 如果成功,则返回0;否则返回-1,并设置
errno
来指示错误原因。
-
-
sem_destroy:
- 功能:
sem_destroy
用于销毁一个已经初始化的信号量。 - 参数:
sem
指向要销毁的信号量的指针。 - 返回值: 如果成功,则返回0;否则返回-1,并设置
errno
来指示错误原因。
- 功能:
-
sem_wait:
- 功能:
sem_wait
用于等待信号量的值减小到大于0。 - 参数:
sem
指向要等待的信号量的指针。 - 返回值: 如果成功,则返回0;否则返回-1,并设置
errno
来指示错误原因。
- 功能:
-
sem_post:
- 功能:
sem_post
用于将信号量的值增加1。 - 参数:
sem
指向要增加值的信号量的指针。 - 返回值: 如果成功,则返回0;否则返回-1,并设置
errno
来指示错误原因。
- 功能:
13.10.3 基于环形队列的生产消费模型
原理:
- 环形队列采用数组模拟,用模运算来模拟环状特性
环形队列
- 生产者不能把消费者套一个圈
- 消费者不能超过生产者
生产者和消费者,只有两种场景会指向同一个位置:
a. 为空 只能让生产者跑
b. 为满 只能让消费者跑
其他情况下,我们根本就不会指向同一位置(多线程并发进入临界区)。
对资源的认识:
生产者(producer)->空间
消费者(customer)->数据
p->sem_space:N
c->sem_data:0
伪代码:
生产者:
1. P(sem_space) // 获取可用空间资源
2. // 生产行为(向环形队列中添加数据)
3. V(sem_data) // 释放数据资源,通知消费者可以消费
消费者:
1. P(sem_data) // 获取可用数据资源
2. // 消费行为(从环形队列中取出数据)
3. V(sem_space) // 释放空间资源,通知生产者可以生产
在这个模型中,sem_space
控制着可用空间的数量,sem_data
控制着可用数据的数量。初始时,sem_space
的初始值为环形队列的大小,表示有足够的空间可以供生产者使用,而 sem_data
的初始值为0,表示没有可用的数据供消费者使用。
在生产者和消费者之间的互斥和同步控制实现了环形队列的生产和消费。生产者在向队列添加数据之前必须先获取空间资源,如果队列已满则会被阻塞,直到有空间可用。消费者在取出数据之前必须先获取数据资源,如果队列为空则会被阻塞,直到有数据可用。
这样,生产者和消费者之间就能够良好地协同工作,避免了竞态条件和数据不一致性问题。
示例代码:
//main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include "Log.hpp"
#include <unistd.h>
#include <pthread.h>
#include <ctime>
void *Productor(void *args)
{
// sleep(5);
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
// 数据怎么来的?
// 1. 有数据,从具体场景中来,从网络中拿数据
// 生产前,你的任务从哪里来的呢???
int data1 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODO
usleep(rand() % 123);
int data2 = rand() % 10; // [1, 10] // 将来深刻理解生产消费,就要从这里入手,TODO
usleep(rand() % 123);
char oper = opers[rand() % (opers.size())];
Task t(data1, data2, oper);
std::cout << "productor task: " << t.PrintTask() << std::endl;
// rq->push();
rq->Push(t);
// sleep(1);
}
}
void *Consumer(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
// sleep(1);
Task t;
rq->Pop(&t);
t();
std::cout << "consumer done, data is : " << t.PrintResult() << std::endl;
}
}
int main()
{
Log log;
log.LogMessage(Debug, "hello %d, %s, %f", 10, "bite", 3.14);
log.LogMessage(Warning, "hello %d, %s, %f", 10, "bite", 3.14);
log.LogMessage(Error, "hello %d, %s, %f", 10, "bite", 3.14);
log.LogMessage(Info, "hello %d, %s, %f", 10, "bite", 3.14);
// // 单生产,单消费:321
// // 如果是多个生产者,多个消费者呢???
// srand((uint64_t)time(nullptr) ^ pthread_self());
// pthread_t c[3], p[2];
// // 唤醒队列中只能放置整形???
// // RingQueue<int> *rq = new RingQueue<int>();
// RingQueue<Task> *rq = new RingQueue<Task>();
// pthread_create(&p[0], nullptr, Productor, rq);
// pthread_create(&p[1], nullptr, Productor, rq);
// pthread_create(&c[0], nullptr, Consumer, rq);
// pthread_create(&c[1], nullptr, Consumer, rq);
// pthread_create(&c[2], nullptr, Consumer, rq);
// pthread_join(p[0], nullptr);
// pthread_join(p[1], nullptr);
// pthread_join(c[0], nullptr);
// pthread_join(c[1], nullptr);
// pthread_join(c[2], nullptr);
return 0;
}
//RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include "LockGuard.hpp"
const int defaultsize = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
public:
RingQueue(int size = defaultsize)
: _ringqueue(size), _size(size), _p_step(0), _c_step(0)
{
sem_init(&_space_sem, 0, size);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_p_mutex, nullptr);
pthread_mutex_init(&_c_mutex, nullptr);
}
void Push(const T &in)
{
// 生产
// 先加锁,还是先申请信号量?
P(_space_sem);
{
LockGuard lockGuard(&_p_mutex);
_ringqueue[_p_step] = in;
_p_step++;
_p_step %= _size;
}
V(_data_sem);
}
void Pop(T *out)
{
// 消费
P(_data_sem);
{
LockGuard lockGuard(&_c_mutex);
*out = _ringqueue[_c_step];
_c_step++;
_c_step %= _size;
}
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_p_mutex);
pthread_mutex_destroy(&_c_mutex);
}
private:
std::vector<T> _ringqueue;
int _size;
int _p_step; // 生产者的生产位置
int _c_step; // 消费位置
sem_t _space_sem; // 生产者
sem_t _data_sem; // 消费者
pthread_mutex_t _p_mutex;
pthread_mutex_t _c_mutex;
};
先加锁还是先申请信号量?
- 如果先加锁,那么当多个线程同时竞争锁时,只有一个线程能够获取到锁,而其他线程需要等待。这会引入竞争,降低了程序的并发性能。
- 先申请信号量,在加锁进程运行的期间,其他进程已经分配好信号量了,只要等待解锁,这大大减少了竞争。
13.11 线程池
#pragma once
#include <iostream>
#include <queue>
#include <vector>
#include <pthread.h>
#include <functional>
#include "Log.hpp"
#include "Thread.hpp"
#include "LockGuard.hpp"
static const int defaultnum = 5;
class ThreadData
{
public:
ThreadData(const std::string &name) : threadname(name)
{
}
~ThreadData()
{
}
public:
std::string threadname;
};
template <class T>
class ThreadPool
{
private:
ThreadPool(int thread_num = defaultnum) : _thread_num(thread_num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 构建指定个数的线程
for (int i = 0; i < _thread_num; i++)
{
// 待优化
std::string threadname = "thread-";
threadname += std::to_string(i + 1);
ThreadData td(threadname);
// Thread<ThreadData> t(threadname,
// std::bind(&ThreadPool<T>::ThreadRun, this, std::placeholders::_1), td);
// _threads.push_back(t);
_threads.emplace_back(threadname,
std::bind(&ThreadPool<T>::ThreadRun, this,
std::placeholders::_1),
td);
lg.LogMessage(Info, "%s is created...\n", threadname.c_str());
}
}
ThreadPool(const ThreadPool<T> &tp) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T>) = delete;
public:
// 有线程安全问题的
static ThreadPool<T> *GetInstance()
{
if (instance == nullptr)
{
LockGuard lockguard(&sig_lock);
if (instance == nullptr)
{
lg.LogMessage(Info, "创建单例成功...\n");
instance = new ThreadPool<T>();
}
}
return instance;
}
bool Start()
{
// 启动
for (auto &thread : _threads)
{
thread.Start();
lg.LogMessage(Info, "%s is running ...\n", thread.ThreadName().c_str());
}
return true;
}
void ThreadWait(const ThreadData &td)
{
lg.LogMessage(Debug, "no task, %s is sleeping...\n", td.threadname.c_str());
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void checkSelf()
{
// 1. _task_num > _task_num_high_water && _thread_num < _thread_num_high_water
// 创建更多的线程,并且更新_thread_num
// 2. _task_num == _task_num_low_water && _thread_num >= _thread_num_high_water
// 把自己退出了,并且更新_thread_num
}
void ThreadRun(ThreadData &td)
{
while (true)
{
// checkSelf()
checkSelf();
// 取任务
T t;
{
LockGuard lockguard(&_mutex);
while (_q.empty())
{
ThreadWait(td);
lg.LogMessage(Debug, "thread %s is wakeup\n", td.threadname.c_str());
}
t = _q.front();
_q.pop();
}
// 处理任务
t();
lg.LogMessage(Debug, "%s handler task %s done, result is : %s\n",
td.threadname, t.PrintTask().c_str(), t.PrintResult().c_str());
}
}
void Push(T &in)
{
lg.LogMessage(Debug, "other thread push a task, task is : %s\n", in.PrintTask().c_str());
LockGuard lockguard(&_mutex);
_q.push(in);
ThreadWakeup();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
// for debug
void Wait()
{
for (auto &thread : _threads)
{
thread.Join();
}
}
private:
std::queue<T> _q;
std::vector<Thread<ThreadData>> _threads;
int _thread_num;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
static ThreadPool<T> *instance;
static pthread_mutex_t sig_lock;
// 扩展1:
// int _thread_num;
// int _task_num;
// int _thread_num_low_water; // 3
// int _thread_num_high_water; // 10
// int _task_num_low_water; // 0
// int _task_num_high_water; // 30
// 扩展2: 多进程+多线程
// int number{1};
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::sig_lock = PTHREAD_MUTEX_INITIALIZER;
// threadpool.conf
thread_num_low_water:3
thread_num_high_water:10
task_num_low_water:0
task_num_high_water:30
13.12 日志系统
//Log.hpp
#pragma once
#include <iostream>
#include <fstream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
enum
{
Debug = 0,
Info,
Warning,
Error,
Fatal
};
enum
{
Screen = 10,
OneFile,
ClassFile
};
std::string LevelToString(int level)
{
switch (level)
{
case Debug:
return "Debug";
case Info:
return "Info";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "Unknown";
}
}
const int defaultstyle = Screen;
const std::string default_filename = "log.";
const std::string logdir = "log";
class Log
{
public:
Log() : style(defaultstyle), filename(default_filename)
{
mkdir(logdir.c_str(), 0775);
}
void Enable(int sty) //
{
style = sty;
}
std::string TimeStampExLocalTime()
{
time_t currtime = time(nullptr);
struct tm *curr = localtime(&currtime);
char time_buffer[128];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
curr->tm_year + 1900, curr->tm_mon + 1, curr->tm_mday,
curr->tm_hour, curr->tm_min, curr->tm_sec);
return time_buffer;
}
void WriteLogToOneFile(const std::string &logname, const std::string &message)
{
umask(0);
int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);
if(fd < 0) return;
write(fd, message.c_str(), message.size());
close(fd);
// std::ofstream out(logname);
// if (!out.is_open())
// return;
// out.write(message.c_str(), message.size());
// out.close();
}
void WriteLogToClassFile(const std::string &levelstr, const std::string &message)
{
std::string logname = logdir;
logname += "/";
logname += filename;
logname += levelstr;
WriteLogToOneFile(logname, message);
}
void WriteLog(const std::string &levelstr, const std::string &message)
{
switch (style)
{
case Screen:
std::cout << message;
break;
case OneFile:
WriteLogToClassFile("all", message);
break;
case ClassFile:
WriteLogToClassFile(levelstr, message);
break;
default:
break;
}
}
void LogMessage(int level, const char *format, ...) // 类C的一个日志接口
{
char leftbuffer[1024];
std::string levelstr = LevelToString(level);
std::string currtime = TimeStampExLocalTime();
std::string idstr = std::to_string(getpid());
char rightbuffer[1024];
va_list args; // char *, void *
va_start(args, format);
// args 指向了可变参数部分
vsnprintf(rightbuffer, sizeof(rightbuffer), format, args);
va_end(args); // args = nullptr;
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s] ",
levelstr.c_str(), currtime.c_str(), idstr.c_str());
std::string loginfo = leftbuffer;
loginfo += rightbuffer;
WriteLog(levelstr, loginfo);
}
// void operator()(int level, const char *format, ...)
// {
// LogMessage(int level, const char *format, ...)
// }
~Log() {}
private:
int style;
std::string filename;
};
Log lg;
class Conf
{
public:
Conf()
{
lg.Enable(Screen)
}
~Conf()
{}
};
Conf conf;
//Main.cc
#include <iostream>
#include "Log.hpp"
int main()
{
Log lg;
lg.Enable(OneFile);
lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Debug, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Info, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Warning, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Error, "this is a log message: %d, %lf\n", 123, 3.14);
lg.LogMessage(Fatal, "this is a log message: %d, %lf\n", 123, 3.14);
return 0;
}
13.13 线程安全的单例模式
单例模式的特点
在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这 些数据.
饿汉实现方式和懒汉实现方式
例子:
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式.
懒汉方式最核心的思想是 “延时加载”. 从而能够优化服务器的启动速度.
饿汉方式实现单例模式
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
懒汉方式实现单例模式
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
存在一个严重的问题, 线程不安全.
第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
但是后续再次调用, 就没有问题了.
懒汉方式实现单例模式(线程安全版本)
// 懒汉模式, 线程安全
template <typename T>
class Singleton {
volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T* GetInstance() {
if (inst == NULL) {
lock.lock();
if (inst == NULL) {
inst = new T();
}
lock.unlock();
}
return inst;
}
};
注意事项:
- 加锁解锁的位置
- 双重 if 判定, 避免不必要的锁竞争
- volatile关键字防止过度优化
13.14 STL,智能指针和线程安全
STL中的容器是否是线程安全的?
不是.
原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响. 而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶). 因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全.
智能指针是否是线程安全的?
对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这 个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数.
13.15 其他常见的各种锁
- 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
- 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
- CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
- 自旋锁,公平锁,非公平锁?
13.16 读者写者问题
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
当前锁状态 | 读锁要求 | 写锁要求 |
---|---|---|
无锁 | 可以 | 可以 |
读锁 | 可以 | 阻塞 |
写锁 | 阻塞 | 阻塞 |
三种关系:
读读:没关系,并发
写写:互斥
读写:互斥&&同步
读写锁接口
#include <pthread.h>
// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
// 销毁读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
// 获取读锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
// 获取写锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
// 尝试获取读锁
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
// 尝试获取写锁
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
// 解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
#include <stdio.h>
#include <pthread.h>
pthread_rwlock_t rwlock;
void* reader(void* arg) {
pthread_rwlock_rdlock(&rwlock);
printf("Reader acquired the lock.\n");
// 读取数据...
pthread_rwlock_unlock(&rwlock);
printf("Reader released the lock.\n");
return NULL;
}
void* writer(void* arg) {
pthread_rwlock_wrlock(&rwlock);
printf("Writer acquired the lock.\n");
// 写入数据...
pthread_rwlock_unlock(&rwlock);
printf("Writer released the lock.\n");
return NULL;
}
int main() {
pthread_rwlock_init(&rwlock, NULL);
pthread_t readerThread, writerThread;
pthread_create(&readerThread, NULL, reader, NULL);
pthread_create(&writerThread, NULL, writer, NULL);
pthread_join(readerThread, NULL);
pthread_join(writerThread, NULL);
pthread_rwlock_destroy(&rwlock);
return 0;
}
读写锁原理
sem_t sem_w(1)
sem_t sem_r(1)
int reader_count = 0
reader:
P(sem_r)
if(reader_count==0)
P(sem_w);
reader_count++;
V(sem_r);
z
writer:
P(sem_w);
read操作:
P(sem_r);
reader_count--;
if(reader_count==0)
V(sem_w);
V(sem_r);
write操作:
V(sem_w);
13.17 自旋锁
自旋锁是一种用于保护共享资源的同步机制,它允许线程反复检查锁是否可用,而不是立即被阻塞。当自旋锁已经被另一个线程获取时,线程会一直处于忙等待状态,直到锁被释放。自旋锁通常用于临界区很短小且共享资源竞争激烈的情况下。
基本接口:
#include <pthread.h>
// 初始化自旋锁
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
// 销毁自旋锁
int pthread_spin_destroy(pthread_spinlock_t *lock);
// 获取自旋锁
int pthread_spin_lock(pthread_spinlock_t *lock);
// 尝试获取自旋锁
int pthread_spin_trylock(pthread_spinlock_t *lock);
// 释放自旋锁
int pthread_spin_unlock(pthread_spinlock_t *lock);