一文搞懂系列——Linux C线程池技术

背景

最近在走读诊断项目代码时,发现其用到了线程池技术,感觉耳目一新。以前基本只是听过线程池,但是并没有实际应用。对它有一丝的好奇,于是趁这个机会深入了解一下线程池的实现原理。

线程池的优点

线程池出现的背景,其实对应CPU性能优化——“瑞士军刀“文章中提到的短时应用。

即短时间内通过创建线程处理大量请求,但是请求业务的执行时间过短,会造成一些缺陷。

  • 浪费系统资源。比如我们创建一个线程,再销毁一个线程耗时10ms,但是业务的执行时间只有10ms。这就导致系统有效利用率较低。
  • 系统不稳定。如果短时间内,来了大量的请求,每一个请求都通过创建线程的方式执行。可能存在瞬时负载很高,请求响应降低,从而导致系统不稳定。

于是我们可以通过线程池技术,减少线程创建和消耗的耗时,提高系统的资源利用;控制线程并行数量,确保系统的稳定性;

线程池实现

线程池的核心包括以下内容:

  • 线程池任务节点结构。
  • 线程池控制器。
  • 线程池的控制流程。

线程池任务节点结构

线程池任务结点用来保存用户投递过来的的任务,并放入线程池中的线程来执行,任务结构如下:

struct worker_t {
    void * (* process)(void * arg); /*回调函数*/
    int    paratype;                /*函数类型(预留)*/
    void * arg;                     /*回调函数参数*/
    struct worker_t * next;         /*链接下一个任务节点*/
};

线程池控制器

线程池控制器用来对线程池进行控制管理,描述当前线程池的最基本信息,包括任务的投递,线程池状态的更新与查询,线程池的销毁等,其结构如下:

/*线程控制器*/
struct CThread_pool_t {
    pthread_mutex_t queue_lock;     /*互斥锁*/
    pthread_cond_t  queue_ready;    /*条件变量*/
    
    worker_t * queue_head;          /*任务节点链表 保存所有投递的任务*/
    int shutdown;                   /*线程池销毁标志 1-销毁*/
    pthread_t * threadid;           /*线程ID*/
    
    int max_thread_num;             /*线程池可容纳最大线程数*/
    int current_pthread_num;        /*当前线程池存放的线程*/
    int current_pthread_task_num;   /*当前已经执行任务和已分配任务的线程数目和*/
    int current_wait_queue_num;     /*当前等待队列的的任务数目*/
    int free_pthread_num;           /*线程池允许最大的空闲线程数/*/
    
    /**
     *  function:       ThreadPoolAddWorkUnlimit
     *  description:    向线程池投递任务
     *  input param:    pthis   线程池指针
     *                  process 回调函数
     *                  arg     回调函数参数
     *  return Valr:    0       成功
     *                  -1      失败
     */     
    int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg);
    
    /**
     *  function:       ThreadPoolAddWorkLimit
     *  description:    向线程池投递任务,无空闲线程则阻塞
     *  input param:    pthis   线程池指针
     *                  process 回调函数
     *                  arg     回调函数参数
     *  return Val:     0       成功
     *                  -1      失败
     */     
    int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg);
    
    /**
     *  function:       ThreadPoolGetThreadMaxNum
     *  description:    获取线程池可容纳的最大线程数
     *  input param:    pthis   线程池指针
     */     
    int (* GetThreadMaxNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentThreadNum
     *  description:    获取线程池存放的线程数
     *  input param:    pthis   线程池指针
     *  return Val:     线程池存放的线程数
     */     
    int (* GetCurrentThreadNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentTaskThreadNum
     *  description:    获取当前正在执行任务和已经分配任务的线程数目和
     *  input param:    pthis   线程池指针
     *  return Val:     当前正在执行任务和已经分配任务的线程数目和
     */     
    int (* GetCurrentTaskThreadNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolGetCurrentWaitTaskNum
     *  description:    获取线程池等待队列任务数
     *  input param:    pthis   线程池指针
     *  return Val:     等待队列任务数
     */     
    int (* GetCurrentWaitTaskNum)(void * pthis);
    
    /**
     *  function:       ThreadPoolDestroy
     *  description:    销毁线程池
     *  input param:    pthis   线程池指针
     *  return Val:     0       成功
     *                  -1      失败
     */     
    int (* Destroy)(void * pthis);    
};

线程池的控制流程

线程池的控制流程可以分为三个步骤:

  1. 线程池创建。即创建max_num个线程ThreadPoolRoutine,即空闲线程:
/**
 *  function:       ThreadPoolConstruct
 *  description:    构建线程池
 *  input param:    max_num   线程池可容纳的最大线程数
 *                  free_num  线程池允许存在的最大空闲线程,超过则将线程释放回操作系统
 *  return Val:     线程池指针                 
 */     
CThread_pool_t * 
ThreadPoolConstruct(int max_num, int free_num)
{
    int i = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
    if(NULL == pool)
        return NULL;
    
    memset(pool, 0, sizeof(CThread_pool_t));
    
    /*初始化互斥锁*/
    pthread_mutex_init(&(pool->queue_lock), NULL);
    /*初始化条件变量*/
    pthread_cond_init(&(pool->queue_ready), NULL);
    
    pool->queue_head                = NULL;
    pool->max_thread_num            = max_num; // 线程池可容纳的最大线程数
    pool->current_wait_queue_num    = 0;
    pool->current_pthread_task_num  = 0;
    pool->shutdown                  = 0;
    pool->current_pthread_num       = 0;
    pool->free_pthread_num          = free_num; // 线程池允许存在最大空闲线程
    pool->threadid                  = NULL;
    pool->threadid                  = (pthread_t *)malloc(max_num*sizeof(pthread_t));
    /*该函数指针赋值*/
    pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    pool->AddWorkLimit              = ThreadPoolAddWorkLimit;
    pool->Destroy                   = ThreadPoolDestroy;
    pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;
    pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;
    pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;
    pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;
    
    for(i=0; i<max_num; i++) {
        pool->current_pthread_num++;    // 当前池中的线程数
        /*创建线程*/
        pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool);
        usleep(1000);        
    }
    
    return pool;
}
  1. 投递任务。即将任务生产者,将任务节点投入线程池中。实现如下:
/**
 *  function:       ThreadPoolAddWorkLimit
 *  description:    向线程池投递任务,无空闲线程则阻塞
 *  input param:    pthis   线程池指针
 *                  process 回调函数
 *                  arg     回调函数参数
 *  return Val:     0       成功
 *                  -1      失败
 */     
int
ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg)
{ 
    // int FreeThreadNum = 0;
    // int CurrentPthreadNum = 0;
    
    CThread_pool_t * pool = (CThread_pool_t *)pthis;
    
    /*为添加的任务队列节点分配内存*/
    worker_t * newworker  = (worker_t *)malloc(sizeof(worker_t)); 
    if(NULL == newworker) 
        return -1;
    
    newworker->process  = process;  // 回调函数,在线程ThreadPoolRoutine()中执行
    newworker->arg      = arg;      // 回调函数参数
    newworker->next     = NULL;      
    
    pthread_mutex_lock(&(pool->queue_lock));
    
    /*插入新任务队列节点*/
    worker_t * member = pool->queue_head;   // 指向任务队列链表整体
    if(member != NULL) {
        while(member->next != NULL) // 队列中有节点
            member = member->next;  // member指针往后移动
            
        member->next = newworker;   // 插入到队列链表尾部
    } else 
        pool->queue_head = newworker; // 插入到队列链表头
    
    assert(pool->queue_head != NULL);
    pool->current_wait_queue_num++; // 等待队列加1
    
    /*空闲的线程= 当前线程池存放的线程 - 当前已经执行任务和已分配任务的线程数目和*/
    int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    /*如果没有空闲线程且池中当前线程数不超过可容纳最大线程*/
    if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {  //-> 条件为真进行新线程创建
        int CurrentPthreadNum = pool->current_pthread_num;
        
        /*新增线程*/
        pool->threadid = (pthread_t *)realloc(pool->threadid, 
                                        (CurrentPthreadNum+1) * sizeof(pthread_t));
                                        
        pthread_create(&(pool->threadid[CurrentPthreadNum]),
                                              NULL, ThreadPoolRoutine, (void *)pool);
        /*当前线程池中线程总数加1*/                                   
        pool->current_pthread_num++;
        
        /*分配任务线程数加1*/
        pool->current_pthread_task_num++;
        pthread_mutex_unlock(&(pool->queue_lock));
        
        /*发送信号给一个处与条件阻塞等待状态的线程*/
        pthread_cond_signal(&(pool->queue_ready));
        return 0;
    }
    
    pool->current_pthread_task_num++;
    pthread_mutex_unlock(&(pool->queue_lock));
    
    /*发送信号给一个处与条件阻塞等待状态的线程*/
    pthread_cond_signal(&(pool->queue_ready));
//  usleep(10);  //看情况  
    return 0;
}
  1. 线程执行。即每一个线程的执行逻辑。实现如下:
/**
 *  function:       ThreadPoolRoutine
 *  description:    线程池中执行的线程
 *  input param:    arg  线程池指针
 */     
void * 
ThreadPoolRoutine(void * arg)
{
    CThread_pool_t * pool = (CThread_pool_t *)arg;
    
    while(1) {
        /*上锁,pthread_cond_wait()调用会解锁*/
        pthread_mutex_lock(&(pool->queue_lock));
        
        /*队列没有等待任务*/
        while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) {
            /*条件锁阻塞等待条件信号*/
            pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
        }
        
        if(pool->shutdown) {
            pthread_mutex_unlock(&(pool->queue_lock));
            pthread_exit(NULL);         // 释放线程
        }
        
        assert(pool->current_wait_queue_num != 0);
        assert(pool->queue_head != NULL);
        
        pool->current_wait_queue_num--; // 等待任务减1,准备执行任务
        worker_t * worker = pool->queue_head;   // 去等待队列任务节点头
        pool->queue_head = worker->next;        // 链表后移     
        pthread_mutex_unlock(&(pool->queue_lock));
        
        (* (worker->process))(worker->arg);      // 执行回调函数
        
        pthread_mutex_lock(&(pool->queue_lock));
        pool->current_pthread_task_num--;       // 函数执行结束
        free(worker);   // 释放任务结点
        worker = NULL;
        
        if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) {
            pthread_mutex_unlock(&(pool->queue_lock));
            break;  // 当线程池中空闲线程超过 free_pthread_num 则将线程释放回操作系统
        }
        pthread_mutex_unlock(&(pool->queue_lock));    
    }
    
    pool->current_pthread_num--;    // 当前线程数减1
    pthread_exit(NULL);             // 释放线程
    
    return (void *)NULL;
}

这个就是用来执行任务的线程,在初始化创建线程时所有线程都全部阻塞在pthread_cond_wait()处,此时的线程就为空闲线程,也就是线程被挂起,当收到信号并取得互斥锁时,表明任务投递过来
则获取等待队列里的任务结点并执行回调函数;函数执行结束后回去判断当前等待队列是否还有任务,有则接下去执行,否则重新阻塞回到空闲线程状态。

若我的内容对您有所帮助,还请关注我的公众号。不定期分享干活,剖析案例,也可以一起讨论分享。
我的宗旨:
踩完您工作中的所有坑并分享给您,让你的工作无bug,人生尽是坦途

在这里插入图片描述

总结

实际上,我觉得在诊断项目中,线程池技术是非必要的。因此它不会涉及到大量的请求,以及每一个请求处理,一般都会比较耗时。

参考:https://www.cnblogs.com/zhaoosheLBJ/p/9337291.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/324054.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

腾讯云服务器购买指南,2024更新购买步骤

腾讯云服务器购买流程很简单&#xff0c;有两种购买方式&#xff0c;直接在官方活动上购买比较划算&#xff0c;在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵&#xff0c;但是自定义购买云服务器CPU内存带宽配置选择范围广&#xff0c;活动上购买只能选择固定的活动…

2024年学鸿蒙开发就业前景怎么样?

随着科技的不断进步&#xff0c;鸿蒙系统作为华为自主研发的操作系统&#xff0c;逐渐引起了人们的关注。 2024年&#xff0c;鸿蒙开发就业前景如何&#xff1f; 对于那些对鸿蒙开发感兴趣并希望在这一领域寻找职业发展的人来说&#xff0c;这是一个非常重要的问题。 首先&a…

buuctf-Misc 题目解答分解118-120

118.[INSHack2017]sanity 打开压缩包就是一个md 文件 typora 打开 发现flag INSA{Youre_sane_Good_for_you} 119.粽子的来历 解压压缩包 &#xff0c;得到文件夹如下 用010 editor 打开 我是A.doc 这个有些可以 都改成FF 保存 然后再次打开 docx 文件就发现了屈原的诗 其他b…

PattPatel-“Introduction to Computing Systems“(4)期末样卷题目解析:C语言递归

C语言的递归我觉得最主要的还是要把Patt&Patel的部分好好理解下&#xff08;因为有和硬件结合的部分&#xff09;&#xff0c;但因为今天就考试&#xff08;来不及做这样的事情&#xff09;&#xff0c;先把之前模拟卷的题目给尝试弄明白&#xff0c;然后考完试之后继续学习…

《新课程教学》(电子版)是正规期刊吗?能评职称吗?

《新课程教学》&#xff08;电子版&#xff09;主要出版内容为学科教学理论、学科教学实践经验和成果&#xff0c;主要读者对象为中小学教师&#xff0c;期刊设卷首语、名家讲堂、课程与教学、教学实践、考试评价、教育信息化、教学琐谈、教育管理、教师心语、一线课堂、重温经…

StarRocks Awards 2023 年度贡献人物

2023 年行将结束。这一年&#xff0c;StarRocks 继续全方位大步向前迈进&#xff0c;在 300 贡献者的辛勤建设下&#xff0c;社区先后发布了 50 版本&#xff0c;并完成了从全场景 OLAP 到云原生湖仓的进化。 贡献者们的每一行代码、每一场布道&#xff0c;推动着 StarRocks 社…

c语言学习总结———编译和链接

再次来做一下学习总结&#xff0c;今天我们总结一下关于编译和链接的学习吧&#xff01; 1. 翻译环境和运⾏环境 在ANSI C的任何⼀种实现中&#xff0c;存在两个不同的环境。 第1种是翻译环境&#xff0c;在这个环境中源代码被转换为可执⾏的机器指令。 第2种是执⾏环境&…

Matter - 体验,灯泡(1)

一、前言 Matter&#xff08;当时称为 Project Connected Home over IP 或 Project CHIP&#xff09;于2019年12月11日首次宣布。当时&#xff0c;它是由苹果、谷歌、亚马逊和联发科技等公司共同发起的一个项目&#xff0c;目的是创建一个开放标准&#xff0c;提高智能家居设备…

模拟日光AR汽车HUD的光学特性太阳光模拟器

AR HUD 的光学特性 几何光学可描述物体、透镜和成像之间的关系。将物体放在透镜及其焦点之间将会形成放大且离实际物体有一定距离的虚像[4]。这便是 HUD 生成虚像的方法。源物体&#xff08;在这里是散射屏或 TFT 面板&#xff09;在 HUD 反光镜光学系统的焦距内。这使相应虚像…

快速排序【hoare版本】【挖坑法】【双指针法】(数据结构)

快速排序是Hoare于1962年提出的一种二叉树结构的交换排序方法&#xff0c;其基本思想为&#xff1a;任取待排序元素序列中 的某元素作为基准值&#xff0c;按照该排序码将待排序集合分割成两子序列&#xff0c;左子序列中所有元素均小于基准值&#xff0c;右子序列中所有元素均…

第五站:C++的内存解析

目录 C内存分布 变量的四种存储方式 函数返回值使用指针(指针函数) 动态分配内存空间 不能使用外部函数的普通局部变量的地址 通过指针函数返回静态局部变量的地址 动态内存 根据需要分配内存,不浪费(根据用户的需求设置内存的容量) 被调用函数之外需要使用被调用函数内…

C# Cad2016二次开发选择文本信息导出(六)

//选文本信息导出 [CommandMethod("getdata")] public void getdata() {// 获取当前文档和数据库Document doc Autodesk.AutoCAD.ApplicationServices.Application.DocumentManager.MdiActiveDocument;Database db doc.Database;Editor ed doc.Editor;// 获取当前…

亲手打造一个本地LLM语音助手来管理智能家居

经历过 Siri 和 Google 助手之后&#xff0c;我发现尽管它们能够控制各种设备&#xff0c;但却无法进行个性化定制&#xff0c;并且不可避免地依赖于云服务。出于对新知识的渴望以及想在生活中使用一些酷炫的东西&#xff0c;我下定决心&#xff0c;要追求更高的目标。我的要求…

我成为开源贡献者的原因竟然是做MySql-CDC数据同步

今年下半年机缘巧合下公司决定搭建自己的数据中台&#xff0c;中台的建设势必少不了数据集成。首先面临的就是数据集成技术选型的问题&#xff0c;按照社区活跃度、数据源适配性、同步效率等要求对市面上几个成熟度较高的开源引擎进行了深度调研。 最终经过内部讨论决定用Apac…

解决虚拟机字体太小的问题

在win11中&#xff0c;安装VMWare软件后&#xff0c;创建好虚拟机&#xff0c;打开终端后&#xff0c;发现终端里显示的字体太小&#xff0c;不方便使用&#xff0c;因此需要修改。 1、打开终端 2、输入"gsettings set org.gnome.desktop.interface text-scaling-factor…

运筹说 第98期|无约束极值问题

上一期我们一起学习了关于非线性规划问题的一维搜索方法的相关内容&#xff0c;本期小编将带大家学习非线性规划的无约束极值问题。 下面&#xff0c;让我们从实际问题出发&#xff0c;学习无约束极值问题吧&#xff01; 一、问题描述及求解原理 1 无约束极值问题的定义 无约…

日志采集传输框架之 Flume,将监听端口数据发送至Kafka

1、简介 Flume 是 Cloudera 提供的一个高可用的&#xff0c;高可靠的&#xff0c;分布式的海量日志采集、聚合和传 输的系统。Flume 基于流式架构&#xff0c;主要有以下几个部分组成。 主要组件介绍&#xff1a; 1&#xff09;、Flume Agent 是一个 JVM 进程&#xf…

SpringBoot异常处理(Whitelabel Error Page和自定义全局异常处理页面)和整合ajax异常处理

SpringBoot异常处理&#xff08;Whitelabel Error Page和自定义全局异常处理页面&#xff09;和整合ajax异常处理 1、springboot自带的异常处理页面Whitelabel Error Page SpringBoot默认的处理异常的机制&#xff1a;SpringBoot 默认的已经提供了一套处理异常的机制。一旦程…

继钱江之后,赛科龙也出自动挡?RA401自动挡曝光

QJ在发动赛921的当天&#xff0c;还有一台闪300搭载了自动挡&#xff0c;当天的热度高的离谱&#xff0c;并且后续也经常有人问&#xff0c;这自动挡啥时候上市等等&#xff0c;相信有很多人都想要一台排量大一点的自动挡摩托车&#xff0c;而最新的消息赛科龙也在开发一台&…

文本分类的一些记录

背景 过去工作中最常遇到的问题就是文本分类和实体抽取的任务。其中文本分类是自然语言处理中最基础的任务&#xff0c;指的是将文本打上特定的类别标签&#xff0c;以做区分和筛选。文本分类主要流程一般是&#xff1a;先预处理文本&#xff0c;再提取特征&#xff0c;最后通…