Linux多线程(三) 线程池C++实现

一、线程池原理

我们使用线程的时候就去创建一个线程,这样实现起来非常简便。但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

线程池就是为了让线程可以复用,不在线程的创建和销毁上浪费时间。

线程池主要分为三个部分

  1. 任务队列:存储需要处理的任务
  2. 工作线程:任务队列的消费者,也可以说是处理任务的线程
    • 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
    • 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
    • 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
  3. 管理线程:任务队列的生产者,也可以说是创造任务的线程
    • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
    • 当任务过多的时候, 可以适当的创建一些新的工作线程
    • 当任务过少的时候, 可以适当的销毁一些工作的线程

二、任务队列

2.1、任务

任务就是函数与参数组成的结构体。

// 定义任务结构体
using callback = void (*)(void *);
struct Task
{
    callback function;
    void *arg;
    Task() : function(nullptr), arg(nullptr){};
    Task(callback f, void *arg_f) : function(f), arg(arg_f){};
};

2.2、任务队列

任务队列是对 queue 的进一步封装

// 任务队列
class TaskQueue
{
public:
    TaskQueue()
    {
        pthread_mutex_init(&m_mutex, NULL);
    };
    ~TaskQueue()
    {
        pthread_mutex_destroy(&m_mutex);
    };

    // 添加任务
    inline void addTask(Task &task)
    {
        pthread_mutex_lock(&m_mutex);
        m_queue.push(task);
        pthread_mutex_unlock(&m_mutex);
    }

    // 添加任务
    inline void addTask(callback func, void *arg)
    {
        pthread_mutex_lock(&m_mutex);
        Task task(func, arg);
        m_queue.push(task);
        pthread_mutex_unlock(&m_mutex);
    };

    // 取出一个任务
    inline Task takeTask()
    {
        Task t;
        pthread_mutex_lock(&m_mutex);
        if (!m_queue.empty())
        {
            t = m_queue.front();
            m_queue.pop();
        }
        pthread_mutex_unlock(&m_mutex);
        return t;
    };

    // 获取当前队列中任务个数
    inline int taskNumber()
    {
        return m_queue.size();
    }

    inline bool empty() {
        return m_queue.empty();
    }

private:
    pthread_mutex_t m_mutex;  // 互斥锁
    std::queue<Task> m_queue; // 任务队列
};

三、线程池定义

class ThreadPool
{
public:
    ThreadPool(int min, int max);
    ThreadPool() : ThreadPool(5, 20) {}
    ~ThreadPool();

    // 添加任务
    void addTask(Task task);
    // 添加任务
    void addTask(callback func, void *arg);
    // 获取忙线程的个数
    int getBusyNumber();
    // 获取活着的线程个数
    int getAliveNumber();

private:
    // 工作的线程的任务函数
    static void *worker(void *arg);
    // 管理者线程的任务函数
    static void *manager(void *arg);
    void threadExit();

private:
    pthread_mutex_t m_lock;
    pthread_cond_t m_notEmpty;
    pthread_t *m_threadIDs;
    pthread_t m_managerID;
    TaskQueue *m_taskQ;
    int m_minNum;
    int m_maxNum;
    int m_busyNum;
    int m_aliveNum;
    int m_exitNum;
    bool m_shutdown;
};

四、线程池的实现

4.1、构造函数

ThreadPool::ThreadPool(int min, int max) : m_minNum(min), m_maxNum(max), m_busyNum(0), m_aliveNum(min), m_exitNum(0), m_shutdown(false)
{
    // 实例化任务队列
    m_taskQ = new TaskQueue;
    // 给线程数组分配内存
    m_threadIDs = new pthread_t[m_maxNum];
    memset(m_threadIDs, 0, sizeof(pthread_t) * m_maxNum);
    // 初始化锁和条件变量
    pthread_mutex_init(&m_lock, NULL);
    pthread_cond_init(&m_notEmpty,NULL);
    // 创建管理者线程
    pthread_create(&m_managerID, NULL, manager, this);
    // 创建工作者线程
    for (int i=0; i<min; i++) {
        pthread_create(&m_threadIDs[i], NULL, worker, this);
    }
}

4.2、析构函数

ThreadPool::~ThreadPool() {
    this->m_shutdown = true;
    // 销毁管理者线程
    pthread_join(m_managerID, NULL);
    // 唤醒所有的消费者线程
    for (int i = 0; i < m_aliveNum; ++i) {
        pthread_cond_signal(&m_notEmpty);
    }
    // 销毁任务队列
    if (m_taskQ) delete m_taskQ;
    // 销毁保存消费者ID的数组
    if (m_threadIDs) delete[]m_threadIDs;
    // 销毁锁
    pthread_mutex_destroy(&m_lock);
    // 销毁条件变量
    pthread_cond_destroy(&m_notEmpty);
}

4.3、添加任务

void ThreadPool::addTask(Task task)
{
    if (m_shutdown) return;
    // 添加任务
    m_taskQ->addTask(task);
    // 唤醒一个工作处理线程
    pthread_cond_signal(&m_notEmpty);
}

void ThreadPool::addTask(callback func, void *arg)
{
    if (m_shutdown) return;
    // 添加任务
    m_taskQ->addTask(func,arg);
    // 唤醒一个工作处理线程
    pthread_cond_signal(&m_notEmpty);
}

4.4、工作线程函数

// 工作线程任务函数
void* ThreadPool::worker(void* arg) {
    // 将传入的参数强制转换为ThreadPool*指针类型
    ThreadPool* pool = static_cast<ThreadPool*>(arg);
    while (true) {
        // 访问任务队列先要加锁
        pthread_mutex_lock(&pool->m_lock);
        // 任务为空则线程阻塞
        while (pool->m_taskQ->empty() && !pool->m_shutdown) {
            // 阻塞线程在信号量m_notEmpty上
            pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);
            // 解除阻塞之后判断是否要销毁线程
            if (pool->m_exitNum > 0) {
                pool->m_exitNum--;
                if (pool->m_aliveNum > pool->m_minNum) {
                    pool->m_aliveNum--;
                    pthread_mutex_unlock(&pool->m_lock);
                    pool->threadExit();
                }
            }
        }
        // 如果线程池要结束了
        if (pool->m_shutdown) {
            // 解锁
            pthread_mutex_unlock(&pool->m_lock);
            // 销毁线程
            pool->threadExit();
        }

        // 从任务队列中取出一个任务
        Task task = pool->m_taskQ->takeTask();
        // 工作的线程加1
        pool->m_busyNum++;
        // 解锁,下面要开始执行了
        pthread_mutex_unlock(&pool->m_lock);
        
        // 执行任务
        task.function(task.arg);
        // 销毁参数指针
        free(task.arg);
        task.arg = nullptr;
        
        // 工作的线程减1
        pthread_mutex_lock(&pool->m_lock);
        pool->m_busyNum--;
        pthread_mutex_unlock(&pool->m_lock);
    }
    return nullptr;
}

4.5、管理线程函数

// 管理者线程任务函数
void* ThreadPool::manager(void* arg)
{
    ThreadPool* pool = static_cast<ThreadPool*>(arg);

    // 如果线程池没有关闭就一直检测
    while (!pool->m_shutdown) {
        // 每5s监控一次线程池状态
        sleep(5);
        // 取出任务数量和线程数量
        pthread_mutex_lock(&pool->m_lock);
        int queuesize = pool->m_taskQ->taskNumber();
        int liveNum = pool->m_aliveNum;
        int busyNum = pool->m_busyNum;
        pthread_mutex_unlock(&pool->m_lock);

        // 创建线程
        const int NUMBER = 2;
        // 当前任务太多了,需要增加线程处理
        if (queuesize > liveNum && liveNum < pool->m_maxNum) {
            // 线程池加锁
            pthread_mutex_lock(&pool->m_lock);
            int num = 0;
            for (int i = 0; i < pool->m_maxNum && num < NUMBER && pool->m_aliveNum < pool->m_maxNum; ++i)
            {
                if (pool->m_threadIDs[i] == 0)
                {
                    pthread_create(&pool->m_threadIDs[i], NULL, worker, pool);
                    num++;
                    pool->m_aliveNum++;
                }
            }
            // 线程池解锁
            pthread_mutex_unlock(&pool->m_lock);
        }
        // 当前任务太少了,需要减少线程,减轻系统负担
        if (busyNum * 2 < liveNum && liveNum > pool->m_minNum + NUMBER) {
            // 线程池加锁
            pthread_mutex_lock(&pool->m_lock);
            pool->m_exitNum = NUMBER;
            // 线程池解锁
            pthread_mutex_unlock(&pool->m_lock);
            
            // 唤醒线程,自动删除自己
            for (int i = 0; i < NUMBER; ++i) {
                pthread_cond_signal(&pool->m_notEmpty);
            }
        }
    }
    return nullptr;
}

4.6、线程自我销毁

void ThreadPool::threadExit()
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < m_maxNum; ++i)
    {
        if (m_threadIDs[i] == tid) {
            m_threadIDs[i] = 0;
            break;
        }
    }
    pthread_exit(NULL);
}

五、测试代码

void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
    sleep(1);
}

int main()
{
    // 创建线程池
    ThreadPool* pool = new ThreadPool(2, 5);
    for (int i = 0; i < 100; ++i)
    {
        int* num = (int*)malloc(sizeof(int));
        *num = i;
        pool->addTask( taskFunc, num);
    }

    sleep(30);

    delete pool;
    return 0;
}

运行结果:

image-20240423223821791

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/579261.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

华为MRS服务使用记录

背景&#xff1a;公司的业务需求是使用华为的这一套成品来进行开发&#xff0c;使用中发现&#xff0c;这个产品跟原生的Hadoop的那一套的使用&#xff0c;还是有很大的区别的&#xff0c;现记录一下&#xff0c;避免以后忘了 一、原始代码的下载 下载地址&#xff1a;MRS样例…

STM32HAL库++ESP8266+cJSON连接阿里云物联网平台

实验使用资源&#xff1a;正点原子F1 USART1&#xff1a;PA9P、A10&#xff08;串口打印调试&#xff09; USART3&#xff1a;PB10、PB11&#xff08;WiFi模块&#xff09; DHT11&#xff1a;PG11&#xff08;采集数据、上报&#xff09; LED0、1&#xff1a;PB5、PE5&#xff…

input框添加验证(如只允许输入数字)中文输入导致显示问题的解决方案

文章目录 input框添加验证(如只允许输入数字)中文输入导致显示问题的解决方案问题描述解决办法 onCompositionStart与onCompositionEnd input框添加验证(如只允许输入数字)中文输入导致显示问题的解决方案 问题描述 测试环境&#xff1a;react antd input (react的事件与原生…

浅谈在Java代码中创建线程的多种方式

文章目录 一、Thread 类1.1 跨平台性 二、Thread 类里的常用方法三、创建线程的方法1、自定义一个类&#xff0c;继承Thread类&#xff0c;重写run方法1.1、调用 start() 方法与调用 run() 方法来创建线程&#xff0c;有什么区别&#xff1f;1.2、sleep()方法 2、自定义一个类&…

嵌入式常见存储器

阅读引言&#xff1a; 在看一款芯片的数据手册的时候&#xff0c; 无意间翻到了它的启动模式(Boot Mode), 发现这种这么多种ROM&#xff0c;所以就写下了这篇文章。 目录 一、存储器汇总 二、易失性存储器(RAM) 1. SRAM 1.1 单口SRAM 1.2 双口SRAM 2. DRAM 2.1 SDRAM 2…

Fast-DetectGPT 无需训练的快速文本检测

本文提出了一种新的文本检测方法 ——Fast-DetectGPT&#xff0c;无需训练&#xff0c;直接使用开源小语言模型检测各种大语言模型&#xff0c;如GPT等生成的文本内容。 Fast-DetectGPT 将检测速度提高了 340 倍&#xff0c;将检测准确率相对提升了 75%&#xff0c;超过商用系…

有哪些好用电脑端时间定时软件?桌面日程安排软件推荐 桌面备忘录

随着现代生活节奏的加快&#xff0c;人们对于时间管理和任务提醒的需求越来越大。为了满足这一需求&#xff0c;市场上涌现出了众多桌面便签备忘录软件&#xff0c;它们不仅可以帮助我们记录待办事项&#xff0c;还能定时提醒我们完成任务。在这篇文章中&#xff0c;我将为大家…

计算机研究生如何在顶级会议了解行业方向

以为例子论文可视化 |WACV 2022 年 (thecvf.com)https://wacv2022.thecvf.com/papers-visualizations?filterprimary_subject_area&search3DComputerVision 这些图表适用于IEEE/CVF 计算机视觉冬季会议 (WACV) 2022。顶部图表是根据彼此相似性分布的会议主要会议论文的可…

微电子领域材料生长方法(六)液相外延(LPE)

微电子领域材料生长方法&#xff08;六&#xff09;液相外延&#xff08;LPE&#xff09; 液相外延&#xff08;Liquid Phase Epitaxy, LPE&#xff09;是一种用于生长单晶薄膜的技术&#xff0c;特别是在半导体材料的制备中。LPE技术允许在较低的温度下从熔体中生长出高质量的…

Visual 下载 NuGet包速度变慢

Visual 下载 NuGet包速度变慢 最近遇到一个问题&#xff0c;即我在使用 Visual Studio 下载 Nuget 包的时候会发现变得特别慢&#xff0c;那么该如何解决该问题呢 Visual Studio → 工具 → NuGet 包管理项 → 程序包管理设置 → 程序包源 从上面我们可以看到我使用的包源地址…

2024 最新免费听全网音乐神器

之前分享过几个的音乐软件挂了2024最新神器app&#xff0c;全网音乐免费听 &#xff0c;这里再整理分享下&#xff0c;下载地址 https://pan.quark.cn/s/b52ada313fbd 玩转互联网达人 苏生不惑备用号&#xff0c;分享各种黑科技软件资源和技巧&#xff0c;带你玩转互联网。 …

12.JAVAEE之网络原理2

1.网络层 网络层要做的事情,主要是两方面, 1)地址管理,制定一系列的规则,通过地址,描述出网络上一个设备的位置. 2)路由选择.网络环境比较复杂的,从一个节点到另一个节点之间,存在很多条不同的路径,就需要通过这种方式,筛选/规划出更合适的路径进行数据传输 IP协议 8位协议&…

HackMyVM-Convert

目录 信息收集 arp nmap WEB web信息收集 gobuster RCE漏洞 反弹shell 提权 get user.txt 提权 信息收集 arp ┌──(root㉿0x00)-[~/HackMyVM] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 08:00:27:77:ed:84, IPv4: 192.168.9.126 Starting…

律师口才训练技巧课程介绍?

律师口才训练技巧课程介绍 一、课程背景与目标 律师口才作为法律职业的核心能力之一&#xff0c;对于律师在**辩论、法律咨询、谈判协商等场合的表现具有至关重要的作用。然而&#xff0c;许多律师在口才方面存在不足&#xff0c;难以充分发挥自己的专业能力。因此&#xff0c;…

CTF之eval

首先我们先了解一下eval&#xff08;&#xff09;函数 什么是eval()? eval() 函数把字符串按照 PHP 代码来计算。 该字符串必须是合法的 PHP 代码&#xff0c;且必须以分号结尾。 如果没有在代码字符串中调用 return 语句&#xff0c;则返回 NULL。如果代码中存在解析错误…

数据结构——二叉树的顺序存储(堆)(C++实现)

数据结构——二叉树的顺序存储&#xff08;堆&#xff09;&#xff08;C实现&#xff09; 二叉树可以顺序存储的前提堆的定义堆的分类大根堆小根堆 整体结构把握两种调整算法向上调整算法递归版本 非递归版本向下调整算法非递归版本 向上调整算法和向下调整算法的比较 我们接着…

【Linux系统化学习】生产者消费者模型(阻塞队列和环形队列)

目录 生产者消费者模型 什么是生产者消费者模型 为什么要使用生产者消费者模型 生产者消费者模型的优点 为什么生产者和生产者要互斥&#xff1f; 为什么消费者和消费者要互斥&#xff1f; 为什么生产者和消费者既是互斥又是同步&#xff1f; 基于BlockingQueue的生产者…

将数组中最大的数放在最后一位,最小的数放在第一位

#include <stdio.h> int main() {void input(int number[]);void output(int number[]);void swapmaxmin(int number[]);int number[10];input(number);//swapmaxmin(number);output(number);return 0; }//往一个数组里输入 void input(int number[]) {int i;for(i0;i<…

Bert类模型也具备指令遵循能力吗?

深度学习自然语言处理 原创作者&#xff1a;Winnie BERT模型&#xff0c;依托Transformer架构及其大规模预训练&#xff0c;为自然语言处理领域带来了深远的影响。BERT模型架构包含多层双向Transformer编码器&#xff0c;通过这种结构&#xff0c;BERT及其家族成员&#xff0c;…

tensorflow_decision_forests\tensorflow\ops\inference\inference.so not found

恰好有一个帖子提到了py3.10里面的解决方案 pip install --user tensorflow2.11.0My tensorflow version is 2.11.0 and my tensorflow_decision_forests version is 1.2.0 so those should be compatible. I also am using Python version 3.10.11原文链接&#xff1a; http…