一、线程池的启动 (主线程)
// 启动线程池 (主线程)
void threadPoolRun(struct ThreadPool* pool) {
/*
线程池被创建出来之后,接下来就需要让线程池运行起来,
其实就是让线程池里的若干个子线程运行起来
*/
// 确保线程池未运行
assert(pool && !pool->isStart);
// 比较主线程的ID和当前线程ID是否相等
// 相等=>确保执行线程为主线程;不相等=>exit(0)
if(pool->mainLoop->threadID != pthread_self()) {
exit(0);
}
pool->isStart = true; // 标记为启动
if(pool->threadNum > 0) { // 线程数量大于零
for(int i=0;i<pool->threadNum;++i) {
workerThreadInit(&pool->workerThreads[i], i);// 初始化子线程
workerThreadRun(&pool->workerThreads[i]); // 启动子线程
}
}
}
## 学习笔记:线程池的运行机制
- 线程池被创建后,需要启动使其子线程运行。启动线程池的函数需要一个有效struct ThreadPool*类型指针pool作为参数,和threadNum代表子线程总个数
// 初始化线程池
struct ThreadPool* threadPoolInit(struct EventLoop* mainLoop, int threadNum);
- 确保线程池未运行且执行线程为主线程
// 确保线程池未运行
assert(pool && !pool->isStart);
// 比较主线程的ID和当前线程ID是否相等
// 相等=>确保执行线程为主线程;不相等=>exit(0)
if(pool->mainLoop->threadID != pthread_self()) {
exit(0);
}
如果条件满足,将线程池标记为已启动,并初始化并启动子线程;如果线程数量大于零,通过WorkerThread模块的workerThreadInit函数进行初始化,通过WorkerThread模块的workerThreadRun函数进行启动;如果线程数量为零,线程池可以提供主线程的反应堆模型(mainLoop)
pool->isStart = true; // 标记为启动
if(pool->threadNum > 0) { // 线程数量大于零
for(int i=0;i<pool->threadNum;++i) {
workerThreadInit(&pool->workerThreads[i], i);// 初始化子线程
workerThreadRun(&pool->workerThreads[i]); // 启动子线程
}
}
### 知识点:线程池的启动
- 启动线程池的函数需要确保传入的结构体指针有效且线程池未运行
- 执行线程需要判断是否为主线程,避免异常情况
- 成功启动后,需要初始化并启动子线程,通过WorkerThread模块的函数进行初始化和启动
- 如果线程数量为零,线程池可以提供主线程的反应堆模型(mainLoop)
二、从线程池中取出一个反应堆实例 (主线程)
此外,takeWorkerEventLoop函数 主线程可以从线程池中取出某个子线程的反应堆实例。
目的:这个函数是主线程调用的,因为主线程是拥有线程池的。因此主线程可以遍历线程池里边的子线程,从中挑选一个子线程,得到它的反应堆模型,再将处理的任务放到反应堆模型里边。
// 取出线程池中的某个子线程的反应堆实例
struct EventLoop* takeWorkerEventLoop(struct ThreadPool* pool) {
assert(pool->isStart);// 确保线程池是运行的
// 比较主线程的ID和当前线程ID是否相等
// 相等=>确保执行线程为主线程;不相等=>exit(0)
if(pool->mainLoop->threadID != pthread_self()) {
exit(0);
}
// 从线程池中找到一个子线程,然后取出里边的反应堆实例
struct EventLoop* evLoop = pool->mainLoop;
if(pool->threadNum > 0) {
evLoop = pool->workerThreads[pool->index].evLoop;
pool->index = ++pool->index % pool->threadNum;
}
return evLoop;
}
如果线程数量为零,evLoop 为主线程的反应堆模型(mainLoop),即:
evLoop=pool->mainLoop;
如果线程数量大于零,从线程池中的某个子线程获取其事件循环,并将其存储在evLoop
变量中。为了对线程池中的工作线程实现雨露均沾,故需要用到index这个变量,为了确保 pool->index 的值在合适的取值范围内,并且不会超出它的取值范围:先将 pool->index 的值加一,然后对 pool->threadNum取余数,并将结果赋值给 pool->index :
① pool->index++;② pool->index %= pool->threadNum;
=>③ pool->index = ++pool->index % pool->threadNum;
if(pool->threadNum > 0) {
evLoop = pool->workerThreads[pool->index].evLoop;
pool->index = ++pool->index % pool->threadNum;
}
### 知识点:子线程的反应堆实例的取出
- 可以通过takeWorkerEventLoop函数从线程池中取出子线程的反应堆实例
- 这个函数的核心是取出反应堆实例,用于处理任务
- 如果线程数量为零,evLoop 为主线程的反应堆模型(mainLoop)
- 对线程池中的工作线程实现雨露均沾地分配各个子线程调度,避免所有任务都由同一个线程处理,还确保index在合适的取值范围
>>总结:
本文主要讲述了线程池的启动过程和操作函数。在启动线程池时,需要传入有效指针,确保线程池未被运行,并判断执行线程的线程是否为主线程。通过WorkerThread模块中的函数对子线程进行初始化并启动。主线程可以方便地管理子线程,并从中选择一个子线程以获取其反应堆模型。
此外,还讲述了线程池操作的函数,包括初始化、启动和取出子线程等。整个处理流程需要确保每个任务都能被雨露均沾地分配给各个子线程,避免所有任务都由同一个线程处理,还确保了index在合适的取值范围。
>>线程池总结:
// 初始化线程池
struct ThreadPool* threadPoolInit(struct EventLoop* mainLoop, int threadNum);
// 启动线程池
void threadPoolRun(struct ThreadPool* pool);
// 取出线程池中的某个子线程的反应堆实例
struct EventLoop* takeWorkerEventLoop(struct ThreadPool* pool);
threadPoolInit函数、threadPoolRun函数、takeWorkerEventLoop函数都需要在主线程里边进行依次调用。首先调用threadPoolInit函数得到pool实例,之后调用threadPoolRun函数启动线程池(意味着里边的子线程会启动)。接着调用takeWorkerEventLoop函数取出线程池中的某个子线程的反应堆实例,再把这个实例给到调用者,调用者就可以通过这个实例,往它的任务队列里边添加任务,这个任务添加到evLoop的任务队列里边去了之后,就开始处理任务队列。然后再根据任务队列里边的节点类型来处理Dispatcher的检测集合。有三种情况:
- 情况1.往检测集合里边添加新的节点
- 情况2.从检测集合里边删除一个节点
- 情况3.修改检测集合里边某个文件描述符对应的事件
Dispatcher这个检测集合被处理完之后,这个反应堆模型开始进行一个循环,它需要循环调用底层的poll/epoll_wait/select函数来检测这个集合里边是否有激活的文件描述符。如果有激活的文件描述符,那么就通过这个文件描述符找到它所对应的channel。找到这个channel之后,再基于激活的事件调用事件对应的回调函数。这个函数调用完成之后,对应的事件也就处理完毕。这就走完了整个的处理流程