目录
线程池的概念
线程池的优点
线程池的实现
【注意】
线程池的线程安全
日志文件的实现
线程池的概念
线程池也是一种池化技术,可以预先申请一批线程,当我们后续有任务的时候就可以直接用,这本质上是一种空间换时间的策略。
如果有任务来的时候再创建线程,那成本又要提高,又要初始化,又要创建数据结构。
线程池的优点
- 线程池避免了短时间内创建与销毁线程的代价。
- 线程池不仅能够保证内核充分利用,还能防止过分调度。
线程池的实现
我们这次要实现的线程池就是这样,让主线程派发任务,让线程池中的线程处理任务,这也是一个生产者消费者模型。
// thread.hpp // 把线程封装一下 #pragma once #include <iostream> #include <string> #include <cstdio> #include <vector> #include <queue> #include <unistd.h> using namespace std; typedef void*(*func_t)(void*); class ThreadData { public: string name_; void* args_; }; class Thread { public: Thread(int num, func_t callback, void* args) :func_(callback) { char nameBuffer[64]; snprintf(nameBuffer, sizeof(nameBuffer), "Thread-%d", num); name_ = nameBuffer; tdata_.args_ = args; tdata_.name_ = name_; } void start() { pthread_create(&tid_, nullptr, func_, (void*)&tdata_); } void join() { pthread_join(tid_, nullptr); } string name() { return name_; } ~Thread() {} private: string name_; pthread_t tid_; ThreadData tdata_; func_t func_; };
// threadPool.hpp #pragma once #include "thread.hpp" #include "lockGuard.hpp" #include "log.hpp" const int g_default_num = 3; template <class T> class ThreadPool { public: // 通过接口获得成员变量 pthread_mutex_t* getMutex() { return &lock_; } void waitCond() { pthread_cond_wait(&cond_, &lock_); } bool isEmpty() { return task_queue_.empty(); } public: ThreadPool(int thread_num = g_default_num) // 初始化后,就已经有了对象,也有了this指针 :num_(thread_num) { pthread_mutex_init(&lock_, nullptr); pthread_cond_init(&cond_, nullptr); for (int i = 0; i < num_; i++) { threads_.push_back(new Thread(i + 1, routine, this) ); // 通过传入this指针就可以拿到ThreadPool中的task_queue } } void run() { for (auto& iter : threads_) { iter->start(); cout << iter->name() << "启动成功" << endl; } } // 去掉this指针 // 消费的过程 static void* routine(void* args) { ThreadData* td = (ThreadData*)args; ThreadPool<T>* tq = (ThreadPool<T>*)td->args_; // 去掉this指针就无法访问成员方法了,通过创建线程的时候传入this拿到线程池对象 while (true) { T task; { lockGuard lockguard(tq->getMutex()); // 加锁 while (tq->isEmpty()) tq->waitCond(); // 检测 // 读取任务 task = tq->getTask(); } // 仿函数 cout << td->name_ << ", 消费者:" << task._x << " + " << task._y << " = " << task() << endl; // sleep(1); } } void pushTask(const T& task) { lockGuard lockguard(&lock_); task_queue_.push(task); pthread_cond_signal(&cond_); } T getTask() { T t = task_queue_.front(); task_queue_.pop(); return t; } void joins() { for (auto& iter : threads_) { iter->join(); } } ~ThreadPool() { for (auto& iter : threads_) { delete iter; } pthread_mutex_destroy(&lock_); pthread_cond_destroy(&cond_); } private: vector<Thread*> threads_; int num_; queue<T> task_queue_; // 任务队列 pthread_mutex_t lock_; // 互斥锁 pthread_cond_t cond_; // 条件变量 };
// testMain.cc #include "threadPool.hpp" #include "Task.hpp" #include <ctime> int Add(int x, int y) { return x + y; } int main() { srand((unsigned)time(nullptr)); cout << "hello thread pool" << endl; ThreadPool<Task> *tp = new ThreadPool<Task>(); tp->run(); while (true) { int x = rand() % 10 + 1; usleep(rand() % 1000); int y = rand() % 10 + 1; Task t(x, y, Add); tp->pushTask(t); cout << "生产者:" << x << " + " << y << " = ? " << endl; //sleep(1); } tp->joins(); return 0; }
【注意】
- 线程池中的任务队列会被多个执行流访问,因此我们需要互斥锁对任务队列进行保护。
- 线程池中的线程要从任务队列中拿任务,所以任务队列中必须要先有任务,必须要加锁循环检测,如果任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,这些操作都是通过加锁和条件变量完成的。
- 主线程向任务队列中push一个任务后,此时可能有线程正处于等待状态,所以在新增任务后需要唤醒在条件变量下等待的线程。
- 某线程从任务队列中拿到任务后,该任务就已经属于当前线程了,所以解锁之后再进行处理任务,让加锁的动作更细粒度,也因为处理任务的过程会耗费时间,所以不要将处理动作其放到临界区当中。
- 要给执行线程函数用static修饰,这个函数的类型必须是void* (*callback)(void*);如果放到类中,该函数就会多一个this指针。但是让他变成静态函数又不能访问线程池中的任务队列,所以要在线程创建的时候把线程池的对象指针传过去,因为初始化列表后已经有了对象,所以一定有this指针。也因为这个函数没有this指针,所以一些类内的操作要提供接口。
线程池的线程安全
我们也可以把线程池变成单例模式(懒汉模式)的,让整个进程只有一个线程池,但是如果以后有多个线程同时访问,同时判断这个单例对象存不存在,那就会有线程安全的问题。
class ThreadPool { // ... private: // 私有构造,删除拷贝构造和赋值重载 ThreadPool(int thread_num = g_default_num) // 初始化后,就已经有了对象,也有了this指针 :num_(thread_num) { pthread_mutex_init(&lock_, nullptr); pthread_cond_init(&cond_, nullptr); for (int i = 0; i < num_; i++) { threads_.push_back(new Thread(i + 1, routine, this) ); // 通过传入this指针就可以拿到ThreadPool中的task_queue } } ThreadPool(const ThreadPool<T>&) = delete; const ThreadPool<T> operator=(const ThreadPool<T>& ) = delete; public: static ThreadPool<T>* getThreadPool(int num = g_default_num) // 通过getThreadPool获取线程池 { // 只有第一次为空的时候才创建,如果不为空直接返回thread_ptr,这样指针就只有一个 { lockGuard lockguard(&mutex); if (nullptr == thread_ptr) { thread_ptr = new ThreadPool<T>(num); } } return thread_ptr; } // ... private: // 添加静态成员变量 static ThreadPool<T>* thread_ptr; // 单例模式 static pthread_mutex_t mutex; }; // 初始化 template<class T> ThreadPool<T>* ThreadPool<T>::thread_ptr = nullptr; template<class T> pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
这样就可以保证,第一次获取ThreadPool对象的时候,多个线程访问就是安全的。但这就带来了另一个问题,如果每次想要获取ThreadPool对象的时候就会申请释放锁,这个行为也是在浪费资源,所以还要再调整一下。
static ThreadPool<T>* getThreadPool(int num = g_default_num) { // 只有第一次为空的时候才创建,如果不为空直接返回thread_ptr,这样就只new了一次 if (nullptr == thread_ptr) // 多判断一次不就可以了吗,已经创建了就直接返回,没有就加锁创建 { lockGuard lockguard(&mutex); if (nullptr == thread_ptr) { thread_ptr = new ThreadPool<T>(num); } } return thread_ptr; }
这样使用双重判定空指针就减少了大量已经创建好单例,其他线程还在请求锁的行为。
日志文件的实现
我们需要用到下面这些接口。
// log.hpp #pragma once #include <iostream> #include <string> #include <cstdio> #include <cstdarg> #include <unistd.h> #include <sys/types.h> #include <fcntl.h> #include <ctime> // 日志级别 #define DEBUG 0 #define NORMAL 1 #define WARNING 2 #define ERROR 3 #define FATAL 4 const char* gLevelMap[] = { "DEBUG", "NORMAL", "WARNING", "ERROR", "FATAL" }; // 完整的日志功能,至少有:日志等级 时间 日志内容 支持用户自定义 void logMessage(int level, const char* format, ...) // 最后一个参数就是可变参数列表 { char stdBuffer[1024]; // 日志的标准部分 time_t timestamp = time(nullptr); // 时间戳 snprintf(stdBuffer, sizeof(stdBuffer), "[%s][%ld]", gLevelMap[level], timestamp); char logBuffer[1024]; // 自定义部分 va_list args; // 可变参数列表 va_start(args, format); vsnprintf(logBuffer, sizeof (logBuffer), format, args); // 用起来和printf相差不多 va_end(args); // printf("%s%s\n", stdBuffer, logBuffer); // 打印到显示器 FILE* fp = fopen("log.txt", "a"); fprintf(fp, "%s%s\n", stdBuffer, logBuffer); // 打印到文件 fclose(fp); }
所以以后如果要用到这些线程池、日志文件等,就直接用了。