【C++】线程池实现

目录

  • 一、线程池简介
    • 线程池的核心组件
    • 实现步骤
  • 二、C++11实现线程池
    • 源码
  • 三、线程池源码解析
    • 1. 成员变量
    • 2. 构造函数
      • 2.1 线程初始化
      • 2.2 工作线程逻辑
    • 3. 任务提交(enqueue方法)
      • 3.1 方法签名
      • 3.2 任务封装
      • 3.3 任务入队
    • 4. 析构函数
      • 4.1 停机控制
    • 5. 关键技术点解析
      • 5.1 完美转发实现
      • 5.2 异常传播机制
      • 5.3 内存管理模型
  • 四、 性能特征分析
  • 五、 扩展优化方向
  • 六、 典型问题排查指南
  • 七、 测试用例
    • 如果这篇文章对你有所帮助,渴望获得你的一个点赞!

一、线程池简介

线程池是一种并发编程技术,通过预先创建一组线程并复用它们来执行多个任务,避免了频繁创建和销毁线程的开销。它特别适合处理大量短生命周期任务的场景(如服务器请求、并行计算)。

线程池的核心组件

1. 任务队列(Task Queue)
存储待执行的任务(通常是函数对象或可调用对象)。

2. 工作线程(Worker Threads)
一组预先创建的线程,不断从队列中取出任务并执行。

3. 同步机制
互斥锁(Mutex):保护任务队列的线程安全访问。
条件变量(Condition Variable):通知线程任务到达或线程池终止。

实现步骤

1. 初始化线程池
创建固定数量的线程,每个线程循环等待任务。

2. 提交任务
将任务包装成函数对象,加入任务队列。

3. 任务执行
工作线程从队列中取出任务并执行。

4. 终止线程池
发送停止信号,等待所有线程完成当前任务后退出。

二、C++11实现线程池

源码

#include <vector>
#include <queue>
#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>

class ThreadPool 
{
public:
    //构造函数:根据输入的线程数(默认硬件并发数)创建工作线程。
    //每个工作线程执行一个循环,不断从任务队列中取出并执行任务。
    //explicit关键字防止隐式类型转换
    explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
        : stop(false) 
    {
        if (threads == 0) 
        {
            threads = 1;
        }

        for (size_t i = 0; i < threads; ++i) 
        {
            workers.emplace_back([this] 
            {
                for (;;) 
                {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        
                        //等待条件:线程通过条件变量等待任务到来或停止信号。(CPU使用率:休眠时接近0%,仅在任务到来时唤醒)
                        //lambda表达式作为谓词,当条件(停止信号为true 或 任务队列非空)为真时,才会解除阻塞。
                        this->condition.wait(lock, [this] {
                            return (this->stop || !this->tasks.empty());
                            });

                        /* 传统忙等待:while (!(stop || !tasks.empty())) {} // 空循环消耗CPU */

                        if (this->stop && this->tasks.empty())
                        {
                            //如果线程池需要终止且任务队列为空则直接return
                            return;
                        }

                        //任务提取:从队列中取出任务并执行,使用std::move避免拷贝开销。
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    //执行任务
                    task();
                }
            });
        }
    }

    //任务提交(enqueue方法)
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type> 
    {
        using return_type = typename std::result_of<F(Args...)>::type;

        //任务封装:使用std::packaged_task包装用户任务,支持异步返回结果。
        //智能指针管理:shared_ptr确保任务对象的生命周期延续至执行完毕。
        //完美转发:通过std::forward保持参数的左值/右值特性。
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            if (stop)
            {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }  

            tasks.emplace([task]() { (*task)(); });
            /* push传入的对象需要事先构造好,再复制过去插入容器中;
               而emplace则可以自己使用构造函数所需的参数构造出对象,并直接插入容器中。
               emplace相比于push省去了复制的步骤,则使用emplace会更加节省内存。*/
        }
        condition.notify_one();
        return res;
    }

    ~ThreadPool() 
    {
        //设置stop标志,唤醒所有线程,等待任务队列清空。
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers)
        {
            worker.join();
        }
    }

private:
    std::vector<std::thread> workers;        //存储工作线程对象
    std::queue<std::function<void()>> tasks; //任务队列,存储待执行的任务

    std::mutex queue_mutex;                  //保护任务队列的互斥锁
    std::condition_variable condition;       //线程间同步的条件变量
    bool stop;                               //线程池是否停止标志
};

三、线程池源码解析

1. 成员变量

std::vector<std::thread> workers;        // 工作线程容器
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex;                  // 队列互斥锁
std::condition_variable condition;       // 条件变量
bool stop;                               // 停机标志

设计要点:

  • 采用生产者-消费者模式,任务队列作为共享资源

  • 组合使用mutex+condition_variable实现线程同步

  • vector存储线程对象便于统一管理生命周期


2. 构造函数

2.1 线程初始化

explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
    : stop(false)
{
    if (threads == 0) 
    {
        threads = 1;
    }
        
    for (size_t i = 0; i < threads; ++i) 
    {
        workers.emplace_back([this] { /* 工作线程逻辑 */ });
    }
}

设计要点:

  • explicit防止隐式类型转换(如ThreadPool pool = 4;

  • 默认使用硬件并发线程数(通过hardware_concurrency()

  • 最少创建1个线程避免空池

  • 使用emplace_back直接构造线程对象


2.2 工作线程逻辑

for (;;)
{
    std::function<void()> task;
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        condition.wait(lock, [this] {
            return stop || !tasks.empty();
        });

        if (stop && tasks.empty()) 
        {
           return; 
        }

        task = std::move(tasks.front());
        tasks.pop();
    }
    task();
}

核心机制:

  • unique_lock配合条件变量实现自动锁管理

  • 双重状态检查(停机标志+队列非空)

  • 任务提取使用移动语义避免拷贝

  • 任务执行在锁作用域外进行


3. 任务提交(enqueue方法)

3.1 方法签名

template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type>

类型推导:

  • 使用尾置返回类型声明
  • std::result_of推导可调用对象的返回类型
  • 完美转发参数(F&&+Args&&...

3.2 任务封装

auto task = std::make_shared<std::packaged_task<return_type()>>
    (std::bind(std::forward<F>(f), std::forward<Args>(args)...));

封装策略:

  • packaged_task包装任务用于异步获取结果
  • shared_ptr管理任务对象生命周期
  • std::bind绑定参数(注意C++11的参数转发限制)

3.3 任务入队

tasks.emplace([task]() { (*task)(); });

优化点:

  • 使用emplace直接构造队列元素
  • Lambda捕获shared_ptr保持任务有效性
  • 显式解引用执行packaged_task

4. 析构函数

4.1 停机控制

~ThreadPool() 
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for (auto& worker : workers)
    {
        worker.join();
    }  
}

停机协议:

  1. 设置停机标志原子操作
  2. 广播唤醒所有等待线程
  3. 等待所有工作线程退出

5. 关键技术点解析

5.1 完美转发实现

std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  • 保持参数的左右值特性
  • 支持移动语义参数的传递
  • C++11的限制:无法完美转发所有参数类型

5.2 异常传播机制

  • 任务异常通过future对象传播
  • packaged_task自动捕获异常
  • 用户通过future.get()获取异常

5.3 内存管理模型

         [任务提交者]
             |
             v
       [packaged_task] <---- shared_ptr ---- [任务队列]
             |
             v
         [future]
  • 三重生命周期保障:
    1. 提交者持有future
    2. 队列持有任务包装器
    3. 工作线程执行任务

四、 性能特征分析

1. 时间复杂度

操作时间复杂度
任务提交(enqueue)O(1)(加锁开销)
任务提取O(1)
线程唤醒取决于系统调度

2. 空间复杂度

组件空间占用
线程栈每线程MB级
任务队列与任务数成正比
同步原语固定大小

五、 扩展优化方向

1. 任务窃取(Work Stealing)

  • 实现多个任务队列
  • 空闲线程从其他队列窃取任务

2. 动态线程池

void adjust_workers(size_t new_size) 
{
    if (new_size > workers.size()) 
    {
     	// 扩容逻辑
    } 
    else 
    {
        // 缩容逻辑
    }
}

3. 优先级队列

using Task = std::pair<int, std::function<void()>>; // 优先级+任务
   std::priority_queue<Task> tasks;

4. 无锁队列

moodycamel::ConcurrentQueue<std::function<void()>> tasks;

六、 典型问题排查指南

现象可能原因解决方案
任务未执行线程池提前析构延长线程池生命周期
future.get()永久阻塞任务未提交/异常未处理检查任务提交路径
CPU利用率100%忙等待或锁竞争优化任务粒度/使用无锁结构
内存持续增长任务对象未正确释放检查智能指针使用

该实现完整展现了现代C++线程池的核心设计范式,开发者可根据具体需求在此基础进行功能扩展和性能优化。理解这个代码结构是掌握更高级并发模式的基础。

七、 测试用例

使用实例(C++11兼容):

#include <iostream>

int main() 
{
    ThreadPool pool(4);
    
    // 提交普通函数
    auto future1 = pool.enqueue([](int a, int b) {
        return a + b;
    }, 2, 3);
    
    // 提交成员函数
    struct Calculator 
    {
        int multiply(int a, int b) 
        { 
            return a * b; 
        }
    } calc;
    auto future2 = pool.enqueue(std::bind(&Calculator::multiply, &calc, 
                                        std::placeholders::_1, 
                                        std::placeholders::_2), 4, 5);
    
    // 异常处理示例
    auto future3 = pool.enqueue([]() -> int {
        throw std::runtime_error("example error");
        return 1;
    });
    
    std::cout << "2+3=" << future1.get() << std::endl;
    std::cout << "4*5=" << future2.get() << std::endl;
    
    try 
    {
        future3.get();
    } 
    catch(const std::exception& e)
    {
        std::cout << "Caught exception: " << e.what() << std::endl;
    }
    
    return 0;
}

如果这篇文章对你有所帮助,渴望获得你的一个点赞!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/964208.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[EAI-023] FAST,机器人动作专用的Tokenizer,提高VLA模型的能力和训练效率

Paper Card 论文标题&#xff1a;FAST: Efficient Action Tokenization for Vision-Language-Action Models 论文作者&#xff1a;Karl Pertsch, Kyle Stachowicz, Brian Ichter, Danny Driess, Suraj Nair, Quan Vuong, Oier Mees, Chelsea Finn, Sergey Levine 论文链接&…

介绍一下Mybatis的底层原理(包括一二级缓存)

表面上我们的就是Sql语句和我们的java对象进行映射&#xff0c;然后Mapper代理然后调用方法来操作数据库 底层的话我们就涉及到Sqlsession和Configuration 首先说一下SqlSession&#xff0c; 它可以被视为与数据库交互的一个会话&#xff0c;用于执行 SQL 语句&#xff08;Ex…

wx050基于django+vue+uniapp的傣族节日及民间故事推广小程序

开发语言&#xff1a;Python框架&#xff1a;djangouniappPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 后台登录界面 管理员主界面 用户管理 …

hot100(6)

51.22.括号生成 字符串回溯的典型问题 char[] path;List<String> res;int n;public List<String> generateParenthesis(int n) {this.n n;path new char[2*n];res new ArrayList<>();dfs(0,0,0);return res;}public void dfs(int index,int left, int r…

【游戏设计原理】98 - 时间膨胀

从上文中&#xff0c;我们可以得到以下几个启示&#xff1a; 游戏设计的核心目标是让玩家感到“时间飞逝” 游戏的成功与否&#xff0c;往往取决于玩家的沉浸感。如果玩家能够完全投入游戏并感受到时间飞逝&#xff0c;说明游戏设计在玩法、挑战、叙事等方面达到了吸引人的平衡…

RocketMQ面试题:进阶部分

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…

Deepseek-R1 和 OpenAI o1 这样的推理模型普遍存在“思考不足”的问题

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

结构性多余到结构性消失的现象和案例

在碎片化的现象和案例中提取关联性的信息。 也就是废墟之上如何重生的问题。 碎片化无处不在&#xff0c;普通人无人可以幸免。 当AI能力越来越强大&#xff0c;如下这些都在变为现实。 生产力 98%的人是过剩劳动力 人在大规模地被废弃 当人是生产力主体的时候&#xff0c;如…

(脚本学习)BUU18 [CISCN2019 华北赛区 Day2 Web1]Hack World1

自用 题目 考虑是不是布尔盲注&#xff0c;如何测试&#xff1a;用"1^1^11 1^0^10&#xff0c;就像是真真真等于真&#xff0c;真假真等于假"这个测试 SQL布尔盲注脚本1 import requestsurl "http://8e4a9bf2-c055-4680-91fd-5b969ebc209e.node5.buuoj.cn…

【C++】P1957 口算练习题

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述输入格式&#xff1a;输出格式&#xff1a; &#x1f4af;我的做法代码实现&#xff1a; &#x1f4af;老师的做法代码实现&#xff1a; &#x1f4af;对比分析&am…

【Linux系统】信号:再谈OS与内核区、信号捕捉、重入函数与 volatile

再谈操作系统与内核区 1、浅谈虚拟机和操作系统映射于地址空间的作用 我们调用任何函数&#xff08;无论是库函数还是系统调用&#xff09;&#xff0c;都是在各自进程的地址空间中执行的。无论操作系统如何切换进程&#xff0c;它都能确保访问同一个操作系统实例。换句话说&am…

LabVIEW双光子成像系统:自主创新,精准成像,赋能科研

双光子成像系统&#xff1a;自主创新&#xff0c;精准成像&#xff0c;赋能科研 第一部分&#xff1a;概述 双光子成像利用两个低能量光子同时激发荧光分子&#xff0c;具有深层穿透、高分辨率、低光损伤等优势。它能实现活体深层组织的成像&#xff0c;支持实时动态观察&…

「全网最细 + 实战源码案例」设计模式——策略模式

核心思想 享元模式&#xff08;Flyweight Pattern&#xff09;是一种行为型设计模式&#xff0c;用于定义一系列算法或策略&#xff0c;将它们封装成独立的类&#xff0c;并使它们可以相互替换&#xff0c;而不影响客户端的代码&#xff0c;提高代码的可维护性和扩展性。 结构…

安全策略实验

安全策略实验 1.拓扑图 2.需求分析 需求&#xff1a; 1.VLAN 2属于办公区&#xff0c;VLAN 3属于生产区 2.办公区PC在工作日时间&#xff08;周一至周五&#xff0c;早8到晚6&#xff09;可以正常访问OA server其他时间不允许 3.办公区PC可以在任意时刻访问Web Server 4.生产…

一文了解边缘计算

什么是边缘计算&#xff1f; 我们可以通过一个最简单的例子来理解它&#xff0c;它就像一个司令员&#xff0c;身在离炮火最近的前线&#xff0c;汇集现场所有的实时信息&#xff0c;经过分析并做出决策&#xff0c;及时果断而不拖延。 1.什么是边缘计算&#xff1f; 边缘计算…

对象的实例化、内存布局与访问定位

一、创建对象的方式 二、创建对象的步骤: 一、判断对象对应的类是否加载、链接、初始化: 虚拟机遇到一条new指令&#xff0c;首先去检查这个指令的参数能否在Metaspace的常量池中定位到一个类的符号引用&#xff0c;并且检查这个符号引用代表的类是否已经被加载、解析和初始化…

Altium Designer绘制原理图时画斜线的方法

第一步&#xff1a;检查设置是否正确 打开preferences->PCB Editor ->Interactive Routing->Interactive Routing Options->Restrict TO 90/45去掉勾选项&#xff0c;点击OK即可。如下图所示&#xff1a; 然后在划线时&#xff0c;按下shift空格就能够切换划线…

【R语言】环境空间

一、环境空间的特点 环境空间是一种特殊类型的变量&#xff0c;它可以像其它变量一样被分配和操作&#xff0c;还可以以参数的形式传递给函数。 R语言中环境空间具有如下3个特点&#xff1a; 1、对象名称唯一性 此特点指的是在不同的环境空间中可以有同名的变量出现&#x…

NeuralCF 模型:神经网络协同过滤模型

实验和完整代码 完整代码实现和jupyter运行&#xff1a;https://github.com/Myolive-Lin/RecSys--deep-learning-recommendation-system/tree/main 引言 NeuralCF 模型由新加坡国立大学研究人员于 2017 年提出&#xff0c;其核心思想在于将传统协同过滤方法与深度学习技术相结…

【ChatGPT:开启人工智能新纪元】

一、ChatGPT 是什么 最近,ChatGPT 可是火得一塌糊涂,不管是在科技圈、媒体界,还是咱们普通人的日常聊天里,都能听到它的大名。好多人都在讨论,这 ChatGPT 到底是个啥 “神器”,能让大家这么着迷?今天咱就好好唠唠。 ChatGPT,全称是 Chat Generative Pre-trained Trans…