C++之深入解析如何实现一个线程池

一、基础概念

  • 当进行并行的任务作业操作时,线程的建立与销毁的开销是,阻碍性能进步的关键,因此线程池,由此产生。使用多个线程,无限制循环等待队列,进行计算和操作,帮助快速降低和减少性能损耗。
  • 线程池的组成:
    • 线程池管理器:初始化和创建线程,启动和停止线程,调配任务,管理线程池;
    • 工作线程:线程池中等待并执行分配的任务;
    • 任务接口:添加任务的接口,以提供工作线程调度任务的执行;
    • 任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面。

二、线程池工作情况

① 没有任务要执行,缓冲队列为空

在这里插入图片描述

② 队列中任务数量,小于等于线程池中线程任务数量

在这里插入图片描述

③ 任务数量大于线程池数量,缓冲队列未满

在这里插入图片描述

④ 任务数量大于线程池数量,缓冲队列已满

在这里插入图片描述

三、线程池的 C++ 实现

  • 线程池的主要组成有如下三个部分:
    • 任务队列(Task Quene);
    • 线程池(Thread Pool);
    • 完成队列(Completed Tasks)。

在这里插入图片描述

① 队列

  • 可以使用队列来存储工作,因为它是更合理的数据结构,我们希望以与发送它相同的顺序启动工作。但是,这个队列有点特殊,线程是连续的查询队列要求工作,当有可用的工作时,线程从队列中获取工作并执行它。如果两个线程试图同时执行相同的工作会发生什么?不难知道,程序会崩溃,为了避免这种问题,在标准 C ++ Queue 上实现了一个包装器,它使用 mutex 来限制并发访问。
  • 来看一下 SafeQueue 类的一小部分示例:
void enqueue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_queue.push(t);
}
  • 要排队做的第一件事就是锁定互斥锁以确保没有其它们人正在访问该资源,然后将元素推送到队列中,当锁超出范围时,它会自动释放,这样,使 Queue 线程安全,因此不必担心许多线程在相同的“时间”访问和/或修改它。

② 提交函数

  • 线程池最重要的方法是负责向队列添加工作的方法,不难理解它是如何工作的:
    • 接受任何参数的任何函数;
    • 立即返回“东西”以避免阻塞主线程,此返回的对象最终应包含操作的结果。
  • 完整的提交函数如下所示:
// Submit a function to be executed asynchronously by the pool template<typename F, typename...Args>

auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
    // Create a function with bounded parameters ready to execute

    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
    // Encapsulate it into a shared ptr in order to be able to copy construct // assign 

    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
    // Wrap packaged task into void function

    std::function<void()> wrapper_func = [task_ptr]() {
      (*task_ptr)(); 
    };

    // Enqueue generic wrapper function

    m_queue.enqueue(wrapperfunc);
    // Wake up one thread if its waiting

    m_conditional_lock.notify_one();
    // Return future from promise

    return task_ptr->get_future();
}
  • 队列完整源代码:
// SafeQueue.h

#pragma once

#include <mutex>

#include <queue>

// Thread safe implementation of a Queue using a std::queue

template <typename T>
class SafeQueue {
private:
  std::queue<T> m_queue; //利用模板函数构造队列

  std::mutex m_mutex;//访问互斥信号量

public:
  SafeQueue() { //空构造函数


  }

  SafeQueue(SafeQueue& other) {//拷贝构造函数

    //TODO:
  }

  ~SafeQueue() { //析构函数

  }


  bool empty() {  //队列是否为空

    std::unique_lock<std::mutex> lock(m_mutex); //互斥信号变量加锁,防止m_queue被改变

    return m_queue.empty();
  }

  int size() {
    std::unique_lock<std::mutex> lock(m_mutex); //互斥信号变量加锁,防止m_queue被改变

    return m_queue.size();
  }
//队列添加元素

  void enqueue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_queue.push(t);
  }
//队列取出元素

  bool dequeue(T& t) {
    std::unique_lock<std::mutex> lock(m_mutex); //队列加锁

    if (m_queue.empty()) {
      return false;
    }
    t = std::move(m_queue.front()); //取出队首元素,返回队首元素值,并进行右值引用

    m_queue.pop(); //弹出入队的第一个元素

    return true;
  }
};
  • 线程池完整代码:
// ThreadPool.h

#pragma once
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
#include "SafeQueue.h"

class ThreadPool {
private:
  class ThreadWorker {//内置线程工作类

  private:
    int m_id; //工作id

    ThreadPool * m_pool;//所属线程池

  public:
    //构造函数

    ThreadWorker(ThreadPool * pool, const int id) 
      : m_pool(pool), m_id(id) {
    }
    //重载`()`操作

    void operator()() {
      std::function<void()> func; //定义基础函数类func

      bool dequeued; //是否正在取出队列中元素

      //判断线程池是否关闭,没有关闭,循环提取

      while (!m_pool->m_shutdown) {
        {
          //为线程环境锁加锁,互访问工作线程的休眠和唤醒

          std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
          //如果任务队列为空,阻塞当前线程

          if (m_pool->m_queue.empty()) {
            m_pool->m_conditional_lock.wait(lock); //等待条件变量通知,开启线程

          }
          //取出任务队列中的元素

          dequeued = m_pool->m_queue.dequeue(func);
        }
        //如果成功取出,执行工作函数

        if (dequeued) {
          func();
        }
      }
    }
  };

  bool m_shutdown; //线程池是否关闭

  SafeQueue<std::function<void()>> m_queue;//执行函数安全队列,即任务队列

  std::vector<std::thread> m_threads; //工作线程队列

  std::mutex m_conditional_mutex;//线程休眠锁互斥变量

  std::condition_variable m_conditional_lock; //线程环境锁,让线程可以处于休眠或者唤醒状态

public:
    //线程池构造函数

  ThreadPool(const int n_threads)
    : m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false) {
  }

  ThreadPool(const ThreadPool &) = delete; //拷贝构造函数,并且取消默认父类构造函数

  ThreadPool(ThreadPool &&) = delete; // 拷贝构造函数,允许右值引用

  ThreadPool & operator=(const ThreadPool &) = delete; // 赋值操作

  ThreadPool & operator=(ThreadPool &&) = delete; //赋值操作

  // Inits thread pool

  void init() {
    for (int i = 0; i < m_threads.size(); ++i) {
      m_threads[i] = std::thread(ThreadWorker(this, i));//分配工作线程

    }
  }

  // Waits until threads finish their current task and shutdowns the pool

  void shutdown() {
    m_shutdown = true;
    m_conditional_lock.notify_all(); //通知 唤醒所有工作线程

    for (int i = 0; i < m_threads.size(); ++i) {
      if(m_threads[i].joinable()) { //判断线程是否正在等待

        m_threads[i].join();  //将线程加入等待队列

      }
    }
  }

  // Submit a function to be executed asynchronously by the pool
  //线程的主要工作函数,使用了后置返回类型,自动判断函数返回值

  template<typename F, typename...Args>
  auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
    // Create a function with bounded parameters ready to execute
    // 

    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);//连接函数和参数定义,特殊函数类型,避免左、右值错误

    // Encapsulate it into a shared ptr in order to be able to copy construct // assign 
    //封装获取任务对象,方便另外一个线程查看结果

    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

    // Wrap packaged task into void function
    //利用正则表达式,返回一个函数对象

    std::function<void()> wrapper_func = [task_ptr]() {
      (*task_ptr)(); 
    };

    // 队列通用安全封包函数,并压入安全队列

    m_queue.enqueue(wrapper_func);

    // 唤醒一个等待中的线程

    m_conditional_lock.notify_one();

    // 返回先前注册的任务指针

    return task_ptr->get_future();
  }
};
  • 使用样例代码:
#include <iostream>

#include <random>

#include "../include/ThreadPool.h"

std::random_device rd; //真实随机数产生器

std::mt19937 mt(rd()); //生成计算随机数mt;

std::uniform_int_distribution<int> dist(-1000, 1000);//生成-1000到1000之间的离散均匀分部数

auto rnd = std::bind(dist, mt);

//设置线程睡眠时间

void simulate_hard_computation() {
  std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}

// 添加两个数字的简单函数并打印结果

void multiply(const int a, const int b) {
  simulate_hard_computation();
  const int res = a * b;
  std::cout << a << " * " << b << " = " << res << std::endl;
}

//添加并输出结果

void multiply_output(int & out, const int a, const int b) {
  simulate_hard_computation();
  out = a * b;
  std::cout << a << " * " << b << " = " << out << std::endl;
}

// 结果返回

int multiply_return(const int a, const int b) {
  simulate_hard_computation();
  const int res = a * b;
  std::cout << a << " * " << b << " = " << res << std::endl;
  return res;
}


void example() {
  // 创建3个线程的线程池

  ThreadPool pool(3);

  // 初始化线程池

  pool.init();

  // 提交乘法操作,总共30个

  for (int i = 1; i < 3; ++i) {
    for (int j = 1; j < 10; ++j) {
      pool.submit(multiply, i, j);
    }
  }

  // 使用ref传递的输出参数提交函数

  int output_ref;
  auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);

  // 等待乘法输出完成

  future1.get();
  std::cout << "Last operation result is equals to " << output_ref << std::endl;

  // 使用return参数提交函数

  auto future2 = pool.submit(multiply_return, 5, 3);

  // 等待乘法输出完成

  int res = future2.get();
  std::cout << "Last operation result is equals to " << res << std::endl;

  //关闭线程池
  pool.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/16329.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Linux安装Mysql操作步骤详解

目录 1. 检测当前系统中是否安装了MySql数据库 2. 使用FinalShell自带的上传工具将jdk的二进制发布包上传到Linux 3. 解压并解包到/usr/local/mysql&#xff08;便于区分&#xff09; 第一步&#xff1a;将包先移动到该目录下 第二步&#xff1a;解压解包 第三步&#xff1a…

springboot的创建和使用

目录 1.springboot的优点 2.springboot项目创建 2.1使用idea创建 2. 2 ⽹⻚版创建 3.项⽬⽬录介绍和运⾏ 3.1运行项目 3.2输出hello world 4.注意事项 1.路径 2.约定大于配置 spring的诞生为了简化java程序,springboot的诞生为了简化spring程序开发 1.springboot的优点…

了解标量、向量和点积

数据科学基础数学&#xff1a;线性代数简介 了解标量、向量和点积 机器只能按着算法理解和处理数据结构存储的数字. 例如创建垃圾邮件检测器&#xff0c;则首先必须将文本数据转换为数字(通过单词嵌入)。 两个句子之间的余弦相似性 两个句子之间的余弦相似性可以通过它们的向量…

Python小姿势 - Python使用Jupyter Notebook

Python使用Jupyter Notebook Jupyter Notebook是一个开源的Web应用程序&#xff0c;可以用来创建和共享包含 live code&#xff0c;公式&#xff0c;可视化和解释性文本的文档。 安装Jupyter Notebook 首先&#xff0c;确保你安装了正确的Python版本和包管理器&#xff08;pip&…

java中的\t说明

阅读前请看一下&#xff1a;我是一个热衷于记录的人&#xff0c;每次写博客会反复研读&#xff0c;尽量不断提升博客质量。文章设置为仅粉丝可见&#xff0c;是因为写博客确实花了不少精力。希望互相进步谢谢&#xff01;&#xff01; 文章目录 阅读前请看一下&#xff1a;我是…

加载自己的图像数据集

文章目录 1 加载图像数据集2 图像预处理3 再次加载数据集4 这里还有一个问题&#xff0c;我们没有验证集5 构建DataLoader6 检查是否正确导入数据集 原文链接&#xff1a;《加载自己的图像数据集》 ​ 数据集下载链接 1 加载图像数据集 目录结构&#xff1a; 针对这种非常典型…

while语句和until语句顺便带点小实验

while语句和until语句 一、while用法二、Until循环语句三、趣味小实验猜价格的游戏&#xff08;价格是随机数&#xff09;写一个计算器脚本闲来无事去购物 一、while用法 for循环语句非常适用于列表对象无规律&#xff0c;且列表来源以固定&#xff08;如某个列表文件&#xf…

nginx配置sh脚本远程执行一键安装

背景 本地多机重复操作某些shell指令&#xff0c;分步执行&#xff0c;很耗费时间&#xff0c; 需要远程一键部署&#xff0c;傻瓜化运维&#xff0c;更为通用安装。 即参考docker通用安装 sudo curl https://get.docker.com | sh - # sudo python3 -m pip install docker-co…

Design_transformer

磁性元件设计 思路 滤波电感设计 磁芯不要饱和&#xff08;开气隙&#xff09; 考虑铜损大于铁损 谐振电感设计 磁芯不要饱和&#xff08;开气隙&#xff09; 考虑铁损大于铜损 变压器设计 磁芯不要饱和&#xff08;开气隙&#xff09; 励磁电流产生磁场 开气隙 增加了…

FreeRTOS系统学习-内核篇.01-数据结构---列表与列表项定义详解-链表节点插入实验

# 内核篇.01 列表与列表项 为什么要学列表&#xff1f;链表单向链表双向链表 FreeRTOS 中链表的实现节点节点初始化尾节点根节点链表根节点初始化将节点插入到链表的尾部将节点按照升序排列插入到链表将节点从链表删除节点带参宏小函数 链表节点插入实验实验现象 为什么要学列表…

内存优化-比glibc更快的tcmalloc

TCMalloc 是 Google 开发的内存分配器&#xff0c;在不少项目中都有使用&#xff0c;例如在 Golang 中就使用了类似的算法进行内存分配。它具有现代化内存分配器的基本特征&#xff1a;对抗内存碎片、在多核处理器能够 scale。据称&#xff0c;它的内存分配速度是 glibc2.3 中实…

vue3表单输入绑定

初识表单输入绑定 vue3可以帮助我们将vue定义的变量绑定到html表单元素上&#xff0c;并且监听到html表单元素修改值时&#xff0c;会将对应的vue定义的变量修改。 <!-- 将vue3定义的text绑定给inut元素, 当input元素发生input输入事件时, 将修改vue3定义的text --> <…

WeakMap 与 WeakSet

WeakSet WeakSet 结构与 Set 类似&#xff0c;也是不重复的值的集合。 成员都是数组和类似数组的对象&#xff0c;WeakSet 的成员只能是对象&#xff0c;而不能是其他类型的值。 若调用 add() 方法时传入了非数组和类似数组的对象的参数&#xff0c;就会抛出错误。 const b …

SpringBoot + Druid DataSource 实现监控 MySQL 性能

1 添加依赖 <properties><java.version>1.8</java.version><alibabaDruidStarter.version>1.2.11</alibabaDruidStarter.version> </properties><dependency><groupId>com.alibaba</groupId><artifactId>druid-s…

MYSQL进阶02

MYSQL进阶02 数据类型char与varchartext与blob浮点数与定点数日期类型的选择 数据类型 char与varchar char和varchar类型类似&#xff0c;都用来存储字符串&#xff0c;但是他们保存和检索的方式不同。char属于固定长度的字符类型&#xff0c;而varchar属于可变长度的字符类型…

【Java校招面试】基础知识(四)——JVM

目录 前言一、基础概念二、反射三、类加载器ClassLoader四、JVM内存模型后记 前言 本篇主要介绍Java虚拟机——JVM的相关内容。 “基础知识”是本专栏的第一个部分&#xff0c;本篇博文是第四篇博文&#xff0c;如有需要&#xff0c;可&#xff1a; 点击这里&#xff0c;返回…

营收、利润增速第一!海尔智家为何领跑?

“企业只有保持领先的能力&#xff0c;才有可能取得经济成果。” 管理学大师德鲁克曾如此强调。所谓“领先”&#xff0c;就是独一无二的、有价值的东西。利润&#xff0c;是企业在某个领域取得领先优势后&#xff0c;必然获得的回报。 这种“领先优势”&#xff0c;在各行业…

Linux基础IO【重定向及缓冲区理解】

✨个人主页&#xff1a; 北 海 &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 文章目录 &#x1f307;前言&#x1f3d9;️正文1、文件描述符1.1、先描述&#xff0c;再组织1.2、files_struct1.3、分配规则…

跨平台Office文档预览原生插件,非腾讯X5,支持离线,稳定高可用

引言 2023年4月13日零时起&#xff0c;腾讯浏览服务内核文档能力正式下线&#xff0c;要实现真正离线文档预览&#xff0c;于是有了这边文章。 前面写了多篇关于<跨平台文件在线预览解决方案>&#xff0c;不管使用pdf.js、LibreOffice&#xff0c;还是永中DCS&#xff…

单列文本数据快速导入表格

文本数据导入Excel似乎是个老生常谈&#xff0c;方法也有很多&#xff0c;例如 使用文本编辑器打开文本文件&#xff0c;拷贝粘贴到Excel然后分类Power Query中的【从文本/CSV】如下图所示。 但是这个需求略有不同&#xff0c;文本数据为单列&#xff0c;每7行数据为一组&am…