并发
资源管理
资源
- 程序中符合先获取后释放(显式或隐式)规律的东西,比如内存、锁、套接字、线程句柄和文件句柄等。
- RAII: (Resource Acquisition Is Initialization),也称为“资源获取就是初始化”,是C++语言的一种管理资源、避免泄漏的惯用法。
unique_ptr & shared_ptr
- unique_ptr 是一个独立对象或者是数组的句柄,unique_ptr 通过移动操作使得简单高效
- shared_ptr 很多方面和unique_ptr 相似, 唯一区别是shared_ptr 通过拷贝操作而非移动操作。多个shared_ptr 共享该对象的所有权,只有当最后一个 shared_ptr 被销毁时,对象才被销毁。
- shared_ptr 提供的垃圾回收机制需要慎重使用。 shared_ptr 使得对象的生命周期变得不那么容易掌控了,除非在一定程度上一定要使用共享所有权,否则别轻易使用shared_ptr
- shared_ptr 没有指定哪个拥有者有权读写对象
- 当我们使用函数返回集合的时候,不必使用指针,因为如果定义了移动语义, 在返回的时候会默认使用移动操作
并发
任务 和thread
- 线程(thread)是任务在程序中的系统级表示。
- thread 在 中定义
thread 的使用方法
#include <iostream>
#include <thread>
#include <vector>
#include "config.h"
// thead 的使用
// 函数的参数这里必须是const, 否则会报thread:120:44: error: static assertion failed: std::thread arguments must be invocable after conversion to rvalues
// 在多线程中使用io 操作,需要考虑线程安全,打印出来的字符服务保证输出内容的顺序
void f(const std::vector<double>& v) {
std::cout<< "function f" << std::endl;
}
// 通过res 指针返回thread 的执行结果,不美观的方法
// 不推荐使用此方法,因为无法掌控res 什么时候写入了,一般用消息队列,或者使用condition 、promise等方式来实现返回结果
void f2(const std::vector<double>& v, double* res) {
}
struct F {
std::vector<double> & v;
F(std::vector<double>& vv):v{vv} {
}
/* 如果不使用调用运算符重载, 会编译报错: hread:120:44: error: static assertion failed: std::thread arguments must be invocable after conversion to rvalues
*/
void operator()() { // 调用运算符重载
std::cout << "struct F operator" << std::endl;
}
};
class Foo
{
public:
void bar()
{
for (int i = 0; i < 5; ++i)
{
std::cout << "正在执行线程3\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
int n = 0;
};
int main(int argc, char **argv) {
std::vector<double> some_vec {1,2,3,4,5,6};
std::vector<double> vec2{10,11,12,13};
std::thread t1{f, some_vec}; // 在t1 的线程里面执行函数f, 参数是some_vec
std::thread t2{F{vec2}}; // 在t2 的县城里面执行函数 F 的调用运算符, 参数是vec2
double res;
std::thread t3(f2, some_vec, &res);
Foo f;
std::thread t4(&Foo::bar, &f); // t5 在对象 f 上运行 foo::bar()
t1.join();
t2.join();
return 0;
}
- thread 更多用法清参照cppreference
thread 共享数据
- mutex // 访问数据排他处理
- condition_variable 允许一个thread 等待另外一个thread
- 代码示例
class Message {
private:
int msgId;
public:
Message(int id) : msgId(id){
}
int getId() { return msgId;}
};
std::queue<Message> mMsgQueue;
std::condition_variable mCond;
std::mutex mMutex;
void consumer() {
while(true) {
//std::cout << "wait mutex " << std::endl;
std::unique_lock<std::mutex> lck{mMutex};
//std::cout << "wait condition " << std::endl;
//while(mCond.wait(lck))/*do nothing*/;
mCond.wait(lck);
//std::cout << "after wait condition " << std::endl;
auto m = mMsgQueue.front();
mMsgQueue.pop();
std::cout << "get msg Id: " << m.getId() << std::endl;
// lck.unlock();
}
}
void producer() {
int index = 10;
while(index > 0) {
Message msg(index);
std::unique_lock<std::mutex> lck{mMutex};
mMsgQueue.push(msg);
std::cout << "push msg Id: " << msg.getId() << std::endl;
lck.unlock(); //(1)
mCond.notify_all();
//std::cout << "after notify all: " << msg.getId() << std::endl;
//std::this_thread::sleep_for(std::chrono::milliseconds(20)); //(2)
index--;
}
}
int main(int argc, char **argv) {
std::thread t2(consumer);
std::thread t1(producer);
t1.join();
t2.join();
}
-
对于生产者-消费者模式,理想状态下,生产者生产一个,消费者就消费一个,但是实际并非如此
- (1) 如果不主动调用lck.unlock(), lck 需要等到while(){} 进入下一个循环才会释放,所以无论是否加(), cosumer 线程得不到执行的契机(可能和平台有关)
- (2) 对于加了(1) 而没有(2) 的场合while 很快执行到下一个循环,又立即执行到lck, 也会导致在producer 执行过程中,consumer 得不到执行
执行效果大致如下:
-
而如果加了(1) 和(2)后:
-
对于consume 如上图的处理方式,获取一次锁只pop 一次数据,会因为producer notify 和consumer 的wait 不是时间上不是匹配出现而导致数据没有被全部读取出来,可以修改为如下方式:
-
在获取到一次锁后,把消息队列中的消息全部处理掉
void consumer() {
while(true) {
//std::cout << "wait mutex " << std::endl;
std::unique_lock<std::mutex> lck{mMutex};
//std::cout << "wait condition " << std::endl;
//while(mCond.wait(lck))/*do nothing*/;
mCond.wait(lck);
//std::cout << "after wait condition " << std::endl;
while(!mMsgQueue.empty()) {
auto m = mMsgQueue.front();
mMsgQueue.pop();
std::cout << "get msg Id: " << m.getId() << std::endl;
}
// lck.unlock();
}
}
执行效果大致如下, 这样只是消费者不会立马消费掉生产者生产的数据,但是即使没有(1)(2)也不会导致消息得不到处理:
任务通信
- c++ 中主要提供了以下三种,都定义在 头文件中。
- future and promise 用来从一个独立线程上创建出的任务返回结果
- packaged_task 是帮助启动义务以及链接返回结果的机制
- async 以非常类似调用函数的方式启动一个任务
future and promis
- 允许两个任务间传输直,而无须显示使用锁–“系统”高效地实现了这种传输。
- 当一个任务需要向另一个任务传输这个值时,把它放入promise。
- c++ 实现会出现在对应的future 中。
- 使用future 的get()函数,如果值还未准备好,线程会阻塞直至值准备好。如果get( )无法计算出来,get()会抛一个异常(系统或者从get() 得到数据的任务)
- promise 的作用主要为future的get() 提供放置 操作(set_value()) 和set_exception().
#include <iostream>
#include <thread>
#include <future>
#include <functional>
void work (std::promise<int>& px) {
try {
px.set_value(11);
} catch (...) {
px.set_exception(std::current_exception());
}
}
void result (std::future<int>& res) {
try {
int result = res.get();
std::cout<< " recieve the result: " << result << std::endl;
} catch(...) {
//std::cout<<"exception"<< std::current_exception() << std::endl;
}
}
int main(int argc, char **argv) {
std::promise<int> pro;
std::future<int> fut = pro.get_future(); //binding the future and the promise
std::thread t1(work, std::ref(pro));
std::thread t2(result, std::ref(fut));
t1.join();
t2.join();
return 0;
}
-
可以看到使用步骤如下:
- 声明promise
- 定义future,并且绑定promise 和future, 二者的模板类型必须一致(很好理解,provise 设定的就是传给future 的)
- 使用promise 的set_value()或者set_exception() 传递值
- 使用future 的get() 函数接收值(注意get()是阻塞的)
-
这里使用了std::ref std::ref的使用场景:
- 算法传递可调用对象,而这些对象可能需要引用传递。
- 在多线程之间共享数据,但有希望通过引用传递数据而不是复制。
- std::ref可以使函数接受任何类型的引用而不仅仅是特定类型
packaged_task
- packaged_task 简化了任务连接future & promise.
- (TODO)