目录
- 线程池
- locker.h
- threadpool.h
线程池
相比于动态地创建子线程,选择一个已经存在的子线程的代价显然要小得多。至于主线程选择哪个子线程来为新任务服务,有多种方式:
- 主线程使用某种算法来主动选择子线程。最简单、最常用的算法是随机算法和Round Robin(轮流选取)算法,但更优秀、更智能的算法将使任务在各个工作线程中更均匀地分配,从而减轻服务器的整体压力。
- 主线程和所有子线程通过一个共享的工作队列来同步,子线程都睡眠在该工作队列上。当有新的任务到来时,主线程将任务添加到工作队列中。这将唤醒正在等待任务的子线程,不过只有一个子线程将获得新任务的“接管权”,它可以从工作队列中取出任务并执行之,而其他子线程将继续睡眠在工作队列上。
线程间竞争的不是CPU的计算资源而是IO,IO的处理一般较慢。
池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源。
当服务器执行完之后,把相关的资源放回池中,无需执行系统调用释放资源。
创建一个webserver目录
locker.h
#ifndef LOCKER_H
#define LOCKER_H
#include<pthread.h>
#include<exception>
#include<semaphore.h>
//线程同步机制封装类
//互斥锁类
class locker{
public:
locker(){
if(pthread_mutex_init(&m_mutex,NULL)!=0){
throw std::exception();
}
}
~locker(){
pthread_mutex_destroy(&m_mutex);
}
bool lock(){
return pthread_mutex_lock(&m_mutex)==0;
}
bool unlock(){
return pthread_mutex_unlock(&m_mutex)==0;
}
pthread_mutex_t * get(){
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
//条件变量类
class cond{
public:
cond(){
if(pthread_cond_init(&m_cond,NULL)!=0){
throw std::exception();
}
}
~cond(){
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t * mutex){
return pthread_cond_wait(&m_cond,mutex)==0;
}
bool timewait(pthread_mutex_t * mutex,struct timespec t){
return pthread_cond_timedwait(&m_cond,mutex,&t)==0;
}
bool signal(){
return pthread_cond_signal(&m_cond)==0;
}
bool broadcast(){
return pthread_cond_broadcast(&m_cond)==0;
}
private:
pthread_cond_t m_cond;
};
//信号量类
class sem{
public:
sem(){
if(sem_init(&m_sem,0,0)!=0){
throw std::exception();
}
}
sem(int num){
if(sem_init(&m_sem,0,num)!=0){
throw std::exception();
}
}
~sem(){
sem_destroy(&m_sem);
}
//等待信号量
bool wait(){
return sem_wait(&m_sem)==0;
}
//增加信号量
bool post(){
return sem_post(&m_sem)==0;
}
private:
sem_t m_sem;
};
#endif
threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include<pthread.h>
#include<list>
#include<exception>
#include<cstdio>
#include"locker.h"
//线程池类,定义成模板类是为了代码的复用,模板参数T是任务类
template<typename T>
class threadpool{
public:
threadpool(int thread_number=8,int max_requests=10000);
~threadpool();
bool append(T* request);
private:
static void* worker(void * arg);
void run();
private:
//线程的数量
int m_thread_number;
//线程池数组,大小为m_thread_number
pthread_t * m_threads;
//请求队列中最多允许的,等待处理的请求数量
int m_max_requests;
//请求队列
std::list< T*> m_workqueue;
//互斥锁
locker m_queuelocker;
//信号量用来判断是否有任务需要处理
sem m_queuestat;
//是否结束线程
bool m_stop;
};
template<typename T>
threadpool<T>::threadpool(int thread_number,int max_requests) :
m_thread_number(thread_number),m_max_requests(max_requests),
m_stop(false),m_threads(NULL){
if((thread_number <=0)||(max_requests<=0)){
throw std::exception();
}
m_threads=new pthread_t[m_thread_number];
if(!m_threads){
throw std::exception();
}
//创建thread_number个线程,并将它们设置为线程脱离
for(int i=0;i<thread_number;i++){
printf("create the %dth thread\n",i);
if(pthread_create(m_threads + i,NULL,worker,this)!=0){
delete [] m_threads;
throw std::exception();
}
if(pthread_detach(m_threads[i])){
delete[] m_threads;
throw std::exception();
}
}
}
template<typename T>
threadpool<T>::~threadpool(){
delete[] m_threads;
m_stop=true;
}
template<typename T>
bool threadpool<T>::append(T *request){
m_queuelocker.lock();
if(m_workqueue.size()>m_max_requests){
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template<typename T>
void* threadpool<T>::worker(void *arg){
threadpool * pool=(threadpool *)arg;
pool->run();
return pool;
}
template<typename T>
void threadpool<T>::run(){
while(!m_stop){
m_queuestat.wait();
m_queuelocker.lock();
if(m_workqueue.empty()){
m_queuelocker.unlock();
continue;
}
T* request=m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if(!request){
continue;
}
request->process();
}
}
#endif