github看到一个项目(GitHub - markparticle/WebServer: C++ Linux WebServer服务器),内部使用的一个线程池看着不错,拿来学习一下。
/*
* @Author : mark
* @Date : 2020-06-15
* @copyleft Apache 2.0
*/
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <functional>
class ThreadPool {
public:
explicit ThreadPool(size_t threadCount = 8): pool_(std::make_shared<Pool>()) {
assert(threadCount > 0);
for(size_t i = 0; i < threadCount; i++) {
std::thread([pool = pool_] {
std::unique_lock<std::mutex> locker(pool->mtx);
while(true) {
if(!pool->tasks.empty()) {
auto task = std::move(pool->tasks.front());
pool->tasks.pop();
locker.unlock();
task();
locker.lock();
}
else if(pool->isClosed) break;
else pool->cond.wait(locker);
}
}).detach();
}
}
ThreadPool() = default;
ThreadPool(ThreadPool&&) = default;
~ThreadPool() {
if(static_cast<bool>(pool_)) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}
}
template<class F>
void AddTask(F&& task) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}
pool_->cond.notify_one();
}
private:
struct Pool {
std::mutex mtx;
std::condition_variable cond;
bool isClosed;
std::queue<std::function<void()>> tasks;
};
std::shared_ptr<Pool> pool_;
};
#endif //THREADPOOL_H
1,先看下 私有变量。
struct Pool {
std::mutex mtx;
std::condition_variable cond;
bool isClosed;
std::queue<std::function<void()>> tasks;
};
std::shared_ptr<Pool> pool_;
一个结构体 Pool 将用到的变量,全部包含进去。
mtx 一个锁,用于加入,取出任务。
cond 信号量,用于线程间同步。
isClosed 线程池退出的标志。
tasks 任务队列,存放所有需要在线程中执行的任务。
pool_ 智能指针,管理所有资源。
2,构造函数
(1),默认开启8个线程(size_t threadCount = 8),并在初始化列表中 初始化pool_对象(std::make_shared<Pool>())
(2),循环启动每个线程
for(){
std::thread({
//todo
}).detach();
}
(3),加锁 确保 ,各个线程对 任务队列(tasks) 不产生竞争,因为添加任务,取任务都要操作这个队列。
std::unique_lock<std::mutex> locker(pool->mtx);
(4),单独看一个线程。
首先判断任务队列是否为空,为空 则利用信号量阻塞当前线程。
else pool->cond.wait(locker);
如果线程池是退出状态,则跳出当前循环,当前线程也会退出。
else if(pool->isClosed) break;
如果任务队列不为空,则取出第一个任务,队列减1,然后解锁,执行任务,再加锁。
auto task = std::move(pool->tasks.front());
pool->tasks.pop();
locker.unlock();
task();
locker.lock();
std::move 用于移动语义,允许在不复制内存的情况下转移资源的所有权。这段代码之后,相当于8个线程,从任务队列tasks 中抢任务,抢到一个执行一个。比如A线程 抢到了任务1,任务1 在执行中过程中,由于队列处于未加锁状态,那B线程,就可以继续抢任务2。
3,添加任务
std::forward ,在一个函数中将参数以原始的形式传递给另一个函数,同时保持其值类别(lvalue 或 rvalue)和 const 修饰符。
这里将task 添加到 任务队列 tasks中。并且利用locker 进行保护。这里添加完成之后,上述的8个线程就会从这个任务队列中抢任务,然后执行。
template<class F>
void AddTask(F&& task) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->tasks.emplace(std::forward<F>(task));
}
pool_->cond.notify_one();
}
3,销毁
将isClosed 标志置为false,并通知所有线程继续执行,防止因为任务队列为空,造成阻塞无法退出。在上述循环中,检测到isClosed 为fasel,则while循环退出,线程退出。
~ThreadPool() {
if(static_cast<bool>(pool_)) {
{
std::lock_guard<std::mutex> locker(pool_->mtx);
pool_->isClosed = true;
}
pool_->cond.notify_all();
}
}
5,使用
构造完ThreadPool 之后,直接调用AddTask ,将任务传入其中即可。
std::bind 用于创建一个可调用对象,将其与特定的参数绑定在一起。
在C++11 之下,封装一个线程池 还是很优雅的。