文章目录:
- 线程池了解
- 线程池模拟实现
线程池了解
线程池是一种常见的线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,以避免在处理短时间任务时频繁地创建和销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度,提高程序性能。
线程池通常由以下组件组成:
- 任务队列:用于存储待执行的任务。
- 工作线程:线程池维护的多个线程,等待着监督管理者分配可并发执行的任务。
- 监督管理者:负责管理线程池的工作线程,分配任务给空闲的线程执行,并监控工作线程的运行状态。
线程池的应用场景:
1️⃣ 需要大量的线程来完成任务,且完成任务的时间比较短。WEB 服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,可以想象一个热门网站的点击次数。但对于长时间的任务,比如一个 Telnet 连接请求,线程池的优点就不明显了。因为 Telnet 会话时间比线程的创建时间大多了。
2️⃣ 对性能要求苛刻的应用,比如要求服务器迅速相应客户请求。
3️⃣ 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将输出大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误。
4️⃣ 需要限制系统钟同时运行线程的数量,以避免系统资源被耗尽或因过多线程而导致系统性能能下降。
5️⃣ 需要异步执行任务,但又不想为每个任务创建一个新线程,因为这会消耗大量系统资源。
6️⃣ 需要在多个任务之间共享线程池之间共享线程池中的资源,例如共享数据库连接或共享线程安全的数据结构。
线程池的优点:
- 线程池避免了在处理短时间任务时创建于销毁线程的代价。
- 线程池不仅能够保证内核充分利用,还能防止过分调度。
注意:线程池中的可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。
线程池模拟实现
下面我们实现一个简单的线程池,线程池中提供一个任务队列,用于存储给出的任务,以及多个线程。
- 创建固定数量的线程池,循环从任务队列中获取任务对象。
- 获取到任务对象后,执行任务对象中的任务接口。
下面线程池中引入的任务 task.hpp 实现如下:
#pragma once
#include <iostream>
#include <string>
class Task
{
public:
Task(int one = 0, int two = 0, char op = '0')
: elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator()() { return run(); }
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
if (elemTwo_ == 0)
{
std::cout << "div zero,about " << std::endl;
result = -1;
}
else
result = elemOne_ / elemTwo_;
break;
case '%':
if (elemTwo_ == 0)
{
std::cout << "mod zero,about " << std::endl;
result = -1;
}
else
result = elemOne_ % elemTwo_;
break;
default:
std::cout << "非法操作:" << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
线程池代码实现如下:
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <cstdlib>
#include <memory>
#include <pthread.h>
#include <cassert>
#include <sys/prctl.h>
using namespace std;
int gThreadNum = 5; // 默认线程数
template <class T>
class ThreadPool
{
// 加锁和解锁互斥锁
void lockQueue() { pthread_mutex_lock(&mutex_); }
void unlockQueue() { pthread_mutex_unlock(&mutex_); }
// 检测队列是否为空
bool isEmpty() { return taskQueue_.empty(); }
// 等待任务队列中有新任务加入
void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
// 唤醒队列中的线程来处理任务
void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
public:
// 构造函数,需要传入线程数
ThreadPool(int threadNum = gThreadNum) : isStart_(false), threadNum_(threadNum)
{
assert(threadNum_ > 0);
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
// 析构函数,销毁互斥锁和条件变量
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
// 线程执行函数,传入this指针
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self()); // 线程分离
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args); // 获取线程池对象
prctl(PR_SET_NAME, "follower"); // 设置线程名
while (1)
{
tp->lockQueue(); // 加锁
while (tp->isEmpty()) // 如果任务队列为空,则等待
{
tp->waitForTask();
}
T t = tp->pop(); // 取出任务
tp->unlockQueue(); // 解锁
// for debug
int one, two;
char oper;
t.get(&one, &two, &oper);
// 执行任务并输出结果
Log() << "完成计算任务:" << one << oper << two << "=" << t.run() << endl;
}
}
// 从任务队列中取出任务
T pop()
{
T temp = taskQueue_.front();
taskQueue_.pop();
return temp;
}
// 启动线程池
void start()
{
assert(!isStart_); // 确保线程池未启动
for (int i = 0; i < threadNum_; i++)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
// 添加任务到任务队列中
void push(const T &in)
{
lockQueue();
taskQueue_.push(in);
choiceThreadForHandler();
unlockQueue();
}
private:
bool isStart_; // 判断线程池是否启动
int threadNum_; // 线程数量
queue<T> taskQueue_; // 任务队列,存储待执行的任务
pthread_mutex_t mutex_; // 互斥锁,保证线程安全
pthread_cond_t cond_; // 条件变量,用于线程同步
};
线程池实现说明:
- 创建了一个模板类 ThreadPool ,该类中包含任务队列、互斥锁、条件变量等成员变量,以及相应的成员函数,用于添加任务、取出任务、启动线程池等操作。
- 任务队列是被多个线程共享的临界资源,为了保证多个线程对任务队列的安全访问,需要引入互斥锁来保证同一时间只有一个线程可以对任务队列进行操作。
- 线程池中的线程是通过不断地从任务队列中取出任务来进行处理的。当任务队列为空时,线程需要等待新的任务加入。为了实现该等待操作,需要引入条件变量来实现线程的同步。当任务队列为空时,线程将会等待条件变量,一旦有新的任务加入队列,就被通过条件变量被唤醒,从而继续执行任务。
- 在线程执行函数中使用 while 替代 if 进行条件判断,当线程被唤醒后,应该再次检测条件是否满足。因为线程可能会出现伪唤醒的情况,即线程在没有明确被唤醒的情况下醒来。因此,这里使用 while 来检测条件是否满足。
- 在获取任务后,应该尽快的释放锁,然后再处理任务,这样可以最大程度的减少临界区的持有时间,允许其它线程有机会执行并发操作,提高整体效率。
pthread_mutex_lock(&mutex);
// 获取任务
pthread_mutex_unlock(&mutex);
// 处理任务
- 需要将线程例程(threadRoutine)设置为静态方法。这是因为 pthread_create 函数的第三个参数需要的是一个全局函数,它不能接受非静态成员函数。因为非静态成员函数默认第一个参数是 this ,而 pthread_create 创建的线程函数只能接受一个 void* 参数,这两者是不匹配的,因此,我们需要将其设置为静态的,然后显示地传递线程池对象作为参数。
接下来对实现的线程池进行测试,测试代码如下:
#include "ThreadPool.hpp"
#include "task.hpp"
#include <ctime>
int main()
{
prctl(PR_SET_NAME,"master");
const string operators = "+-*/%";
unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
tp->start();
srand((unsigned long)time(nullptr) ^ getpid() ^ pthread_self());
// 派发任务的线程
while (true)
{
int one = rand() % 50;
int two = rand() % 10;
char oper = operators[rand() % operators.size()];
cout << "派发计算任务:" << one << oper << two << "=?" << endl;
Task t(one, two, oper);
tp->push(t);
sleep(1);
}
return 0;
}
当程序运行起来之后,该进程中有一个 master 主线程,和5个 follower 子线程进行处理任务。