目录
线程池
介绍
图解
过程
本质
模拟实现
思路
注意点
解决方法
代码
pthread_pool.hpp
task.hpp
main.cpp
示例
线程池
介绍
线程池是一种并发编程的设计模式,用于管理和重复使用线程,以提高多线程应用程序的性能和效率
线程池主要用于控制并发执行的线程数量,避免在需要执行任务时频繁创建和销毁线程,从而减少系统资源的开销
简单来说,就是用空间换时间
- 比如说,malloc封装的系统调用是有代价的
- 所以每次开辟时都会额外开辟一些空间,这样可以有效减少调用的次数,减少成本
图解
过程
主线程只负责push任务,由线程池完成其他工作
- 创建线程
- 分配任务 (当队列中有任务时,就让某个线程去处理;没有时,线程进行等待)
本质
- 上面的过程听着是不是很熟悉,将任务push进队列,线程又将任务pop出来去处理
- 其实就是我们已经学习了两种的生产消费者模型
- 只是要将多线程封装进类中而已
模拟实现
思路
- 还是熟悉的模型,类中要实现核心的push和pop操作
- 以及处理好线程的创建和销毁
注意点
我们的所有操作都是放在类中的 -- 这是一个大前提
- 其中,pthread_create创造出的线程,其执行任务的函数要求参数有且仅有void*这一个类型
- 但是该函数是被我们放在类内的,会强制添加一个this指针参数
- 所以,参数个数就不满足要求了
解决方法
- 放在类外(全局函数)
- 静态成员函数
代码
pthread_pool.hpp
#include <pthread.h> #include <vector> #include <queue> #include <stdlib.h> #include <string> #include <unistd.h> #include <semaphore.h> #include <iostream> struct thread { pthread_t tid_; std::string name_; }; template <class T> class thread_pool { private: void lock() { pthread_mutex_lock(&mutex_); } void unlock() { pthread_mutex_unlock(&mutex_); } void wait() { pthread_cond_wait(&cond_, &mutex_); } void signal() { pthread_cond_signal(&cond_); } T pop() { T t = task_.front(); task_.pop(); return t; } bool is_empty() { return task_.size() == 0; } std::string get_name(pthread_t tid) { for (auto &it : threads_) { if (it.tid_ == tid) { return it.name_; } } return "none"; } static void *entry(void *args) // 类成员会有this参数,但入口函数不允许有多余参数 { thread_pool<T> *tp = static_cast<thread_pool<T> *>(args); // this指针,用于拿到成员变量/函数 while (true) { tp->lock(); while (tp->is_empty()) { tp->wait(); } T t = tp->pop(); tp->unlock(); t(); std::cout << "im " << tp->get_name(pthread_self()) << ",task is " << t.get_task() << " ,result is " << t.get_result() << " ,code is " << t.get_code() << std::endl; usleep(20); } return nullptr; } public: thread_pool(int num = 5) : num_(num), threads_(num) { pthread_cond_init(&cond_, nullptr); pthread_mutex_init(&mutex_, nullptr); } ~thread_pool() { pthread_cond_destroy(&cond_); pthread_mutex_destroy(&mutex_); } void init() { for (size_t i = 0; i < num_; ++i) { pthread_create(&(threads_[i].tid_), nullptr, entry, this); std::string name = "thread-" + std::to_string(i + 1); threads_[i].name_ = name; } } void join() { for (size_t i = 0; i < num_; ++i) { pthread_join(threads_[i].tid_, nullptr); } } void push(const T data) { lock(); task_.push(data); signal(); // 放在锁内,确保只有当前线程执行唤醒操作,不然可能会有多次操作 unlock(); } private: std::vector<thread> threads_; std::queue<T> task_; int num_; pthread_cond_t cond_; pthread_mutex_t mutex_; };
task.hpp
#pragma once // 生成二元整数运算任务(加减乘除),有错误码提示 // 1为/0操作,2为%0操作,3为非法错误 #include <iostream> #include <string> using namespace std; string symbol = "+-*/%"; int sym_size = symbol.size(); class Task { public: Task() {} // 方便只是为了接收传参而定义一个对象 Task(int x, int y, char c) : x_(x), y_(y), code_(0), op_(c), res_(0) { } int get_result() { return res_; } int get_code() { return code_; } string get_task() { string task = to_string(x_) + op_ + to_string(y_) + " = ?"; return task; } void operator()() { switch (op_) { case '+': res_ = x_ + y_; break; case '-': res_ = x_ - y_; break; case '*': res_ = x_ * y_; break; case '/': if (y_ == 0) { code_ = 1; break; } res_ = x_ / y_; break; case '%': if (y_ == 0) { code_ = 2; break; } res_ = x_ % y_; break; default: code_ = 3; break; } } private: int x_; int y_; int res_; int code_; char op_; };
main.cpp
#include "thread_pool.hpp" #include "Task.hpp" #include <random> #include <time.h> #include <unistd.h> int main() { thread_pool<Task> *tp = new thread_pool<Task>; tp->init(); while (true) { int x = rand() % 10 + 1; usleep(30); int y = rand() % 5; usleep(30); char op = symbol[rand() % (sym_size - 1)]; Task task(x, y, op); cout << "task is " << task.get_task() << endl; tp->push(task); sleep(1); } tp->join(); return 0; }
示例
按照预想的那样,主线程放入一个任务,就会有一个线程去处理: