线程池应该使用的地方 和 epoll 技术结合
线程池代码处理数据的地方。
线程池分析:
线程池代码1 threadpool_create
//Tencent8888 start threadpool_create函数的目的初始化线程池,对应的struct是 threadpool_t
/*
1.先malloc整个线程池的大小
2.这里使用的 do while实现的好处是,如果哪里有问题,则可以直接break出来,这里要学习的是这种技巧,实际上是为了避免使用goto 语句
3.锁子和 条件变量的初始化
4.将存放线程池中每个线程的tid。数组 进行分配空间。pthread_t *threads
5.将 任务队列 task_queue 进行分配空间
6.对于最小数量的thread,进行初始化
7.对管理线程进行初始化
8.如果有有问题 free相关的信息
9.下来我们就要看 thread 的初始化的回调函数 threadpool_thread
10.然后再看 管理线程 的初始化的回调函数 adjust_thread
*/
//Tencent8888 end
//Tencent8888 start threadpool_create函数的目的初始化线程池,对应的struct是 threadpool_t
/*
1.先malloc整个线程池的大小
2.这里使用的 do while实现的好处是,如果哪里有问题,则可以直接break出来,这里要学习的是这种技巧,实际上是为了避免使用goto 语句
3.锁子和 条件变量的初始化
4.将存放线程池中每个线程的tid。数组 进行分配空间。pthread_t *threads
5.将 任务队列 task_queue 进行分配空间
6.对于最小数量的thread,进行初始化
7.对管理线程进行初始化
8.如果有有问题 free相关的信息
9.下来我们就要看 thread 的初始化的回调函数 threadpool_thread
10.然后再看 管理线程 的初始化的回调函数 adjust_thread
*/
//Tencent8888 end
threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{
int i;
threadpool_t *pool = NULL;
do {
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
printf("malloc threadpool fail");
break;/*跳出do while*/
}
pool->min_thr_num = min_thr_num; /* 线程池最小线程数 */
pool->max_thr_num = max_thr_num; /* 线程池最大线程数 */
pool->busy_thr_num = 0; /* 忙状态线程个数 */
pool->live_thr_num = min_thr_num; /* 活着的线程数 初值=最小线程数 */
pool->queue_size = 0; /* 有0个产品 */
pool->queue_max_size = queue_max_size; /* task_queue队列可容纳任务数上限*/
pool->queue_front = 0; /* task_queue队头下标 */
pool->queue_rear = 0; /* task_queue队尾下标 */
pool->shutdown = false; /* 不关闭线程池 */
/* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
if (pool->threads == NULL) {
printf("malloc threads fail");
break;
}
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
/* 队列开辟空间 */
pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
if (pool->task_queue == NULL) {
printf("malloc task_queue fail");
break;
}
/* 初始化互斥琐、条件变量 */
if (pthread_mutex_init(&(pool->lock), NULL) != 0
|| pthread_mutex_init(&(pool->thread_counter), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
|| pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
{
printf("init the lock or cond fail");
break;
}
/* 启动 min_thr_num 个 work thread */
for (i = 0; i < min_thr_num; i++) {
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
printf("start thread 0x%x...\n", (unsigned int)pool->threads[i]);
}
pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */
return pool;
} while (0);
threadpool_free(pool); /* 前面代码调用失败时,释放poll存储空间 */
return NULL;
}
线程池代码2 threadpool_thread
代码中的使用
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);//pool指向当前线程池
pthread_create的第一个参数为传输参数,保存系统给我们分配好的线程ID。
第二个参数为NULL。表示使用线程默认属性
第三个参数为函数指针,指向线程主函数,该函数执行完毕,则线程结束
第四个参数 为第三个函数指针需要的参数,当前传递的是整个线程池pool
先复习一下 pthread_create函数 的使用
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
返回值:成功:0; 失败:错误号 -----Linux环境下,所有线程特点,失败均直接返回错误号。
参数:
pthread_t:当前Linux中可理解为:typedef unsigned long int pthread_t;
参数1:传出参数,保存系统为我们分配好的线程ID
参数2:通常传NULL,表示使用线程默认属性。若想使用具体属性也可以修改该参数。
参数3:函数指针,指向线程主函数(线程体),该函数运行结束,则线程结束。
参数4:线程主函数执行期间所使用的参数。
下来看threadpool_thread 这个方法到底干了些啥?
1.先是将参数 threadpool 取出来,因为pthread_create使用的时候,会将整个线程池poll做为参数传递过来。这是一种poll使用的方法
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);//pool指向当前线程池
/* 线程池中各个工作线程 */
void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
while (true) {
/* Lock must be taken to wait on conditional variable */
/*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
pthread_mutex_lock(&(pool->lock));
/*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
while ((pool->queue_size == 0) && (!pool->shutdown)) {
printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
/*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
if (pool->wait_exit_thr_num > 0) {
pool->wait_exit_thr_num--;
/*如果线程池里线程个数大于最小值时可以结束当前线程*/
if (pool->live_thr_num > pool->min_thr_num) {
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
}
}
}
/*如果指定了true,要关闭线程池里的每个线程,自行退出处理*/
if (pool->shutdown) {
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting\n", (unsigned int)pthread_self());
pthread_exit(NULL); /* 线程自行结束 */
}
/*从任务队列里获取任务, 是一个出队操作*/
task.function = pool->task_queue[pool->queue_front].function;
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; /* 出队,模拟环形队列 */
pool->queue_size--;
/*通知可以有新的任务添加进来*/
pthread_cond_broadcast(&(pool->queue_not_full));
/*任务取出后,立即将 线程池琐 释放*/
pthread_mutex_unlock(&(pool->lock));
/*执行任务*/
printf("thread 0x%x start working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter)); /*忙状态线程数变量琐*/
pool->busy_thr_num++; /*忙状态线程数+1*/
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.function))(task.arg); /*执行回调函数任务*/
//task.function(task.arg); /*执行回调函数任务*/
/*任务结束处理*/
printf("thread 0x%x end working\n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--; /*处理掉一个任务,忙状态数线程数-1*/
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}