线程池
- 一、线程池的概念
- 1、线程池的优点
- 2、线程池的应用场景
- 二、线程池的实现
- 1、实现逻辑
- 2、threadpool线程池
- (1)threadpool.hpp代码
- (2)为什么线程池中需要有互斥锁和条件变量?
- (3)注意点
- (4)为什么线程池中的线程执行例程需要设置为静态方法?
- (5)任务类型的设计(Task.hpp)
- (6)主线程逻辑(main.cc)
- (7)显示结果
一、线程池的概念
线程池是一种线程使用模式。
线程过多会带来调度开销,进而影响缓存局部和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
1、线程池的优点
- 线程池避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核充分利用,还能防止过分调度。
2、线程池的应用场景
线程池常见的应用场景如下:
- 需要大量的线程来完成任务,且完成任务的时间比较短。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
总结:
- 像Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
- 对于长时间的任务,比如Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存到达极限,出现错误。
二、线程池的实现
1、实现逻辑
下面我们实现一个简单的线程池,线程池中提供了一个任务队列,以及若干个线程(多线程)。
- 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理。
- 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列当中。
2、threadpool线程池
(1)threadpool.hpp代码
#pragma once
#include <iostream>
#include <unistd.h>
#include <queue>
#include <pthread.h>
#define NUM 5
// 线程池
template<class T>
class ThreadPool
{
private:
std::queue<T> _task_q; // 任务队列
int _thread_num; // 线程池中的线程数量
pthread_mutex_t _mutex; // 锁
pthread_cond_t _cond; // 条件变量
private:
// 判空函数
bool IsEmpty()
{
return _task_q.size() == 0;
}
// 锁队列
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
// 解锁队列
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
// 进程等待
void WaitThread()
{
pthread_cond_wait(&_cond, &_mutex);
}
// 进程唤醒
void WakeThread()
{
pthread_cond_signal(&_cond);
}
public:
// 构造函数
ThreadPool(int num = NUM)
:_thread_num(num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
// 析构函数
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
// 线程执行例程
static void* Routine(void* arg)
{
pthread_detach(pthread_self());
ThreadPool* Self = (ThreadPool*)arg;
while(1)
{
// 加锁
Self->LockQueue();
// 等待
while(Self->IsEmpty())
{
Self->WaitThread();
}
// 获取任务
T task;
Self->Pop(task);
// 解锁
Self->UnlockQueue();
// 执行任务
task.Run();
}
}
// 线程初始化
void ThreadPoolInit()
{
pthread_t tid;
for(int i = 0; i < _thread_num; i++)
{
pthread_create(&tid, nullptr, Routine, this); // this指针
}
}
// 往任务队列塞任务(主线程调用)
void Push(const T& task)
{
LockQueue();
_task_q.push(task);
UnlockQueue();
WakeThread();
}
//从任务队列获取任务(线程池中的线程调用)
void Pop(T& task)
{
task = _task_q.front();
_task_q.pop();
}
};
(2)为什么线程池中需要有互斥锁和条件变量?
线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。
线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要先判断任务队列当中是否有任务,若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。
当外部线程向任务队列中Push一个任务后,此时可能有线程正处于等待状态,因此在新增任务后需要唤醒在条件变量下等待的线程。
(3)注意点
- 当某线程被唤醒时,其可能是异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足被唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if。
- pthread_cond_broadcast函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部在等待的线程都唤醒了,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。因此在唤醒线程时最好使用pthread_cond_signal函数唤醒正在等待的一个线程即可。
- 当线程从任务队列中拿到任务后,该任务就已经属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后再进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以我们不要将其放到临界区当中。
- 如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。
(4)为什么线程池中的线程执行例程需要设置为静态方法?
静态方法是一个很巧妙的方法。
我们先来回顾一下前面的知识体系结构,我们的Routine方法是void*返回类型的,其参数也是void 的。
而我们此时将Routine方法放到类里面的话,第一个参数是一个隐藏的this指针!此时的Routine是有两个参数的,而不是仅有一个参数,这样我们就没有办法通过编译了呀,因为Routine执行例程的方法只允许一个void * 的参数。此时直接将该Routine函数作为创建线程时的执行例程是不行的,无法通过编译。
我们知道,静态函数的方法的话,其是不会出现this指针的,因为静态函数属于类而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,因此我们需要将Routine设置为静态方法,此时Routine函数才真正只有一个参数类型为void的参数。
但是会出现一个问题:我们在静态成员函数内部无法调用非静态成员函数,而我们需要在Routine函数当中调用该类的某些非静态成员函数,比如Pop。因此我们在进行创建线程的时候,我们把this指针带上,此时我们就能够通过该this指针在Routine函数内部调用非静态成员函数了。
(5)任务类型的设计(Task.hpp)
计算类就是简单的计算过程,我们要处理这方面的任务类的话,我们仅仅需要调用我们的Run函数即可,我们看下面代码实现的逻辑,并且这个线程池内的线程不断从任务队列拿出任务进行处理,而它们并不需要关心这些任务是哪来的,它们只需要拿到任务后执行对应的Run方法即可。
#pragma once
//#include"threadpool.hpp"
#include<iostream>
// 任务类
class Task
{
private:
int _x; // 左数
int _y; // 右数
char _op; // 操作数
public:
// 构造
Task(int x = 0, int y = 0, char op = '0')
:_x(x)
,_y(y)
,_op(op)
{}
// 析构
~Task()
{}
// 任务处理方法
void Run()
{
int result = 0;
switch(_op)
{
case '+':
{
result = _x + _y;
break;
}
case '-':
{
result = _x - _y;
break;
}
case '*':
{
result = _x * _y;
break;
}
case '/':
{
if(_y == 0)
{
std::cout << "div zero error" << std::endl;
return;
}
else
{
result = _x / _y;
break;
}
}
case '%':
{
if(_y == 0)
{
std::cout << "mod zero error" << std::endl;
return;
}
else
{
result = _x % _y;
break;
}
}
default:
{
std::cout << "please enter:" << " +-*/% " << std::endl;
}
}
std::cout << "thread[" << pthread_self() << "]# " << _x << _op << _y << "=" << result << std::endl;
}
};
(6)主线程逻辑(main.cc)
主线程就负责不断向任务队列当中Push任务就行了,此后线程池当中的线程会从任务队列当中获取到这些任务并进行处理。
#include"threadpool.hpp"
#include"Task.hpp"
int main()
{
srand((unsigned int)time(nullptr));
ThreadPool<Task>* tp = new ThreadPool<Task>; // 线程池
// 初始化
tp->ThreadPoolInit();
const char* option = "+-*/%";
while(1)
{
sleep(1);
int x = rand() % 100 + 1; // 左数
int y = rand() % 100 + 1; // 右数
int index = rand() % 5; // 下标
Task task(x, y, option[index]);
tp->Push(task);
}
return 0;
}
(7)显示结果
while :; do ps -aL | head -1 && ps -aL | grep threadpool | grep -v grep; sleep 1; done
运行代码后一瞬间就有六个线程,其中一个是主线程,另外五个是线程池内处理任务的线程。
并且我们会发现这五个线程在处理时会呈现出一定的顺序性,因为主线程是每秒Push一个任务,这五个线程只会有一个线程获取到该任务,其他线程都会在等待队列中进行等待,当该线程处理完任务后就会因为任务队列为空而排到等待队列的最后,当主线程再次Push一个任务后会唤醒等待队列首部的一个线程,这个线程处理完任务后又会排到等待队列的最后,因此这五个线程在处理任务时会呈现出一定的顺序性。
注意: 此后我们如果想让线程池处理其他不同的任务请求时,我们只需要提供一个任务类,在该任务类当中提供对应的任务处理方法就行了。