一、线程池原理
我们使用线程的时候就去创建一个线程,这样实现起来非常简便。但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
线程池就是为了让线程可以复用,不在线程的创建和销毁上浪费时间。
线程池主要分为三个部分
- 任务队列:存储需要处理的任务
- 工作线程:任务队列的消费者,也可以说是处理任务的线程
- 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
- 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
- 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
- 管理线程:任务队列的生产者,也可以说是创造任务的线程
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
- 当任务过多的时候, 可以适当的创建一些新的工作线程
- 当任务过少的时候, 可以适当的销毁一些工作的线程
二、任务队列
2.1、任务
任务就是函数与参数组成的结构体。
// 定义任务结构体
using callback = void (*)(void *);
struct Task
{
callback function;
void *arg;
Task() : function(nullptr), arg(nullptr){};
Task(callback f, void *arg_f) : function(f), arg(arg_f){};
};
2.2、任务队列
任务队列是对 queue
的进一步封装
// 任务队列
class TaskQueue
{
public:
TaskQueue()
{
pthread_mutex_init(&m_mutex, NULL);
};
~TaskQueue()
{
pthread_mutex_destroy(&m_mutex);
};
// 添加任务
inline void addTask(Task &task)
{
pthread_mutex_lock(&m_mutex);
m_queue.push(task);
pthread_mutex_unlock(&m_mutex);
}
// 添加任务
inline void addTask(callback func, void *arg)
{
pthread_mutex_lock(&m_mutex);
Task task(func, arg);
m_queue.push(task);
pthread_mutex_unlock(&m_mutex);
};
// 取出一个任务
inline Task takeTask()
{
Task t;
pthread_mutex_lock(&m_mutex);
if (!m_queue.empty())
{
t = m_queue.front();
m_queue.pop();
}
pthread_mutex_unlock(&m_mutex);
return t;
};
// 获取当前队列中任务个数
inline int taskNumber()
{
return m_queue.size();
}
inline bool empty() {
return m_queue.empty();
}
private:
pthread_mutex_t m_mutex; // 互斥锁
std::queue<Task> m_queue; // 任务队列
};
三、线程池定义
class ThreadPool
{
public:
ThreadPool(int min, int max);
ThreadPool() : ThreadPool(5, 20) {}
~ThreadPool();
// 添加任务
void addTask(Task task);
// 添加任务
void addTask(callback func, void *arg);
// 获取忙线程的个数
int getBusyNumber();
// 获取活着的线程个数
int getAliveNumber();
private:
// 工作的线程的任务函数
static void *worker(void *arg);
// 管理者线程的任务函数
static void *manager(void *arg);
void threadExit();
private:
pthread_mutex_t m_lock;
pthread_cond_t m_notEmpty;
pthread_t *m_threadIDs;
pthread_t m_managerID;
TaskQueue *m_taskQ;
int m_minNum;
int m_maxNum;
int m_busyNum;
int m_aliveNum;
int m_exitNum;
bool m_shutdown;
};
四、线程池的实现
4.1、构造函数
ThreadPool::ThreadPool(int min, int max) : m_minNum(min), m_maxNum(max), m_busyNum(0), m_aliveNum(min), m_exitNum(0), m_shutdown(false)
{
// 实例化任务队列
m_taskQ = new TaskQueue;
// 给线程数组分配内存
m_threadIDs = new pthread_t[m_maxNum];
memset(m_threadIDs, 0, sizeof(pthread_t) * m_maxNum);
// 初始化锁和条件变量
pthread_mutex_init(&m_lock, NULL);
pthread_cond_init(&m_notEmpty,NULL);
// 创建管理者线程
pthread_create(&m_managerID, NULL, manager, this);
// 创建工作者线程
for (int i=0; i<min; i++) {
pthread_create(&m_threadIDs[i], NULL, worker, this);
}
}
4.2、析构函数
ThreadPool::~ThreadPool() {
this->m_shutdown = true;
// 销毁管理者线程
pthread_join(m_managerID, NULL);
// 唤醒所有的消费者线程
for (int i = 0; i < m_aliveNum; ++i) {
pthread_cond_signal(&m_notEmpty);
}
// 销毁任务队列
if (m_taskQ) delete m_taskQ;
// 销毁保存消费者ID的数组
if (m_threadIDs) delete[]m_threadIDs;
// 销毁锁
pthread_mutex_destroy(&m_lock);
// 销毁条件变量
pthread_cond_destroy(&m_notEmpty);
}
4.3、添加任务
void ThreadPool::addTask(Task task)
{
if (m_shutdown) return;
// 添加任务
m_taskQ->addTask(task);
// 唤醒一个工作处理线程
pthread_cond_signal(&m_notEmpty);
}
void ThreadPool::addTask(callback func, void *arg)
{
if (m_shutdown) return;
// 添加任务
m_taskQ->addTask(func,arg);
// 唤醒一个工作处理线程
pthread_cond_signal(&m_notEmpty);
}
4.4、工作线程函数
// 工作线程任务函数
void* ThreadPool::worker(void* arg) {
// 将传入的参数强制转换为ThreadPool*指针类型
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while (true) {
// 访问任务队列先要加锁
pthread_mutex_lock(&pool->m_lock);
// 任务为空则线程阻塞
while (pool->m_taskQ->empty() && !pool->m_shutdown) {
// 阻塞线程在信号量m_notEmpty上
pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);
// 解除阻塞之后判断是否要销毁线程
if (pool->m_exitNum > 0) {
pool->m_exitNum--;
if (pool->m_aliveNum > pool->m_minNum) {
pool->m_aliveNum--;
pthread_mutex_unlock(&pool->m_lock);
pool->threadExit();
}
}
}
// 如果线程池要结束了
if (pool->m_shutdown) {
// 解锁
pthread_mutex_unlock(&pool->m_lock);
// 销毁线程
pool->threadExit();
}
// 从任务队列中取出一个任务
Task task = pool->m_taskQ->takeTask();
// 工作的线程加1
pool->m_busyNum++;
// 解锁,下面要开始执行了
pthread_mutex_unlock(&pool->m_lock);
// 执行任务
task.function(task.arg);
// 销毁参数指针
free(task.arg);
task.arg = nullptr;
// 工作的线程减1
pthread_mutex_lock(&pool->m_lock);
pool->m_busyNum--;
pthread_mutex_unlock(&pool->m_lock);
}
return nullptr;
}
4.5、管理线程函数
// 管理者线程任务函数
void* ThreadPool::manager(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 如果线程池没有关闭就一直检测
while (!pool->m_shutdown) {
// 每5s监控一次线程池状态
sleep(5);
// 取出任务数量和线程数量
pthread_mutex_lock(&pool->m_lock);
int queuesize = pool->m_taskQ->taskNumber();
int liveNum = pool->m_aliveNum;
int busyNum = pool->m_busyNum;
pthread_mutex_unlock(&pool->m_lock);
// 创建线程
const int NUMBER = 2;
// 当前任务太多了,需要增加线程处理
if (queuesize > liveNum && liveNum < pool->m_maxNum) {
// 线程池加锁
pthread_mutex_lock(&pool->m_lock);
int num = 0;
for (int i = 0; i < pool->m_maxNum && num < NUMBER && pool->m_aliveNum < pool->m_maxNum; ++i)
{
if (pool->m_threadIDs[i] == 0)
{
pthread_create(&pool->m_threadIDs[i], NULL, worker, pool);
num++;
pool->m_aliveNum++;
}
}
// 线程池解锁
pthread_mutex_unlock(&pool->m_lock);
}
// 当前任务太少了,需要减少线程,减轻系统负担
if (busyNum * 2 < liveNum && liveNum > pool->m_minNum + NUMBER) {
// 线程池加锁
pthread_mutex_lock(&pool->m_lock);
pool->m_exitNum = NUMBER;
// 线程池解锁
pthread_mutex_unlock(&pool->m_lock);
// 唤醒线程,自动删除自己
for (int i = 0; i < NUMBER; ++i) {
pthread_cond_signal(&pool->m_notEmpty);
}
}
}
return nullptr;
}
4.6、线程自我销毁
void ThreadPool::threadExit()
{
pthread_t tid = pthread_self();
for (int i = 0; i < m_maxNum; ++i)
{
if (m_threadIDs[i] == tid) {
m_threadIDs[i] = 0;
break;
}
}
pthread_exit(NULL);
}
五、测试代码
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
sleep(1);
}
int main()
{
// 创建线程池
ThreadPool* pool = new ThreadPool(2, 5);
for (int i = 0; i < 100; ++i)
{
int* num = (int*)malloc(sizeof(int));
*num = i;
pool->addTask( taskFunc, num);
}
sleep(30);
delete pool;
return 0;
}
运行结果: