0501 多线程管理
专栏内容:
- postgresql使用入门基础
- 手写数据库toadb
- 并发编程
个人主页:我的主页
管理社区:开源数据库
座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.
文章目录
- 0501 多线程管理
- 一、概述
- 二、 原理与机制
- 三、多条流水线的工厂
- 3.1 Worker信息结构定义
- 3.2 工厂的结构定义
- 3.3 工厂的建立
- 四、分发任务
- 五、执行任务
- 六、总结
- 结尾
一、概述
现代的CPU都会采用多个core的形式具有并行执行的能力,同一时间可以打开多个应用程序,即使是我们的手机,它的CPU也是非常强大的多核处理器。
如何让我们开发的应用程序充分利用多核CPU呢,这就不得不说多线程模型。
本文就来分享一下多线程模型的搭建与使用。
二、 原理与机制
在应用程序的架构中,一般采用分层原则,业务以任务的形式发布,而执行者接收任务只负责执行,并记录结果。
基于这样的总体架构设计,对于多线程的使用分为两种方式:
- 当任务产生时,再启动线程,任务执行完成后,线程也随之结束;
- 希望在应用程序启动时,有一定数量的执行者线程就开始处理待命状态,这个数量也决定了应用程序的并发,也就是处理任务的吞吐量;
第一种模式,适合一些任务量不大的业务逻辑,没有业务任务时,不需要占用系统资源;
而对于第二种模式,适合大量任务的场景,频繁的启动和销毁线程反而会带来大量的开销,最好是提前准备好线程,每个线程能执行不同的任务,线程是可重入的。
对于数据库程序而言,部分业务应用会以短连接的形式连接到数据库,可能执行一条或几条SQL就断开了,面对这样大量的短连接时,数据库内核需要保持一定数量的工作线程,来提升处理性能。
如何让多个线程保持等待状态,同时当有任务时还可以唤醒呢?
下面让一步步分解来看。
三、多条流水线的工厂
对于执行任务的线程,我们叫它Worker线程,它们在程序启动时就会创建,然后不停的执行任务,类似于流水线生产一样。
而在一个工厂会有多条这样的流水线,当工厂接到订单时,就会派发给其中一个生产线,并制定生产计划。
下面我们来看Worker线程的定义和工厂的定义,以及它们的初始化。
3.1 Worker信息结构定义
工作者线程需要记录一些信息,如运行状态,线程ID,还有对应的处理接口等,当然每个工作者会有一个唤醒器,也就是信号量。
typedef enum WORKER_STATE
{
TW_IDLE,
TW_RUNNING,
TW_UNKNOWN
}WS_STATE;
typedef struct ThreadWorkerInfo
{
unsigned int tw_threadid;
volatile WS_STATE tw_state;
SemLock taskIdleLock;
TaskProcess taskEntry;
}ThreadWorkerInfo;
说明
- tw_threadid, 是创建工作者线程的ID;
- tw_state,工作者线程的状态,运行之后是idle状态;当有任务执行时,为running状态;执行结束后,又回到了idle状态;
- taskIdleLock,当工作者空闲时,会设置有效,有任务时唤醒工作者。这里可以使用信号量,初始化计数器为0;
- taskEntry,任务处理接口;当有任务时,调用对应的任务处理接口进行处理;
注意,这里的tw_state的存储类型采用 volatile ,后面会看到这个值会被两个线程修改和访问,因为并没有竞争,所以没有进行加锁保护,为了数据的一致性,每次都会从内存进行读取。
3.2 工厂的结构定义
工厂记录了所有工作者的信息,当有任务产生时,来选择空闲的工作者进行派发。
#define WORK_THREAD_NUM 16
typedef struct ThreadFactoryInfo
{
ThreadWorkerInfo workerInfoList[WORK_THREAD_NUM];
}ThreadFactoryInfo;
说明
- 工作者数量为静态定义,也可以动态数组的形式定义;
- 当有任务产生时,遍历数组,找到空闲工作者进行派发;
3.3 工厂的建立
在程序启动时,我们将工厂进行建立,此时流水线工作者准备就绪,都处于空闲状态。
工厂遍历数组,初始化每一个工作者。
int CreeateWorkerThread(ThreadWorkerInfo *work)
{
int ret = 0;
pthread_t threadId;
if(NULL == InitializeSem(0, &worker->taskIdleLock))
{
return -1;
}
ret = pthread_create(&threadId, NULL, threadEntry, (void *)worker);
if (ret != 0)
{
return -1;
}
worker->tw_threadid = (unsigned int)threadId;
worker->tw_state = TW_IDLE;
worker->taskEntry = NULL;
return 0;
}
工作者的初始化说明
- 信号量的初始化,初始计数器为0;
- 启动线程,这里的线程的执行入口为threadEntry,在下一小节介绍;
- 线程的入参为worker信息本身;
- 初始化线程状态为idle, 此时线程的任务处理接口为NULL;
当然,在程序结束时,我们需要对创建的信号量和线程资源进行回收。
int DestoryWorkerThread(ThreadWorkerInfo *worker)
{
int* ret = 0;
if(NULL == worker)
return 0;
if(worker->tw_threadid > 0)
{
pthread_join((pthread_t)&worker->tw_threadid, (void **)&ret);
}
DestorySem(&workerInfo->taskIdleLock);
return 0;
}
线程默认情况下需要通过pthread_join进行回收资源,当然也可以设置为分离状态,这里就不再对线程关注。
四、分发任务
任务的产生和分发,可以由主线程进行,当接收到网络消息或键盘指令后,生成任务,然后进行派发。
派发的流程
- 准备任务;
- 查找空闲工作者;
- 找到后空闲工作者后,将任务派发给工作者;
- 唤醒工作者;
代码实现
static ThreadFactoryInfo factory;
ThreadWorkerInfo * GetIdleWorker()
{
int index = WORK_THREAD_NUM - 1;
for(; index >= 0; index --)
{
if(factory->workerInfoList[index].tw_state == TW_IDLE)
return &factory->workerInfoList[index];
}
return NULL;
}
int PushTask(TaskProcess taskProc)
{
ThreadWorkerInfo *idleWorker = NULL;
idleWorker = GetIdleWorker();
if(NULL == idleWorker)
{
return -1;
}
idleWorker->taskEntry = taskProc;
worker->tw_state = TW_RUNNING;
PostSem(&idleWorker->taskIdleLock);
return 0;
}
注意
在派发任务时,要注意操作的顺序;
先赋值任务处理接口和运行状态,再进行唤醒;
这样就不会竞争访问taskEntry,同时在信号量的唤醒操作中默认带有内存同步操作。
五、执行任务
工作者线程创建后,调用线程主函数threadEntry,在此处工作者处于就绪状态。
代码实现如下:
static void* threadEntry(void *arg)
{
ThreadWorkerInfo *worker = (ThreadWorkerInfo*)arg;
int ret = 0;
if(NULL == worker)
return NULL;
while(worker->tw_threadid > 0)
{
ret = WaitSem(&worker->taskIdleLock);
if(ret < 0)
{
break;
}
if(NULL != worker->taskEntry)
{
worker->taskEntry(&workerInfo->taskContext);
worker->tw_state = TW_IDLE;
worker->taskEntry = NULL;
}
}
return NULL;
}
说明
- 在线程启动后,会等待信号量的通知;
- 如果信号量被通知,此时检查任务是否被分发;
- 有任务时,调用任务处理接口,执行任务;
- 当任务执行完成后,继续等待信号量通知;
六、总结
本文分享了并发编程模型中,分发-并发执行的经典架构;
在这一架构中,工作者线程通过信号量的等待处理就绪状态;
分发者当有任务产生时,先派发任务,再唤醒工作者。
结尾
非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!
作者邮箱:study@senllang.onaliyun.com
如有错误或者疏漏欢迎指出,互相学习。
注:未经同意,不得转载!