线程池基本信息
线程池是一种结合池化思想衍生出来的一种线程管理及使用的方案
其主要针对服务器端多线程场景下,服务器频繁接收请求,每个请求都分配一个单独的线程去处理。
使用线程的开销:
- 创建和销毁线程
- 调度线程
线程池主要解决的核心问题是资源管理的问题。在并发环境下,系统不能确定在任意时刻中,有多少任务需要处理,有多少资源需要投入,这种不确定性带来以下问题:
- 频繁申请/销毁/调度资源,将带来执行业务之外的开销,线程数量过多时,这部分消耗非常巨大
- 对资源申请缺少抑制手段,容易引发资源耗尽的风险
- 系统无法合理的管理内部资源的分布,会降低系统的稳定性
线程池的几个概念
- 线程池管理器
用于初始化一定数量的线程资源,提供启动线程,停止线程、调配任务的方法。 - 工作线程
线程池中等待并执行分配任务的线程 - 任务接口
添加任务的接口,便于工作线程调度任务的执行 - 任务队列
用于存放等待处理的任务(区分任务的优先级)
线程池工作的四种场景
-
线程池空闲
主程序中没有任务需要执行,任务队列为空闲状态
-
线程池未饱和工作
主程序添加小于线程池中线程数量的任务
-
线程池饱和,启用任务缓冲
主程序添加的任务数量大于当前线程池中线程数量
-
** 任务缓冲队列饱和**
主程序添加的任务数量大于当前线程池的中线程数量,且任务缓冲队列已满
线程池的实现
- 头文件
#pragma once
#include <vector>
#include <functional>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
namespace MyThreadPool
{
static const int kiInitThreadSize = 3;
enum TaskPriority
{
Level0,
Level1,
Level2
};
typedef std::function<void()> Task;
typedef std::pair<TaskPriority, Task> TaskPair;
typedef std::vector<std::thread*> Threads;
class ThreadPool
{
public:
ThreadPool();
virtual ~ThreadPool();
void Start();
void Stop();
void AddTask(const Task& task);
void AddTask(const TaskPair& taskPair);
private:
ThreadPool(const ThreadPool&); // 拷贝构造定义为私有,禁止该类对象进行复制拷贝
const ThreadPool& operator=(const ThreadPool&);
struct TaskPriorityCmp
{
bool operator()(const TaskPair& p1, const TaskPair& p2)
{
return p1.first > p2.first;
}
};
void ThreadLoop();
Task Take();
typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks; // 优先队列
Threads m_threads;
Tasks m_tasks;
std::mutex m_mutex;
std::condition_variable m_cond;
bool m_bIsStarted;
};
}
- 源文件
#include "my_thread_pool.h"
#pragma once
#include <vector>
#include <functional>
#include <thread>
#include <queue>
#include <iostream>
#include <condition_variable>
namespace MyThreadPool
{
ThreadPool::ThreadPool() : m_mutex(), m_bIsStarted(false)
{
}
ThreadPool::~ThreadPool()
{
if (m_bIsStarted)
{
Stop();
}
}
void ThreadPool::Start()
{
if (!m_threads.empty())
{
return;
}
m_bIsStarted = true;
m_threads.reserve(kiInitThreadSize);
for (int i = 0; i < kiInitThreadSize; ++i)
{
m_threads.push_back(new std::thread(std::bind(&ThreadPool::ThreadLoop, this)));
}
}
void ThreadPool::Stop()
{
std::cout << "ThreadPool Stop()!" << std::endl;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_bIsStarted = false;
m_cond.notify_all();
}
for (auto it = m_threads.begin(); it != m_threads.end(); ++it)
{
(*it)->join();
delete (*it);
}
m_threads.clear();
}
void ThreadPool::AddTask(const Task& task)
{
std::unique_lock<std::mutex> lock(m_mutex);
TaskPair taskPair(Level2, task);
m_tasks.push(taskPair);
m_cond.notify_one();
}
void ThreadPool::AddTask(const TaskPair& taskPair)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_tasks.push(taskPair);
m_cond.notify_one();
}
void ThreadPool::ThreadLoop()
{
std::cout << "ThreadPool::ThreadLoop() tid is " << std::this_thread::get_id() << " start!" << std::endl;
while (m_bIsStarted)
{
Task oneTask = Take();
if (oneTask)
{
oneTask();
}
}
std::cout << "ThreadPool::ThreadLoop() tid is " << std::this_thread::get_id() << " exit!" << std::endl;
}
Task ThreadPool::Take()
{
std::unique_lock<std::mutex> lock(m_mutex);
while (m_tasks.empty() && m_bIsStarted)
{
std::cout << "ThreadPool::Take tid : " << std::this_thread::get_id() << " wait" << std::endl;
m_cond.wait(lock);
}
std::cout << "ThreadPool::Take tid : " << std::this_thread::get_id() << " wake up" << std::endl;
Task taskTmp;
Tasks::size_type size = m_tasks.size();
if (!m_tasks.empty() && m_bIsStarted)
{
taskTmp = m_tasks.top().second;
m_tasks.pop();
}
return taskTmp;
}
}