线程池是一种池式组件,通过创建和维护一定数量的线程,实现这些线程的重复使用,避免了频繁创建和销毁线程的开销,从而提升了性能
线程池的作用:
1.复用线程资源;
2.减少线程创建和销毁的开销;
3.可异步处理生产者线程的任务;
4.减少了多个任务(不是一个任务)的执行时间;
代码实现
一、结构体定义
1.任务结构体:
typedef struct task_s {
void *next; // 指向下一个task
handler_pt func; // 该task执行的函数
void *arg; // 函数的参数
}task_t;
2.队列结构体:
typedef struct task_queue_s { // task队列
void *head; // 一级指针,指向队列的第一个task结构体
void **tail; // 二级指针,指向队列的最后一个task结构体的第一个成员void *next指针
int block; // 阻塞标志
spinlock_t lock; // 自旋锁变量
pthread_mutex_t mutex; // 互斥锁变量
pthread_cond_t cond; // 条件变量
} task_queue_t;
3.线程池结构体:
struct threadpool_s {
task_queue_t *task_queue; // task队列指针
atomic_int quit; // 原子变量
int thread_count; // 线程池中的线程数量
pthread_t *threads; // 线程句柄数组
};
二、资源创建和销毁
1.创建任务队列:
static task_queue_t *__taskqueue_create() { // 创建一个任务队列
int ret;
task_queue_t *queue = (task_queue_t *)malloc(sizeof(task_queue_t));
// 回滚式编程,创建资源
if (queue) {
ret = pthread_mutex_init(&queue->mutex, NULL);
if (ret == 0) {
ret = pthread_cond_init(&queue->cond, NULL);
if (ret == 0) { // 全部资源创建成功
spinlock_init(&queue->lock);
queue->head = NULL; // 队列为空
queue->tail = &queue->head; // tail是二级指针,大小是一个指针八字节,队列为空时指向head指针首地址
queue->block = 1; // 设置为阻塞
return queue;
}
pthread_mutex_destroy(&queue->mutex); // queue和init成功,但此{}内其它资源至少有一个创建失败
}
free(queue); // queue创建成功,但此{}内其它资源没有全部创建成功,至少有一个失败
}
return NULL; // queue创建失败
}
2.销毁任务队列:
static void __taskqueue_destroy(task_queue_t *queue) { // 销毁一个任务队列
task_t *task;
// 先将任务队列中的任务全部销毁
while ((task = __pop_task(queue))) {
free(task);
}
// 销毁任务队列的成员
spinlock_destroy(&queue->lock);
pthread_cond_destroy(&queue->cond);
pthread_mutex_destroy(&queue->mutex);
free(queue); // 释放队列结构体空间
}
3.创建线程池:
static int __threads_create(threadpool_t *pool, size_t thread_count) { // 为线程池创建若干线程
pthread_attr_t attr; // 用于设置线程属性的变量,作为pthread_create的第二个参数
int ret;
ret = pthread_attr_init(&attr); // 初始化线程属性变量
// 资源创建-->回滚式编程
if (ret == 0) {
// 为线程句柄数组分配空间
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
if (pool->threads) {
// 创建若干线程开始执行__threadpool_worker函数
int i = 0;
for (; i < thread_count; i++) {
if (pthread_create(&pool->threads[i], &attr, __threadpool_worker, pool) != 0) {
break; // 出现创建失败的情况
}
}
pool->thread_count = i; // 更新线程数量,因为可能中途创建失败,数量小于预期值
pthread_attr_destroy(&attr); // 销毁线程属性变量
if (i == thread_count)
return 0; // 资源全部创建成功
__threads_terminate(pool);
free(pool->threads);
}
ret = -1; // 为线程句柄数组分配空间失败
}
return ret; // 初始化线程属性变量失败
}
threadpool_t *threadpool_create(int thread_count) { // 创建一个线程池,参数是线程数量
threadpool_t *pool;
pool = (threadpool_t *)malloc(sizeof(*pool)); // 为线程池分配内存
if (pool) {
task_queue_t *queue = __taskqueue_create(); // 创建线程池的工作队列
if (queue) {
pool->task_queue = queue;
atomic_init(&pool->quit, 0);
if (__threads_create(pool, thread_count) == 0) // 为线程池创建若干线程
return pool;
__taskqueue_destroy(queue); // 线程创建失败则逐层释放已经创建的资源
}
free(pool);
}
return NULL;
}
4.回收线程池:
static void __threads_terminate(threadpool_t *pool) { // 回收线程池的所有线程
atomic_store(&pool->quit, 1); // 设置quit状态,使得不再有新的线程开始工作
__nonblock(pool->task_queue); // 设置队列为非阻塞,使得get_task的线程不会进入条件等待,直接退出
int i;
for (i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL); // 等待所有线程结束
}
}
三、任务的添加、删除和执行
1.向任务队列中添加一个任务:
static inline void __add_task(task_queue_t *queue, void *task) { // 向任务队列中添加一个任务
// void *,不限定任务类型,只要该任务的结构体起始内存是一个用于链接下一个节点的指针
void **link = (void **)task; //转换task指向的类型从结构体变为指针:所指范围是8字节,也就是task_t结构体对象的前8个字节,此时task相当于指向其结构体的第一个成员next,与tail用法相同
*link = NULL; // (*link等价于next指针)将该task指向的next指针成员指向NULL
// 修改共享变量queue时加锁
spinlock_lock(&queue->lock);
*queue->tail = link; // 等价于 queue->tail->next = link, 因为二级指针tail实际上指向一个task_t结构体的第一个成员:next指针,link指向这个task的第一个成员next指针
queue->tail = link; // 实际上第一行的next指针和第二行的tail指针都保存着link也就是task的首地址
spinlock_unlock(&queue->lock);
pthread_cond_signal(&queue->cond); // 添加一个任务后,广播唤醒处于等待状态的任务
}
int threadpool_post(threadpool_t *pool, handler_pt func, void *arg) { // 向线程池添加一个任务
if (atomic_load(&pool->quit) == 1) // 如果线程池的状态是quit:已终止,则退出
return -1;
task_t *task = (task_t *)malloc(sizeof(task_t));
if (!task) return -1;
task->func = func;
task->arg = arg;
__add_task(pool->task_queue, task);
return 0;
}
2.从任务队列中取出一个任务:
static inline void *__pop_task(task_queue_t *queue) { // 从队列中取出一个task
// 上锁
spinlock_lock(&queue->lock);
if (queue->head == NULL) { // 队列为空
spinlock_unlock(&queue->lock);
return NULL;
}
task_t *task;
task = queue->head; // 取出第一个
void **link = (void **)task; // link指向task即head的下一个task
queue->head = *link; // *link即link指向的task指针
if (queue->head == NULL) { // 判断取出一个节点后队列是否为空,很必要!
queue->tail = &queue->head;
}
spinlock_unlock(&queue->lock);
return task;
}
static inline void *__get_task(task_queue_t *queue) { // 从队列中获取一个task,调用了pop_task
task_t *task;
// 使用while, 一定不能用if,如果被虚假唤醒,队列仍为空,返回的task == NULL
while ((task = __pop_task(queue)) == NULL) { // 队列为空
// 上锁 ,一定要在while内此处上锁,如果在while外,则其它没有抢到锁的线程会直接获得一个未初始化的野指针task
pthread_mutex_lock(&queue->mutex);
if (queue->block == 0) { // 队列为非阻塞状态
pthread_mutex_unlock(&queue->mutex);
return NULL; // 队列为空就直接返回
}
// 队列为阻塞状态
pthread_cond_wait(&queue->cond, &queue->mutex);
// pthread_cond_wait函数执行的内容:
// 1.先进行unlock(&queue->mutex)
// 2.再使当前线程休眠
// --- 接收到broadcast广播时唤醒线程
// 3.唤醒当前线程
// 4.加上lock(&queue->mutex)
// 5.函数返回
// 6.pthread_cond_wait的执行并不是原子性的,所以需要使用锁
pthread_mutex_unlock(&queue->mutex);
}
return task;
}
3.执行任务(每一个消费者线程所执行的函数)
static void *__threadpool_worker(void *arg) { // 线程池中的全部线程所执行的函数
threadpool_t *pool = (threadpool_t *)arg; // 传入一个线程池的指针
task_t *task;
void *ctx;
while (atomic_load(&pool->quit) == 0) { // 线程池没有停止运转(被销毁前停止、被终止前停止)
task = (task_t *)__get_task(pool->task_queue); // 从线程池的任务队列中获取一个任务
if (!task) break; // 线程池被标记终止"quit",get_task返回NULL
handler_pt func = task->func; // 获取该任务所执行的函数
ctx = task->arg; // 任务执行函数所需的参数
free(task); // 任务被执行后销毁
func(ctx); // 执行任务函数
}
return NULL;
}
推荐学习 https://xxetb.xetslk.com/s/p5Ibb