目录
一:Atomic:
二:Thread
1. 创建线程
2. 小心移动(std::move)线程
3. 如何创建带参数的线程
4. 线程参数是引用类型时,要小心谨慎。
5. 获取线程ID
6. jthread
7. 如何在线程中使用中断 stop_token
三:如何解决数据竞争
1.有问题的代码
2.使用互斥
3.预防死锁
4. 自动释放锁
5. 延迟锁
6. 共享锁
7. 线程安全的初始化
四:线程局部存储
五:线程通信
1.条件变量
2. 防止虚假唤醒
3. 防止唤醒丢失
4.信号量
5. std::latch
六:任务
1. std::promise, std::future
2. 用std::promise, std::future进行线程同步
3. std::async
4. std::package_task
一:Atomic:
#include <atomic>
#include <thread>
#include <iostream>
using namespace std;
std::atomic_int x, y;
int r1, r2;
void writeX() {
x.store(1);
r1 = y.load();
}
void writeY() {
y.store(1);
r2 = x.load();
}
int main() {
for (int i = 0; i < 100; i++)
{
x = 0;
y = 0;
std::thread a(writeX);
std::thread b(writeY);
a.join();
b.join();
std::cout << r1 << r2 << std::endl;
}
return 0;
}
//可能的输出有三种情况:01, 10, 11
//01:先执行线程a, 再执行线程b
//10:先执行线程b,再执行线程a
//11:执行线程a一半后调度到线程b,然后再回来
二:Thread
1. 创建线程
#include <atomic>
#include <thread>
#include <iostream>
using namespace std;
void helloFunction() {
cout << "function" << endl;
}
class HelloFunctionObject {
public:
void operator()() const {
cout << "function object" << endl;
}
};
int main()
{
thread t1(helloFunction); // function
HelloFunctionObject helloFunctionObject;
thread t2(helloFunctionObject); // function object
thread t3([] { cout << "lambda function" << std::endl; }); // lambda function
t1.join(); //需要用join,否则可能会出现主线程退出时,t1线程还没有执行完的情况,引起异常
t2.join();
t3.join();
return 0;
}
2. 小心移动(std::move)线程
#include <atomic>
#include <thread>
#include <iostream>
using namespace std;
int main()
{
std::thread t([] { cout << "lambda function"; });
std::thread t2;
t2 = std::move(t);
std::thread t3([] { cout << "lambda function"; });
/*此处代码有问题,当t2 已经获得线程t后,它已经是callable和joinable,再赋值t3会terminate*/
t2 = std::move(t3); std::terminate
}
3. 如何创建带参数的线程
#include <atomic>
#include <thread>
#include <iostream>
using namespace std;
//如何在线程中传递参数
void printStringCopy(string s) { cout << s; }
void printStringRef(const string& s) { cout << s; }
int main()
{
string s{ "C++" };
thread tPerCopy([=] { cout << s; }); // C++
thread tPerCopy2(printStringCopy, s); // C++
tPerCopy.join();
tPerCopy2.join();
thread tPerReference([&] { cout << s; }); // C++
thread tPerReference2(printStringRef, s); // C++
tPerReference.join();
tPerReference2.join();
}
4. 线程参数是引用类型时,要小心谨慎。
#include <iostream>
using namespace std;
using std::this_thread::sleep_for;
using std::this_thread::get_id;
struct Sleeper {
Sleeper(int& i_) :i{ i_ } {};
void operator() (int k) {
for (unsigned int j = 0; j <= 5; ++j) {
sleep_for(std::chrono::milliseconds(100));
i += k;
}
std::cout << get_id(); // undefined behaviour
}
private:
int& i;
};
int main()
{
int valSleeper = 1000;
//valSleeper 作为引用类型传给线程,如果主线程先退出,t线程使用valSleeper会产生未定义行为, 并且主线程和t线程共享varSleeper,产生数据竞争,
std::thread t(Sleeper(valSleeper), 5);
t.detach();
std::cout << valSleeper; // undefined behaviour
}
5. 获取线程ID
using namespace std;
using std::this_thread::get_id;
int main()
{
std::cout << std::thread::hardware_concurrency() << std::endl; // 4
std::thread t1([] { std::cout << get_id() << std::endl; }); // 139783038650112
std::thread t2([] { std::cout << get_id() << std::endl; }); // 139783030257408
std::cout << t1.get_id() << std::endl; // 139783038650112
std::cout << t2.get_id() << std::endl; // 139783030257408
t1.swap(t2);
std::cout << t1.get_id() << std::endl; // 139783030257408
std::cout << t2.get_id() << std::endl; // 139783038650112
std::cout << get_id() << std::endl; // 140159896602432
t1.join();
t2.join();
}
6. jthread
#include <atomic>
#include <thread>
#include <iostream>
using namespace std;
using std::this_thread::get_id;
//jthread 自动join()的线程
int main()
{
std::jthread thr{ [] { std::cout << "std::jthread" << "\n"; } }; // std::jthread
std::cout << "thr.joinable(): " << thr.joinable() << "\n"; // thr.joinable(): true
}
7. 如何在线程中使用中断 stop_token
#include <atomic>
#include <thread>
#include <iostream>
using namespace std;
using std::this_thread::get_id;
using namespace::std::literals;//字面量,比如0.2s, C++20能识别这种写法
std::jthread nonInterruptable([] { // (1) 创建非中断线程
int counter{ 0 };
while (counter < 10) {
std::this_thread::sleep_for(0.2s);
std::cerr << "nonInterruptable: " << counter << std::endl;
++counter;
}
});
std::jthread interruptable([](std::stop_token stoken) { // (2) 创建可中断线程
int counter{ 0 };
while (counter < 10) {
std::this_thread::sleep_for(0.2s);
if (stoken.stop_requested()) return; // (3) 检查线程是否被中断
std::cerr << "interruptable: " << counter << std::endl;
++counter;
}
});
int main()
{
std::this_thread::sleep_for(1s);
std::cerr << "Main thread interrupts both jthreads" << std::endl;
nonInterruptable.request_stop(); // (4)//请求中断,非中断线程不理会
interruptable.request_stop();//请求中断,中断线程会响应
}
三:如何解决数据竞争
1.有问题的代码
#include <atomic>
#include <thread>
#include <iostream>
using namespace std;
struct Worker {
Worker(string n) :name(n) {};
void operator() () {
for (int i = 1; i <= 3; ++i) {
this_thread::sleep_for(chrono::milliseconds(200));
//流本身是线程安全的,但是cout是共享变量,它会独占流,多个线程访问cout时会引起数据竞争
cout << name << ": " << "Work " << i << endl;
}
}
private:
string name;
};
int main()
{
thread herb = thread(Worker("Herb"));
thread andrei = thread(Worker(" Andrei"));
thread scott = thread(Worker(" Scott"));
thread bjarne = thread(Worker(" Bjarne"));
herb.join();
andrei.join();
scott.join();
bjarne.join();
}
2.使用互斥
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
using namespace std;
std::mutex mutexCout;
struct Worker {
Worker(string n) :name(n) {};
void operator() () {
for (int i = 1; i <= 3; ++i) {
this_thread::sleep_for(chrono::milliseconds(200));
mutexCout.lock();
cout << name << ": " << "Work " << i << endl;
mutexCout.unlock();
}
}
private:
string name;
};
int main()
{
thread herb = thread(Worker("Herb"));
thread andrei = thread(Worker("Andrei"));
thread scott = thread(Worker("Scott"));
thread bjarne = thread(Worker("Bjarne"));
herb.join();
andrei.join();
scott.join();
bjarne.join();
}
3.预防死锁
m.lock();
sharedVar= getVar(); //如果此处抛出异常,会导致m.unlock未调用,锁不能被释放,其他线程无法得到锁,进而可能产生死锁
m.unlock()
#include <iostream>
#include <mutex>
using namespace std;
struct CriticalData {
std::mutex mut;
};
void deadLock(CriticalData& a, CriticalData& b) {
a.mut.lock();
std::cout << "get the first mutex\n";
std::this_thread::sleep_for(std::chrono::milliseconds(1));
b.mut.lock();
std::cout << "get the second mutex\n";
a.mut.unlock(), b.mut.unlock();
}
int main()
{
CriticalData c1;
CriticalData c2;
//t1, t2在拿到锁后都在等对方释放锁
std::thread t1([&] { deadLock(c1, c2); });
std::thread t2([&] { deadLock(c2, c1); });
t1.join();
t2.join();
}
4. 自动释放锁
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
using namespace std;
std::mutex mutexCout;
struct Worker {
Worker(std::string n) :name(n) {};
void operator() () {
for (int i = 1; i <= 3; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::lock_guard<std::mutex> myLock(mutexCout);//自动释放锁
std::cout << name << ": " << "Work " << i << std::endl;
}
}
private:
std::string name;
};
int main()
{
thread herb = thread(Worker("Herb"));
thread andrei = thread(Worker("Andrei"));
thread scott = thread(Worker("Scott"));
thread bjarne = thread(Worker("Bjarne"));
herb.join();
andrei.join();
scott.join();
bjarne.join();
}
5. 延迟锁
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
using namespace std;
using namespace std;
struct CriticalData {
mutex mut;
};
void deadLockResolved(CriticalData& a, CriticalData& b) {
unique_lock<mutex>guard1(a.mut, defer_lock);
cout << this_thread::get_id() << ": get the first lock" << endl;
this_thread::sleep_for(chrono::milliseconds(1));
unique_lock<mutex>guard2(b.mut, defer_lock);
cout << this_thread::get_id() << ": get the second lock" << endl;
cout << this_thread::get_id() << ": atomic locking" << endl;
lock(guard1, guard2);
}
int main()
{
CriticalData c1;
CriticalData c2;
thread t1([&] { deadLockResolved(c1, c2); });
thread t2([&] { deadLockResolved(c2, c1); });
t1.join();
t2.join();
}
6. 共享锁
#include <mutex>
...
std::shared_timed_mutex sharedMutex;
std::unique_lock<std::shared_timed_mutex> writerLock(sharedMutex);
std::shared_lock<std::shared_time_mutex> readerLock(sharedMutex);
std::shared_lock<std::shared_time_mutex> readerLock2(sharedMutex);
7. 线程安全的初始化
//常量表达式是线程安全的
struct MyDouble{
constexpr MyDouble(double v):val(v){};
constexpr double getValue(){ return val; }
private:
double val
};
constexpr MyDouble myDouble(10.5);
std::cout << myDouble.getValue(); // 10.5
//块内静态变量
void blockScope(){
static int MySharedDataInt= 2011;
}
//once_flag, call_once
#include <mutex>
...
using namespace std;
once_flag onceFlag;
void do_once(){
call_once(onceFlag, []{ cout << "Only once." << endl; });
}
thread t1(do_once);
thread t2(do_once);
四:线程局部存储
std::mutex coutMutex;
thread_local std::string s("hello from ");
void addThreadLocal(std::string const& s2){
s+= s2;
std::lock_guard<std::mutex> guard(coutMutex);
std::cout << s << std::endl;
std::cout << "&s: " << &s << std::endl;
std::cout << std::endl;
}
std::thread t1(addThreadLocal, "t1");
std::thread t2(addThreadLocal, "t2");
std::thread t3(addThreadLocal, "t3");
std::thread t4(addThreadLocal, "t4");
五:线程通信
1.条件变量
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
using namespace std;
std::mutex mutex_;
std::condition_variable condVar;
bool dataReady = false;
void doTheWork() {
std::cout << "Processing shared data." << std::endl;
}
void waitingForWork() {
std::cout << "Worker: Waiting for work." << std::endl;
std::unique_lock<std::mutex> lck(mutex_);
condVar.wait(lck, [] { return dataReady; });
doTheWork();
std::cout << "Work done." << std::endl;
}
void setDataReady() {
std::lock_guard<std::mutex> lck(mutex_);
dataReady = true;
std::cout << "Sender: Data is ready." << std::endl;
condVar.notify_one();
}
int main()
{
std::thread t1(waitingForWork);
std::thread t2(setDataReady);
t1.join();
t2.join();
}
2. 防止虚假唤醒
//为了防止虚假唤醒,在唤醒前应进行条件检查,且发送方应将条件置为true。
//dataReady = true; //发送方设置条件满足
//[] { return dataReady; } //接收方进行条件检查
3. 防止唤醒丢失
//如果发送方在接收方等待之前,就发送了唤醒,可能会导致唤醒丢失,因此要做两件事:
//1: 要先等待,后发送唤醒
//2: 在接收方的等待函数中要检查是否满足条件 [] { return dataReady; };
4.信号量
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <semaphore>
#include <vector>
using namespace std;
std::vector<int> myVec;
std::counting_semaphore<1> prepareSignal(0); // (1)
void prepareWork() {
myVec.insert(myVec.end(), { 0, 1, 0, 3 });
std::cout << "Sender: Data prepared." << '\n';
prepareSignal.release(); // (2)
}
void completeWork() {
std::cout << "Waiter: Waiting for data." << '\n';
prepareSignal.acquire(); // (3)
myVec[2] = 2;
std::cout << "Waiter: Complete the work." << '\n';
for (auto i : myVec) std::cout << i << " ";
std::cout << '\n';
}
int main()
{
std::thread t1(prepareWork);
std::thread t2(completeWork);
t1.join();
t2.join();
}
5. std::latch
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <semaphore>
#include <vector>
#include <latch>
using namespace std;
std::mutex coutMutex;
std::latch workDone(2);
std::latch goHome(1); // (5)
void synchronizedOut(const std::string s) {
std::lock_guard<std::mutex> lo(coutMutex);
std::cout << s;
}
class Worker {
public:
Worker(std::string n) : name(n) { };
void operator() () {
// notify the boss when work is done
synchronizedOut(name + ": " + "Work done!\n");
workDone.count_down(); // (3) 完成工作
// waiting before going home
goHome.wait();//等待老板发命令让他们回家
synchronizedOut(name + ": " + "Good bye!\n");
}
private:
std::string name;
};
int main()
{
std::cout << "BOSS: START WORKING! " << '\n';
Worker herb(" Herb"); // (1) 工人1
std::thread herbWork(herb); //工人1必须完成自己的工作
Worker scott(" Scott"); // (2) 工人2
std::thread scottWork(scott);//工人2必须完成自己的工作
workDone.wait(); // (4) 完成工作后等待
std::cout << '\n';
goHome.count_down();//老板发命令回家
std::cout << "BOSS: GO HOME!" << '\n';
herbWork.join();
scottWork.join();
}
6. std::barrier
#include <barrier>
#include <iostream>
#include <string>
#include <syncstream>
#include <thread>
#include <vector>
int main()
{
const auto workers = { "Anil", "Busara", "Carl" };
auto on_completion = []() noexcept
{
// locking not needed here
static auto phase =
"... done\n"
"Cleaning up...\n";
std::cout << phase;
phase = "... done\n";
};
std::barrier sync_point(std::ssize(workers), on_completion);
auto work = [&](std::string name)
{
std::string product = " " + name + " worked\n";
std::osyncstream(std::cout) << product; // ok, op<< call is atomic
sync_point.arrive_and_wait();
product = " " + name + " cleaned\n";
std::osyncstream(std::cout) << product;
sync_point.arrive_and_wait();
};
std::cout << "Starting...\n";
std::vector<std::jthread> threads;
threads.reserve(std::size(workers));
for (auto const& worker : workers)
threads.emplace_back(work, worker);
}
六:任务
1. std::promise, std::future
#include <future>
#include <iostream>
void product(std::promise<int>&& intPromise, int a, int b) {
intPromise.set_value(a * b);
}
int main()
{
int a = 20;
int b = 10;
std::promise<int> prodPromise;
std::future<int> prodResult = prodPromise.get_future();
std::jthread prodThread(product, std::move(prodPromise), a, b);
std::cout << "20*10= " << prodResult.get(); // 20*10= 200
}
2. 用std::promise, std::future进行线程同步
#include <future>
#include <iostream>
void doTheWork() {
std::cout << "Processing shared data." << std::endl;
}
void waitingForWork(std::future<void>&& fut) {
std::cout << "Worker: Waiting for work." <<
std::endl;
fut.wait();
doTheWork();
std::cout << "Work done." << std::endl;
}
void setDataReady(std::promise<void>&& prom) {
std::cout << "Sender: Data is ready." <<
std::endl;
prom.set_value();
}
int main()
{
std::promise<void> sendReady;
auto fut = sendReady.get_future();
std::jthread t1(waitingForWork, std::move(fut));
std::jthread t2(setDataReady, std::move(sendReady));
}
3. std::async
#include <future>
#include <iostream>
using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;
int main()
{
auto begin = system_clock::now();
auto asyncLazy = std::async(launch::deferred, [] { return system_clock::now(); });
auto asyncEager = std::async(launch::async, [] { return system_clock::now(); });
std::this_thread::sleep_for(std::chrono::seconds(1));
auto lazyStart = asyncLazy.get() - begin;
auto eagerStart = asyncEager.get() - begin;
auto lazyDuration = duration<double>(lazyStart).count();
auto eagerDuration = duration<double>(eagerStart).count();
std::cout << lazyDuration << " sec"; // 1.00018 sec.
std::cout << eagerDuration << " sec"; // 0.00015489 sec.
}
#include <future>
#include <iostream>
#include <thread>
using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;
int main()
{
int res;
std::thread t([&] { res = 2000 + 11; });
t.join();
std::cout << res << std::endl; // 2011
auto fut = std::async([] { return 2000 + 11; });//异步调用
std::cout << fut.get() << std::endl; // 2011
}
4. std::package_task
#include <future>
#include <iostream>
#include <queue>
#include <thread>
using namespace std;
using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;
struct SumUp {
int operator()(int beg, int end) {
for (int i = beg; i < end; ++i) sum += i;
return sum;
}
private:
int beg;
int end;
int sum{ 0 };
};
int main()
{
SumUp sumUp1, sumUp2;
packaged_task<int(int, int)> sumTask1(sumUp1);//任务1
packaged_task<int(int, int)> sumTask2(sumUp2);//任务2
future<int> sum1 = sumTask1.get_future(); //任务1的结果
future<int> sum2 = sumTask2.get_future(); //任务2的结果
deque< packaged_task<int(int, int)>> allTasks; //存储所有的任务
allTasks.push_back(move(sumTask1));//将任务1加入队列
allTasks.push_back(move(sumTask2));//将任务2加入队列
int begin{ 1 };
int increment{ 5000 };
int end = begin + increment;
while (not allTasks.empty()) {
packaged_task<int(int, int)> myTask = move(allTasks.front());//取出1个任务
allTasks.pop_front();
thread sumThread(move(myTask), begin, end);//执行这个任务
begin = end;
end += increment;
sumThread.detach();
}
auto sum = sum1.get() + sum2.get();//查询任务的结果
cout << sum;
}