线程池(Linux +C/C++)

参考  手写线程池 - C语言版 | 爱编程的大丙 (subingwen.cn)

1.为什么需要线程池?

1)线程问题:

(1)如果只使用线程创建函数,在不断有新的任务进来的时候,需要不断的创建任务;任务在结束之后,为了避免占用资源,需要销毁线程任务。导致频繁操作,占用程序运行时间。

(2)在线程创建之后,如果任务已经运行完毕,线程不去销毁,则会白白浪费资源。

2)如何解决线程问题(线程池的优势):

(1)使用线程池之后,将任务和线程分离,建立一定数量的线程,使线程重复利用(不销毁),不断将任务添加到线程中。解决线程频繁创建、销毁问题。提供系统执行效率。

(2)根据任务数量增减,自动添加或者减少线程,使得线程维持在最优数量,节约系统资源。

2.线程池是什么?

1)线程的基本组成:

(1)任务队列(负责保存要处理的任务,并将任务交给工作线程去处理)

(a)通过线程池提供的api函数,将待处理的任务添加到任务队列,或者从任务队列删除;

(b)已经处理的任务会被从任务队列删除;

(c)线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程

(2)工作者线程(任务队列的消费者、执行人员,动态创建N个)

(a)线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理;

(b)工作的线程相当于是任务队列的消费者角色;

(c)如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞);

(d)如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作;

(3)管理者线程(管理整个线程池,1个)

(a)周期性地 对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测;

(b)根据任务数量的多少,增减线程数量;

3.线程池怎么实现?(linux/C语言版本)

1)单个任务元素结构体

// 任务结构体
typedef struct Task
{
    void (*function)(void* arg);
    void* arg;
}Task;

2)线程池结构体(任务队列+管理线程+工作线程)

// 线程池结构体
 struct ThreadPool
{
    Task* taskQ;        // 任务队列  后面利用堆新建数组
    int queueCapacity;  // 容量
    int queueSize;      // 当前任务个数
    int queueFront;     // 队头 -> 取数据
    int queueRear;      // 队尾 -> 放数据

    pthread_t managerID;    // 管理者线程ID
    pthread_t *threadIDs;   // 工作的线程ID   后面利用堆新建数组
    int minNum;             // 最小线程数量
    int maxNum;             // 最大线程数量
    int busyNum;            // 忙的线程的个数
    int liveNum;            // 存活的线程的个数
    int exitNum;            // 要销毁的线程个数
    pthread_mutex_t mutexPool;  // 锁整个的线程池
    pthread_mutex_t mutexBusy;  // 锁busyNum变量
    pthread_cond_t notFull;     // 任务队列是不是满了
    pthread_cond_t notEmpty;    // 任务队列是不是空了

    int shutdown;           // 是不是要销毁线程池, 销毁为1, 不销毁为0
};

3)API声明

typedef struct ThreadPool ThreadPool;

// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);

// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);

//
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);

4)创建线程池

(1)主要操作:

(a)创建线程池对象:ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));

(b)创建工作者线程队列(pthread_t):只是为了便于管理线程,并没有创建线程。

        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);

(c)创建任务队列(task类型):pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);

(d)创建管理者线程:pthread_create(&pool->managerID, NULL, manager, pool);

(e)创建工作者线程:按照最小数量minNum进行for循环;
           pthread_create(&pool->threadIDs[i], NULL, worker, pool);

(f)初始化互斥量/条件变量:mutexPool、mutexBusy、notEmpty、notFull

(g)其余工作:初始化一些控制变量

(h)鲁棒性工作:检查指针,及时释放内存

(2)说明:

(a)为什么使用do while ?可以使用break,最后统一销毁资源;

(b)memset,后续根据指针是不是0 判断是不是有线程闲置

ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
    //实例化线程池对象
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    do 
    {
        //判断pool有没有指向有效内存
        if (pool == NULL)
        {
            printf("malloc threadpool fail...\n");
            break;
        }
        //创建工作者线程队列,按照最多线程数量 创建
        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
        //判断threadIDs有没有指向有效内存
        if (pool->threadIDs == NULL)
        {
            printf("malloc threadIDs fail...\n");
            break;
        }
        
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
        //初始化相关参数
        pool->minNum = min;
        pool->maxNum = max;
        pool->busyNum = 0;
        pool->liveNum = min;    // 和最小个数相等
        pool->exitNum = 0;
        //创建互斥量、条件变量
        if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
            pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
            pthread_cond_init(&pool->notFull, NULL) != 0)
        {
            printf("mutex or condition init fail...\n");
            break;
        }
        创建任务队列
        pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;
        
        pool->shutdown = 0;

        //创建管理者线程
        pthread_create(&pool->managerID, NULL, manager, pool);
        //创建工作者线程
        for (int i = 0; i < min; ++i)
        {
            pthread_create(&pool->threadIDs[i], NULL, worker, pool);
        }
        return pool;
    } while (0);

    // 释放资源
    if (pool && pool->threadIDs) free(pool->threadIDs);
    if (pool && pool->taskQ) free(pool->taskQ);
    if (pool) free(pool);

    return NULL;
}

5)工作线程执行函数worker

(1)主要操作:

(a)判断任务队列是否为空。假如为空,阻塞线程。并进一步判断,是否要销毁线程池。

(b)如果任务队列不为空,取出任务,执行任务回调函数。

(2)说明:

(a)线程池属于公共资源,在访问线程池的时候要加锁;访问结束之后解锁。

(b)while (pool->queueSize == 0 && !pool->shutdown)  。在pthread_cond_wait这个地方,为什么采用while而不是if。关于该点,进行解释。

        首先,我们需要了解pthread_cond_wait的执行流程:

        -》pthread_cond_wait()函数会将当前线程挂起,使它处于休眠状态。
        -》在当前线程被挂起之前,函数会自动调用pthread_mutex_lock()函数将关联的互斥锁上锁,保证条件变量的独占访问。
        -》函数会将当前线程加入条件变量的等待队列中,并释放关联的互斥锁。
        -》当前线程进入休眠状态,等待其他线程在该条件变量上发出信号或广播。
        -》当另一个线程在该条件变量上发出信号或广播时,pthread_cond_wait()函数会唤醒一个等待在该条件变量上的线程。
        -》唤醒的线程重新获得关联的互斥锁的所有权,并继续执行。
        -》pthread_cond_wait()函数返回。

        其次,我们需要了解pthread_cond_signal的作用:至少唤醒一个线程。

        综合上面两点,设想以下场景。

初始临界变量x = 0;

线程1:如果x< 1,阻塞。否则,继续执行,并设置x = 0;

线程2:如果x< 1,阻塞。否则,继续执行,并设置x = 0;

线程3:设置x = 1,调用pthread_cond_signal唤醒线程。

        情况1,采用if语句:线程1收到了线程唤醒,线程2收到了线程唤醒,线程1抢到了互斥锁,他正常进行,并把x设置为0;然后线程2因为收到了线程唤醒,他也开始执行。问题出现了,线程1不应该执行因为他的判断条件是 x<1的情况下,应该进行阻塞。可是他却开始执行了。

        情况2,采用while语句:线程1收到了线程唤醒,线程2收到了线程唤醒,线程1抢到了互斥锁,因为之前被阻塞了,while语句不满足,他又检查了一次while条件,发现不用阻塞了,他正常进行,并把x设置为0。然后线程2因为收到了线程唤醒,他也开始执行。因为之前也被阻塞了,while语句不满足,他有检查一次条件,发现x被修改了,x=0,需要被阻塞了。所以他没法往下执行了。所以接着被阻塞了。这种情况,就可以保证,条件变量只能唤醒一个线程。防止“伪唤醒”。

        回到本例子中,情况一样。线程池会唤醒多个工作线程,让他们去执行任务。但是有可能任务数<线程数。如果使用while循环,就能保证每个任务对应一个线程。如果使用if判断,有可能唤醒多余的线程,并引发程序崩溃。

void* worker(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;

    while (1)
    {
        pthread_mutex_lock(&pool->mutexPool);
        // 当前任务队列是否为空
        while (pool->queueSize == 0 && !pool->shutdown)
        {
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

            // 判断是不是要销毁线程
            if (pool->exitNum > 0)
            {
                pool->exitNum--;
                if (pool->liveNum > pool->minNum)
                {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool);
                    //线程退出函数。
                    threadExit(pool);
                }
            }
        }

        // 判断线程池是否被关闭了
        if (pool->shutdown)
        {
            pthread_mutex_unlock(&pool->mutexPool);
            threadExit(pool);
        }

        // 从任务队列中取出一个任务
        Task task;
        task.function = pool->taskQ[pool->queueFront].function;
        task.arg = pool->taskQ[pool->queueFront].arg;
        // 移动头结点 数组变成了循环队列
        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
        pool->queueSize--;
        //告诉生产者,可以添加任务了
        pthread_cond_signal(&pool->notFull);
        //解锁
        pthread_mutex_unlock(&pool->mutexPool);
        //工作线程数量+1
        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);
        //函数任务调用
        task.function(task.arg);
        //释放函数堆内存
        free(task.arg);
        task.arg = NULL;
        //工作线程数量-1
        printf("thread %ld end working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

6)管理者任务

(1)主要操作:

(a)添加任务线程;

(b)销毁任务线程;

(2)说明:

销毁任务线程的思路:发出条件变量信号,唤醒所有的工作线程;并将shutdown参数,告诉工作线程,现在要关闭。让所有的线程自杀。

void* manager(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;
    while (!pool->shutdown)
    {
        // 每隔3s检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&pool->mutexPool);
        int queueSize = pool->queueSize;
        int liveNum = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexPool);

        // 取出忙的线程的数量
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);

        // 添加线程
        // 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
        if (queueSize > liveNum && liveNum < pool->maxNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            int counter = 0;
            for (int i = 0; i < pool->maxNum && counter < NUMBER
                && pool->liveNum < pool->maxNum; ++i)
            {
                if (pool->threadIDs[i] == 0)
                {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    counter++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }
        // 销毁线程
        // 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
        if (busyNum * 2 < liveNum && liveNum > pool->minNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            // 让工作的线程自杀
            for (int i = 0; i < NUMBER; ++i)
            {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
    return NULL;
}

7)单线程退出函数

(1)主要操作:

前面判断线程是不是在工作,主要是判断线程ID是不是空。如果线程ID是空,表明为空线程;如果线程不为空,则为忙线程。因此,在退出删除线程的时候,需要修改线程队列中对应的线程ID,将其改为ID=0;便于后面的继续使用。

(2)说明

void threadExit(ThreadPool* pool)
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < pool->maxNum; ++i)
    {
        if (pool->threadIDs[i] == tid)
        {
            pool->threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

8)线程池添加函数

(1)主要操作:

(a)如果线程池中的工作线程全部被占用,阻塞添加任务,即阻塞生产者;
(b)如果工作线程释放了notFull条件变量,则说明工作线程有空余,接触生产者

(2)说明:

void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
    pthread_mutex_lock(&pool->mutexPool);
    while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
    {
        // 阻塞生产者线程
        pthread_cond_wait(&pool->notFull, &pool->mutexPool);
    }
    if (pool->shutdown)
    {
        pthread_mutex_unlock(&pool->mutexPool);
        return;
    }
    // 添加任务
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    pthread_cond_signal(&pool->notEmpty);
    pthread_mutex_unlock(&pool->mutexPool);
}

4.线程池怎么用?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/217561.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Temu数据接口:为开发者提供的强大工具

在如今数字化的时代&#xff0c;数据成为了商业运营中不可或缺的一部分。为了满足开发者对数据获取和分析的需求&#xff0c;Temu平台推出了强大的数据接口&#xff0c;为开发者提供了丰富的API服务。通过Temu数据接口&#xff0c;开发者可以方便地获取商品信息、订单数据、用户…

【Avue】select的远程搜索 [模糊搜索]

一、需求 【模糊搜索】 二、实现avue的远程搜索 1、search为搜索 2、remote远程搜索 3、dictValue{{key}}为输入的值

@Scheduled,Quartz,XXL-JOB三种定时任务总结

Scheduled&#xff0c;Quartz&#xff0c;XXL-JOB三种定时任务总结 一、Scheduled 简介 Scheduled 是 Spring 框架中用于声明定时任务的注解。通过使用 Scheduled 注解&#xff0c;你可以指定一个方法应该在何时执行&#xff0c;无需依赖外部的调度器。 这个注解通常与Enab…

python的安装

python官网地址&#xff1a;https://www.python.org/ 以在windows下安装3.12.0版本为例。 下载安装包&#xff1a; 下载下来的安装包是python-3.12.0-amd64.exe&#xff0c;双击安装&#xff0c;按照提示&#xff0c;一步一步往下走&#xff1a; 到cmd下&#xff0c;输入py…

postgresql pg_hba.conf 配置详解

配置文件之pg_hba.conf介绍 该文件用于控制访问安全性&#xff0c;管理客户端对于PostgreSQL服务器的访问权限&#xff0c;内容包括&#xff1a;允许哪些用户连接到哪个数据库&#xff0c;允许哪些IP或者哪个网段的IP连接到本服务器&#xff0c;以及指定连接时使用的身份验证模…

单片机的扩展结构

目录 三种总线的构造方式 地址空间分配和外部地址锁存器 1.存储器地址空间分配 (1)74LS138 (2)74LS139 2.外部地址锁存器 (1)锁存器74LS373 静态数据存储器RAM的并行扩展 (1)常用的静态RAM (SRAM)芯片 (2)外扩数据存储器的读写操作时序 1.读片外RAM操作时序 2.写片…

JDK8新特性——Stream流

文章目录 一、Stream流体验二、Stream流的创建三、Stream流中间方法四、Stream流终究方法 Stream流&#xff08;也叫Stream API&#xff09;。它是从JDK8以后才有的一个新特性&#xff0c;是专业用于对集合或者数组进行便捷操作的 一、Stream流体验 需求&#xff1a;有一个Lis…

【学习笔记】混淆矩阵

混淆矩阵&#xff08;Confusion Matrix&#xff09;&#xff0c;又称为错误矩阵&#xff0c;是一种特别适用于监督学习中分类问题评估模型性能的工具。在机器学习领域&#xff0c;混淆矩阵能够清晰地显示算法模型的分类结果和实际情况之间的差异&#xff0c;常用于二分类和多分…

vscode里面使用vue的一些插件,方便开发

1、vue 2 Snippets &#xff08;vue语法提示&#xff09; vue提示这个也可以 1.1 Vue VSCode Snippets 2、vetur Vetur支持.vue文件的语法高亮显示&#xff0c;除了支持template模板以外 3、Element UI Snippets(饿了么的提示) 4、indent-rainbow&#xff08;缩进高亮提示) 5…

《开箱元宇宙》:Madballs 解锁炫酷新境界,人物化身系列大卖

你是否曾想过&#xff0c;元宇宙是如何融入世界上最具代表性的品牌和名人的战略中的&#xff1f;在本期的《开箱元宇宙》 系列中&#xff0c;我们与 Madballs 的战略顾问 Derek Roberto 一起聊聊 Madballs 如何在 90 分钟内售罄 2,000 个人物化身系列&#xff0c;以及是什么原…

【开源】基于Vue.js的新能源电池回收系统

文末获取源码&#xff0c;项目编号&#xff1a; S 075 。 \color{red}{文末获取源码&#xff0c;项目编号&#xff1a;S075。} 文末获取源码&#xff0c;项目编号&#xff1a;S075。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 用户档案模块2.2 电池品类模块2.3 回…

Nginx(缓存机制)

对于性能优化而言&#xff0c;缓存是一种能够大幅度提升性能的方案&#xff0c;因此几乎可以在各处都能看见缓存&#xff0c;如客户端缓存、代理缓存、服务器缓存等等&#xff0c;Nginx的缓存则属于代理缓存的一种。对于整个系统而言&#xff0c;加入缓存带来的优势额外明显&am…

创新零售巨头:揭开山姆与Costco蓬勃发展背后的秘密

会员制商店这个冷门的业态突然之间硝烟弥漫&#xff0c;更多的资本开始涌向付费会员商店这一业态&#xff0c;本文即将探讨的是付费会员制的成功秘诀和零售企业可行的发展路径。Costco的发展经验对国内超市巨头的崛起具有显著的借鉴意义&#xff0c;以优质低价商品服务为中心&a…

CVE-2016-2510CVE-2017-5586 BeanShell漏洞

前言&#xff1a; 首先我们需要了解BeanShell具体是做什么&#xff1a; BeanShell 是一种轻量级的可嵌入式脚本语言&#xff0c;用于在 Java 环境中执行脚本代码。它提供了一种简单、灵活的方式来扩展和定制 Java 应用程序的行为&#xff0c;允许开发人员动态地执行和评估脚本…

CSS实现瀑布流

多列布局介绍 多列布局 指的是 CSS3 可以将文本内容设计成像报纸一样的多列布局&#xff0c;例&#xff1a; CSS3 的多列布局属性: column-count&#xff1a;指定了需要分割的列数&#xff1b;column-gap&#xff1a;指定了列与列间的间隙&#xff1b;column-rule-style&#…

Mybatis如何执行批量操作

文章目录 Mybatis如何执行批量操作使用foreach标签 使用ExecutorType.BATCH如何获取生成的主键 Mybatis如何执行批量操作 使用foreach标签 foreach的主要用在构建in条件中&#xff0c;它可以在SQL语句中进行迭代一个集合。foreach标签的属性主要有item&#xff0c;index&…

MySQL系统函数

select version();查看mysql版本。 select user();可以查看数据库用户名。 select database();可以查看数据库名。 select system_use();可以查看系统用户名。 show variables like %basedir%;可以展示数据库读取路径。 show variables like %sets_dir%;可以看一下安…

★136. 只出现一次的数字(位运算)

136. 只出现一次的数字 这个题主要考察的知识点是位运算&#xff08;这里是异或&#xff09; 如果不要求空间复杂度为O&#xff08;1&#xff09;&#xff0c;那有很多方法。但是这里有这样的要求。 可以通过位运算 的方法来实现。 异或运算 ⊕有以下三个性质&#xff1a; 任…

文字转语音、语音转文字! AI视频生成神器!

分享一波文字转语音、语音转文字&#xff01;AI视频生成神器&#xff01;让外国人说中文&#xff0c;口型自然&#xff0c;不限语言&#xff0c;感兴趣的同学可以试试~ 可以用Al生成视频&#xff0c;Whisper语音转文字 Whisper 开源项目&#xff1a; https://github.com/Const…

【源码解析】聊聊线程池 实现原理与源码深度解析(二)

AbstractExecutorService 上一篇文章中&#xff0c;主要介绍了AbstractExecutorService的线程执行的核心流程&#xff0c;execute() 这个方法显然是没有返回执行任务的结果&#xff0c;如果我们需要获取任务执行的结果&#xff0c;怎么办&#xff1f; Callable 就是一个可以获…