⭐小白苦学IT的博客主页
⭐初学者必看:Linux操作系统入门
⭐代码仓库:线程池实现源码
❤关注我一起讨论和学习Linux系统
1.前言
你了解过Linux线程池吗?不了解?没有关系,先来带你了解一下我们的池化技术,以及为什么要用到池化技术,然后再了解具体的线程池是什么?有什么作用?用代码如何实现?如何手撕线程池?
2.池化技术是什么?
池化技术 (Pool) 是一种很常见的编程技巧,在请求量大时能明显优化应用性能,降低系统频繁建连的资源开销。我们日常学习或者工作过程中常见的有数据库连接池、线程池、对象池等,它们的特点都是将 “昂贵的”、“费时的” 的资源维护在一个特定的 “池子” 中,规定其最小连接数、最大连接数、阻塞队列等配置,方便进行统一管理和复用,通常还会附带一些探活机制、强制回收、监控一类的配套功能。
说了这么多其实总结下来就是一句话:
就是提前准备一些资源放在"池子"中,在需要使用的时候就可以多次重复使用这些准备好的资源了。
就拿我们的线程池来说,就是提前创建一些线程来进行备用,不至于要使用时再创建线程,如果需要大量线程就会耗费大量的创建线程的时间从而降低应用性能,加大系统频繁建连的资源开销。
池化技术的优点:
- 提高系统性能和资源利用率。
- 减少资源的创建和销毁开销。
- 支持并发访问和高并发处理。
- 简化系统设计和开发。
3.线程池解析
什么是线程池?
线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。
举例说明
假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
线程池的使用场景
Web服务器:Web服务器需要处理大量的客户端请求。通过使用线程池,服务器可以同时处理多个请求,避免为每个请求都创建新的线程,从而提高服务器的性能和响应速度。
数据库操作:数据库操作通常涉及到复杂的查询和数据处理任务。通过线程池,可以并行执行多个数据库操作,提高数据库的整体性能。
图像处理与视频编码:在图像处理或视频编码等任务中,线程池可以用于并行处理图像的不同部分或视频的帧,从而加快处理速度。
网络通信:在网络编程中,线程池常用于处理并发的网络连接或数据传输,确保多个连接或数据传输任务能够同时进行。
任务调度系统:在任务调度系统中,线程池用于将任务分配给可用的线程执行,实现任务的并行处理和高效调度。
线程池的组成部分
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
4.手撕线程池
线程模板类
Thread.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>
#include<pthread.h>
template<class T>
using func_t = std::function<void(T&)>;//相当于typedef std::function<void()> func_t
template<class T>
class Thread
{
public:
Thread(const std::string & threadname,func_t<T> func,T& data)//初始化线程对象
:_tid(0)
,_isrunning(false)
,_threadname(threadname)
,_func(func)
,_data(data)
{}
static void* ThreadRoutine(void * args) //线程创建时开始执行的函数是一个返回值为void* ,参数是一个void*的函数
{
Thread* ts =static_cast<Thread*>(args);
ts->_func(ts->_data);
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid,nullptr,ThreadRoutine,this);//创建线程同时获取返回值
if(n==0) //通过创建线程函数返回值判断线程是否在运行。
{
_isrunning = true;
return true;
}
return false;
}
bool Join()
{
if(!_isrunning) return true; //如果线程没有运行就说明是终止成功了,所以返回true.
int n = pthread_join(_tid,nullptr);//如果线程还在运行就让线程终止
if(n==0)
{
_isrunning = false;//如果n==0说明终止线程成功了,返回true
return true;
}
return false;//否则终止失败了
}
std::string ThreadName()//向外提供线程名称的接口
{
return _threadname;
}
bool IsRunning()//向外提供判断线程是否正在运行的接口
{
return _isrunning;
}
~Thread()
{}
private:
pthread_t _tid;//线程ID
bool _isrunning;//判断线程是否正在运行
std::string _threadname;//线程名称
func_t<T> _func;//线程需要执行的函数
T _data;//线程需要处理的数据
};
将互斥锁封装成一个类,通过对象的生命周期管理互斥锁,也就是RAII的加锁方式:
LockGuard.hpp:
#pragma once
#include<pthread.h>
//不定义锁,默认认为外部会给我们传入锁对象
class Mutex
{
public:
Mutex(pthread_mutex_t * lock):_lock(lock)
{}
void Lock()
{
pthread_mutex_lock(_lock);//加锁
}
void Unlock()
{
pthread_mutex_unlock(_lock);//解锁
}
~Mutex(){}
private:
pthread_mutex_t * _lock;
};
//RAII风格的加锁方式
class LockGuard
{
public:
LockGuard(pthread_mutex_t * lock):_mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex _mutex;
};
ThreadPool.hpp
#pragma once
#include"Thread.hpp"
#include"LockGuard.hpp"
#include"Log.hpp"
#include<pthread.h>
#include<functional>
#include<vector>
#include<queue>
#include<string>
const int defaultnum = 5; //控制线程池中的线程数量
class ThreadData
{
public:
ThreadData(std::string name)
:_threadname(name)
{}
std::string GetThreadName() //封装获取线程名字接口
{
return _threadname;
}
~ThreadData(){}
private:
std::string _threadname;//线程名称
};
template<class T>
class ThreadPool
{
public:
ThreadPool(int thread_num = defaultnum)
:_thread_num(thread_num)
,_threads()
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
//构建指定个数的线程
for(int i = 0;i<_thread_num;i++)
{
std::string threadname = "thread-";
threadname+=std::to_string(i+1); //通过编号对线程进行动态命名
//待优化
ThreadData td(threadname);
// Thread<ThreadData> t(threadname,\
// std::bind(&ThreadPool<T>::ThreadRun,this,std::placeholders::_1),td);
// _threads.push_back(t);
_threads.emplace_back(threadname,\
std::bind(&ThreadPool<T>::ThreadRun,this,std::placeholders::_1),td);
log.LogMessage(DEBUG,"%s is created...\n",threadname.c_str());//打印日志消息
}
}
void ThreadWait(ThreadData& td)//让线程阻塞
{
log.LogMessage(DEBUG,"no task %s is sleeping...\n",td.GetThreadName().c_str());
pthread_cond_wait(&_cond,&_mutex);
}
void ThreadWakeUp()//唤醒一个线程
{
//log.LogMessage(DEBUG,"have task %s is wakeup...\n",td.GetThreadName().c_str());
pthread_cond_signal(&_cond);
}
bool Start()
{
//启动
for(auto & thread:_threads)
{
thread.Start();//创建线程并启动
log.LogMessage(INFO,"%s is running ...\n",thread.ThreadName().c_str());
}
return true;
}
void ThreadRun(ThreadData& td)//线程运行处理任务队列中的任务
{
while(true)
{
T t;
{
LockGuard lockguard(&_mutex);
while(_q.empty())//如果任务队列为空,那么就让进程停下来
{
ThreadWait(td);
log.LogMessage(DEBUG,"thread %s is wakeup...\n",td.GetThreadName().c_str());
}
t = _q.front();
_q.pop();
}
//处理任务
t();
log.LogMessage(INFO,"%s handler task %s done, result is : %s\n",\
td.GetThreadName().c_str(),t.PrintTask().c_str(),t.PrintResult().c_str());
}
}
void Push(T& in)
{
log.LogMessage(DEBUG,"other thread push a task: %s\n",in.PrintTask().c_str());
LockGuard lockguard(&_mutex);
_q.push(in);//将任务放入任务队列
ThreadWakeUp();//唤醒一个线程
}
void Wait()
{
for(auto & thread:_threads)
{
thread.Join();//让线程池中的线程一个一个的终止
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);//销毁锁
pthread_cond_destroy(&_cond);//销毁环境变量
}
private:
std::queue<T> _q;//任务队列
std::vector<Thread<ThreadData>> _threads;//用来存放线程的vector容器
int _thread_num;//线程数量
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _cond;//条件变量
};
对任务对两个数进行+-*/%操作的任务进行封装:
task.hpp:
#pragma once
#include<string>
const int defaultvalue = 0;
//用于判断任务执行结果是否正确的错误码,如果是正常结果,默认是0
enum
{
ok = 0,
div_zero,//除0错误
mod_zero,//模0错误
unknow //未知错误
};
const std::string opers = "+-*/%";
class Task
{
public:
Task(){}
Task(int x, int y, char op, int result=defaultvalue, int code=ok)
: _data_x(x), _data_y(y), _oper(op)
{
}
std::string PrintTask() //构建执行的任务字符串,方便测试观察
{
std::string s;
s= std::to_string(_data_x);
s+=_oper;
s+=std::to_string(_data_y);
s+="=?";
return s;
}
std::string PrintResult() //构建执行任务后的结果字符串,便于测试观察
{
std::string s;
s= std::to_string(_data_x);
s+=_oper;
s+=std::to_string(_data_y);
s+="=";
s+=std::to_string(_result);
s+=" [";
s+=std::to_string(_code);
s+="]";
return s;
}
void Run()
{
switch (_oper)
{
case '+':
_result = _data_x + _data_y;
break;
case '-':
_result = _data_x - _data_y;
break;
case '*':
_result = _data_x * _data_y;
break;
case '/':
{
if (_data_y == 0)
{
_code = div_zero;
}
else _result = _data_x / _data_y;
}
break;
case '%':
{
if (_data_y == 0)
{
_code = mod_zero;
}
else _result = _data_x % _data_y;
}
break;
default:
_code = unknow;
break;
}
}
void operator()()//运算符重载实现仿函数
{
Run();
}
~Task() {}
private:
int _data_x; // 操作数
int _data_y; // 操作数
char _oper; // 运算符
int _result; // 结果
int _code; // 结果码 0:可信 !0:不可信
};
对日志进行的封装
Log.hpp
#pragma once
#include <iostream>
#include <cstdarg>
#include <ctime>
#include <string>
#include <unistd.h>
#include <fstream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
enum
{
DEBUG = 0,
INFO,
WARNING,
ERROR,
FATAL
};
enum
{
Screen = 10,
Onefile,
Classfile
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
const int defaultstyle = Screen;
const std::string default_filename = "log.";
const std::string logdir="log";
class Log
{
public:
Log():style(defaultstyle),filename(default_filename)
{
mkdir(logdir.c_str(),0775);
}
void Enable(int sty)
{
style = sty;
}
std::string TimestampToLocalTime()
{
time_t curr = time(nullptr);
struct tm *currtime = localtime(&curr);
char time_buffer[128];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
currtime->tm_year + 1900, currtime->tm_mon, currtime->tm_mday, currtime->tm_hour,
currtime->tm_min, currtime->tm_sec);
return time_buffer;
}
void WriteLog(const std::string &levelstr, const std::string &message)
{
switch (style)
{
case Screen:
std::cout << message;
break;
case Onefile:
WriteLogToOnefile("all", message);
break;
case Classfile:
WriteLogToClassfile(levelstr, message);
break;
default:
break;
}
}
void WriteLogToOnefile(const std::string &logname, const std::string &message)
{
umask(0);
int fd = open(logname.c_str(),O_CREAT | O_WRONLY | O_APPEND,0666);
if(fd<0)return;
write(fd,message.c_str(),message.size());
close(fd);
// std::ofstream out(logname);
// if (!out.is_open())
// return;
// out.write(message.c_str(), message.size());
// out.close();
}
void WriteLogToClassfile(const std::string &levelstr, const std::string &message)
{
std::string logname = logdir;
logname+="/";
logname+=filename;
logname += levelstr;
WriteLogToOnefile(logname, message);
}
void LogMessage(int level, const char *format, ...) // 类c的日志接口
{
char rightbuffer[1024];
va_list args;
va_start(args, format);
vsnprintf(rightbuffer, sizeof(rightbuffer), format, args);
va_end(args);
char leftbuffer[1024];
std::string curtime = TimestampToLocalTime();
std::string levelstr = LevelToString(level);
std::string idstr = std::to_string(getpid());
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s]",
levelstr.c_str(), curtime.c_str(), idstr.c_str());
std::string logInfo = leftbuffer;
logInfo += rightbuffer;
WriteLog(levelstr, logInfo);
}
~Log() {}
private:
int style;
std::string filename;
};
Log log;
class Conf
{
public:
Conf()
{
log.Enable(Screen);
}
~Conf(){}
};
Conf conf;
注意的测试代码:
Main.cc
#include <iostream>
#include <memory>
#include "task.hpp"
#include "ThreadPool.hpp"
#include <ctime>
int main()
{
std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
tp->Start();//创建线程并在任务队列中拿任务执行
srand((uint64_t)time(nullptr) ^ getpid()); //种下随机种子
while (true)
{
int data1 = rand() % 10;
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % opers.size()];
// 2.生产数据构造任务
Task t(data1, data2, oper);
tp->Push(t);//将任务放入任务队列
sleep(1);
}
tp->Wait();
return 0;
}
测试运行的结果:
我们通过日志消息可以知道,线程一开始5个被创建,然后运行任务,当线程发现任务队列中没有任务时会睡眠,当任务队列有任务之后又会被唤醒,执行的任务的内容以及执行任务的结果也会通过日志展示在了显示器当中。
5.单例模式
什么是单例模式?
单例模式是一种常见的设计模式,主要用于确保某个类仅有一个实例,并提供一个全局访问点。在软件系统中,经常需要一个对象来协调多个子系统之间的行为,这些子系统需要共享这个对象,以确保它们之间的行为一致。这种情况下,就需要使用单例模式来创建这个对象。
单例模式的实现需要满足以下三个特点:
- 单例类只能有一个实例。
- 单例类必须自己创建自己的唯一实例。
- 单例类必须给所有其他对象提供这一唯一实例。
单例模式的应用场景
- Windows的资源管理器就是一个单例模式的应用,它负责整个系统的资源供应。
- 在多线程的程序中,为了避免对共享资源的多重占用,也常使用单例模式来创建线程池、数据库连接池等。
- 网站的计数器,一般也是采用单例模式实现,否则难以同步。
6.设计成单例模式线程池
只需修改ThreadPool.hpp的部分代码和Main.cc的测试代码即可将线程池改为单例模式实现线程安全:
ThreadPool.hpp
#pragma once
#include"Thread.hpp"
#include"LockGuard.hpp"
#include"Log.hpp"
#include<pthread.h>
#include<functional>
#include<vector>
#include<queue>
#include<string>
const int defaultnum = 5; //控制线程池中的线程数量
class ThreadData
{
public:
ThreadData(std::string name)
:_threadname(name)
{}
std::string GetThreadName() //封装获取线程名字接口
{
return _threadname;
}
~ThreadData(){}
private:
std::string _threadname;//线程名称
};
template<class T>
class ThreadPool
{
public:
void ThreadWait(ThreadData& td)//让线程阻塞
{
log.LogMessage(DEBUG,"no task %s is sleeping...\n",td.GetThreadName().c_str());
pthread_cond_wait(&_cond,&_mutex);
}
void ThreadWakeUp()//唤醒一个线程
{
//log.LogMessage(DEBUG,"have task %s is wakeup...\n",td.GetThreadName().c_str());
pthread_cond_signal(&_cond);
}
bool Start()
{
//启动
for(auto & thread:_threads)
{
thread.Start();//创建线程并启动
log.LogMessage(INFO,"%s is running ...\n",thread.ThreadName().c_str());
}
return true;
}
void ThreadRun(ThreadData& td)//线程运行处理任务队列中的任务
{
while(true)
{
T t;
{
LockGuard lockguard(&_mutex);
while(_q.empty())//如果任务队列为空,那么就让进程停下来
{
ThreadWait(td);
log.LogMessage(DEBUG,"thread %s is wakeup...\n",td.GetThreadName().c_str());
}
t = _q.front();
_q.pop();
}
//处理任务
t();
log.LogMessage(INFO,"%s handler task %s done, result is : %s\n",\
td.GetThreadName().c_str(),t.PrintTask().c_str(),t.PrintResult().c_str());
}
}
void Push(T& in)
{
log.LogMessage(DEBUG,"other thread push a task: %s\n",in.PrintTask().c_str());
LockGuard lockguard(&_mutex);
_q.push(in);//将任务放入任务队列
ThreadWakeUp();//唤醒一个线程
}
void Wait()
{
for(auto & thread:_threads)
{
thread.Join();//让线程池中的线程一个一个的终止
}
}
static ThreadPool<T>* GetInstance()
{
if(nullptr == instance)
{
instance = new ThreadPool<T>();
}
return instance;
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);//销毁锁
pthread_cond_destroy(&_cond);//销毁环境变量
}
private:
ThreadPool(int thread_num = defaultnum)
:_thread_num(thread_num)
,_threads()
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
//构建指定个数的线程
for(int i = 0;i<_thread_num;i++)
{
std::string threadname = "thread-";
threadname+=std::to_string(i+1); //通过编号对线程进行动态命名
//待优化
ThreadData td(threadname);
// Thread<ThreadData> t(threadname,\
// std::bind(&ThreadPool<T>::ThreadRun,this,std::placeholders::_1),td);
// _threads.push_back(t);
_threads.emplace_back(threadname,\
std::bind(&ThreadPool<T>::ThreadRun,this,std::placeholders::_1),td);
log.LogMessage(DEBUG,"%s is created...\n",threadname.c_str());//打印日志消息
}
}
ThreadPool(const ThreadPool<T> &tp) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T>) = delete;
private:
std::queue<T> _q;//任务队列
std::vector<Thread<ThreadData>> _threads;//用来存放线程的vector容器
int _thread_num;//线程数量
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _cond;//条件变量
static ThreadPool<T>* instance;
};
template<class T>
ThreadPool<T>* ThreadPool<T>::instance = nullptr;
测试代码Main.cc
#include <iostream>
#include <memory>
#include "task.hpp"
#include "ThreadPool.hpp"
#include <ctime>
int main()
{
//单例模式下这样构造对象就不起作用了
//std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
ThreadPool<Task>::GetInstance()->Start();//创建线程并在任务队列中拿任务执行
srand((uint64_t)time(nullptr) ^ getpid()); //种下随机种子
while (true)
{
int data1 = rand() % 10;
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % opers.size()];
// 2.生产数据构造任务
Task t(data1, data2, oper);
ThreadPool<Task>::GetInstance()->Push(t);//将任务放入任务队列
sleep(1);
}
ThreadPool<Task>::GetInstance()->Wait();
return 0;
}
运行结果:
7.总结与扩展
线程池的扩展方向:
- 动态调整线程数量:线程池可以根据当前系统的负载情况动态地调整线程的数量。当任务队列堆积的任务较多时,可以增加线程数量以提高处理速度;当任务队列为空或线程空闲较多时,可以减少线程数量以节省资源。
- 优先级调度:线程池可以支持任务的优先级调度,允许根据任务的紧急程度或重要性来分配执行顺序。这对于需要处理不同优先级任务的应用场景非常有用。
- 任务依赖与分组:线程池可以支持任务之间的依赖关系,确保按照特定的顺序执行任务。同时,也可以支持任务分组,将一组相关的任务分配给同一个线程执行,以减少线程间的切换开销。