目录
一、线程的创建
1、介绍thread类
2、创建线程
二、线程的2种工作方式
其一:关联主线程
其二:拆离主线程
两种工作方式的使用-代码示例
detach
join
三、线程安全问题
1、什么是线程安全
2、怎么使程序线程安全
保护对共享数据的操作-加互斥锁
1)互斥锁 mutex
互斥锁介绍
互斥锁的使用
a. 使用lock_guard
b.使用unique_lock
c.使用mutex的成员函数lock和unlock
2)读写锁
读写锁使用
采用具有“自保能力”的共享变量-使用原子类型
什么是原子操作
原子类型
原子类型使用
四、线程间的通讯和同步-条件变量
条件变量condition_variable
条件变量(condition_variable类)的使用
介绍关键成员函数(等待和唤醒)
wait(等待)
notify_one 和 notify_all(唤醒)
条件变量的使用
五、异步执行
C++中的异步执行
相关方法/类介绍
std::async
std::future
异步执行实现
本篇文章主要介绍在C++中如何实现基本的并发编程。介绍了C++11、14、17的相关C++特性。面向对并发和多线程有些基本了解,但是不知如何使用C++新特性去实现并发编程的小伙伴们。本篇文章对这类疑问起到一个入门的参考作用,相关函数和类可能介绍的不是十分全面,但是基本用法都有说明,如果想要了解更多,可以参考C++标准库官方文档等资料。
一、线程的创建
创建线程,需要使用thread类
1、介绍thread类
首先介绍下thread类
用途:用于创建和管理线程
一些函数介绍:
get_id: 获取当前线程的唯一标识
sleep_for: 休眠线程一段时间(输入参数为时间段,比如“1s”)
this_thread::sleep_for(chrono::seconds(1));
sleep_until: 休眠线程,直到指定时间(输入参数为时间点,比如“今天22:00”)
2、创建线程
要创建一个线程,肯定要在线程中执行一个任务,该任务需要使用函数来封装。所以线程在创建时,必须要传入一个函数,此函数可以是具名函数,也可以是匿名函数。此外,如果函数需要传参,我们还要处理参数传入问题。
当执行有参函数时,只需要在函数指针后依次传入参数即可。
【注】不理解匿名函数的同学可以参考博主另一篇文章 C++ lambda表达式_c++ lamda捕获范围-CSDN博客
代码示例
#include <thread>
using namespace std;
int main(){
//1、创建新线程并执行无参函数
//执行具名函数
thread th1(func_th);
//执行匿名函数
thread th11([]{cout << "lambda func" << endl;});
//2、创建新线程并执行有参函数
//执行具名函数
thread th2(func_th_param, 8);
//执行匿名函数
thread th22([](int n){cout << "lambda func " << n << endl;}
, 9);
if (th1.joinable()) th1.join(); //关联主函数,后面会介绍
if (th2.joinable()) th2.join();
if (th11.joinable()) th11.join();
if (th22.joinable()) th22.join();
cout << "main th end!" << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
需要注意的是,当需要执行的目标函数需要传入引用类型的参数时,需要在线程创建时使用ref(引用)或 cref(const 引用)显式获取变量的引用传入,否则编译报错。
void func_th1(int& n)
{
cout << "new th " << endl;
}
void func_th2(const int& n)
{
cout << "new th " << endl;
}
int main(){
int n = 0;
//thread th(func_th1, n); //会报错,因为类型不匹配
thread th1(func_th1, ref(n)); //正确,获取n的引用传入
thread th2(func_th2, cref(n)); //正确,获取n的常量引用传入
if (th1.joinable()) th1.join();
if (th2.joinable()) th2.join();
cout << "main th end!" << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
二、线程的2种工作方式
此节还是介绍thread类,需要用到的成员函数:join、detach、joinable。
其一:关联主线程
调用join方法,则该线程和主线程进行关联,主线程会等待子线程结束后再继续向下执行。在调用join前,一般使用joinable判断是否可以关联。当发生:该线程已经被join或detach过;该线程已经被销毁等情况时,直接关联会产生错误。
例如下列代码,将会报错
void func_th(int n)
{
while(n--)
{
//cout << "* ";
}
cout << endl;
}
int main(){
thread th(func_th, 10);
th.join();
th.join();
return 0;
}
其二:拆离主线程
调用detach方法,则该线程从主线程剥离,主线程不会等待该线程的执行,主线程继续执行。该线程由操作系统负责释放。可以理解为一种后台工作的模式。
两种工作方式的使用-代码示例
detach
#include <thread>
void func_th()
{
cout << "th start..." << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
cout << "th end..." << endl;
}
int main(){
cout << "main th start!" << endl;
thread th(func_th);
th.detach();
cout << "main th end!" << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
输出:
由此可见,使用detach分离线程,主线程不会等待该线程执行。
join
main函数修改如下
int main(){
cout << "main th start!" << endl;
thread th(func_th);
if (th.joinable()) th.join();
cout << "main th end!" << endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
输出:
使用join关联主线程,则主线程在执行到join()被调用处,将等待子线程执行完成后再继续向下执行。
三、线程安全问题
在多线程的系统中,线程安全问题是一定要考虑的。
1、什么是线程安全
线程安全,是指在多线程环境中,对共享数据的访问和操作不会导致数据的不一致或不正确,程序功能正常执行的一种状态。当一个程序或库可以在并发执行时,不对其进行额外的同步或保护的条件下,依然能够保持其预期的行为,则这个程序或库是线程安全的。
如下列代码
函数func_shared被多个线程调用,同一时刻可能存在多个线程对共享变量data进行修改,data变量容易引发无法预见的错误。这段程序不是线程安全的。
vector<int> data;
void func_shared(char n)
{
data.pop_back();
data.push_back(n);
}
int main(){
thread th1(func_shared, 8);
thread th2(func_shared, 9);
if (th1.joinable()) th1.join();
if (th2.joinable()) th2.join();
return 0;
}
在多线程编程中,如果忽视线程安全。则会引发一些不可预见的错误。比较典型的,比如发生竞争条件,甚至引起数据损坏、应用崩溃等。其他错误都好理解,下面介绍下竞争条件问题。
竞争条件:
在多个线程同时读取变量、进行计算、并写回结果的情况下,如果该共享变量不是原子类型(后面会介绍原子类型),也没有同步机制的保护,则会因为写回操作的执行顺序问题,导致结果不符合预期。
更详细的解释,例如下列代码,启动100个线程,均对共享变量dat进行自增操作。但是可能在某一时刻,线程A,读取到dat等于700,并进行+1,当A没有写回dat时,线程B随即读到dat也等于700,则这两次计算其实只将dat加1等于701。所以最终结果,很有可能不是我们预期的 10000*100。
这就是发生了竞争条件问题。
#include <iostream>
#include <algorithm>
#include <vector>
#include <atomic>
#include <thread>
using namespace std;
int dat(0);
void func(int n)
{
for(int i = 0; i < n; ++i)
{
dat++;
}
}
int main(){
vector<thread> ths;
for(int i = 0; i < 100; ++i)
{
ths.emplace_back(func, 10000);
}
for(auto& th : ths)
{
th.join();
}
cout << dat << endl;
return 0;
}
2次运行结果:
2、怎么使程序线程安全
从上述例子可见,由于1)共享数据本身不存在保护自己的能力;2)对共享数据的操作没有提前保护。导致这段程序在并发环境下的线程不安全。那么,我们可以从这2方面入手使整段程序线程安全。互斥锁具有如下特性:
保护对共享数据的操作-加互斥锁
1)互斥锁 mutex
互斥锁介绍
互斥锁是最常用的同步机制之一,在访问共享资源前添加互斥锁,能够保证同时只有一个线程能够访问该资源,其他线程必须等待当前持有锁的线程释放锁后才能进行访问。
- 独占性: 同一时间只允许一个线程持有锁。
- 阻塞等待: 如果某个线程尝试获取锁而发现锁已经被其他线程占用,则该线程会被阻塞,直到锁被释放。
互斥锁的使用
a. 使用lock_guard
std::lock_guard是一个 RAII(资源获取即初始化)风格的模板类,用于自动管理mutex的加锁和解锁。在需要加锁的作用域内使用lock_guard加锁,离开作用域范围后自动解锁。使用方法如下
#include <thread>
#include <mutex>
vector<int> data;
std::mutex mtx;
void func_shared(char n)
{
{
std::lock_guard<std::mutex> lock(mtx); //加锁
data.pop_back();
data.push_back(n);
}
//离开{}范围,即离开了作用域自动解锁,如果不添加{}缩小作用域,则整个函数都将处于锁定范围
}
b.使用unique_lock
unique_lock是另一种RAll风格的互斥锁管理模板类,和lock_guard不同的是,unique_lock除了会在离开作用域后自动解锁,也可以在作用域内手动解锁,并且可以在需要锁定的时候再次加锁,所以比lock_guard更加灵活。
vector<int> data;
std::mutex mtx;
void func_shared(char n)
{
{
std::unique_lock<std::mutex> lock(mtx); //加锁
data.pop_back();
lock.unlock(); //手动解锁
cout << data.size() << endl;
lock.lock(); //二次锁定
data.push_back(n);
}
//离开了作用域自动解锁
}
c.使用mutex的成员函数lock和unlock
mutex类提供了负责加锁的成员函数 lock() 以及解锁的成员函数 unlock(),可以直接调用。这样使用锁一定要注意,每次加锁后要有对应的解锁,否则容易引发死锁。
void func_shared(char n)
{
mtx.lock(); //加锁
data.pop_back();
data.push_back(n);
mtx.unlock(); //解锁
}
2)读写锁
共享互斥量shared_mutex介绍
读写锁是使用shared_mutex实现的。shared_mutex提供了读写分离的功能,允许多个线程同时读取共享资源,但是在写入时会互斥锁定共享资源。
读写锁的特点:
- 并发读取:多个线程可以同时获取读锁,并同时读取共享资源,读锁之前不互斥
- 独占写入:当发生写入操作,一个线程获取写锁时,其他线程无法获取读锁或写锁,即无法读写该共享资源,直到该线程的写入操作完成。
读写锁使用
通过使用shared_lock对shared_mutex进行管理,实现读锁;
使用unique_lock对shared_mutex进行管理,实现写锁。
#include <shared_mutex>
using namespace std;
std::shared_mutex rw_mtx;
int dat = 0;
void read_func()
{
std::shared_lock<std::shared_mutex> r_lock(rw_mtx); //读锁锁定
cout << dat << endl;
//锁将在函数执行完释放
}
void write_func(int d)
{
std::unique_lock<std::shared_mutex> w_lock(rw_mtx); //写锁锁定
dat = d;
//锁将在函数执行完释放
}
采用具有“自保能力”的共享变量-使用原子类型
加锁虽然可以在多线程中对共享资源进行保护,但是它有个缺点,就是太慢了。每次加锁、解锁都会造成资源消耗。而定义原子类型就不会产生这样的问题。
什么是原子操作
原子操作是指不会被其它线程中断的操作。这种操作要么还未开始,要么全部完成,如在执行过程中则不会被线程调度的机制打断。
原子类型
在C++11标准中引入了原子操作库<atomic>,提供了一系列原子类型。对这些类型的操作是原子操作。
每个 atomic_integral
类型都具有与 atomic<T>
的相应实例化相同的成员函数集,并且可以传递至任何非成员原子函数。
需要注意的是,原子操作通常是针对固定大小、基本数据类型的变量设计的,因为它们可以直接映射到硬件级别的原子指令或者操作系统提供的原子操作接口。所以类似string等数据结构,C++标准库不具有对应的原子类型。
原子类型使用
我们使用上面介绍竞争条件的代码,仅仅只是将共享变量改为原子类型,
#include <atomic>
#include <thread>
using namespace std;
atomic<int> dat(0); //原子类型变量
void func(int n)
{
for(int i = 0; i < n; ++i)
{
dat++;
}
}
int main(){
vector<thread> ths;
for(int i = 0; i < 100; ++i)
{
ths.emplace_back(func, 10000);
}
for(auto& th : ths)
{
th.join();
}
cout << dat << endl;
return 0;
}
运行结果符合预期:
四、线程间的通讯和同步-条件变量
个人理解线程同步就是线程间协调资源、协同工作的一种工作模式。在多线程模式下,有的时候需要线程间合作起来,按照某种顺序协调资源,才能实现预定目标。
比如在生产者-消费者模式中,线程A需要生产一定量的数据后,线程B再将这些数据进行消费计算,如果数据资源为空,线程B去调用计算没有任何意义,甚至可能产生错误。而类似这种工作需要线程间进行通信,同步状态才能执行。
其实互斥锁、原子操作等都是实现线程同步的重要组成部分,但锁的应用上一节已经介绍,这节重点介绍条件变量。
条件变量condition_variable
条件变量是多线程编程中线程间通信和同步的重要工具之一。它可以使一个线程在等待某个特定条件发生时候阻塞,并在条件满足后被唤醒。
常用场景:
- 在生产者-消费者模型中,当生产者线程产生了数据,可以通过条件变量唤醒消费者线程进行消费;
- 线程间的同步:当一个线程需要等待另一个线程完成某项工作后才能执行时,可以使用条件变量进行线程间的同步;
- 事件通知:当某个事件发生时,需要通知一个或多个线程执行相应的操作时,可以使用条件变量实现。
条件变量(condition_variable类)的使用
介绍关键成员函数(等待和唤醒)
wait(等待)
wait用于阻塞线程,以等待条件满足。它有两种重载形式:
- void wait(unique_lock<mutex>& Lck);
- void wait(unique_lock<mutex>& Lck, Predicate Pred);
首先,必须要传入的第一个参数是互斥锁对象,不可以直接传入mutex对象,而是需要unique_lock模板管理的锁对象。
第二个可选参数是一个谓词,可以是任何返回值为bool类型的表达式,比如变量、比较运算表达式、返回值为bool类型的函数(包括匿名函数)等。如果Pred最终值为true,则wait释放阻塞(不阻塞),继续向下执行;如果Pred最终值为false,则wait阻塞线程。
具体使用方法见下面代码。
notify_one 和 notify_all(唤醒)
用于对condition_variable对象发出唤醒信号。
其中notify_one可以对一个线程发出唤醒信号,如果有多个线程等待唤醒,那么会由操作系统调度决定唤醒哪个;
notify_all可以唤醒所有此condition_variable对象阻塞的线程。
条件变量的使用
条件变量需要和互斥量配合起来使用。
为什么条件变量需要和互斥锁配合使用?
1、条件变量本身并不是原子操作,不具备互斥保护的能力。也就是在获取条件变量状态、操作条件变量的过程需要互斥锁来保护;
2、当条件变量控制多个线程的消费状态时,如果多个线程被同时唤醒,多个线程会同时继续执行访问共享资源,所以需要使用互斥锁避免发生竞态条件。
所以在操作条件变量前,线程需要先获取一个互斥锁,避免发生不可预见的错误。
条件变量的使用一般有两个操作--等待和唤醒。在操作条件变量前,先获取互斥锁,那么在条件变量被调用等待函数(当前线程调用wait)时,该方法在等待期间会释放互斥锁,并阻塞当前线程;当条件变量被唤醒(其他线程调用notify_one或notify_all)时,wait会解除阻塞,该线程会重新获取互斥锁并继续执行。
例如实现代码:使用一个线程生产10个数字(0.5s生产一个),当队列满时, 使用2个线程输出这10个数字。
#include <iostream>
#include <algorithm>
#include <thread>
#include <condition_variable>
#include <queue>
using namespace std;
queue<int> dat;
condition_variable cv;
mutex mtx;
bool data_ready = false;
void producer()
{
int n = 10;
while(n--)
{
{
unique_lock<mutex> lock(mtx); //添加数据前加锁
dat.push(n);
}
this_thread::sleep_for(chrono::milliseconds(500));//模拟生产环境-间隔500ms消费一个数据
}
cout << "produce finished. notify consumer!!!" << endl;
data_ready = true;
cv.notify_all();
}
void consumer1()
{
while(1)
{
unique_lock<mutex> lock(mtx); //在使用共享数据前加锁
cv.wait(lock,
[]{return data_ready && !dat.empty();}); //如果数据ready并且队列不是空,则继续执行,否则阻塞
cout << "thread1 get data: " << dat.front() << endl;
this_thread::sleep_for(chrono::milliseconds(100));//模拟消费环境-间隔100ms消费一个数据
dat.pop();
}
}
void consumer2()
{
while(1)
{
unique_lock<mutex> lock(mtx); //在使用共享数据前加锁
cv.wait(lock,
[]{return data_ready && !dat.empty();}); //如果数据ready并且队列不是空,则继续执行,否则阻塞
cout << "thread2 get data: " << dat.front() << endl;
this_thread::sleep_for(chrono::milliseconds(100));//模拟消费环境-间隔100ms消费一个数据
dat.pop();
}
}
int main(){
thread th_pro(producer);
thread th_con1(consumer1);
thread th_con2(consumer2);
if(th_pro.joinable()) th_pro.join();
if(th_con1.joinable()) th_con1.join();
if(th_con2.joinable()) th_con2.join();
cout << "finished!!!" << endl;
return 0;
}
运行结果:
从结果可见,达到预期。生产全部10个数据后,由2和消费线程处理了这些数据,并且没有产生错误。
条件变量基础使用介绍到这里,如果想深入学习其他方法(例如wait_for、wait_until等),可以参考官方文档。
五、异步执行
C++中的异步执行
异步执行指的是一个任务可以在后台或者并发执行,不阻塞当前线程的执行方式。
在C++11标准中,引入std::async用于异步执行一个函数或其他可调用对象,并且可以使用std::future对象获取任务的返回值或者等待任务完成。程序员只负责将任务传递给std::async,由C++标准库进行开辟线程、管理线程等工作。
使用std::async的实现一般称为基于任务的异步设计。我们知道使用std::thread也可以实现异步执行,我们一般称为基于线程的异步设计。
如果我们想要实现异步编程,要优先选用基于任务而非基于线程的程序设计。原因如下:
- 基于任务的异步编程是更高阶的抽象设计,std::async的使用,把程序员在线程管理的工作中解放出来,不用关心线程管理的细节;
- 相比于基于std::thread的设计,基于std::async的程序能够更方便地获取任务返回结果;
- 如果需要执行的任务发生异常,基于std::thread的实现会直接产生中断程序,而基于std::async的实现可以捕获异常,保证程序正常执行。
相关方法/类介绍
std::async
std::async是声明在 future 头文件中的函数模板。它有两种重载形式
template <class Fn, class... ArgTypes>
future<typename result_of<Fn(ArgTypes...)>::type>
async(Fn&& fn, ArgTypes&&... args);
template <class Fn, class... ArgTypes>
future<typename result_of<Fn(ArgTypes...)>::type>
async(launch policy, Fn&& fn, ArgTypes&&... args);
- 参数fn:任务对象。需要输入可调用对象,比如函数对象,lambda表达式等,定义了需要执行的任务;
- 参数args:参数列表。如果输入的fn需要传入参数,则在fn之后依次传入;
- 参数policy:启动策略。是一个可选参数,用于指定异步任务的启动方式。如果不指定此参数,标准库会根据情况选择合适的启动策略。这个参数有2个常用枚举值:
- std::launch::async:表示任务应该再一个新线程中异步执行,使用这个参数std::async会创建一个新线程去执行传入的任务;
- std::launch::deferred:表示任务的执行被推迟到调用std::future对象的get()或wait()方法时。这种情况下,任务其实还是在当前线程中同步执行的,只不过执行时间被推迟。
- 返回值:返回一个std::future的对象,该对象的使用方式见下面介绍。
std::future
std::future是一个模板类,用于描述异步操作的结果,包括任务的返回值、任务的状态和处理异常等。
std::future的基本特性和常用方法:
- 获取结果:使用get()方法来获取任务的结果。如果任务执行成功,get()是返回值;如果任务抛出了异常,get()会重新抛出该异常,可以在主线程中重新捕获该异常;
- 检查状态:使用vaild()来检查该future对象是否有效,即是否关联了一个有效的异步任务。
- 阻塞和同步:get()方法会阻塞当前线程,即如果在调用get()处,异步任务还没有执行完成,则调用get()的线程会被阻塞,等待异步任务执行完成才会继续向下执行;wait()方法用于等待异步操作完成,也具有阻塞的功能,但是它不能获取返回值,只会确保异步任务完成。
vaild()函数用于检测是否是有效的std::future对象,是有关联了一个有效的异步任务。那么什么是有效的?
1、首先,std::future对象已经被初始化,并且关联着一个有效的异步任务(例如,通过std::async创建的任务,或者通过
std::promise
和std::future
关联的任务。后面两种关联任务的方式不再介绍,有兴趣可以查看相关资料学习)2、std::future对象没有被移动或销毁。
异步执行实现
例1:异步启动一个任务,打印任务执行结果(返回值)
#include <iostream>
#include <algorithm>
#include <future>
using namespace std;
int func_asyn()
{
cout << "func_asyn run..." << endl;
this_thread::sleep_for(chrono::seconds(1));
cout << "func_asyn finished..." << endl;
return 666;
}
int main(){
auto fu = std::async(std::launch::async, func_asyn);
cout << "main running!!!" << endl;
cout << fu.get() << endl;
cout << "main finished!!!" << endl;
return 0;
}
输出:
例2:异步执行一个任务,捕获任务中的异常,并输出异常描述
#include <iostream>
#include <algorithm>
#include <future>
using namespace std;
int func_asyn(int n)
{
if(n < 0)
throw invalid_argument("invalid arg!!!");
return n;
}
int main(){
try {
auto fu1 = std::async(std::launch::async, func_asyn, 1);
auto fu2 = std::async(std::launch::async, func_asyn, -1);
cout << "get fu1..." << endl;
cout << fu1.get() << endl;
cout << "get fu2..." << endl;
cout << fu2.get() << endl;
cout << "finished!!!" << endl;
} catch (const std::exception& e) {
cout << e.what() << endl;
}
return 0;
}
输出
【参考资料】
thread 类 | Microsoft Learn
<atomic> | Microsoft Learn
<mutex> | Microsoft Learn
<shared_mutex> | Microsoft Learn
condition_variable 类 | Microsoft Learn
<future> | Microsoft Learn