文章目录
- 4.4 简化代码与同步工具
- 同步工具作为构建块
- 4.4.1 使用Future的函数化编程
- 函数化编程简介
- C++支持函数化编程
- 快速排序 - FP模式
- 快速排序串行版
- 快速排序并行版
- spawn_task函数
- 结论
- 快速排序 - 串行版
- 快速排序 - 并行版
- `spawn_task`函数
- 使用 `spawn_task` 实现并行快速排序
- 详细解释
- 使用示例
- 总结
- 4.4.2 使用消息传递的同步操作
- CSP(Communicating Sequential Processes)概念简介
- 消息队列与C++线程
- ATM机应用示例
- ATM机逻辑的状态机模型
- 状态机类的实现
- `getting_pin` 状态函数实现
- 状态机的工作流程
- 总结
- 4.4.3 扩展规范中的持续性并发
- 新类型 `std::experimental::future` 和 `std::experimental::promise`
- 示例代码
- 使用 `std::experimental::promise` 实现与 `std::async` 等价的功能
- 4.4.4 持续性连接
- 同步方式
- 异步方式
- 持续性方式
- 全异步操作
- Future 展开(Future-unwrapping)
- 多个持续性对象
- 4.4.5 等待多个 `future`
- 背景
- 使用 `std::async` 收集结果
- 使用 `std::experimental::when_all` 收集结果
- `std::experimental::when_any`
- 4.4.6 使用 `std::experimental::when_any` 等待第一个 `future`
- 背景
- 示例代码
- 解释
- 其他示例
- 4.4.7 锁存器和栅栏
- 锁存器(Latch)
- 栅栏(Barrier)
- 4.4.8 `std::experimental::latch`:基础的锁存器类型
- 示例代码
- 4.4.9 `std::experimental::barrier`:简单的栅栏
- 示例代码
- 4.4.10 `std::experimental::flex_barrier` — 更灵活的栅栏
- 示例代码
4.4 简化代码与同步工具
同步工具作为构建块
在本章中,同步工具被称为构建块。关注同步操作而非具体机制。当程序需要并发时,提供更多的函数化方式简化代码。相比多个线程间共享数据,每个任务最好拥有自己的数据,并且其他线程可以使用future
获取运行结果。
4.4.1 使用Future的函数化编程
函数化编程简介
函数化编程(Functional Programming, FP)是一种编程范式,其中函数的结果仅依赖于传入的参数,无副作用。C++标准库中的数学相关函数如sin
, cos
, 和 sqrt
都具有这种特性。纯粹的函数不会改变任何外部状态,限制了函数的返回值。
C++支持函数化编程
- Lambda表达式:C++11引入了Lambda表达式,使得匿名函数的定义更加简便。
- 自动类型推断:通过
auto
关键字,编译器能够自动推断变量类型。 - std::bind:用于绑定函数和参数,创建新的可调用对象。
快速排序 - FP模式
快速排序串行版
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
auto new_lower = sequential_quick_sort(std::move(lower_part));
auto new_higher = sequential_quick_sort(std::move(input));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower);
return result;
}
- 关键点:
- 使用
splice()
移动元素。 std::partition
分割列表。- 递归调用自身进行排序。
- 使用
快速排序并行版
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });
std::list<T> lower_part;
lower_point.splice(lower_part.end(), input, input.begin(), divide_point);
std::future<std::list<T>> new_lower(
std::async(¶llel_quick_sort<T>, std::move(lower_part)));
auto new_higher = parallel_quick_sort(std::move(input));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}
- 改进点:
- 利用
std::async
并发执行排序任务。 - 使用
std::future
等待异步任务完成。
- 利用
spawn_task函数
一个简单的封装函数,用于创建和启动包含std::packaged_task
的任务,并返回其结果的std::future
对象。
template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a)
{
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)> task(std::move(f));
std::future<result_type> res(task.get_future());
std::thread t(std::move(task), std::move(a));
t.detach();
return res;
}
结论
- FP在并发中的应用:提供了一种避免共享状态和竞争条件的方法,简化了并发编程。
- 扩展性:讨论了如何进一步优化并行快速排序算法,如利用C++17中的并行算法。
好的,以下是带有详细注释的快速排序算法(串行版和并行版)以及spawn_task
函数的完整代码。每个代码块上方都有详细的注释解释其功能和作用。
快速排序 - 串行版
// 模板函数:实现串行版本的快速排序
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
// 如果输入列表为空,则直接返回
if(input.empty())
{
return input;
}
std::list<T> result; // 存储排序结果的列表
// 将输入列表的第一个元素移动到结果列表中作为“中间”值
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin(); // 获取“中间”值
// 使用std::partition将列表分成两部分:小于pivot的部分和大于等于pivot的部分
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });
std::list<T> lower_part; // 存储小于pivot的部分
// 将小于pivot的部分从input列表移动到lower_part列表
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
// 递归调用自身对lower_part进行排序
auto new_lower = sequential_quick_sort(std::move(lower_part));
// 递归调用自身对input列表(此时为大于等于pivot的部分)进行排序
auto new_higher = sequential_quick_sort(std::move(input));
// 将new_higher的结果拼接到result列表的末尾
result.splice(result.end(), new_higher);
// 将new_lower的结果拼接到result列表的开头
result.splice(result.begin(), new_lower);
return result;
}
快速排序 - 并行版
// 模板函数:实现并行版本的快速排序
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
// 如果输入列表为空,则直接返回
if(input.empty())
{
return input;
}
std::list<T> result; // 存储排序结果的列表
// 将输入列表的第一个元素移动到结果列表中作为“中间”值
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin(); // 获取“中间”值
// 使用std::partition将列表分成两部分:小于pivot的部分和大于等于pivot的部分
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });
std::list<T> lower_part; // 存储小于pivot的部分
// 将小于pivot的部分从input列表移动到lower_part列表
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
// 使用std::async异步调用parallel_quick_sort对lower_part进行排序,并获取future对象
std::future<std::list<T>> new_lower(
std::async(¶llel_quick_sort<T>, std::move(lower_part)));
// 递归调用自身对input列表(此时为大于等于pivot的部分)进行排序
auto new_higher = parallel_quick_sort(std::move(input));
// 将new_higher的结果拼接到result列表的末尾
result.splice(result.end(), new_higher);
// 等待lower_part的异步任务完成,并将结果拼接到result列表的开头
result.splice(result.begin(), new_lower.get());
return result;
}
spawn_task
函数
// 模板函数:封装std::packaged_task和std::thread,创建并发任务
template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a)
{
// 定义结果类型
typedef std::result_of<F(A&&)>::type result_type;
// 创建一个std::packaged_task对象,包装传入的函数f
std::packaged_task<result_type(A&&)> task(std::move(f));
// 获取future对象,用于获取任务的结果
std::future<result_type> res(task.get_future());
// 创建线程执行任务,并分离线程
std::thread t(std::move(task), std::move(a));
t.detach();
return res;
}
这些注释详细解释了每一行代码的作用和逻辑,帮助你更好地理解快速排序算法的串行和并行实现,以及如何使用C++标准库中的并发工具来简化并发编程。希望这对你的学习有所帮助!
在你提供的代码示例中,spawn_task
函数并没有直接被使用。它是一个辅助函数,展示了如何通过封装 std::packaged_task
和 std::thread
来创建和启动并发任务,并返回一个 std::future
对象以获取任务的结果。这个函数可以用于任何需要异步执行任务并获取结果的场景。
尽管 spawn_task
没有在快速排序的示例中直接使用,但我们可以展示如何将其集成到 parallel_quick_sort
函数中,以替代 std::async
。下面是一个修改后的版本,展示了如何使用 spawn_task
函数来实现并行快速排序:
使用 spawn_task
实现并行快速排序
#include <future>
#include <list>
#include <thread>
#include <algorithm>
// spawn_task 函数:封装 std::packaged_task 和 std::thread,创建并发任务
template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a)
{
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)> task(std::move(f));
std::future<result_type> res(task.get_future());
std::thread t(std::move(task), std::move(a));
t.detach();
return res;
}
// 并行版快速排序
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t){ return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
// 使用 spawn_task 替代 std::async
auto new_lower_future = spawn_task(¶llel_quick_sort<T>, std::move(lower_part));
auto new_higher = parallel_quick_sort(std::move(input));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower_future.get());
return result;
}
详细解释
-
spawn_task
函数:- 这个函数接受一个可调用对象
f
和参数a
,并创建一个std::packaged_task
对象。 - 它返回一个
std::future
对象,该对象可以在未来获取任务的结果。 - 线程
t
执行任务并分离(detach
),这意味着主线程不会等待它完成。
- 这个函数接受一个可调用对象
-
parallel_quick_sort
函数:- 在分割列表后,我们使用
spawn_task
函数来异步调用parallel_quick_sort
对lower_part
列表进行排序。 new_lower_future
是一个std::future
对象,它将在稍后调用.get()
方法时阻塞,直到异步任务完成并返回结果。- 主线程继续递归调用
parallel_quick_sort
对input
列表(此时为大于等于pivot
的部分)进行排序。 - 最后,将两个排序好的子列表拼接在一起,并返回最终结果。
- 在分割列表后,我们使用
使用示例
以下是如何调用 parallel_quick_sort
函数的一个简单示例:
int main() {
std::list<int> numbers = {3, 6, 8, 10, 1, 2, 1};
std::list<int> sorted_numbers = parallel_quick_sort(numbers);
for (const auto& num : sorted_numbers) {
std::cout << num << " ";
}
std::cout << std::endl;
return 0;
}
总结
虽然 spawn_task
函数没有直接出现在原始的快速排序示例中,但它提供了一种替代 std::async
的方法来实现异步任务。通过这种方式,你可以更灵活地控制线程的创建和管理,尤其是在需要自定义线程池或更复杂的任务调度机制时。希望这能帮助你更好地理解如何使用 spawn_task
函数及其应用场景。
以下是经过优化排版后的4.4.2节内容,详细解释了使用消息传递的同步操作,并展示了如何实现一个简单的ATM机逻辑。每个部分都有详细的注释和结构化展示。
4.4.2 使用消息传递的同步操作
CSP(Communicating Sequential Processes)概念简介
CSP的核心思想是:在没有共享数据的情况下,每个线程可以基于接收到的信息独立运行。每个线程都可以视为一个状态机:当接收到一条信息时,会以某种方式更新其状态,并可能向其他线程发送信息。这种模式依赖于明确的行为要求和专业的编程团队,但可以使编程变得更加简单和清晰。
消息队列与C++线程
尽管C++线程共享同一块地址空间,无法直接达到真正的消息传递要求,但我们可以通过一些约定来确保线程间不共享数据。例如,所有消息都通过消息队列传递,而消息队列本身需要共享,具体的细节应包含在库中。
ATM机应用示例
假设我们要为实现自动取款机(ATM)编写一个应用。这个应用需要处理以下任务:
- 处理用户插入卡片并与银行交互。
- 控制机械接受用户的卡片、显示适当的信息、处理按钮事件、吐出现金以及退还卡。
一种处理方法是将所有任务分配到三个独立的线程上:
- 物理机械线程:处理卡片插入、现金吐出等物理操作。
- ATM逻辑线程:处理ATM机的业务逻辑。
- 银行通讯线程:处理与银行的通信。
这些线程之间不共享任何数据,而是通过消息队列进行通信。例如,当用户插入卡片或按下按钮时,物理机械线程会发送一条消息给逻辑线程,逻辑线程再发送消息给物理机械线程,指示其执行相应操作。
ATM机逻辑的状态机模型
ATM机的逻辑可以建模为一个状态机,每个状态等待特定的消息并根据消息类型更新状态。图4.3展示了一个简化的状态机模型。
状态机类的实现
struct card_inserted {
std::string account;
};
class atm {
messaging::receiver incoming; // 接收消息的队列
messaging::sender bank; // 发送消息给银行的队列
messaging::sender interface_hardware; // 发送消息给硬件接口的队列
void (atm::*state)(); // 当前状态函数指针
std::string account; // 当前账户
std::string pin; // 当前PIN码
public:
void waiting_for_card() { // 等待卡片插入的状态
interface_hardware.send(display_enter_card()); // 显示“请插入卡片”
incoming.wait().handle<card_inserted>(
[&](card_inserted const& msg) { // 处理卡片插入消息
account = msg.account;
pin = "";
interface_hardware.send(display_enter_pin());
state = &atm::getting_pin; // 更新状态为获取PIN码
}
);
}
void getting_pin(); // 获取PIN码的状态
void run() { // 主循环函数
state = &atm::waiting_for_card; // 初始化状态为等待卡片插入
try {
for (;;) {
(this->*state)(); // 反复调用当前状态函数
}
} catch (messaging::close_queue const&) {
// 处理队列关闭的情况
}
}
};
getting_pin
状态函数实现
void atm::getting_pin() {
incoming.wait()
.handle<digit_pressed>( // 处理数字按键消息
[&](digit_pressed const& msg) {
unsigned const pin_length = 4;
pin += msg.digit;
if (pin.length() == pin_length) {
bank.send(verify_pin(account, pin, incoming)); // 验证PIN码
state = &atm::verifying_pin; // 更新状态为验证PIN码
}
}
)
.handle<clear_last_pressed>( // 处理清除最后一个数字按键消息
[&](clear_last_pressed const& msg) {
if (!pin.empty()) {
pin.resize(pin.length() - 1);
}
}
)
.handle<cancel_pressed>( // 处理取消按键消息
[&](cancel_pressed const& msg) {
state = &atm::done_processing; // 更新状态为处理完成
}
);
}
状态机的工作流程
- 初始化状态:
run()
函数初始化状态为waiting_for_card
。 - 等待卡片插入:
waiting_for_card()
函数发送一条消息让终端显示“请插入卡片”。- 等待接收到
card_inserted
类型的消息,处理后更新状态为getting_pin
。
- 获取PIN码:
getting_pin()
函数等待三种不同类型的消息:digit_pressed
:处理数字按键输入。clear_last_pressed
:处理清除最后一个数字按键。cancel_pressed
:处理取消交易按键。
- 根据不同的消息类型,更新PIN码或状态。
总结
使用消息传递的方式进行同步操作,可以极大地简化并发系统的设计。每个线程都被独立对待,避免了共享数据带来的复杂性和潜在问题。这种方式被称为参与者模式(Actor model),其中各个参与者互相发送消息来执行任务,并且不会共享状态,除非通过消息直接传入。
这种排版方式使内容更加结构化,便于理解和参考。希望这对你有所帮助!
以下是经过优化排版后的4.4.3和4.4.4节内容,详细解释了扩展规范中的持续性并发以及如何使用持续性连接来处理异步任务。每个部分都有详细的注释和结构化展示。
4.4.3 扩展规范中的持续性并发
新类型 std::experimental::future
和 std::experimental::promise
并发技术扩展规范在 std::experimental
命名空间中提供了新的类型 std::promise
和 std::packaged_task
。这些类型与标准命名空间中的类型不同,其返回实例类型为 std::experimental::future
,而不是 std::future
。这使得用户可以体验到 std::experimental::future
的新特性——持续性(Continuation)。
假设一个任务产生了一个结果,并且 future
持有这个结果。然后需要写一些代码来处理这个结果。使用 std::future
时,必须等待 future
的状态变为就绪态,或者使用全阻塞函数 wait()
或非阻塞的 wait_for()
/wait_until()
成员函数进行等待。这种方式会让代码变得非常复杂。用一句话来说:“万事俱备,只等数据”,这就是持续性的意义。
为了给 future
添加持续性,只需要在其成员函数后添加 .then()
即可。例如,给定一个 future
对象 fut
,添加持续性的调用即为 fut.then(continuation)
。
示例代码
std::experimental::future<int> find_the_answer;
auto fut = find_the_answer();
auto fut2 = fut.then(find_the_question);
assert(!fut.valid()); // 原始 future 不再有效
assert(fut2.valid()); // 新的 future 有效
当原始 future
变成就绪态时,find_the_question
持续性函数不会安排在指定的线程上运行,而是由实现者自由选择执行方式(如线程池或其他线程管理库)。这样做是为了让实现者能够基于丰富的经验选择更好的线程使用方式,并为用户提供合适的机制来控制线程。
使用 std::experimental::promise
实现与 std::async
等价的功能
template<typename Func>
std::experimental::future<decltype(std::declval<Func>()())>
spawn_async(Func&& func) {
std::experimental::promise<
decltype(std::declval<Func>()())> p;
auto res = p.get_future();
std::thread t(
[p = std::move(p), f = std::decay_t<Func>(func)]() mutable {
try {
p.set_value_at_thread_exit(f());
} catch (...) {
p.set_exception_at_thread_exit(std::current_exception());
}
});
t.detach();
return res;
}
这段代码展示了如何使用 std::experimental::promise
获取 future
并生成新的线程运行 Lambda 表达式,该 Lambda 表达式为 promise
设置返回值或异常。
4.4.4 持续性连接
假设有一系列耗时任务要完成,并且希望使用异步多线程来减轻主线程的计算压力。例如,在用户登录应用时,需要将登录凭证发送给后台,在对身份信息进行验证后,从后台获取用户的账户信息,并使用获取到的信息更新显示。
同步方式
void process_login(std::string const& username, std::string const& password) {
try {
user_id const id = backend.authenticate_user(username, password);
user_data const info_to_display = backend.request_current_info(id);
update_display(info_to_display);
} catch (std::exception& e) {
display_error(e);
}
}
异步方式
std::future<void> process_login(std::string const& username, std::string const& password) {
return std::async(std::launch::async, [=]() {
try {
user_id const id = backend.authenticate_user(username, password);
user_data const info_to_display = backend.request_current_info(id);
update_display(info_to_display);
} catch (std::exception& e) {
display_error(e);
}
});
}
虽然使用 std::async
将任务放到后台线程上运行,但仍然会阻塞 UI 线程等待任务完成。为了进一步避免线程阻塞,可以使用持续性(Continuation)来连接每个任务。
持续性方式
std::experimental::future<void> process_login(
std::string const& username, std::string const& password) {
return spawn_async([=]() {
return backend.authenticate_user(username, password);
}).then([](std::experimental::future<user_id> id) {
return backend.request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display) {
try {
update_display(info_to_display.get());
} catch (std::exception& e) {
display_error(e);
}
});
}
每个持续性函数都以 std::experimental::future
作为独立参数,然后使用 .get()
来获取其持有的值。这意味着异常会沿着链条传播,如果有函数抛出异常,会在调用 info_to_display.get()
时抛出,并由捕获结构处理所有异常类型。
全异步操作
为了进一步优化,可以将每个任务拆分为完全异步的操作:
std::experimental::future<void> process_login(
std::string const& username, std::string const& password) {
return backend.async_authenticate_user(username, password).then(
[](std::experimental::future<user_id> id) {
return backend.async_request_current_info(id.get());
}).then([](std::experimental::future<user_data> info_to_display) {
try {
update_display(info_to_display.get());
} catch (std::exception& e) {
display_error(e);
}
});
}
使用 backend.async_authenticate_user
返回 std::experimental::future<user_id>
而不是直接返回 user_id
,这样可以在不阻塞线程的情况下处理异步操作。
Future 展开(Future-unwrapping)
持续性支持一种精妙的特性,叫做 future 展开(Future-unwrapping)。当向 .then()
传递一个持续性函数,并且返回一个 future<some_type>
类型的值时,相应的 .then()
返回值类型也是 future<some_type>
。最终的代码可能如下所示:
return backend.async_authenticate_user(username, password).then(
[](auto id) {
return backend.async_request_current_info(id.get());
});
如果编译器支持 C++14 泛型 Lambda 表达式,可以使用 auto
替换 Lambda 表达式的参数类型,简化代码。
多个持续性对象
std::experimental::shared_future
同样支持持续性。与 std::experimental::future
不同的是,std::experimental::shared_future
对象可以具有多个持续性对象,并且持续性参数是 std::experimental::shared_future
,而不是 std::experimental::future
。
示例代码:
auto fut = spawn_async(some_function).share();
auto fut2 = fut.then([](std::experimental::shared_future<some_data> data) {
do_stuff(data);
});
auto fut3 = fut.then([](std::experimental::shared_future<some_data> data) {
return do_other_stuff(data);
});
由于调用了 .share()
,fut
是 std::experimental::shared_future
实例,因为持续性函数必须将 std::experimental::shared_future
对象作为参数。不过,持续性返回的值为 std::experimental::future
,目前这个值无法共享,所以 fut2
和 fut3
的类型都是 std::experimental::future
。
以下是经过优化排版后的4.4.5和4.4.6节内容,详细解释了如何使用 std::experimental::when_all
和 std::experimental::when_any
来等待多个 future
对象。每个部分都有详细的注释和结构化展示。
4.4.5 等待多个 future
背景
假设有很多数据需要处理,每个数据都可以单独进行处理,这是利用硬件并发的好机会。可以使用异步任务组来处理数据项,每个任务通过 future
返回处理结果。然而,需要等待所有任务完成才能得到最终的结果。逐个 future
进行收集然后再整理结果,可能会导致线程资源的浪费和频繁的上下文切换。
使用 std::async
收集结果
std::future<FinalResult> process_data(std::vector<MyData>& vec) {
size_t const chunk_size = whatever;
std::vector<std::future<ChunkResult>> results;
for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(
std::async(process_chunk, begin, begin + this_chunk_size));
begin += this_chunk_size;
}
return std::async([all_results = std::move(results)]() mutable {
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f : all_results) {
v.push_back(f.get()); // 1
}
return gather_results(v);
});
}
这段代码生成异步任务来处理结果,并在所有结果就绪时对结果进行整合。每个任务都是独立的,因此调度程序会在 f.get()
处反复唤醒,当发现有非就绪态的结果时,将再次回到休眠状态。这种方式不仅会占用线程资源,还会增加上下文切换频率,从而增加额外开销。
使用 std::experimental::when_all
收集结果
为了减少线程资源的浪费和上下文切换的频率,可以使用 std::experimental::when_all
来等待所有 future
对象就绪。
std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec) {
size_t const chunk_size = whatever;
std::vector<std::experimental::future<ChunkResult>> results;
for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
size_t const remaining_size = end - begin;
size_t const this_chunk_size = std::min(remaining_size, chunk_size);
results.push_back(spawn_async(process_chunk, begin, begin + this_chunk_size));
begin += this_chunk_size;
}
return std::experimental::when_all(results.begin(), results.end()).then(
[](std::future<std::vector<std::experimental::future<ChunkResult>>> ready_results) {
std::vector<std::experimental::future<ChunkResult>> all_results = ready_results.get();
std::vector<ChunkResult> v;
v.reserve(all_results.size());
for (auto& f : all_results) {
v.push_back(f.get()); // 2
}
return gather_results(v);
});
}
在这个例子中,when_all
函数会等待所有 future
的状态变为就绪,然后调用 .then
来处理结果。虽然 Lambda 表达式看上去与之前类似,但这里将 results
的 vector
作为参数(包装到 future
中),而不是放在捕获器中,并在之后对每个 future
使用 get()
,从而无阻塞地获得所有处理结果。
std::experimental::when_any
除了 when_all
,还有 when_any
,它也会产生一个 future
,当 future
组中任意一个为就绪态时,这个新的 future
就会被置为就绪态。这对于并发性任务是一个不错的选择。
4.4.6 使用 std::experimental::when_any
等待第一个 future
背景
假设要在一大堆数据中找到一个符合要求的值(符合条件的值可能有很多),找到任何一个即可。这种任务是可以并行的,可以多线程完成,每个任务去检查数据的一个子集。如果有线程找到了合适的值,这个线程就会设置一个标志,让其他线程停止搜索,并返回结果。
示例代码
std::experimental::future<FinalResult> find_and_process_value(std::vector<MyData>& data) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_tasks = (concurrency > 0) ? concurrency : 2;
std::vector<std::experimental::future<MyData*>> results;
auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
auto chunk_begin = data.begin();
std::shared_ptr<std::atomic<bool>> done_flag = std::make_shared<std::atomic<bool>>(false);
for (unsigned i = 0; i < num_tasks; ++i) { // 1
auto chunk_end = (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
results.push_back(spawn_async([=] { // 2
for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end); ++entry) {
if (matches_find_criteria(*entry)) {
*done_flag = true;
return &*entry;
}
}
return (MyData*)nullptr;
}));
chunk_begin = chunk_end;
}
std::shared_ptr<std::experimental::promise<FinalResult>> final_result =
std::make_shared<std::experimental::promise<FinalResult>>();
struct DoneCheck {
std::shared_ptr<std::experimental::promise<FinalResult>> final_result;
DoneCheck(std::shared_ptr<std::experimental::promise<FinalResult>> final_result_)
: final_result(std::move(final_result_)) {}
void operator()(std::experimental::future<std::experimental::when_any_result<
std::vector<std::experimental::future<MyData*>>>> results_param) {
auto results = results_param.get();
MyData* const ready_result = results.futures[results.index].get(); // 5
if (ready_result)
final_result->set_value(process_found_value(*ready_result)); // 6
else {
results.futures.erase(results.futures.begin() + results.index); // 7
if (!results.futures.empty()) {
std::experimental::when_any(results.futures.begin(), results.futures.end())
.then(std::move(*this)); // 8
} else {
final_result->set_exception(
std::make_exception_ptr(std::runtime_error("Not found"))); // 9
}
}
}
};
std::experimental::when_any(results.begin(), results.end()).then(DoneCheck(final_result)); // 3
return final_result->get_future(); // 10
}
解释
- 初始化循环:生成
num_tasks
个异步任务,每个任务执行 Lambda 表达式。 - Lambda 表达式:每个任务都有自己的
chunk_begin
和chunk_end
,并拷贝共享指针done_flag
,避免生命周期问题。 - 调用
when_any
:通过连接持续性完成任务的返回结果处理。 - 持续性操作符:当其中一个任务完成初始化,
DoneCheck
的函数操作符会被调用。 - 获取就绪的
future
:从就绪的future
中获取值。 - 处理结果:当符合条件的值被找到,可以对结果进行处理,并对最终结果进行设置。
- 丢弃就绪的
future
:从集合中删除就绪的future
。 - 递归调用
when_any
:如果还有很多future
需要检查,会产生对when_any
的再次调用。 - 异常处理:如果没有剩下任何
future
,则说明该值没有找到,在future
中存储一个异常。 - 返回最终结果:函数的返回值是一个
future
,其包含最终的结果。
其他示例
有时,future
中存储的是元组(或 when_any_result
持有一个元组),而不是 vector
:
std::experimental::future<int> f1 = spawn_async(func1);
std::experimental::future<std::string> f2 = spawn_async(func2);
std::experimental::future<double> f3 = spawn_async(func3);
std::experimental::future<
std::tuple<
std::experimental::future<int>,
std::experimental::future<std::string>,
std::experimental::future<double>>> result =
std::experimental::when_all(std::move(f1), std::move(f2), std::move(f3));
这个例子强调了 when_any
和 when_all
的重要性——可以通过容器中的任意 std::experimental::future
实例进行移动,并且通过值获取参数,因此需要显式地将 future
传入,或是传递一个临时变量。
以下是经过优化排版后的4.4.7至4.4.10节内容,详细解释了锁存器(Latch)和栅栏(Barrier)的概念及其在C++中的使用方法。每个部分都有详细的注释和结构化展示。
4.4.7 锁存器和栅栏
锁存器(Latch)
锁存器是一种同步对象,当其计数器减为0时,锁存器就处于就绪态。锁存器一旦就绪就会保持该状态,直到被销毁。因此,锁存器是用于同步一系列事件的轻量级机制。
栅栏(Barrier)
栅栏是一种可复用的同步机制,用于一组线程间的内部同步。与锁存器不同的是,栅栏要求每个线程在一个周期内只能到达栅栏一次。当所有线程都到达栅栏时,阻塞会被解除。栅栏可以复用,线程可以再次到达栅栏等待下一个周期的所有线程。
4.4.8 std::experimental::latch
:基础的锁存器类型
std::experimental::latch
声明在 <experimental/latch>
头文件中。构造 std::experimental::latch
时,将计数器的值作为构造函数的唯一参数。当等待的事件发生时,调用锁存器的 count_down
成员函数。当计数器为0时,锁存器状态变为就绪。可以调用 wait
成员函数对锁存器进行阻塞,直到锁存器处于就绪状态。如果需要检查锁存器是否就绪,可以调用 is_ready
成员函数。想要减少计数器并阻塞直至计数器为0,可以调用 count_down_and_wait
成员函数。
示例代码
#include <experimental/latch>
#include <future>
#include <vector>
void foo() {
unsigned const thread_count = ...;
std::experimental::latch done(thread_count); // 1 初始化锁存器
MyData data[thread_count];
std::vector<std::future<void>> threads;
for (unsigned i = 0; i < thread_count; ++i)
threads.push_back(std::async(std::launch::async, [&, i] { // 2 创建线程
data[i] = make_data(i);
done.count_down(); // 3 减少计数器
do_more_stuff(); // 4 线程完成其他任务
}));
done.wait(); // 5 等待锁存器就绪
process_data(data, thread_count); // 6 处理数据
} // 7 结束函数,析构future
- 初始化锁存器:使用需要等待的事件数量对
done
的构造进行初始化。 - 创建线程:使用
std::async
产生适量的线程。 - 减少计数器:每个线程生成相应的数据块后,都会对锁存器的计数器进行递减。
- 处理其他任务:线程在数据准备好之后,还可以执行其他任务。
- 等待锁存器就绪:主线程只需要等待锁存器成为就绪态即可。
- 处理数据:处理生成的数据。
需要注意的是,传递给 std::async
的 Lambda 表达式中,通过引用的方式对除了 i
之外的所有内容进行捕获,而 i
是通过值捕获的方式进行传递,以避免数据竞争和未定义行为。
4.4.9 std::experimental::barrier
:简单的栅栏
并发技术扩展规范提供了两种栅栏机制,分别为 std::experimental::barrier
和 std::experimental::flex_barrier
,前者更简单、开销更低,后者更灵活、开销较大。
假设有一组线程对某些数据进行处理。每个线程都在处理独立的任务,但在处理完当前任务之前,所有线程必须同步。这时可以使用 std::experimental::barrier
来实现这种同步。
示例代码
#include <experimental/barrier>
#include <thread>
#include <vector>
result_chunk process(data_chunk);
std::vector<data_chunk> divide_into_chunks(data_block data, unsigned num_threads);
void process_data(data_source &source, data_sink &sink) {
unsigned const concurrency = std::thread::hardware_concurrency();
unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
std::experimental::barrier sync(num_threads);
std::vector<joining_thread> threads(num_threads);
std::vector<data_chunk> chunks;
result_block result;
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) { // 6 检查数据源是否已完成
if (!i) { // 1 只有主线程执行
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
sync.arrive_and_wait(); // 2 第一个同步点
result.set_chunk(i, num_threads, process(chunks[i])); // 3 处理数据块
sync.arrive_and_wait(); // 4 第二个同步点
if (!i) { // 5 主线程写入结果
sink.write_data(std::move(result));
}
}
});
}
} // 7 结束函数,析构线程
- 初始化数据块:只有主线程(
i=0
)会执行这个步骤。 - 第一个同步点:所有线程都在等待主线程完成数据划分。
- 处理数据块:每个线程处理自己的数据块,并更新结果。
- 第二个同步点:所有线程都在等待主线程完成结果写入。
- 写入结果:只有主线程会在同步点后写入结果。
4.4.10 std::experimental::flex_barrier
— 更灵活的栅栏
std::experimental::flex_barrier
提供了一个额外的构造函数,允许传入一个完整的函数和线程数量。当所有线程都到达栅栏处时,该函数由其中一个线程运行。这不仅指定了串行代码的运行方式,还提供了一种修改下一个周期到达栅栏处线程个数的方式。
示例代码
#include <experimental/barrier>
#include <thread>
#include <vector>
auto split_source = [&] { // 1 Lambda表达式用于拆分数据
if (!source.done()) {
data_block current_block = source.get_next_data_block();
chunks = divide_into_chunks(current_block, num_threads);
}
};
split_source(); // 2 初始调用Lambda表达式
result_block result;
std::experimental::flex_barrier sync(num_threads, [&] { // 3 构造flex_barrier
sink.write_data(std::move(result));
split_source(); // 4 再次调用Lambda表达式
return -1; // 5 返回值表示线程数目不变
});
std::vector<joining_thread> threads(num_threads);
for (unsigned i = 0; i < num_threads; ++i) {
threads[i] = joining_thread([&, i] {
while (!source.done()) { // 6 检查数据源是否已完成
result.set_chunk(i, num_threads, process(chunks[i]));
sync.arrive_and_wait(); // 7 同步点
}
});
}
- Lambda表达式:用于拆分数据块。
- 初始调用:在迭代开始前调用一次。
- 构造flex_barrier:传入一个完整的函数和线程数量。
- 再次调用Lambda表达式:在所有线程到达栅栏处时,由主线程调用。
- 返回值:表示线程数目不变或改变。
主循环简化为只包含并行部分的代码,只有一个同步点。
以上内容展示了如何使用锁存器和栅栏来管理多线程程序中的同步问题。希望这些示例和解释能帮助你更好地理解和应用这些同步机制。