线程池的实现

线程池是一种池式组件,通过创建和维护一定数量的线程,实现这些线程的重复使用避免了频繁创建和销毁线程的开销,从而提升了性能

在这里插入图片描述

线程池的作用:

1.复用线程资源;

2.减少线程创建和销毁的开销;

3.可异步处理生产者线程的任务;

4.减少了多个任务(不是一个任务)的执行时间;

代码实现

一、结构体定义

1.任务结构体:

typedef struct task_s { 
    void *next;  // 指向下一个task
    handler_pt func; // 该task执行的函数
    void *arg;  // 函数的参数
}task_t;

2.队列结构体:

typedef struct task_queue_s { // task队列
    void *head; // 一级指针,指向队列的第一个task结构体
    void **tail; // 二级指针,指向队列的最后一个task结构体的第一个成员void *next指针
    int block; // 阻塞标志
    spinlock_t lock; // 自旋锁变量
    pthread_mutex_t mutex; // 互斥锁变量
    pthread_cond_t cond; // 条件变量
} task_queue_t;

3.线程池结构体:

struct threadpool_s {
    task_queue_t *task_queue; // task队列指针
    atomic_int quit; // 原子变量
    int thread_count; // 线程池中的线程数量
    pthread_t *threads; // 线程句柄数组
};
二、资源创建和销毁

1.创建任务队列:

static task_queue_t *__taskqueue_create() { // 创建一个任务队列

    int ret;
    task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
    // 回滚式编程,创建资源
    if (queue) {
        ret = pthread_mutex_init(&queue->mutex, NULL);
        if (ret == 0) {
            ret = pthread_cond_init(&queue->cond, NULL);
            if (ret == 0) { // 全部资源创建成功
                spinlock_init(&queue->lock);
                queue->head = NULL; // 队列为空
                queue->tail = &queue->head; // tail是二级指针,大小是一个指针八字节,队列为空时指向head指针首地址
                queue->block = 1; // 设置为阻塞
                return queue;
            }
            pthread_mutex_destroy(&queue->mutex); // queue和init成功,但此{}内其它资源至少有一个创建失败
        }
        free(queue); // queue创建成功,但此{}内其它资源没有全部创建成功,至少有一个失败
    }
    return NULL; // queue创建失败
}

2.销毁任务队列:

static void __taskqueue_destroy(task_queue_t *queue) { // 销毁一个任务队列
    task_t *task;
    // 先将任务队列中的任务全部销毁
    while ((task = __pop_task(queue))) {
        free(task);
    }
    // 销毁任务队列的成员
    spinlock_destroy(&queue->lock);
    pthread_cond_destroy(&queue->cond);
    pthread_mutex_destroy(&queue->mutex);
    free(queue); // 释放队列结构体空间
}

3.创建线程池:

static int __threads_create(threadpool_t *pool, size_t thread_count) { // 为线程池创建若干线程

    pthread_attr_t attr; // 用于设置线程属性的变量,作为pthread_create的第二个参数
    int ret;

    ret = pthread_attr_init(&attr); // 初始化线程属性变量
    // 资源创建-->回滚式编程
    if (ret == 0) {
        // 为线程句柄数组分配空间
        pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
        if (pool->threads) {
            // 创建若干线程开始执行__threadpool_worker函数
            int i = 0;
            for (; i < thread_count; i++) {
                if (pthread_create(&pool->threads[i], &attr, __threadpool_worker, pool) != 0) {
                    break; // 出现创建失败的情况
                }
            }
            pool->thread_count = i; // 更新线程数量,因为可能中途创建失败,数量小于预期值
            pthread_attr_destroy(&attr); // 销毁线程属性变量
            if (i == thread_count)
                return 0; // 资源全部创建成功
            __threads_terminate(pool);
            free(pool->threads);
        }
        ret = -1; // 为线程句柄数组分配空间失败
    }
    return ret; // 初始化线程属性变量失败
}


threadpool_t *threadpool_create(int thread_count) { // 创建一个线程池,参数是线程数量
    threadpool_t *pool;

    pool = (threadpool_t *)malloc(sizeof(*pool)); // 为线程池分配内存
    if (pool) {
        task_queue_t *queue = __taskqueue_create(); // 创建线程池的工作队列
        if (queue) {
            pool->task_queue = queue;
            atomic_init(&pool->quit, 0);
            if (__threads_create(pool, thread_count) == 0) // 为线程池创建若干线程
                return pool;
            __taskqueue_destroy(queue); // 线程创建失败则逐层释放已经创建的资源
        }
        free(pool);
    }
    return NULL;
}

4.回收线程池:

static void __threads_terminate(threadpool_t *pool) { // 回收线程池的所有线程

    atomic_store(&pool->quit, 1); // 设置quit状态,使得不再有新的线程开始工作
    __nonblock(pool->task_queue); // 设置队列为非阻塞,使得get_task的线程不会进入条件等待,直接退出
    int i;
    for (i = 0; i < pool->thread_count; i++) {
        pthread_join(pool->threads[i], NULL); // 等待所有线程结束
    }
}

三、任务的添加、删除和执行

1.向任务队列中添加一个任务:

static inline void __add_task(task_queue_t *queue, void *task) { // 向任务队列中添加一个任务 

    // void *,不限定任务类型,只要该任务的结构体起始内存是一个用于链接下一个节点的指针
    void **link = (void **)task; //转换task指向的类型从结构体变为指针:所指范围是8字节,也就是task_t结构体对象的前8个字节,此时task相当于指向其结构体的第一个成员next,与tail用法相同
    *link = NULL; // (*link等价于next指针)将该task指向的next指针成员指向NULL
    
    // 修改共享变量queue时加锁
    spinlock_lock(&queue->lock);
    *queue->tail = link; // 等价于 queue->tail->next = link, 因为二级指针tail实际上指向一个task_t结构体的第一个成员:next指针,link指向这个task的第一个成员next指针
    queue->tail = link; // 实际上第一行的next指针和第二行的tail指针都保存着link也就是task的首地址
    spinlock_unlock(&queue->lock);
    pthread_cond_signal(&queue->cond); // 添加一个任务后,广播唤醒处于等待状态的任务
}

int threadpool_post(threadpool_t *pool, handler_pt func, void *arg) { // 向线程池添加一个任务
    if (atomic_load(&pool->quit) == 1) // 如果线程池的状态是quit:已终止,则退出
        return -1;
    task_t *task = (task_t *)malloc(sizeof(task_t));
    if (!task) return -1;
    task->func = func;
    task->arg = arg;
    __add_task(pool->task_queue, task);
    return 0;
}

2.从任务队列中取出一个任务:

static inline void *__pop_task(task_queue_t *queue) { // 从队列中取出一个task
    // 上锁
    spinlock_lock(&queue->lock);
    if (queue->head == NULL) { // 队列为空
        spinlock_unlock(&queue->lock);
        return NULL;
    }
    task_t *task;
    task = queue->head; // 取出第一个

    void **link = (void **)task; // link指向task即head的下一个task
    queue->head = *link; // *link即link指向的task指针

    if (queue->head == NULL) { // 判断取出一个节点后队列是否为空,很必要!
        queue->tail = &queue->head;
    }
    spinlock_unlock(&queue->lock);
    return task;
}

static inline void *__get_task(task_queue_t *queue) { // 从队列中获取一个task,调用了pop_task
    task_t *task;
    // 使用while, 一定不能用if,如果被虚假唤醒,队列仍为空,返回的task == NULL
    while ((task = __pop_task(queue)) == NULL) { // 队列为空

    // 上锁 ,一定要在while内此处上锁,如果在while外,则其它没有抢到锁的线程会直接获得一个未初始化的野指针task
        pthread_mutex_lock(&queue->mutex);
        if (queue->block == 0) { // 队列为非阻塞状态
            pthread_mutex_unlock(&queue->mutex);
            return NULL; // 队列为空就直接返回
        }
        // 队列为阻塞状态
        pthread_cond_wait(&queue->cond, &queue->mutex);
        // pthread_cond_wait函数执行的内容:
        // 1.先进行unlock(&queue->mutex)
        // 2.再使当前线程休眠
        // --- 接收到broadcast广播时唤醒线程
        // 3.唤醒当前线程
        // 4.加上lock(&queue->mutex)
        // 5.函数返回
        // 6.pthread_cond_wait的执行并不是原子性的,所以需要使用锁
        pthread_mutex_unlock(&queue->mutex);
    }
    return task;
}

3.执行任务(每一个消费者线程所执行的函数)

static void *__threadpool_worker(void *arg) { // 线程池中的全部线程所执行的函数

    threadpool_t *pool = (threadpool_t *)arg; // 传入一个线程池的指针
    task_t *task;  
    void *ctx; 

    while (atomic_load(&pool->quit) == 0) { // 线程池没有停止运转(被销毁前停止、被终止前停止)
        task = (task_t *)__get_task(pool->task_queue); // 从线程池的任务队列中获取一个任务
        if (!task) break; // 线程池被标记终止"quit",get_task返回NULL
        handler_pt func = task->func; // 获取该任务所执行的函数
        ctx = task->arg; // 任务执行函数所需的参数
        free(task); // 任务被执行后销毁
        func(ctx); // 执行任务函数
    }

    return NULL;
}

推荐学习 https://xxetb.xetslk.com/s/p5Ibb

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/657771.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LBank研究院: DePIN赛道解析|加密精神与Jevons悖论的第三世界

作者&#xff1a;Eva&#xff0c;LBank研究员 *本人谨代表作者观点&#xff0c;不构成任何交易建议。 *本文内容为原创&#xff0c;版权为LBank所有&#xff0c;如需转载请注明作者和出处&#xff0c;否则将追究法律责任。 TLDR: DePIN是对传统老牌硬件的洗牌挑战&#xff…

excel 点击单元格的内容 跳转到其他sheet设置

如图点击1处跳转到2 按照如下图步骤操作即可

js setTimeout、setInterval、promise、async await执行顺序梳理

基础知识 async: 关键字用于标记一个函数为异步函数&#xff0c;该函数中有一个或多个promise对象&#xff0c;需要等待执行完成后才会继续执行。 await:关键字&#xff0c;用于等待一个promise对象执行完&#xff0c;并返回其中的值&#xff0c;只能在async函数内部使用。可…

【PB案例学习笔记】-11动画显示窗口

写在前面 这是PB案例学习笔记系列文章的第11篇&#xff0c;该系列文章适合具有一定PB基础的读者。 通过一个个由浅入深的编程实战案例学习&#xff0c;提高编程技巧&#xff0c;以保证小伙伴们能应付公司的各种开发需求。 文章中设计到的源码&#xff0c;小凡都上传到了gite…

调用萨姆索诺夫函数:深入探索函数的参数与返回值

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、萨姆索诺夫函数的引入与调用 二、如何获取函数的返回值 三、无参数与无返回值的函数调…

基于魔搭开源推理引擎 DashInfer实现CPU服务器大模型推理--理论篇

前言 在人工智能技术飞速发展的今天&#xff0c;如何高效地在CPU上运行大规模的预训练语言模型&#xff08;LLM&#xff09;成为了加速生成式AI应用广泛落地的核心问题。阿里巴巴达摩院模型开源社区ModelScope近期推出了一款名为DashInfer的推理引擎&#xff0c;旨在解决这一挑…

语音控制系统的安全挑战与防御策略(上)

语音控制系统&#xff08;VCS&#xff09;提供了便捷的用户界面&#xff0c;涉及智能家居、自动驾驶汽车、智能客服等众多应用场景&#xff0c;已成为现代智能设备不可或缺的一部分。其市场规模预计到2023年达到70亿美元&#xff0c;这种扩张带来了重大的安全挑战&#xff0c;如…

STM32简易音乐播放器(HAL库)

一、设计描述 本设计以STM32MP157A单片机为核心控制器&#xff0c;加上其他的模块一起组成基于单片机的音乐盒的整个系统&#xff0c;通过不同频率的PWM使蜂鸣器播放音乐&#xff0c;通过按键中断实现歌曲切换&#xff0c;音量调节&#xff0c;定时器中断实现播放速度调节&…

如何为 kNN 搜索选择最佳 k 和 num_candidates

作者&#xff1a;Madhusudhan Konda 如何选择最好的 k 和 num_candidates&#xff1f; 向量搜索在当前的生成式人工智能/机器学习领域中已经成为一个改变游戏规则的技术。它允许我们基于语义含义而不仅仅是精确的关键词匹配来找到相似的项目。 Elasticsearch的 k-近邻&#x…

使用 Flask 实现异步请求处理

文章目录 为什么需要异步请求处理&#xff1f;在 Flask 中实现异步请求处理使用 Flask-Cors 扩展 总结 在开发 Web 应用程序时&#xff0c;异步请求处理是提高性能和并发能力的重要方法之一。Flask 是一个轻量级的 Web 框架&#xff0c;它提供了易于使用的工具来实现异步请求处…

STM32高级控制定时器(STM32F103):检测输入PWM周期和占空比

目录 概述 1 PWM 输入模式 1.1 原理介绍 1.2 应用实例 1.3 示例时序图 2 使用STM32Cube配置工程 2.1 软件环境 2.2 配置参数 2.3 生成项目文件 3 功能实现 3.1 PWM占空比函数 3.2 输入捕捉回调函数 4 功能测试 4.1 测试软件框架结构 4.2 实验实现 4.2.1 测试实…

邀请媒体参会,媒体邀约的正确打开方式

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 邀请媒体参会是一项重要的公关活动&#xff0c;需要细致的规划和执行。以下是一些步骤和建议&#xff0c;可以帮助你更有效地进行媒体邀约&#xff1a; 1. 拟定邀约媒体名单&#xff1a;…

python数据分析——分组操作1

参考资料&#xff1a;活用pandas库 1、简介 借助“分割-应用-组合”&#xff08;split-apply-combine&#xff09;模式&#xff0c;分组操作可以有效地聚合、转换和过滤数据。 分割&#xff1a;基于键&#xff0c;把要处理的数据分割为小片段。 应用&#xff1a;分别处理每个数…

【CUDA】Nsight profile驱动的CUDA优化

前置准备 安装NVIDIA Nsight Compute。 安装好后选择使用管理员权限启动下载官方 Demo 代码官方博客Shuffle warp 1. 任务介绍及CPU版本 1.1 任务介绍 任务理解&#xff1a; 有一个 L x M 的矩阵 M 1 M_1 M1​ 对其每行取平均值 得到 V 1 ∈ R L 1 V_1 \in \mathbb{R}^{…

Java | Leetcode Java题解之第117题填充每个节点的下一个右侧节点指针II

题目&#xff1a; 题解&#xff1a; class Solution {Node last null, nextStart null;public Node connect(Node root) {if (root null) {return null;}Node start root;while (start ! null) {last null;nextStart null;for (Node p start; p ! null; p p.next) {if…

学习笔记——数据通信基础——数据通信网络(拓扑结构)

网络拓扑 网络拓扑(Network Topology)是指用传输介质(例如双绞线、光纤等)互连各种设备(例如计算机终端、路由器、交换机等)所呈现的结构化布局。 1、网络拓扑形态 星型网络∶所有节点通过一个中心节点连接在一起。 优点∶容易在网络中增加新的节点。通信数据必须经过中心节点…

【2】:向量与矩阵

向量 既有大小又有方向的量叫做向量 向量的模 向量的长度 单位向量 (只表示方向不表示长度) 向量的加减运算 向量求和 行向量与列向量的置换 图形学中竖着写 向量的长度计算 点乘&#xff08;计算向量间夹角&#xff09; 点乘满足的运算规律 交换律、结合律、分配…

新型高性能数据记录仪ETHOS 2

| 具有强大CPU性能的数据记录仪 IPETRONIK推出了一款新型高性能数据记录仪——ETHOS 2&#xff0c;作为ETHOS的第二代&#xff0c;它借助新型英特尔i7-9850HE处理器&#xff0c;实现了11,572的性能指数&#xff0c;从而能够快速有效应对CAN FD、LIN和以太网总线测量方面的日益…

【校园网网络维修】当前用户使用的IP与设备重定向地址中IP不一致,请重新认证

出现的网络问题&#xff1a;当前用户使用的IP与设备重定向地址中IP不一致&#xff0c;请重新认证 可能的原因&#xff1a; 把之前登录的网页收藏到浏览器&#xff0c;然后直接通过这个链接进行登录认证。可能是收藏网址导致的ip地址请求参数不一致。 解决方法&#xff1a; 方法…

循环buffer“一写多读“

1.往期回顾 一个简单实用的循环buffer&#xff0c;用于缓冲数据&#xff01;测试500M数据&#xff0c;耗时1.3秒。 C语言版本的循环buffer比C版本的速度更快&#xff01;测试500M数据0.5秒&#xff0c;达9.25Gbps左右&#xff01; C 语言免拷贝版本循环 buffer 比拷贝版本快了…