线程池-手写线程池Linux C简单版本(生产者-消费者模型)

目录

  • 简介
  • 手写线程池
    • 线程池结构体分析
      • task_t
      • task_queue_t
      • thread_pool_t
    • 线程池函数分析
      • thread_pool_create
      • thread_pool_post
      • thread_worker
      • thread_pool_destroy
      • wait_all_done
      • thread_pool_free
    • 主函数调用
  • 运行结果

简介

本线程池采用C语言实现

线程池的场景:

当某些任务特别耗时(例如大量的IO读写操作),严重影响线程其他的任务的执行,可以使用线程池

线程池的一般特点:

线程池通常是一个生产者-消费者模型
生产者线程用于发布任务,任务通常保存在任务队列中
线程池作为消费者,用于取出任务,执行任务

线程池中线程数量的选择:

有一个经验公式: 线程数量 =(io等待时间+cpu运算时间)*核心数/cpu运算时间

因此可以根据经验公式得出下面两种场景的线程数量:

  • cpu密集任务:线程数量=核心数(即上面的公式假设cpu运算时间>>io等待时间)
  • io密集任务:线程数量=2*n+2

手写线程池

线程池代码结构:

  • thread_pool_create:创建线程池所需要的资源,包含不限于任务队列,子线程的创建。
  • thread_pool_post:用于任务的发布,将执行任务存在任务队列中。
  • thread_pool_destroy:用于线程池的退出,以及资源的销毁。
  • wait_all_done:join线程池所有子线程,等待回收子线程。
  • thread_worker:用于任务执行。

主要的核心点集中在thread_pool_post和thread_worker两个函数中,这两个函数也构成了生产者-消费者模型。本文采用队列+互斥锁+条件变量实现。

线程池结构体分析

由于C语言不像C++可以用类封装函数,因此线程池会使用结构体来封装一些变量或者函数指针。

task_t

封装任务的入口指针以及参数。

typedef struct task_t {
    handler_pt func;
    void * arg;
} task_t;

task_queue_t

封装任务队列,为了不频繁移动队列中数据,此处采用头尾索引来标记任务。

typedef struct task_queue_t {
    uint32_t head;
    uint32_t tail;
    uint32_t count;
    task_t *queue;
} task_queue_t;

thread_pool_t

包含互斥锁,条件变量,任务队列等信息

struct thread_pool_t {
    pthread_mutex_t mutex;
    pthread_cond_t condition; //条件变量
    pthread_t *threads; //线程
    task_queue_t task_queue; //任务队列

    int closed; //是否关闭线程池执行的标志,为1表示关闭
    int started; // 当前正在运行的线程数

    int thrd_count; //线程数
    int queue_size; //任务队列大小
};

其中closed:表示是否关闭线程池执行的标志,为1表示关闭。在线程的运行函数中,用来判断是否继续循环等待执行任务队列中的任务。
started:表示当前正在运行的线程数。在thread_pool_destroy函数中销毁线程池时,需要等待所有线程停止才行,即started == 0

线程池函数分析

thread_pool_create

创建线程池,初始化一些线程池属性
通过循环pthread_create函数创建子线程。

thread_pool_t *thread_pool_create(int thrd_count, int queue_size) {
    thread_pool_t *pool;

    if (thrd_count <= 0 || queue_size <= 0) {
        return NULL;
    }
    pool = (thread_pool_t*) malloc(sizeof(*pool));
    if (pool == NULL) {
        return NULL;
    }
    pool->thrd_count = 0;
    pool->queue_size = queue_size;
    pool->task_queue.head = 0;
    pool->task_queue.tail = 0;
    pool->task_queue.count = 0;

    pool->started = pool->closed = 0;

    pool->task_queue.queue = (task_t*)malloc(sizeof(task_t)*queue_size);
    if (pool->task_queue.queue == NULL) {
        // TODO: free pool
        return NULL;
    }

    pool->threads = (pthread_t*) malloc(sizeof(pthread_t) * thrd_count);
    if (pool->threads == NULL) {
        // TODO: free pool
        return NULL;
    }
    int i = 0;
    for (; i < thrd_count; i++) {
        if (pthread_create(&(pool ->threads[i]), NULL, thread_worker, (void*)pool) != 0) {
            // TODO: free pool
            return NULL;
        }
        pool->thrd_count++;
        pool->started++;
    }
    return pool;
}

thread_pool_post

作为生产者,往任务队列里面添加任务
通过pthread_cond_signal通知子唤醒子线程的pthread_cond_wait

int thread_pool_post(thread_pool_t *pool, handler_pt func, void *arg) {
    if (pool == NULL || func == NULL) {
        return -1;
    }
    task_queue_t *task_queue = &(pool->task_queue);
//此处用自旋锁会更节省消耗,因为锁里面的逻辑比较简单
    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }

    if (pool->closed) {
        pthread_mutex_unlock(&(pool->mutex));
        return -3;
    }

    if (task_queue->count == pool->queue_size) {
        pthread_mutex_unlock(&(pool->mutex));
        return -4;
    }
//避免queue数据的变化,采用头尾索引来标识
    task_queue->queue[task_queue->tail].func = func;
    task_queue->queue[task_queue->tail].arg = arg;
    task_queue->tail = (task_queue->tail + 1) % pool->queue_size;
    task_queue->count++;
//唤醒一个休眠的线程
    if (pthread_cond_signal(&(pool->condition)) != 0) {
        pthread_mutex_unlock(&(pool->mutex));
        return -5;
    }
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}

thread_worker

pthread_cond_wait等待任务的唤醒
作为消费者, (*(task.func))(task.arg);执行任务

static void *thread_worker(void *thrd_pool) {
    thread_pool_t *pool = (thread_pool_t*)thrd_pool;
    task_queue_t *que;
    task_t task;
    for (;;) {
        pthread_mutex_lock(&(pool->mutex));
        que = &pool->task_queue;
        while (que->count == 0 && pool->closed == 0) {
            // 阻塞在 condition,等待任务队列添加任务
            pthread_cond_wait(&(pool->condition), &(pool->mutex));
        }
        if (pool->closed == 1 && que->count == 0) break;//没有任务,并且关闭标志打开,即跳出循环
        task = que->queue[que->head];
        que->head = (que->head + 1) % pool->queue_size;
        que->count--;
        pthread_mutex_unlock(&(pool->mutex));
        (*(task.func))(task.arg);//执行对应任务函数
    }
    pool->started--;//跳出循环之后,运行线程数需要减1
    pthread_mutex_unlock(&(pool->mutex));
    pthread_exit(NULL);
    return NULL;
}

thread_pool_destroy

销毁释放线程池,置 pool->closed = 1;
通过pthread_cond_broadcast唤醒线程池所有线程,这个和thread_pool_post里的pthread_cond_signal一样,并且broadcast会通知到所有的线程

int thread_pool_destroy(thread_pool_t *pool) {
    if (pool == NULL) {
        return -1;
    }

    if (pthread_mutex_lock(&(pool->mutex)) != 0) {
        return -2;
    }

    if (pool->closed) {
        thread_pool_free(pool);
        return -3;
    }
    pool->closed = 1;
//广播形式,通知所有阻塞在condition的线程接触阻塞
    if (pthread_cond_broadcast(&(pool->condition)) != 0 || 
            pthread_mutex_unlock(&(pool->mutex)) != 0) {
        thread_pool_free(pool);
        return -4;
    }
    wait_all_done(pool);
    thread_pool_free(pool);
    return 0;
}

wait_all_done

将所有线程通过pthread_join回收,所有子线程任务执行完毕,回收线程

int wait_all_done(thread_pool_t *pool) {
    printf("wait_all_done start!pool->thrd_count:%d\n", pool->thrd_count);
    int i, ret=0;
    for (i=0; i < pool->thrd_count; i++) {
        printf("wait_all_done doing! i:%d\n", i);
        if (pthread_join(pool->threads[i], NULL) != 0) {
            ret=1;
        }
        
    }
    printf("wait_all_done end!\n");
    return ret;
}

thread_pool_free

释放线程池空间

static void thread_pool_free(thread_pool_t *pool) {
    if (pool == NULL || pool->started > 0) {
        return;
    }

    if (pool->threads) {
        free(pool->threads);
        pool->threads = NULL;

        pthread_mutex_lock(&(pool->mutex));
        pthread_mutex_destroy(&pool->mutex);
        pthread_cond_destroy(&pool->condition);
    }

    if (pool->task_queue.queue) {
        free(pool->task_queue.queue);
        pool->task_queue.queue = NULL;
    }
    free(pool);
}

主函数调用

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#include "thrd_pool.h"

int nums = 0;
int done = 0;
int task_num = 100;

pthread_mutex_t lock;

void do_task(void *arg) {
    usleep(10000);
    pthread_mutex_lock(&lock);
    done++;
    printf("doing %d task\n", done);
    pthread_mutex_unlock(&lock);
}

int main(int argc, char **argv) {
    int threads = 8;
    int queue_size = 256;

    if (argc == 2) {
        threads = atoi(argv[1]);
        if (threads <= 0) {
            printf("threads number error: %d\n", threads);
            return 1;
        }
    } else if (argc > 2) {
        threads = atoi(argv[1]);
        queue_size = atoi(argv[1]);
        if (threads <= 0 || queue_size <= 0) {
            printf("threads number or queue size error: %d,%d\n", threads, queue_size);
            return 1;
        }
    }

    thread_pool_t *pool = thread_pool_create(threads, queue_size);
    if (pool == NULL) {
        printf("thread pool create error!\n");
        return 1;
    }

    while (thread_pool_post(pool, &do_task, NULL) == 0) {
        pthread_mutex_lock(&lock);
        nums++;
        pthread_mutex_unlock(&lock);
        if (nums > task_num) break;
    }

    printf("add %d tasks\n", nums);
    usleep(1000000);//延时等待所有的作业完成

    printf("did %d tasks\n", done);
    thread_pool_destroy(pool);
    return 0;
}

运行结果

使用指令编译文件:

gcc main.c thrd_pool.c -o main -lpthread

运行执行文件得到运行结果
在这里插入图片描述在这里插入图片描述

完整代码下载线程池Linux C语言简单版本

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/56635.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

React哲学——官方示例

在本篇技术博客中&#xff0c;我们将介绍React官方示例&#xff1a;React哲学。我们将深入探讨这个示例中使用的组件化、状态管理和数据流等核心概念。让我们一起开始吧&#xff01; 项目概览 React是一个流行的JavaScript库&#xff0c;用于构建用户界面。React的设计理念是…

组合模式(Composite)

组合模式是一种结构型设计模式&#xff0c;主要用来将多个对象组织成树形结构以表示“部分-整体”的层次结构&#xff0c;因此该模式也称为“部分-整体”模式。简言之&#xff0c;组合模式就是用来将一组对象组合成树状结构&#xff0c;并且能像使用独立对象一样使用它们。 Co…

排序进行曲-v2.0

小程一言 这篇文章是在排序进行曲1.0之后的续讲&#xff0c; 0之后的续讲,英语在上一篇讲的排序的基本概念与分类0之后的续讲, 英语在上一篇讲的排序的基本概念与分类这片主要是对0之后的续讲,英语在上一篇讲的排序的基本概念与分类这 篇主要是对几个简单的排序进行细致的分析…

HarmonyOS/OpenHarmony元服务开发-配置卡片的配置文件

卡片相关的配置文件主要包含FormExtensionAbility的配置和卡片的配置两部分&#xff1a; 1.卡片需要在module.json5配置文件中的extensionAbilities标签下&#xff0c;配置FormExtensionAbility相关信息。FormExtensionAbility需要填写metadata元信息标签&#xff0c;其中键名称…

企业电子招投标采购系统java spring cloud+spring boot功能模块功能描述+数字化采购管理 采购招投标

​功能模块&#xff1a; 待办消息&#xff0c;招标公告&#xff0c;中标公告&#xff0c;信息发布 描述&#xff1a; 全过程数字化采购管理&#xff0c;打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力&#xff0c;为外…

Java枚举解析:掌握枚举的绝佳指南!

申明&#xff1a;本人于公众号Java筑基期&#xff0c;CSDN先后发当前文章&#xff0c;标明原创&#xff0c;转载二次发文请注明转载公众号&#xff0c;另外请不要再标原创 &#xff0c;注意违规 枚举 在Java中&#xff0c;枚举&#xff08;Enumeration&#xff09;是一种特殊的…

Python实现单例模式

一、介绍 单例模式是一种常见的设计模式&#xff0c;它保证一个类只能被实例化一次&#xff0c;并提供了一个全局访问点来获取这个唯一的实例。在Python中&#xff0c;可以通过使用装饰器、元类或模块等方式实现单例模式。 二、Python实现单例模式的6种方法 1、使用模块实现…

git 忽略掉不需要的文件

第一步&#xff1a;创建.gitignore文件 touch .gitignore 第二步&#xff1a;使用vi编辑器 输入不需要的文件&#xff0c;或用通配符*来忽视一系列文件 效果&#xff1a;

什么是软件检测证明材料,如何才能获得软件检测证明材料?

一、什么是软件检测证明材料 软件检测证明材料是指在软件开发和发布过程中&#xff0c;为了证明软件的质量和合法性&#xff0c;进行的一系列检测和测试的结果的集合。它是软件开发者和用户之间信任的桥梁&#xff0c;可以帮助用户了解软件的性能和安全性&#xff0c;让用户放…

开放自动化软件的硬件平台

自动化行业的产品主要以嵌入式系统为主&#xff0c;历来对产品硬件的可靠性和性能都提出很高的要求。最典型的产品要数PLC。PLC 要求满足体积小&#xff0c;实时性&#xff0c;可靠性&#xff0c;可扩展性强&#xff0c;环境要求高等特点。它们通常采用工业级高性能嵌入式SoC 实…

LinearAlgebraMIT_6_ColumnSpaceAndNullSpace

这节课的两个重点是column space列空间和null space零空间。 x.1 pre-multiply/left multiply and post-multiply/right multiply 对于pre-multiply/left multiply左乘和post-multiply/right multiply右乘&#xff0c;如果用英文的pre-和post-是比较容易理解的&#xff0c; A…

Maven发布项目到Nexus私服

项目pom配置 在项目pom.xml中文件中的仓库配置&#xff0c;Nexus私服如何搭建在这里不介绍了可自行百度。 <distributionManagement><repository><id>releases</id><name>Nexus Release Repository</name><url>http://私服地址:34…

gitlab CI/CD 安装 gitlab runner

一、为什么需要安装gitlab runner &#xff1f; 极狐GitLab Runner 极狐GitLab Runner 是在流水线中运行作业的应用&#xff0c;与极狐GitLab CI/CD 配合运作。 说白了就是你部署的一个agent。 二、如何安装&#xff1f; 1.介绍通过helm部署github runner 2.helm添加仓库 h…

python小游戏课程设计报告,python游戏课程设计报告

大家好&#xff0c;给大家分享一下python2048游戏课程设计报告&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01;

Clion开发Stm32之温湿度传感器(DS18B20)驱动编写和测试

前言 涵盖之前文章: Clion开发STM32之HAL库GPIO宏定义封装(最新版)Clion开发stm32之微妙延迟(采用nop指令实现)Clion开发STM32之日志模块(参考RT-Thread) DSP18B20驱动文件 头文件 /*******************************************************************************Copy…

多线程(JavaEE初阶系列6)

目录 前言&#xff1a; 1.什么是线程池 2.标准库中的线程池 3.实现线程池 结束语&#xff1a; 前言&#xff1a; 在上一节中小编带着大家了解了一下Java标准库中的定时器的使用方式并给大家实现了一下&#xff0c;那么这节中小编将分享一下多线程中的线程池。给大家讲解一…

CNN卷积详解

转载自&#xff1a;https://blog.csdn.net/yilulvxing/article/details/107452153 仅用于自己学习过程中经典文章讲解的记录&#xff0c;防止原文失效。 1&#xff1a;单通道卷积 以单通道卷积为例&#xff0c;输入为&#xff08;1,5,5&#xff09;&#xff0c;分别表示1个通道…

安防视频监控平台EasyCVR修改参数提示database or disk is full的原因排查

EasyDarwin开源流媒体视频EasyCVR安防监控平台可提供视频监控直播、云端录像、云存储、录像检索与回看、智能告警、平台级联、云台控制、语音对讲、智能分析等能力。视频监控综合管理平台EasyCVR具备视频汇聚融合能力&#xff0c;平台基于云边端一体化架构&#xff0c;具有强大…

JDK各版本重要变革

各版本更新详情 JDK8(LTS)--2014/3 语法层面 lambda表达式(重要特色之一) 一种特殊的匿名内部类,语法更加简洁允许把函数作为一个方法的参数,将代码象数据一样传递&#xff0c;即将函数作为方法参数传递基本语法: <函数式接口> <变量名> (参数...) -> { 方法…

做虾皮你必须懂的五大流量运营逻辑!

一、竞品流量来源 商家排名一般有四个维度&#xff0c;弟一个维度是消量弟一&#xff0c;弟二个维度是销售额弟一&#xff0c;第三个维度是流量弟一&#xff0c;第四个维度利润弟一。只要我们找出来自我排名即可&#xff0c;然后打开生意参谋&#xff0c;到竞品分析添加成竞品…