C++并发编程指南06

文章目录

      • 4.4 简化代码与同步工具
        • 同步工具作为构建块
      • 4.4.1 使用Future的函数化编程
        • 函数化编程简介
        • C++支持函数化编程
      • 快速排序 - FP模式
        • 快速排序串行版
        • 快速排序并行版
      • spawn_task函数
      • 结论
      • 快速排序 - 串行版
      • 快速排序 - 并行版
      • `spawn_task`函数
      • 使用 `spawn_task` 实现并行快速排序
      • 详细解释
      • 使用示例
      • 总结
      • 4.4.2 使用消息传递的同步操作
        • CSP(Communicating Sequential Processes)概念简介
        • 消息队列与C++线程
        • ATM机应用示例
        • ATM机逻辑的状态机模型
          • 状态机类的实现
          • `getting_pin` 状态函数实现
        • 状态机的工作流程
        • 总结
      • 4.4.3 扩展规范中的持续性并发
        • 新类型 `std::experimental::future` 和 `std::experimental::promise`
          • 示例代码
          • 使用 `std::experimental::promise` 实现与 `std::async` 等价的功能
      • 4.4.4 持续性连接
        • 同步方式
        • 异步方式
        • 持续性方式
        • 全异步操作
        • Future 展开(Future-unwrapping)
        • 多个持续性对象
      • 4.4.5 等待多个 `future`
        • 背景
          • 使用 `std::async` 收集结果
          • 使用 `std::experimental::when_all` 收集结果
        • `std::experimental::when_any`
      • 4.4.6 使用 `std::experimental::when_any` 等待第一个 `future`
        • 背景
          • 示例代码
        • 解释
        • 其他示例
      • 4.4.7 锁存器和栅栏
        • 锁存器(Latch)
        • 栅栏(Barrier)
      • 4.4.8 `std::experimental::latch`:基础的锁存器类型
          • 示例代码
      • 4.4.9 `std::experimental::barrier`:简单的栅栏
          • 示例代码
      • 4.4.10 `std::experimental::flex_barrier` — 更灵活的栅栏
          • 示例代码


4.4 简化代码与同步工具

同步工具作为构建块

在本章中,同步工具被称为构建块。关注同步操作而非具体机制。当程序需要并发时,提供更多的函数化方式简化代码。相比多个线程间共享数据,每个任务最好拥有自己的数据,并且其他线程可以使用future获取运行结果。

4.4.1 使用Future的函数化编程

函数化编程简介

函数化编程(Functional Programming, FP)是一种编程范式,其中函数的结果仅依赖于传入的参数,无副作用。C++标准库中的数学相关函数如sin, cos, 和 sqrt都具有这种特性。纯粹的函数不会改变任何外部状态,限制了函数的返回值。

C++支持函数化编程
  • Lambda表达式:C++11引入了Lambda表达式,使得匿名函数的定义更加简便。
  • 自动类型推断:通过auto关键字,编译器能够自动推断变量类型。
  • std::bind:用于绑定函数和参数,创建新的可调用对象。

快速排序 - FP模式

在这里插入图片描述

快速排序串行版
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
    if(input.empty())
    {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const& pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(),
                                       [&](T const& t){ return t < pivot; });

    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

    auto new_lower = sequential_quick_sort(std::move(lower_part));
    auto new_higher = sequential_quick_sort(std::move(input));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower);
    return result;
}
  • 关键点
    • 使用splice()移动元素。
    • std::partition分割列表。
    • 递归调用自身进行排序。
快速排序并行版
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if(input.empty())
    {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const& pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(),
                                       [&](T const& t){ return t < pivot; });

    std::list<T> lower_part;
    lower_point.splice(lower_part.end(), input, input.begin(), divide_point);

    std::future<std::list<T>> new_lower(
        std::async(&parallel_quick_sort<T>, std::move(lower_part)));
    auto new_higher = parallel_quick_sort(std::move(input));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower.get());
    return result;
}
  • 改进点
    • 利用std::async并发执行排序任务。
    • 使用std::future等待异步任务完成。

spawn_task函数

一个简单的封装函数,用于创建和启动包含std::packaged_task的任务,并返回其结果的std::future对象。

template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a)
{
    typedef std::result_of<F(A&&)>::type result_type;
    std::packaged_task<result_type(A&&)> task(std::move(f));
    std::future<result_type> res(task.get_future());
    std::thread t(std::move(task), std::move(a));
    t.detach();
    return res;
}

结论

  • FP在并发中的应用:提供了一种避免共享状态和竞争条件的方法,简化了并发编程。
  • 扩展性:讨论了如何进一步优化并行快速排序算法,如利用C++17中的并行算法。

好的,以下是带有详细注释的快速排序算法(串行版和并行版)以及spawn_task函数的完整代码。每个代码块上方都有详细的注释解释其功能和作用。

快速排序 - 串行版

// 模板函数:实现串行版本的快速排序
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
    // 如果输入列表为空,则直接返回
    if(input.empty())
    {
        return input;
    }
    
    std::list<T> result;  // 存储排序结果的列表
    // 将输入列表的第一个元素移动到结果列表中作为“中间”值
    result.splice(result.begin(), input, input.begin());
    T const& pivot = *result.begin();  // 获取“中间”值

    // 使用std::partition将列表分成两部分:小于pivot的部分和大于等于pivot的部分
    auto divide_point = std::partition(input.begin(), input.end(),
                                       [&](T const& t){ return t < pivot; });

    std::list<T> lower_part;  // 存储小于pivot的部分
    // 将小于pivot的部分从input列表移动到lower_part列表
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

    // 递归调用自身对lower_part进行排序
    auto new_lower = sequential_quick_sort(std::move(lower_part));
    // 递归调用自身对input列表(此时为大于等于pivot的部分)进行排序
    auto new_higher = sequential_quick_sort(std::move(input));

    // 将new_higher的结果拼接到result列表的末尾
    result.splice(result.end(), new_higher);
    // 将new_lower的结果拼接到result列表的开头
    result.splice(result.begin(), new_lower);
    return result;
}

快速排序 - 并行版

// 模板函数:实现并行版本的快速排序
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    // 如果输入列表为空,则直接返回
    if(input.empty())
    {
        return input;
    }
    
    std::list<T> result;  // 存储排序结果的列表
    // 将输入列表的第一个元素移动到结果列表中作为“中间”值
    result.splice(result.begin(), input, input.begin());
    T const& pivot = *result.begin();  // 获取“中间”值

    // 使用std::partition将列表分成两部分:小于pivot的部分和大于等于pivot的部分
    auto divide_point = std::partition(input.begin(), input.end(),
                                       [&](T const& t){ return t < pivot; });

    std::list<T> lower_part;  // 存储小于pivot的部分
    // 将小于pivot的部分从input列表移动到lower_part列表
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

    // 使用std::async异步调用parallel_quick_sort对lower_part进行排序,并获取future对象
    std::future<std::list<T>> new_lower(
        std::async(&parallel_quick_sort<T>, std::move(lower_part)));
    // 递归调用自身对input列表(此时为大于等于pivot的部分)进行排序
    auto new_higher = parallel_quick_sort(std::move(input));

    // 将new_higher的结果拼接到result列表的末尾
    result.splice(result.end(), new_higher);
    // 等待lower_part的异步任务完成,并将结果拼接到result列表的开头
    result.splice(result.begin(), new_lower.get());
    return result;
}

spawn_task函数

// 模板函数:封装std::packaged_task和std::thread,创建并发任务
template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a)
{
    // 定义结果类型
    typedef std::result_of<F(A&&)>::type result_type;
    // 创建一个std::packaged_task对象,包装传入的函数f
    std::packaged_task<result_type(A&&)> task(std::move(f));
    // 获取future对象,用于获取任务的结果
    std::future<result_type> res(task.get_future());
    // 创建线程执行任务,并分离线程
    std::thread t(std::move(task), std::move(a));
    t.detach();
    return res;
}

这些注释详细解释了每一行代码的作用和逻辑,帮助你更好地理解快速排序算法的串行和并行实现,以及如何使用C++标准库中的并发工具来简化并发编程。希望这对你的学习有所帮助!

在你提供的代码示例中,spawn_task 函数并没有直接被使用。它是一个辅助函数,展示了如何通过封装 std::packaged_taskstd::thread 来创建和启动并发任务,并返回一个 std::future 对象以获取任务的结果。这个函数可以用于任何需要异步执行任务并获取结果的场景。

尽管 spawn_task 没有在快速排序的示例中直接使用,但我们可以展示如何将其集成到 parallel_quick_sort 函数中,以替代 std::async。下面是一个修改后的版本,展示了如何使用 spawn_task 函数来实现并行快速排序:

使用 spawn_task 实现并行快速排序

#include <future>
#include <list>
#include <thread>
#include <algorithm>

// spawn_task 函数:封装 std::packaged_task 和 std::thread,创建并发任务
template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f, A&& a)
{
    typedef std::result_of<F(A&&)>::type result_type;
    std::packaged_task<result_type(A&&)> task(std::move(f));
    std::future<result_type> res(task.get_future());
    std::thread t(std::move(task), std::move(a));
    t.detach();
    return res;
}

// 并行版快速排序
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if(input.empty())
    {
        return input;
    }
    
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const& pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(),
                                       [&](T const& t){ return t < pivot; });

    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

    // 使用 spawn_task 替代 std::async
    auto new_lower_future = spawn_task(&parallel_quick_sort<T>, std::move(lower_part));

    auto new_higher = parallel_quick_sort(std::move(input));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower_future.get());

    return result;
}

详细解释

  1. spawn_task 函数

    • 这个函数接受一个可调用对象 f 和参数 a,并创建一个 std::packaged_task 对象。
    • 它返回一个 std::future 对象,该对象可以在未来获取任务的结果。
    • 线程 t 执行任务并分离(detach),这意味着主线程不会等待它完成。
  2. parallel_quick_sort 函数

    • 在分割列表后,我们使用 spawn_task 函数来异步调用 parallel_quick_sortlower_part 列表进行排序。
    • new_lower_future 是一个 std::future 对象,它将在稍后调用 .get() 方法时阻塞,直到异步任务完成并返回结果。
    • 主线程继续递归调用 parallel_quick_sortinput 列表(此时为大于等于 pivot 的部分)进行排序。
    • 最后,将两个排序好的子列表拼接在一起,并返回最终结果。

使用示例

以下是如何调用 parallel_quick_sort 函数的一个简单示例:

int main() {
    std::list<int> numbers = {3, 6, 8, 10, 1, 2, 1};
    std::list<int> sorted_numbers = parallel_quick_sort(numbers);
    
    for (const auto& num : sorted_numbers) {
        std::cout << num << " ";
    }
    std::cout << std::endl;

    return 0;
}

总结

虽然 spawn_task 函数没有直接出现在原始的快速排序示例中,但它提供了一种替代 std::async 的方法来实现异步任务。通过这种方式,你可以更灵活地控制线程的创建和管理,尤其是在需要自定义线程池或更复杂的任务调度机制时。希望这能帮助你更好地理解如何使用 spawn_task 函数及其应用场景。

以下是经过优化排版后的4.4.2节内容,详细解释了使用消息传递的同步操作,并展示了如何实现一个简单的ATM机逻辑。每个部分都有详细的注释和结构化展示。


4.4.2 使用消息传递的同步操作

CSP(Communicating Sequential Processes)概念简介

CSP的核心思想是:在没有共享数据的情况下,每个线程可以基于接收到的信息独立运行。每个线程都可以视为一个状态机:当接收到一条信息时,会以某种方式更新其状态,并可能向其他线程发送信息。这种模式依赖于明确的行为要求和专业的编程团队,但可以使编程变得更加简单和清晰。

消息队列与C++线程

尽管C++线程共享同一块地址空间,无法直接达到真正的消息传递要求,但我们可以通过一些约定来确保线程间不共享数据。例如,所有消息都通过消息队列传递,而消息队列本身需要共享,具体的细节应包含在库中。

ATM机应用示例

在这里插入图片描述

假设我们要为实现自动取款机(ATM)编写一个应用。这个应用需要处理以下任务:

  • 处理用户插入卡片并与银行交互。
  • 控制机械接受用户的卡片、显示适当的信息、处理按钮事件、吐出现金以及退还卡。

一种处理方法是将所有任务分配到三个独立的线程上:

  1. 物理机械线程:处理卡片插入、现金吐出等物理操作。
  2. ATM逻辑线程:处理ATM机的业务逻辑。
  3. 银行通讯线程:处理与银行的通信。

这些线程之间不共享任何数据,而是通过消息队列进行通信。例如,当用户插入卡片或按下按钮时,物理机械线程会发送一条消息给逻辑线程,逻辑线程再发送消息给物理机械线程,指示其执行相应操作。

ATM机逻辑的状态机模型

ATM机的逻辑可以建模为一个状态机,每个状态等待特定的消息并根据消息类型更新状态。图4.3展示了一个简化的状态机模型。

状态机类的实现
struct card_inserted {
    std::string account;
};

class atm {
    messaging::receiver incoming;  // 接收消息的队列
    messaging::sender bank;        // 发送消息给银行的队列
    messaging::sender interface_hardware;  // 发送消息给硬件接口的队列
    void (atm::*state)();  // 当前状态函数指针

    std::string account;  // 当前账户
    std::string pin;      // 当前PIN码

public:
    void waiting_for_card() {  // 等待卡片插入的状态
        interface_hardware.send(display_enter_card());  // 显示“请插入卡片”
        incoming.wait().handle<card_inserted>(
            [&](card_inserted const& msg) {  // 处理卡片插入消息
                account = msg.account;
                pin = "";
                interface_hardware.send(display_enter_pin());
                state = &atm::getting_pin;  // 更新状态为获取PIN码
            }
        );
    }

    void getting_pin();  // 获取PIN码的状态

    void run() {  // 主循环函数
        state = &atm::waiting_for_card;  // 初始化状态为等待卡片插入
        try {
            for (;;) {
                (this->*state)();  // 反复调用当前状态函数
            }
        } catch (messaging::close_queue const&) {
            // 处理队列关闭的情况
        }
    }
};
getting_pin 状态函数实现
void atm::getting_pin() {
    incoming.wait()
        .handle<digit_pressed>(  // 处理数字按键消息
            [&](digit_pressed const& msg) {
                unsigned const pin_length = 4;
                pin += msg.digit;
                if (pin.length() == pin_length) {
                    bank.send(verify_pin(account, pin, incoming));  // 验证PIN码
                    state = &atm::verifying_pin;  // 更新状态为验证PIN码
                }
            }
        )
        .handle<clear_last_pressed>(  // 处理清除最后一个数字按键消息
            [&](clear_last_pressed const& msg) {
                if (!pin.empty()) {
                    pin.resize(pin.length() - 1);
                }
            }
        )
        .handle<cancel_pressed>(  // 处理取消按键消息
            [&](cancel_pressed const& msg) {
                state = &atm::done_processing;  // 更新状态为处理完成
            }
        );
}
状态机的工作流程
  1. 初始化状态run() 函数初始化状态为 waiting_for_card
  2. 等待卡片插入
    • waiting_for_card() 函数发送一条消息让终端显示“请插入卡片”。
    • 等待接收到 card_inserted 类型的消息,处理后更新状态为 getting_pin
  3. 获取PIN码
    • getting_pin() 函数等待三种不同类型的消息:
      • digit_pressed:处理数字按键输入。
      • clear_last_pressed:处理清除最后一个数字按键。
      • cancel_pressed:处理取消交易按键。
    • 根据不同的消息类型,更新PIN码或状态。
总结

使用消息传递的方式进行同步操作,可以极大地简化并发系统的设计。每个线程都被独立对待,避免了共享数据带来的复杂性和潜在问题。这种方式被称为参与者模式(Actor model),其中各个参与者互相发送消息来执行任务,并且不会共享状态,除非通过消息直接传入。


这种排版方式使内容更加结构化,便于理解和参考。希望这对你有所帮助!

以下是经过优化排版后的4.4.3和4.4.4节内容,详细解释了扩展规范中的持续性并发以及如何使用持续性连接来处理异步任务。每个部分都有详细的注释和结构化展示。


4.4.3 扩展规范中的持续性并发

新类型 std::experimental::futurestd::experimental::promise

并发技术扩展规范在 std::experimental 命名空间中提供了新的类型 std::promisestd::packaged_task。这些类型与标准命名空间中的类型不同,其返回实例类型为 std::experimental::future,而不是 std::future。这使得用户可以体验到 std::experimental::future 的新特性——持续性(Continuation)

假设一个任务产生了一个结果,并且 future 持有这个结果。然后需要写一些代码来处理这个结果。使用 std::future 时,必须等待 future 的状态变为就绪态,或者使用全阻塞函数 wait() 或非阻塞的 wait_for()/wait_until() 成员函数进行等待。这种方式会让代码变得非常复杂。用一句话来说:“万事俱备,只等数据”,这就是持续性的意义。

为了给 future 添加持续性,只需要在其成员函数后添加 .then() 即可。例如,给定一个 future 对象 fut,添加持续性的调用即为 fut.then(continuation)

示例代码
std::experimental::future<int> find_the_answer;
auto fut = find_the_answer();
auto fut2 = fut.then(find_the_question);

assert(!fut.valid());  // 原始 future 不再有效
assert(fut2.valid());  // 新的 future 有效

当原始 future 变成就绪态时,find_the_question 持续性函数不会安排在指定的线程上运行,而是由实现者自由选择执行方式(如线程池或其他线程管理库)。这样做是为了让实现者能够基于丰富的经验选择更好的线程使用方式,并为用户提供合适的机制来控制线程。

使用 std::experimental::promise 实现与 std::async 等价的功能
template<typename Func>
std::experimental::future<decltype(std::declval<Func>()())>
spawn_async(Func&& func) {
    std::experimental::promise<
        decltype(std::declval<Func>()())> p;
    auto res = p.get_future();

    std::thread t(
        [p = std::move(p), f = std::decay_t<Func>(func)]() mutable {
            try {
                p.set_value_at_thread_exit(f());
            } catch (...) {
                p.set_exception_at_thread_exit(std::current_exception());
            }
        });
    t.detach();
    return res;
}

这段代码展示了如何使用 std::experimental::promise 获取 future 并生成新的线程运行 Lambda 表达式,该 Lambda 表达式为 promise 设置返回值或异常。

4.4.4 持续性连接

假设有一系列耗时任务要完成,并且希望使用异步多线程来减轻主线程的计算压力。例如,在用户登录应用时,需要将登录凭证发送给后台,在对身份信息进行验证后,从后台获取用户的账户信息,并使用获取到的信息更新显示。

同步方式
void process_login(std::string const& username, std::string const& password) {
    try {
        user_id const id = backend.authenticate_user(username, password);
        user_data const info_to_display = backend.request_current_info(id);
        update_display(info_to_display);
    } catch (std::exception& e) {
        display_error(e);
    }
}
异步方式
std::future<void> process_login(std::string const& username, std::string const& password) {
    return std::async(std::launch::async, [=]() {
        try {
            user_id const id = backend.authenticate_user(username, password);
            user_data const info_to_display = backend.request_current_info(id);
            update_display(info_to_display);
        } catch (std::exception& e) {
            display_error(e);
        }
    });
}

虽然使用 std::async 将任务放到后台线程上运行,但仍然会阻塞 UI 线程等待任务完成。为了进一步避免线程阻塞,可以使用持续性(Continuation)来连接每个任务。

持续性方式
std::experimental::future<void> process_login(
    std::string const& username, std::string const& password) {
    return spawn_async([=]() {
        return backend.authenticate_user(username, password);
    }).then([](std::experimental::future<user_id> id) {
        return backend.request_current_info(id.get());
    }).then([](std::experimental::future<user_data> info_to_display) {
        try {
            update_display(info_to_display.get());
        } catch (std::exception& e) {
            display_error(e);
        }
    });
}

每个持续性函数都以 std::experimental::future 作为独立参数,然后使用 .get() 来获取其持有的值。这意味着异常会沿着链条传播,如果有函数抛出异常,会在调用 info_to_display.get() 时抛出,并由捕获结构处理所有异常类型。

全异步操作

为了进一步优化,可以将每个任务拆分为完全异步的操作:

std::experimental::future<void> process_login(
    std::string const& username, std::string const& password) {
    return backend.async_authenticate_user(username, password).then(
        [](std::experimental::future<user_id> id) {
            return backend.async_request_current_info(id.get());
        }).then([](std::experimental::future<user_data> info_to_display) {
            try {
                update_display(info_to_display.get());
            } catch (std::exception& e) {
                display_error(e);
            }
        });
}

使用 backend.async_authenticate_user 返回 std::experimental::future<user_id> 而不是直接返回 user_id,这样可以在不阻塞线程的情况下处理异步操作。

Future 展开(Future-unwrapping)

持续性支持一种精妙的特性,叫做 future 展开(Future-unwrapping)。当向 .then() 传递一个持续性函数,并且返回一个 future<some_type> 类型的值时,相应的 .then() 返回值类型也是 future<some_type>。最终的代码可能如下所示:

return backend.async_authenticate_user(username, password).then(
    [](auto id) {
        return backend.async_request_current_info(id.get());
    });

如果编译器支持 C++14 泛型 Lambda 表达式,可以使用 auto 替换 Lambda 表达式的参数类型,简化代码。

多个持续性对象

std::experimental::shared_future 同样支持持续性。与 std::experimental::future 不同的是,std::experimental::shared_future 对象可以具有多个持续性对象,并且持续性参数是 std::experimental::shared_future,而不是 std::experimental::future

示例代码:

auto fut = spawn_async(some_function).share();
auto fut2 = fut.then([](std::experimental::shared_future<some_data> data) {
    do_stuff(data);
});
auto fut3 = fut.then([](std::experimental::shared_future<some_data> data) {
    return do_other_stuff(data);
});

由于调用了 .share()futstd::experimental::shared_future 实例,因为持续性函数必须将 std::experimental::shared_future 对象作为参数。不过,持续性返回的值为 std::experimental::future,目前这个值无法共享,所以 fut2fut3 的类型都是 std::experimental::future


以下是经过优化排版后的4.4.5和4.4.6节内容,详细解释了如何使用 std::experimental::when_allstd::experimental::when_any 来等待多个 future 对象。每个部分都有详细的注释和结构化展示。


4.4.5 等待多个 future

背景

假设有很多数据需要处理,每个数据都可以单独进行处理,这是利用硬件并发的好机会。可以使用异步任务组来处理数据项,每个任务通过 future 返回处理结果。然而,需要等待所有任务完成才能得到最终的结果。逐个 future 进行收集然后再整理结果,可能会导致线程资源的浪费和频繁的上下文切换。

使用 std::async 收集结果
std::future<FinalResult> process_data(std::vector<MyData>& vec) {
    size_t const chunk_size = whatever;
    std::vector<std::future<ChunkResult>> results;

    for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
        size_t const remaining_size = end - begin;
        size_t const this_chunk_size = std::min(remaining_size, chunk_size);
        results.push_back(
            std::async(process_chunk, begin, begin + this_chunk_size));
        begin += this_chunk_size;
    }

    return std::async([all_results = std::move(results)]() mutable {
        std::vector<ChunkResult> v;
        v.reserve(all_results.size());
        for (auto& f : all_results) {
            v.push_back(f.get());  // 1
        }
        return gather_results(v);
    });
}

这段代码生成异步任务来处理结果,并在所有结果就绪时对结果进行整合。每个任务都是独立的,因此调度程序会在 f.get() 处反复唤醒,当发现有非就绪态的结果时,将再次回到休眠状态。这种方式不仅会占用线程资源,还会增加上下文切换频率,从而增加额外开销。

使用 std::experimental::when_all 收集结果

为了减少线程资源的浪费和上下文切换的频率,可以使用 std::experimental::when_all 来等待所有 future 对象就绪。

std::experimental::future<FinalResult> process_data(std::vector<MyData>& vec) {
    size_t const chunk_size = whatever;
    std::vector<std::experimental::future<ChunkResult>> results;

    for (auto begin = vec.begin(), end = vec.end(); begin != end;) {
        size_t const remaining_size = end - begin;
        size_t const this_chunk_size = std::min(remaining_size, chunk_size);
        results.push_back(spawn_async(process_chunk, begin, begin + this_chunk_size));
        begin += this_chunk_size;
    }

    return std::experimental::when_all(results.begin(), results.end()).then(
        [](std::future<std::vector<std::experimental::future<ChunkResult>>> ready_results) {
            std::vector<std::experimental::future<ChunkResult>> all_results = ready_results.get();
            std::vector<ChunkResult> v;
            v.reserve(all_results.size());
            for (auto& f : all_results) {
                v.push_back(f.get());  // 2
            }
            return gather_results(v);
        });
}

在这个例子中,when_all 函数会等待所有 future 的状态变为就绪,然后调用 .then 来处理结果。虽然 Lambda 表达式看上去与之前类似,但这里将 resultsvector 作为参数(包装到 future 中),而不是放在捕获器中,并在之后对每个 future 使用 get(),从而无阻塞地获得所有处理结果。

std::experimental::when_any

除了 when_all,还有 when_any,它也会产生一个 future,当 future 组中任意一个为就绪态时,这个新的 future 就会被置为就绪态。这对于并发性任务是一个不错的选择。


4.4.6 使用 std::experimental::when_any 等待第一个 future

背景

假设要在一大堆数据中找到一个符合要求的值(符合条件的值可能有很多),找到任何一个即可。这种任务是可以并行的,可以多线程完成,每个任务去检查数据的一个子集。如果有线程找到了合适的值,这个线程就会设置一个标志,让其他线程停止搜索,并返回结果。

示例代码
std::experimental::future<FinalResult> find_and_process_value(std::vector<MyData>& data) {
    unsigned const concurrency = std::thread::hardware_concurrency();
    unsigned const num_tasks = (concurrency > 0) ? concurrency : 2;
    std::vector<std::experimental::future<MyData*>> results;
    auto const chunk_size = (data.size() + num_tasks - 1) / num_tasks;
    auto chunk_begin = data.begin();

    std::shared_ptr<std::atomic<bool>> done_flag = std::make_shared<std::atomic<bool>>(false);

    for (unsigned i = 0; i < num_tasks; ++i) {  // 1
        auto chunk_end = (i < (num_tasks - 1)) ? chunk_begin + chunk_size : data.end();
        results.push_back(spawn_async([=] {  // 2
            for (auto entry = chunk_begin; !*done_flag && (entry != chunk_end); ++entry) {
                if (matches_find_criteria(*entry)) {
                    *done_flag = true;
                    return &*entry;
                }
            }
            return (MyData*)nullptr;
        }));
        chunk_begin = chunk_end;
    }

    std::shared_ptr<std::experimental::promise<FinalResult>> final_result =
        std::make_shared<std::experimental::promise<FinalResult>>();

    struct DoneCheck {
        std::shared_ptr<std::experimental::promise<FinalResult>> final_result;

        DoneCheck(std::shared_ptr<std::experimental::promise<FinalResult>> final_result_)
            : final_result(std::move(final_result_)) {}

        void operator()(std::experimental::future<std::experimental::when_any_result<
                           std::vector<std::experimental::future<MyData*>>>> results_param) {
            auto results = results_param.get();
            MyData* const ready_result = results.futures[results.index].get();  // 5
            if (ready_result)
                final_result->set_value(process_found_value(*ready_result));  // 6
            else {
                results.futures.erase(results.futures.begin() + results.index);  // 7
                if (!results.futures.empty()) {
                    std::experimental::when_any(results.futures.begin(), results.futures.end())
                        .then(std::move(*this));  // 8
                } else {
                    final_result->set_exception(
                        std::make_exception_ptr(std::runtime_error("Not found")));  // 9
                }
            }
        }
    };

    std::experimental::when_any(results.begin(), results.end()).then(DoneCheck(final_result));  // 3
    return final_result->get_future();  // 10
}
解释
  1. 初始化循环:生成 num_tasks 个异步任务,每个任务执行 Lambda 表达式。
  2. Lambda 表达式:每个任务都有自己的 chunk_beginchunk_end,并拷贝共享指针 done_flag,避免生命周期问题。
  3. 调用 when_any:通过连接持续性完成任务的返回结果处理。
  4. 持续性操作符:当其中一个任务完成初始化,DoneCheck 的函数操作符会被调用。
  5. 获取就绪的 future:从就绪的 future 中获取值。
  6. 处理结果:当符合条件的值被找到,可以对结果进行处理,并对最终结果进行设置。
  7. 丢弃就绪的 future:从集合中删除就绪的 future
  8. 递归调用 when_any:如果还有很多 future 需要检查,会产生对 when_any 的再次调用。
  9. 异常处理:如果没有剩下任何 future,则说明该值没有找到,在 future 中存储一个异常。
  10. 返回最终结果:函数的返回值是一个 future,其包含最终的结果。
其他示例

有时,future 中存储的是元组(或 when_any_result 持有一个元组),而不是 vector

std::experimental::future<int> f1 = spawn_async(func1);
std::experimental::future<std::string> f2 = spawn_async(func2);
std::experimental::future<double> f3 = spawn_async(func3);

std::experimental::future<
    std::tuple<
        std::experimental::future<int>,
        std::experimental::future<std::string>,
        std::experimental::future<double>>> result =
    std::experimental::when_all(std::move(f1), std::move(f2), std::move(f3));

这个例子强调了 when_anywhen_all 的重要性——可以通过容器中的任意 std::experimental::future 实例进行移动,并且通过值获取参数,因此需要显式地将 future 传入,或是传递一个临时变量。


以下是经过优化排版后的4.4.7至4.4.10节内容,详细解释了锁存器(Latch)和栅栏(Barrier)的概念及其在C++中的使用方法。每个部分都有详细的注释和结构化展示。


4.4.7 锁存器和栅栏

锁存器(Latch)

锁存器是一种同步对象,当其计数器减为0时,锁存器就处于就绪态。锁存器一旦就绪就会保持该状态,直到被销毁。因此,锁存器是用于同步一系列事件的轻量级机制。

栅栏(Barrier)

栅栏是一种可复用的同步机制,用于一组线程间的内部同步。与锁存器不同的是,栅栏要求每个线程在一个周期内只能到达栅栏一次。当所有线程都到达栅栏时,阻塞会被解除。栅栏可以复用,线程可以再次到达栅栏等待下一个周期的所有线程。


4.4.8 std::experimental::latch:基础的锁存器类型

std::experimental::latch 声明在 <experimental/latch> 头文件中。构造 std::experimental::latch 时,将计数器的值作为构造函数的唯一参数。当等待的事件发生时,调用锁存器的 count_down 成员函数。当计数器为0时,锁存器状态变为就绪。可以调用 wait 成员函数对锁存器进行阻塞,直到锁存器处于就绪状态。如果需要检查锁存器是否就绪,可以调用 is_ready 成员函数。想要减少计数器并阻塞直至计数器为0,可以调用 count_down_and_wait 成员函数。

示例代码
#include <experimental/latch>
#include <future>
#include <vector>

void foo() {
    unsigned const thread_count = ...;
    std::experimental::latch done(thread_count); // 1 初始化锁存器
    MyData data[thread_count];
    std::vector<std::future<void>> threads;

    for (unsigned i = 0; i < thread_count; ++i)
        threads.push_back(std::async(std::launch::async, [&, i] { // 2 创建线程
            data[i] = make_data(i);
            done.count_down(); // 3 减少计数器
            do_more_stuff(); // 4 线程完成其他任务
        }));

    done.wait(); // 5 等待锁存器就绪
    process_data(data, thread_count); // 6 处理数据
} // 7 结束函数,析构future
  • 初始化锁存器:使用需要等待的事件数量对 done 的构造进行初始化。
  • 创建线程:使用 std::async 产生适量的线程。
  • 减少计数器:每个线程生成相应的数据块后,都会对锁存器的计数器进行递减。
  • 处理其他任务:线程在数据准备好之后,还可以执行其他任务。
  • 等待锁存器就绪:主线程只需要等待锁存器成为就绪态即可。
  • 处理数据:处理生成的数据。

需要注意的是,传递给 std::async 的 Lambda 表达式中,通过引用的方式对除了 i 之外的所有内容进行捕获,而 i 是通过值捕获的方式进行传递,以避免数据竞争和未定义行为。


4.4.9 std::experimental::barrier:简单的栅栏

并发技术扩展规范提供了两种栅栏机制,分别为 std::experimental::barrierstd::experimental::flex_barrier,前者更简单、开销更低,后者更灵活、开销较大。

假设有一组线程对某些数据进行处理。每个线程都在处理独立的任务,但在处理完当前任务之前,所有线程必须同步。这时可以使用 std::experimental::barrier 来实现这种同步。

示例代码
#include <experimental/barrier>
#include <thread>
#include <vector>

result_chunk process(data_chunk);
std::vector<data_chunk> divide_into_chunks(data_block data, unsigned num_threads);

void process_data(data_source &source, data_sink &sink) {
    unsigned const concurrency = std::thread::hardware_concurrency();
    unsigned const num_threads = (concurrency > 0) ? concurrency : 2;
    std::experimental::barrier sync(num_threads);
    std::vector<joining_thread> threads(num_threads);

    std::vector<data_chunk> chunks;
    result_block result;

    for (unsigned i = 0; i < num_threads; ++i) {
        threads[i] = joining_thread([&, i] {
            while (!source.done()) { // 6 检查数据源是否已完成
                if (!i) { // 1 只有主线程执行
                    data_block current_block = source.get_next_data_block();
                    chunks = divide_into_chunks(current_block, num_threads);
                }
                sync.arrive_and_wait(); // 2 第一个同步点
                result.set_chunk(i, num_threads, process(chunks[i])); // 3 处理数据块
                sync.arrive_and_wait(); // 4 第二个同步点
                if (!i) { // 5 主线程写入结果
                    sink.write_data(std::move(result));
                }
            }
        });
    }
} // 7 结束函数,析构线程
  • 初始化数据块:只有主线程(i=0)会执行这个步骤。
  • 第一个同步点:所有线程都在等待主线程完成数据划分。
  • 处理数据块:每个线程处理自己的数据块,并更新结果。
  • 第二个同步点:所有线程都在等待主线程完成结果写入。
  • 写入结果:只有主线程会在同步点后写入结果。

4.4.10 std::experimental::flex_barrier — 更灵活的栅栏

std::experimental::flex_barrier 提供了一个额外的构造函数,允许传入一个完整的函数和线程数量。当所有线程都到达栅栏处时,该函数由其中一个线程运行。这不仅指定了串行代码的运行方式,还提供了一种修改下一个周期到达栅栏处线程个数的方式。

示例代码
#include <experimental/barrier>
#include <thread>
#include <vector>

auto split_source = [&] { // 1 Lambda表达式用于拆分数据
    if (!source.done()) {
        data_block current_block = source.get_next_data_block();
        chunks = divide_into_chunks(current_block, num_threads);
    }
};

split_source(); // 2 初始调用Lambda表达式

result_block result;

std::experimental::flex_barrier sync(num_threads, [&] { // 3 构造flex_barrier
    sink.write_data(std::move(result));
    split_source(); // 4 再次调用Lambda表达式
    return -1; // 5 返回值表示线程数目不变
});

std::vector<joining_thread> threads(num_threads);

for (unsigned i = 0; i < num_threads; ++i) {
    threads[i] = joining_thread([&, i] {
        while (!source.done()) { // 6 检查数据源是否已完成
            result.set_chunk(i, num_threads, process(chunks[i]));
            sync.arrive_and_wait(); // 7 同步点
        }
    });
}
  • Lambda表达式:用于拆分数据块。
  • 初始调用:在迭代开始前调用一次。
  • 构造flex_barrier:传入一个完整的函数和线程数量。
  • 再次调用Lambda表达式:在所有线程到达栅栏处时,由主线程调用。
  • 返回值:表示线程数目不变或改变。

主循环简化为只包含并行部分的代码,只有一个同步点。


以上内容展示了如何使用锁存器和栅栏来管理多线程程序中的同步问题。希望这些示例和解释能帮助你更好地理解和应用这些同步机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/962031.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ios swift画中画技术尝试

继上篇&#xff1a;iOS swift 后台运行应用尝试失败-CSDN博客 为什么想到画中画&#xff0c;起初是看到后台模式里有一个picture in picture&#xff0c;去了解了后发现这个就是小窗口视频播放&#xff0c;方便用户执行多任务。看小窗口视频的同时&#xff0c;可以作其他的事情…

C++,STL 六大组件:容器、迭代器、算法、函数对象、适配器、分配器

文章目录 引言一、容器&#xff08;Containers&#xff09;主要分类 二、迭代器&#xff08;Iterators&#xff09;三、算法&#xff08;Algorithms&#xff09;四、函数对象&#xff08;Functors&#xff09;五、适配器&#xff08;Adapters&#xff09;六、分配器&#xff08…

STM32项目分享:智能鱼缸

目录 一、前言 二、项目简介 1.功能详解 2.主要器件 三、原理图设计 四、PCB硬件设计 PCB图 五、程序设计 六、实验效果 七、包含内容 项目分享 一、前言 项目成品图片&#xff1a; 哔哩哔哩视频链接&#xff1a; STM32智能鱼缸/水族箱 &#xff08;资料分享见文末…

基于MinIO的对象存储增删改查

MinIO是一个高性能的分布式对象存储服务。Python的minio库可操作MinIO&#xff0c;包括创建/列出存储桶、上传/下载/删除文件及列出文件。 查看帮助信息 minio.exe --help minio.exe server --help …

14-6-1C++STL的list

(一&#xff09;list容器的基本概念 list容器简介&#xff1a; 1.list是一个双向链表容器&#xff0c;可高效地进行插入删除元素 2.list不可以随机存取元素&#xff0c;所以不支持at.(pos)函数与[ ]操作符 &#xff08;二&#xff09;list容器头部和尾部的操作 list对象的默…

汽车网络信息安全-ISO/SAE 21434解析(中)

目录 第七章-分布式网络安全活动 1. 供应商能力评估 2. 报价 3. 网络安全职责界定 第八章-持续的网络安全活动 1. 网路安全监控 2. 网络安全事件评估 3. 漏洞分析 4. 漏洞管理 第九章-概念阶段 1. 对象定义 2. 网路安全目标 3. 网络安全概念 第十章 - 产品开发 第十…

C#分页思路:双列表数据组合返回设计思路

一、应用场景 需要分页查询&#xff08;并非全表查载入物理内存再筛选&#xff09;&#xff0c;返回列表1和列表2叠加的数据时 二、实现方式 列表1必查&#xff0c;列表2根据列表1的查询结果决定列表2的分页查询参数 三、示意图及其实现代码 1.示意图 黄色代表list1的数据&a…

YOLOv8源码修改(4)- 实现YOLOv8模型剪枝(任意YOLO模型的简单剪枝)

目录 前言 1. 需修改的源码文件 1.1添加C2f_v2模块 1.2 修改模型读取方式 1.3 增加 L1 正则约束化训练 1.4 在tensorboard上增加BN层权重和偏置参数分布的可视化 1.5 增加剪枝处理文件 2. 工程目录结构 3. 源码文件修改 3.1 添加C2f_v2模块和模型读取 3.2 添加L1正则…

【Block总结】DynamicFilter,动态滤波器降低计算复杂度,替换传统的MHSA|即插即用

论文信息 标题: FFT-based Dynamic Token Mixer for Vision 论文链接: https://arxiv.org/pdf/2303.03932 关键词: 深度学习、计算机视觉、对象检测、分割 GitHub链接: https://github.com/okojoalg/dfformer 创新点 本论文提出了一种新的标记混合器&#xff08;token mix…

设计模式Python版 原型模式

文章目录 前言一、原型模式二、原型模式示例三、原型管理器 前言 GOF设计模式分三大类&#xff1a; 创建型模式&#xff1a;关注对象的创建过程&#xff0c;包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模式。结构型模式&#xff1a;关注类和对…

一文讲解Java中的BIO、NIO、AIO之间的区别

BIO、NIO、AIO是Java中常见的三种IO模型 BIO&#xff1a;采用阻塞式I/O模型&#xff0c;线程在执行I/O操作时被阻塞&#xff0c;无法处理其他任务&#xff0c;适用于连接数比较少的场景&#xff1b;NIO&#xff1a;采用非阻塞 I/O 模型&#xff0c;线程在等待 I/O 时可执行其…

使用 postman 测试思源笔记接口

思源笔记 API 权鉴 官方文档-中文&#xff1a;https://github.com/siyuan-note/siyuan/blob/master/API_zh_CN.md 权鉴相关介绍截图&#xff1a; 对应的xxx&#xff0c;在软件中查看 如上图&#xff1a;在每次发送 API 请求时&#xff0c;需要在 Header 中添加 以下键值对&a…

AWTK 骨骼动画控件发布

Spine 是一款广泛使用的 2D 骨骼动画工具&#xff0c;专为游戏开发和动态图形设计设计。它通过基于骨骼的动画系统&#xff0c;帮助开发者创建流畅、高效的角色动画。本项目是基于 Spine 实现的 AWTK 骨骼动画控件。 代码&#xff1a;https://gitee.com/zlgopen/awtk-widget-s…

新年手搓--本地化部署DeepSeek-R1,全程实测

1.环境准备安装ollma ollma官网链接: Download Ollama on Linux ubuntu命令行安装: curl -fsSL https://ollama.com/install.sh | sh 选择运行模型,用7b模型试一下(模型也差不多5G): ollama run deepseek-r1:7b 运行qwen: ollama run qwen2.5:7b 2.为方便运行…

STM32使用VScode开发

文章目录 Makefile形式创建项目新建stm项目下载stm32cubemx新建项目IED makefile保存到本地arm gcc是编译的工具链G++配置编译Cmake +vscode +MSYS2方式bilibiliMSYS2 统一环境配置mingw32-make -> makewindows环境变量Cmake CmakeListnijia 编译输出elfCMAKE_GENERATOR查询…

【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】1.21 索引宗师:布尔索引的七重境界

1.21 索引宗师&#xff1a;布尔索引的七重境界 目录 #mermaid-svg-Iojpgw5hl0Ptb9Ti {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-Iojpgw5hl0Ptb9Ti .error-icon{fill:#552222;}#mermaid-svg-Iojpgw5hl0Ptb9Ti .…

毕业设计--具有车流量检测功能的智能交通灯设计

摘要&#xff1a; 随着21世纪机动车保有量的持续增加&#xff0c;城市交通拥堵已成为一个日益严重的问题。传统的固定绿灯时长方案导致了大量的时间浪费和交通拥堵。为解决这一问题&#xff0c;本文设计了一款智能交通灯系统&#xff0c;利用车流量检测功能和先进的算法实现了…

课题推荐:基于matlab,适用于自适应粒子滤波的应用

自适应粒子滤波&#xff08;Adaptive Particle Filter, APF&#xff09;是一种用于状态估计的有效方法&#xff0c;特别适用于非线性和非高斯系统。 文章目录 应用场景MATLAB 代码示例代码说明结果扩展说明 以下是一个基于自适应粒子滤波的简单应用示例&#xff0c;模拟一个一维…

Redis(5,jedis和spring)

在前面的学习中&#xff0c;只是学习了各种redis的操作&#xff0c;都是在redis命令行客户端操作的&#xff0c;手动执行的&#xff0c;更多的时候就是使用redis的api&#xff08;&#xff09;&#xff0c;进一步操作redis程序。 在java中实现的redis客户端有很多&#xff0c;…

基于聚类与相关性分析对马来西亚房价数据进行分析

碎碎念&#xff1a;由于最近太忙了&#xff0c;更新的比较慢&#xff0c;提前祝大家新春快乐&#xff0c;万事如意&#xff01;本数据集的下载地址&#xff0c;读者可以自行下载。 1.项目背景 本项目旨在对马来西亚房地产市场进行初步的数据分析&#xff0c;探索各州的房产市…