线程池
相关文章
协议
Socket编程
高并发服务器实现
线程池
如果一个客户端建立连接使用创建一个线程用于处理这一个线程, 处理结束的时候把这一个线程删除, 这个时候会导致线程的创建以及销毁会消耗大量的时间
这时候可以一次性创建多个线程, 这几个线程统称线程池, 如果客户端建立一个连接, 线程池分配一个线程处理客户发过来的数据, 不处理的时候这几个线程阻塞
可以使用条件变量进行阻塞
线程的数量可以随着连接的个数, 时间等条件进行变换, 但是要有一个上限, 连接分个数增加的时候加线程, 如果忙的线程数量比较少的时候释放线程, 这一些处理使用一个adjust线程进行处理
实际实现
主函数
- 创建一个线程池
- 在里面添加任务
- 等待
- 关闭线程池
线程池
-
初始化各种数据
-
获取基础线程, 让他们阻塞等待任务
-
获取任务, 唤醒线程处理
-
处理结束接着阻塞
后台的控制线程(adjust_thread)根据线程的实际使用情况添加或者释放线程
/*************************************************************************
> File Name: thread_pool.c
> Author: XvSenfeng
> Mail: 1458612070@qq.com
> Created Time: Wed 10 Apr 2024 08:23:27 PM CST
************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
使用的头文件
//多长时间判断一下是否需要增加获取减少线程个数
//这一个值小一点看现象
#define DEFAULT_TIME 3
//最小的执行的线程的个数
#define MIN_WAIT_TASK_NUM 10
//增加减少的时候步长
#define DEFAULT_THREAD_VARY 10
#define true 1
#define false 0
使用的宏定义
//记录一个回调函数, 由于线程执行
typedef struct {
void *(*function)(void *);
void *arg;
} threadpool_task_t;
//线程池的控制结构体
typedef struct{
pthread_mutex_t lock; //这一个结构体使用
pthread_mutex_t thread_counter; //记录忙状态的线程的个数
pthread_cond_t queue_not_full; //队列,没有满的时候添加使用
pthread_cond_t queue_not_empty;
pthread_t *threads; //线程数组, 记录现在使用的线程的pid
pthread_t adjust_tid; //管理者的id
threadpool_task_t *task_queue; //线程执行任务的时候使用的环形队列
int min_thr_num; //最小的时候保留的线程个数
int max_thr_num; //最大可以支持的个数
int live_thr_num; //存活的线程的个数
int busy_thr_num; //执行任务的线程的个数
int wait_exit_thr_num; //需要释放的线程的个数
//队列使用
int queue_front;
int queue_rear;
int queue_size;
int queue_max_size;
int shutdown; //记录这一个线程池有没有使用
}threadpool_t;
线程池的控制结构体, 以及任务信息记录的结构体
void *adjust_thread(void *threadpool);
void threadpool_free(threadpool_t *pool);
//普通的线程处理函数
void *threadpool_thread(void *threadpool){
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
while(true){
pthread_mutex_lock(&(pool->lock));
//判断一下现在没有可以执行的任务, 并且这一个线程组还没有销毁
while((pool->queue_size == 0) && (!pool->shutdown)){
printf("thread %#x is waiting\n", (unsigned int)pthread_self());
//等待任务, 获取清除唤醒
pthread_cond_wait(&pool->queue_not_empty, &pool->lock);
//需要清除一部分线程
if(pool->wait_exit_thr_num > 0){
pool->wait_exit_thr_num--;//记录需要清除的任务的个数
if(pool->live_thr_num > pool->min_thr_num){
printf("thread %#x is exiting\n", (unsigned int)pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
}
}
//这时候是有待处理的任务, 或者这一个线程池被销毁了
if(pool->shutdown){
//销毁线程池
pthread_mutex_unlock(&pool->lock);
printf("thread %#x will exit\n", (unsigned int)pthread_self());
pthread_detach(pthread_self());
pthread_exit(NULL);
}
//处理一个待处理的任务
//获取一下这一个任务的处理函数
task.function = pool->task_queue[pool->queue_front].function;
task.arg = pool->task_queue[pool->queue_front].arg;
//调整一下环形缓冲区的指针
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;
pool->queue_size--;
//发送一个任务队列现在有位置的信号, 这一个的阻塞在添加一个待处理任务的位置
pthread_cond_broadcast(&pool->queue_not_full);
pthread_mutex_unlock(&pool->lock);
printf("thread %#x start working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&pool->thread_counter);
pool->busy_thr_num++;
pthread_mutex_unlock(&pool->thread_counter);
(*(task.function))(task.arg);//调用则一个任务的处理函数
printf("thread %#x end working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&pool->thread_counter);
pool->busy_thr_num--;
pthread_mutex_unlock(&pool->thread_counter);
}
pthread_exit(NULL);
}
//创建一个线程池, 返回这一个线程池的操控结构体
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size){
int i;
threadpool_t *pool = NULL;
do{
printf("malloc thread pool\n");
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL){
printf("malloc threadpool fail");
break;
}
//记录一下初始信息
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num;
pool->wait_exit_thr_num = 0;
pool->queue_size = 0;
pool->queue_max_size = queue_max_size;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = false;
//获取线程数组
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
if(pool->threads == NULL){
printf("malloc threads fail");
break;
}
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
//获取任务队列
printf("malloc task queue\n");
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
printf("success\n");
if(pool->task_queue == NULL){
printf("malloc queue fail");
break;
}
memset(pool->task_queue, 0, sizeof(threadpool_task_t)*queue_max_size);
//初始化一下各种锁
if(pthread_mutex_init(&(pool->thread_counter), NULL)!= 0 ||
pthread_mutex_init(&(pool->lock), NULL) != 0 ||
pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 ||
pthread_cond_init(&(pool->queue_not_full), NULL) != 0
){
printf("init lock fail");
break;
}
//开启一定数量的线程
for(i = 0;i<min_thr_num ;i++){
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
printf("start thread %#x...\n", (unsigned int)pool->threads[i]);
}
//开启一个控制线程
pthread_create(&pool->adjust_tid, NULL, adjust_thread, (void *)pool);
return pool;
}while(0);
threadpool_free(pool);
return NULL;
}
//释放这一个线程池的空间
void threadpool_free(threadpool_t *pool){
if(pool == NULL){
return;
}
if(pool->task_queue){
free(pool->task_queue);
}
if(pool->threads){
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
free(pool);
}
pool = NULL;
}
//销毁这一个线程池
void threadpool_destory(threadpool_t *pool){
if(pool == NULL){
return;
}
pool->shutdown = true;
//等待服务线程的结束
pthread_join(pool->adjust_tid, NULL);
int i;
for(i = 0; i < pool->live_thr_num; i++){
pthread_cond_broadcast(&(pool->queue_not_empty));
}
for(i = 0; i < pool->live_thr_num; i++){
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool);
}
//判断一个线程是不是还活着
int is_thread_alive(pthread_t tid){
int kill_rc = pthread_kill(tid, 0);
if(kill_rc == ESRCH){
return false;
}
return true;
}
//控制线程
void *adjust_thread(void *threadpool){
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
//只要这一个线程池还在, 这一个线程就在
while(!pool->shutdown){
sleep(DEFAULT_TIME);
pthread_mutex_lock(&(pool->lock));
int queue_size = pool->queue_size;
int live_thr_num = pool->live_thr_num;
pthread_mutex_unlock(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; //获取忙线程的个数
pthread_mutex_unlock(&(pool->thread_counter));
//创建线程, 这时候等待队列中的任务数大于等待阈值,且存活的线程数小于最大线程数
if(queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num){
//需要创建的线程数
pthread_mutex_lock(&(pool->lock));
int add = 0;
//获取可以使用的数组位置
for(i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY && pool->live_thr_num < pool->max_thr_num; i++){
if(pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])){
pthread_create(&pool->threads[i], NULL, threadpool_thread, (void *)pool);
add++;
pool->live_thr_num++;
printf("new thread %#x is created\n", (unsigned int)pool->threads[i]);
}
}
pthread_mutex_unlock(&(pool->lock));
}
//销毁线程,如果忙线程*2 < 存活线程数且存活线程数大于最小线程数
if((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num){
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;//需要销毁的线程数
pthread_mutex_unlock(&(pool->lock));
//唤醒空闲线程,让其自杀
for(i = 0; i < DEFAULT_THREAD_VARY; i++){
pthread_cond_signal(&(pool->queue_not_empty));
}
}
}
}
//给线程池里面加一个待处理的任务
int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg){
pthread_mutex_lock(&(pool->lock));
//如果队列满了,调用wait阻塞
while((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)){
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
}
//线程池已经关闭
if(pool->shutdown){
pthread_cond_broadcast(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
//清空工作线程的回调函数的参数
if(pool->task_queue[pool->queue_rear].arg != NULL){
pool->task_queue[pool->queue_rear].arg = NULL;
}
//添加任务到任务队列中
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;
pool->queue_size++;
//添加任务后,队列不为空,唤醒线程池中的线程
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
//模拟处理业务
void * process(void *arg){
printf("thread %x working on task %d\n", (int)pthread_self(), *(int *)arg);
sleep(1);
printf("task %d is end\n", *(int *)arg);
return NULL;
}
线程池的代码实现, 建议跟随main函数的流程分析
int main(void){
threadpool_t *thp = threadpool_create(3, 100, 100);
printf("pool inited\n");
int num[20];
//模拟添加任务
for(int i = 0; i < 20 ; i++){
num[i] = i;
printf("add task %d\n", i);
threadpool_add(thp, process, (void *)&num[i]); //添加任务
}
sleep(10);//等待处理结束
threadpool_destory(thp);
}
测试这一个线程池
执行分析
➜ 2024-4-7-MPIO ./a.out start thread 0x2b192700... #开启线程池的基础线程 thread 0x2b192700 is waiting start thread 0x2a991700... thread 0x2a991700 is waiting start thread 0x2a190700... thread 0x2a190700 is waiting pool inited #初始化结束 add task 0 #加入任务 add task 1 add task 2 thread 0x2b192700 start working #任务开始处理 thread 723068672 working on task 0 thread 0x2a991700 start working thread 714675968 working on task 1 thread 0x2a190700 start working add task 3 add task 4 add task 5 add task 6 add task 7 add task 8 add task 9 add task 10 add task 11 add task 12 add task 13 add task 14 add task 15 add task 16 add task 17 add task 18 add task 19 thread 706283264 working on task 2 #任务处理结束 task 0 is end thread 0x2b192700 end working thread 0x2b192700 start working thread 723068672 working on task 3 task 1 is end thread 0x2a991700 end working thread 0x2a991700 start working thread 714675968 working on task 4 task 2 is end thread 0x2a190700 end working thread 0x2a190700 start working thread 706283264 working on task 5 task 3 is end thread 0x2b192700 end working thread 0x2b192700 start working thread 723068672 working on task 6 task 4 is end thread 0x2a991700 end working thread 0x2a991700 start working thread 714675968 working on task 7 task 5 is end thread 0x2a190700 end working thread 0x2a190700 start working thread 706283264 working on task 8 new thread 0x2918e700 is created #由于线程数量比较少, 开始加入线程 new thread 0x2898d700 is created new thread 0x23fff700 is created new thread 0x237fe700 is created new thread 0x22ffd700 is created new thread 0x227fc700 is created new thread 0x21ffb700 is created new thread 0x217fa700 is created new thread 0x20ff9700 is created new thread 0x207f8700 is created thread 0x207f8700 start working thread 0x2898d700 start working thread 681105152 working on task 11 thread 0x23fff700 start working thread 603977472 working on task 12 thread 545228544 working on task 9 thread 0x237fe700 start working thread 595584768 working on task 13 thread 0x2918e700 start working thread 689497856 working on task 10 thread 0x22ffd700 start working thread 587192064 working on task 14 thread 0x227fc700 start working thread 578799360 working on task 15 thread 0x21ffb700 start working thread 570406656 working on task 16 thread 0x217fa700 start working thread 562013952 working on task 17 thread 0x20ff9700 start working thread 553621248 working on task 18 task 6 is end task 8 is end task 7 is end thread 0x2a190700 end working thread 0x2b192700 end working thread 0x2b192700 is waiting thread 0x2a991700 end working thread 0x2a991700 is waiting thread 0x2a190700 start working thread 706283264 working on task 19 task 11 is end thread 0x2898d700 end working task 10 is end thread 0x2918e700 end working task 16 is end thread 0x21ffb700 end working task 12 is end thread 0x23fff700 end working task 15 is end thread 0x227fc700 end working task 18 is end thread 0x20ff9700 end working task 17 is end thread 0x217fa700 end working task 13 is end thread 0x237fe700 end working thread 0x2898d700 is waiting #线程开始空闲 thread 0x2918e700 is waiting task 9 is end thread 0x207f8700 end working task 19 is end thread 0x2a190700 end working task 14 is end thread 0x22ffd700 end working thread 0x21ffb700 is waiting thread 0x23fff700 is waiting thread 0x227fc700 is waiting thread 0x20ff9700 is waiting thread 0x217fa700 is waiting thread 0x237fe700 is waiting thread 0x207f8700 is waiting thread 0x2a190700 is waiting thread 0x22ffd700 is waiting thread 0x2918e700 is exiting #空闲任务不需要了 thread 0x2b192700 is exiting thread 0x2a991700 is exiting thread 0x2898d700 is exiting thread 0x21ffb700 is exiting thread 0x23fff700 is exiting thread 0x227fc700 is exiting thread 0x217fa700 is exiting thread 0x237fe700 is exiting thread 0x20ff9700 is exiting thread 0x207f8700 will exit #销毁线程池的时候还有三个线程 thread 0x2a190700 will exit thread 0x22ffd700 will exit