参考 手写线程池 - C语言版 | 爱编程的大丙 (subingwen.cn)
1.为什么需要线程池?
1)线程问题:
(1)如果只使用线程创建函数,在不断有新的任务进来的时候,需要不断的创建任务;任务在结束之后,为了避免占用资源,需要销毁线程任务。导致频繁操作,占用程序运行时间。
(2)在线程创建之后,如果任务已经运行完毕,线程不去销毁,则会白白浪费资源。
2)如何解决线程问题(线程池的优势):
(1)使用线程池之后,将任务和线程分离,建立一定数量的线程,使线程重复利用(不销毁),不断将任务添加到线程中。解决线程频繁创建、销毁问题。提供系统执行效率。
(2)根据任务数量增减,自动添加或者减少线程,使得线程维持在最优数量,节约系统资源。
2.线程池是什么?
1)线程的基本组成:
(1)任务队列(负责保存要处理的任务,并将任务交给工作线程去处理)
(a)通过线程池提供的api函数,将待处理的任务添加到任务队列,或者从任务队列删除;
(b)已经处理的任务会被从任务队列删除;
(c)线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
(2)工作者线程(任务队列的消费者、执行人员,动态创建N个)
(a)线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理;
(b)工作的线程相当于是任务队列的消费者角色;
(c)如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞);
(d)如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作;
(3)管理者线程(管理整个线程池,1个)
(a)周期性地 对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测;
(b)根据任务数量的多少,增减线程数量;
3.线程池怎么实现?(linux/C语言版本)
1)单个任务元素结构体
// 任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
2)线程池结构体(任务队列+管理线程+工作线程)
// 线程池结构体
struct ThreadPool
{
Task* taskQ; // 任务队列 后面利用堆新建数组
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID 后面利用堆新建数组
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_mutex_t mutexBusy; // 锁busyNum变量
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
3)API声明
typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);
// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
//
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);
4)创建线程池
(1)主要操作:
(a)创建线程池对象:ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
(b)创建工作者线程队列(pthread_t):只是为了便于管理线程,并没有创建线程。
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
(c)创建任务队列(task类型):pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
(d)创建管理者线程:pthread_create(&pool->managerID, NULL, manager, pool);
(e)创建工作者线程:按照最小数量minNum进行for循环;
pthread_create(&pool->threadIDs[i], NULL, worker, pool);(f)初始化互斥量/条件变量:mutexPool、mutexBusy、notEmpty、notFull
(g)其余工作:初始化一些控制变量
(h)鲁棒性工作:检查指针,及时释放内存
(2)说明:
(a)为什么使用do while ?可以使用break,最后统一销毁资源;
(b)memset,后续根据指针是不是0 判断是不是有线程闲置
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
//实例化线程池对象
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
//判断pool有没有指向有效内存
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
//创建工作者线程队列,按照最多线程数量 创建
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
//判断threadIDs有没有指向有效内存
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
//初始化相关参数
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小个数相等
pool->exitNum = 0;
//创建互斥量、条件变量
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}
创建任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
//创建管理者线程
pthread_create(&pool->managerID, NULL, manager, pool);
//创建工作者线程
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);
// 释放资源
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);
return NULL;
}
5)工作线程执行函数worker
(1)主要操作:
(a)判断任务队列是否为空。假如为空,阻塞线程。并进一步判断,是否要销毁线程池。
(b)如果任务队列不为空,取出任务,执行任务回调函数。
(2)说明:
(a)线程池属于公共资源,在访问线程池的时候要加锁;访问结束之后解锁。
(b)while (pool->queueSize == 0 && !pool->shutdown) 。在pthread_cond_wait这个地方,为什么采用while而不是if。关于该点,进行解释。
首先,我们需要了解pthread_cond_wait的执行流程:
-》pthread_cond_wait()函数会将当前线程挂起,使它处于休眠状态。
-》在当前线程被挂起之前,函数会自动调用pthread_mutex_lock()函数将关联的互斥锁上锁,保证条件变量的独占访问。
-》函数会将当前线程加入条件变量的等待队列中,并释放关联的互斥锁。
-》当前线程进入休眠状态,等待其他线程在该条件变量上发出信号或广播。
-》当另一个线程在该条件变量上发出信号或广播时,pthread_cond_wait()函数会唤醒一个等待在该条件变量上的线程。
-》唤醒的线程重新获得关联的互斥锁的所有权,并继续执行。
-》pthread_cond_wait()函数返回。其次,我们需要了解pthread_cond_signal的作用:至少唤醒一个线程。
综合上面两点,设想以下场景。
初始临界变量x = 0; 线程1:如果x< 1,阻塞。否则,继续执行,并设置x = 0; 线程2:如果x< 1,阻塞。否则,继续执行,并设置x = 0; 线程3:设置x = 1,调用pthread_cond_signal唤醒线程。
情况1,采用if语句:线程1收到了线程唤醒,线程2收到了线程唤醒,线程1抢到了互斥锁,他正常进行,并把x设置为0;然后线程2因为收到了线程唤醒,他也开始执行。问题出现了,线程1不应该执行。因为他的判断条件是 x<1的情况下,应该进行阻塞。可是他却开始执行了。
情况2,采用while语句:线程1收到了线程唤醒,线程2收到了线程唤醒,线程1抢到了互斥锁,因为之前被阻塞了,while语句不满足,他又检查了一次while条件,发现不用阻塞了,他正常进行,并把x设置为0。然后线程2因为收到了线程唤醒,他也开始执行。因为之前也被阻塞了,while语句不满足,他有检查一次条件,发现x被修改了,x=0,需要被阻塞了。所以他没法往下执行了。所以接着被阻塞了。这种情况,就可以保证,条件变量只能唤醒一个线程。防止“伪唤醒”。
回到本例子中,情况一样。线程池会唤醒多个工作线程,让他们去执行任务。但是有可能任务数<线程数。如果使用while循环,就能保证每个任务对应一个线程。如果使用if判断,有可能唤醒多余的线程,并引发程序崩溃。
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while (pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
//线程退出函数。
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// 从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头结点 数组变成了循环队列
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
//告诉生产者,可以添加任务了
pthread_cond_signal(&pool->notFull);
//解锁
pthread_mutex_unlock(&pool->mutexPool);
//工作线程数量+1
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
//函数任务调用
task.function(task.arg);
//释放函数堆内存
free(task.arg);
task.arg = NULL;
//工作线程数量-1
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
6)管理者任务
(1)主要操作:
(a)添加任务线程;
(b)销毁任务线程;
(2)说明:
销毁任务线程的思路:发出条件变量信号,唤醒所有的工作线程;并将shutdown参数,告诉工作线程,现在要关闭。让所有的线程自杀。
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// 添加线程
// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
7)单线程退出函数
(1)主要操作:
前面判断线程是不是在工作,主要是判断线程ID是不是空。如果线程ID是空,表明为空线程;如果线程不为空,则为忙线程。因此,在退出删除线程的时候,需要修改线程队列中对应的线程ID,将其改为ID=0;便于后面的继续使用。
(2)说明
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
8)线程池添加函数
(1)主要操作:
(a)如果线程池中的工作线程全部被占用,阻塞添加任务,即阻塞生产者;
(b)如果工作线程释放了notFull条件变量,则说明工作线程有空余,接触生产者
(2)说明:
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
4.线程池怎么用?