C++11 线程池:轻量级高并发解决方案
线程池(Thread Pool)是一种线程管理的机制,它包含了多个预先创建的线程,用于执行多个任务,这些任务被放入任务队列中等待执行。
满足我们的生产者和消费者模型。
线程池的核心组成部分。
- 任务队列 -----按顺序等待要处理的任务。
- 线程数组----- 多个已启动的线程,从任务队列拿取任务处理。
- 互斥锁。
- 条件变量。
- 任务。
线程池的好处:
- 减少线程的创建和销毁次数,提高系统的性能和效率。因为我们每次创建和销毁线程都是有开销的。
- 活动的线程需要消耗系统资源,如果启动太多,会导致系统由于过度消耗内存或切换过度而导致系统资源不足。
- 通过重复利用已创建的线程 ,避免了频繁创建和销毁线程的性能开销。
基于C++11实现的线程池:感受C++11的魅力
当然实际项目中的线程池可能会更复杂。但是对于初学者 ,你能够了解到这里已经足够了。
ThreadPool.h 线程池必要的接口 ,和属性。
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>
#include <functional>
class ThreadPool {
public:
// 创建线程池单例
static ThreadPool* getIntance(int num) {
static ThreadPool* threadPool = nullptr;
// call_once() ,保证函数只能执行一次 ,只能在多线程里面使用
std::call_once(flag_, init_threadPool, threadPool, num);
return threadPool;
}
// 往线程池中添任务
template <typename F , typename... Agrs>
void push_task(F &&f , Agrs&&... agrs);
private:
static void init_threadPool(ThreadPool*& p, int num) {
p = new ThreadPool(num);
}
ThreadPool(int num = 4); // 默认线程数是 4
~ThreadPool();
std::queue<std::function<void()>>task_queue_; // 任务队列
std::vector<std::thread>threads; // 线程数组
std::mutex mutex_; // 互斥锁
std::condition_variable c_variable_; // 条件变量
bool isStop_; // 线程池是否终止
static std::once_flag flag_; // call_once() 需要的flag
};
// 往任务队列添加任务
template <typename F, typename... Agrs>
void ThreadPool::push_task(F&& f, Agrs&&... agrs) {
// 将函数和参数进行绑定
// forward 实现完美转换
std::function<void()>task = std::bind(std::forward<F>(f), std::forward<Agrs>(agrs)...);
{
std::unique_lock<std::mutex>lock(mutex_);
// 加入任务队列
task_queue_.emplace(task);
}
c_variable_.notify_one(); // 唤醒一个线程来执行
}
#endif // !_THREAD_POOL_H
ThreadPool.cpp 相关接口的具体实现。
#include "ThreadPool.h"
std::once_flag ThreadPool::flag_;
ThreadPool::ThreadPool(int num):isStop_ ( false ) {
for (int i = 0; i < num ; i++) {
threads.emplace_back([this]( ) {
while ( true ) {
std::unique_lock<std::mutex>lock(mutex_);
c_variable_.wait(lock, [ = ]() {
return (!task_queue_.empty() || isStop_);
});
if ( isStop_ && task_queue_.empty() ) { // 如果被线程池终止了
return;
}
std::cout << "线程池处理" << std::endl;
// 从任务队列,拿出任务执行
std::function<void( )>task(std::move(task_queue_.front()));
task_queue_.pop();
task();
}
});
}
}
ThreadPool::~ThreadPool( ) {
{
std::unique_lock<std::mutex>lock(mutex_);
isStop_ = true;
if (!task_queue_.empty()) c_variable_.notify_all(); // 唤醒所有线程去执行任务
}
for ( auto& a : threads ) {
a.join();
}
}
测试代码:
#include "ThreadPool.h"
#include <iostream>
#include <algorithm>
#include <chrono>
void print(const char *name) {
std::cout << "name :" << name << std::endl;
}
int getMax(int a, int b) {
return std::max(a, b);
}
void sort(std::vector<int>*&value) {
// 默认是升序
std::sort(value->begin(), value->end()); //
}
int main() {
// 创建一个线程池 ,线程数为4
ThreadPool *pool = ThreadPool::getIntance( 4 );
std::vector<int>value;
//for (int i = 1; i < 10; i++) {
// pool->push_task( [i]() {
// std::cout << "task runing " << i << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(1)); // 休眠1秒
// std::cout << "task stop : " << i << std::endl;
// });
//}
for (int i = 10; i > 0; i--) {
value.emplace_back(i);
}
// 将任务投入线程池,让线程来处理
pool->push_task(print, "小美"); // 输出小美
pool->push_task(sort, &value); // 升序排序
std::this_thread::sleep_for(std::chrono::seconds(10)); // 休眠10秒
for (int i = 0; i < value.size(); i++) {
std::cout << value[i] << std::endl;
}
std::cout << "\n";
return 0;
}
测试结果:
效果达到预期 ,当然你也可以多写几个测试案例。