序言
在之前我们实现过线程池,但是非常基础。答题思路就是实现一个安全的队列,再通过 ThreadPool 来管理队列和线程,对外提供一个接口放入需要执行的函数,但是这个函数是无参无返回值的。
参数的问题我们可以使用 bind 来封装,但是函数返回值的问题需要我们解决。
一、为什么需要线程池?
假如你在运行一个应用程序,其中主线程可能正在运行重要的程序逻辑和更新 UI。但是现在有一个消耗较大的任务需要执行,比如加载一个文件。如果你的主线程去执行该任务,那么应用程序的界面就会出现卡住的情况。这对于用户来说不是一个友好的使用体验,这时候就可以创建一个子线程去执行该加载任务,避免占用主线程。
在程序执行的过程中可能会有大量的其他的任务需要我们去执行,这时候就可以使用子线程去执行该任务。但是每次执行的时候我们都需要创建一个线程,当任务密集的时候,线程创建和销毁的开销就会急剧上升,线程池因此而生。
线程池的主要逻辑是预先创建一批线程,当任务到达时将任务放入队列中,线程池中的线程从队列中获取任务并执行。这种设计确保了线程的复用,从而减少了频繁创建和销毁线程的开销,如下所示:
二、简单线程池的实现
2.1 模块分析
一个线程池的模块最重要的可以分为三部分:
- 执行的任务:一个需要执行的任务
- 任务队列:存放需要执行的任务
- 线程池:管理任务队列和线程,对外提供增加任务的函数
2.2 模块实现
首先是初始化函数
explicit ThreadPool(int numThreads = MAXTHREADNUM)
: _threadNum(numThreads)
, _threads(_threadNum)
, _running(false)
{}
我们创建了对应大小的线程数组来管理线程,并且初始时,线程池没有开启工作状态。
之后是启动函数,也是线程开始工作的函数:
// 线程入口函数
void threadEntrance() {
while (true) {
std::unique_lock<std::mutex> lck(_mtx);
// 执行条件
_cond.wait(lck, [this]{ return !_queue.empty() || !_running; });
// 避免泄露任务
if (!_running && _queue.empty()) {
return;
}
// 取出任务执行
Task tsk = std::move(_queue.front());
_queue.pop();
lck.unlock();
tsk();
}
}
void Start() {
// 开始运行
_running = true;
for (int i = 0; i < _threadNum; i++) {
// 每一个线程执行入口函数
_threads[i] = std::thread(&ThreadPool::threadEntrance, this);
}
}
创建指定数量的线程,并且为每一个线程指定入口函数,入口函数的逻辑大体是:
- 询问执行任务,取出任务执行
- 不存在执行任务,如果是运行态阻塞,非运行态退出
我们来看一下我认为是较为关键的代码逻辑:
// 执行条件
_cond.wait(lck, [this]{ return !_queue.empty() || !_running; });
// 避免泄露任务
if (!_running && _queue.empty()) {
return;
}
不太冷的冷知识💡:
||
操作符在左边为true
时直接返回,只有左边为false
时,才会判断右边的真假
只有队列为空的时候我们才会判断 !_running
的真假:
- true:往后执行
- false:继续等待任务队列有新的任务后唤醒
之后还需要判断避免任务队列为空,线程才退出。这样做的目的是 — 避免任务队列不为空退出,这样会造成任务的泄漏。
之后是较为简单的添加任务函数:
// 添加任务
void addTask(Task task) {
if (!_running) {
// 不合理的请求,抛出异常处理
throw std::runtime_error("Its not start.");
}
// 添加任务
std::unique_lock<std::mutex> lck(_mtx);
_queue.push(std::move(task));
lck.unlock();
_cond.notify_all();
}
保证线程安全即可。
最后是析构函数:
void stop() {
// 停止执行
_running = false;
_cond.notify_all();
}
~ThreadPool() {
stop();
// 回收线程
for (std::thread &th : _threads) {
th.join();
}
}
在正式回收线程之前,我们需要通知其他线程,停止了,大家收工了。这里需要 _cond.notify_all();
的原因是以防线程都因为队列为空在等待的情况。
2.3 不足之处
这正如标题所言,这是一个最简单的实现方式,存在以下问题:
- 用户需要传递一个无参的函数,有参的话可以使用
std::bind
- 用户需要传递一个无返回的函数,这个怎么实现呢?
我们需要一个更为全面的线程池来解决上述两个问题。第一个问题我们可以使用可变参数来让用户传递参数,如:
void add(int x, int y) {
int z = x + y
return;
}
int x = 1;
int y = 2;
// 这样就方便多了
pool.addTask(add, x, y);
第二个问题我们需要使用到异步编程的函数,接下来就会介绍到。
三、异步编程 — std::packaged_task
3.1 功能
用于封装一个可调用对象,并与一个 future
关联。通过 packaged_task
,可以异步执行某个任务,并在任务完成时通过 future
获取结果。
3.2 示例
没有什么比得上一个示例更为直观的了:
int add(int x, int y) {
std::this_thread::sleep_for(std::chrono::seconds(3));
return x + y;
}
int main() {
// 将一个可调用对象封装为 packaged_task
std::packaged_task<int(int, int)> tsk(add);
// 获取与之相对应的 future
std::future res = tsk.get_future();
// 执行该任务
int x = 1, y = 2;
std::thread th(tsk, x, y);
printf("I am waiting!\n");
int result = res.get();
printf("The result is %d\n", result);
th.join();
return 0;
}
这里需要注意,当我们的异步获取的结果未准备时,在主线程获取会被阻塞住。经过使用,我们也可以发现这个封装了的函数和普通的区别是:可以传递返回值。
这个功能还挺实用的,后面可以探索一下底层怎么实现。
四、Plus 版线程池
我们需要修改的唯一地方是 addTask
函数,现在我先展示最后的结果:
/*
使用模板:可以传递任意类型的函数类型
可变参数:传递任意数量的参数
*/
template<class Func, class... Args>
auto addTask(Func &&f, Args&&... args)-> std::future<decltype(f(args...))> {
if (!_running) {
// 不合理的请求,抛出异常处理
throw std::runtime_error("ThreadPool is not running.");
}
// 推断返回类型
using returnType = decltype(f(args...));
// 使用 std::packaged_task 来封装任务
// 再使用 shared_ptr 来指向该任务,以便后续传参
auto task = std::make_shared<std::packaged_task<returnType()>>(std::bind(std::forward<Func>(f), std::forward<Args>(args)...));
std::future<returnType> fut = task->get_future();
// 添加任务到队列
std::unique_lock<std::mutex> lck(_mtx);
_queue.emplace([task](){ (*task)(); });
lck.unlock();
// 通知一个等待的线程
_cond.notify_all();
return fut;
}
是的很奇怪,不管是看起来还是用起来都是非常的奇怪,让只是熟悉 C++11
之前的版本的人来说,仿佛让自己感觉是原始人。
首先是函数头部分:
template<class Func, class... Args>
auto addTask(Func &&f, Args&&... args)-> std::future<decltype(f(args...))>
Func
代表调用可调用的对象(函数,lambda表达式,函数对象等),Args
代表可变参数。auto
是代表推导函数返回值,那 -> std::future<decltype(f(args...))>
这是什么玩意儿呀?这代表尾返回类型,也是推导函数的返回类型。
问题一:好的,现在我知道 ->( returnType) 也是代表一个返回类型,那么为什么有了 auto 还需要未返回类型呢?不会和 auto 冲突吗?
答:我已踩坑。auto 满足大多数简单的场景,但是对于比较复杂的场景,比如这里的返回类型依赖于函数的模板参数,他无法推导出正确的类型。当两者都共存时,会首先采用尾返回类型。
问题二:std::future<decltype(f(args…))> 这是一个什么类型呢?
答:decltype 是一个关键字,可以根据表达式推导类型,所以这里 decltype(f(args…)) 的含义是:根据传入的函数以及相应的参数推导出函数返回值类型。future 代表是一个异步返回的结果。
好的我们现在来看另外一个重要的部分:
auto task = std::make_shared<std::packaged_task<returnType()>>(std::bind(std::forward<Func>(f), std::forward<Args>(args)...));
std::future<returnType> fut = task->get_future();
我们首先使用 bind
函数将该函数的参数绑定,因为我们队列中的任务对象是一个无参的。之后我们将使用 bind
封装了的函数再使用 packaged_task
封装,因为我们需要返回一个异步的结果。最后我们在使用 shared_ptr
来管理 packaged_task
,因为他是不能够被拷贝的,所以我们一共封装了三层。最后获取一个异步结果作为返回值。
现在是最后一个重要的地方了:
_queue.emplace([task](){ (*task)(); });
我们封装了一个 lambda
函数,值捕获了 task
,函数的内容是执行 task
函数,因为他是一个指针,所以我们需要先解引用再执行:
为什么需要这样呢?因为我任务队列存储的类型是 std::function<void()>
,而我们的任务对象是 std::packaged_task<returnType()>
所以还需要封装一下。
五、总结
总结下来,难的不是逻辑,而是需要层层封装,以及需要明白为什么要这样做,还需要了解某些新特性。