线程系列:
Linux–线程的认识(一)
Linux–线程的分离、线程库的地址关系的理解、线程的简单封装(二)
线程的互斥:临界资源只能在同一时间被一个线程使用
生产消费模型
信号量
线程池
线程池(Thread Pool)是一种基于池化技术设计用于管理和复用线程的机制。
在多线程编程中,频繁地创建和销毁线程会消耗大量的系统资源,并且由于线程的创建和销毁需要时间,这也会降低程序的执行效率。线程池通过预先创建一定数量的线程并放入池中,需要时从池中取出线程执行任务,执行完毕后线程并不销毁而是重新放回池中等待下一次使用,从而避免了线程的频繁创建和销毁所带来的开销。
主要优点
- 降低资源消耗:通过复用已存在的线程,减少线程创建和销毁的开销。
- 提高响应速度:当任务到达时,可以立即分配线程进行处理,减少了等待时间。
- 提高线程的可管理性:线程池可以统一管理线程,包括线程的创建、调度、执行和销毁等。
关键参数
- 核心线程数:线程池维护线程的最少数量,即使这些线程处于空闲状态,线程池也不会回收它们(一般为主线程)。
- 最大线程数:线程池中允许的最大线程数。
- 阻塞队列:用于存放待执行的任务的队列。当线程池中的所有线程都忙时,新任务会被添加到这个队列中等待处理。
- 非核心线程空闲存活时间:当线程池中的线程数量超过核心线程数时,如果线程空闲时间超过这个时间,多余的线程将被终止。
- 线程工厂:用于创建新线程的工厂。
- 拒绝策略:当线程池和队列都满了时,对于新来的任务将采取的拒绝策略。
实现原理
线程池的实现原理通常包括以下几个关键部分:
- 线程池管理器:负责创建并管理线程池,包括初始化线程池、创建工作线程、销毁线程池等。
- 工作线程:线程池中的线程,负责执行具体的任务。工作线程通常会不断从任务队列中取出任务并执行,直- 到线程池被销毁或所有任务都执行完毕。
- 任务接口:每个任务必须实现的接口,用于定义任务的执行逻辑。在Java中,这通常是通过实现Runnable或Callable接口来实现的。
- 任务队列:用于存放待处理的任务。当线程池中的工作线程数量达到最大值时,新到达的任务会被放入任务队列中等待处理。任务队列的实现通常依赖于Java的BlockingQueue接口。
实现流程
- 当有新任务提交给线程池时,线程池会首先判断当前正在运行的线程数量是否小于核心线程数。如果是,则直接创建新的线程来执行任务;否则,将任务加入任务队列等待处理。
- 如果任务队列已满,且当前正在运行的线程数量小于最大线程数(maximumPoolSize),则创建新的线程来处理任务;如果线程数量已经达到最大线程数,则根据配置的拒绝策略来处理新任务(如抛出异常、直接丢弃等)。
- 当一个线程完成任务后,它会从任务队列中取下一个任务来执行;如果没有任务可供执行,并且线程池中的线程数量超过了核心线程数,且这些线程空闲时间超过了设定的存活时间,则这些线程会被销毁,直到线程池中的线程数量减少到核心线程数为止。
实例
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include<iostream>
#include<string>
#include<pthread.h>
#include<functional>
#include<unistd.h>
using namespace std;
namespace ThreadMdule
{
using func_t = std::function<void(string)>;
class Thread
{
public:
void Excute()
{
_func(_threadname);
}
Thread(func_t func, const std::string &name="none-name")
: _func(func), _threadname(name), _stop(true)
{}
static void* threadroutine(void* args)
{
Thread* self=static_cast<Thread*>(args);
self->Excute();
return nullptr;
}
bool start()
{
int n=pthread_create(&_tid,nullptr,threadroutine,this);
if(!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if(!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if(!_stop)
{
pthread_join(_tid,nullptr);
}
}
string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
func_t _func;
bool _stop;
};
}
#endif
ThreadPool.hpp
#pragma once
#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"
using namespace ThreadMdule;
using namespace std;
const static int gdefaultthreadnum=3;//默认线程池的线程数
template <class T>
class ThreadPool
{
public:
ThreadPool(int threadnum=gdefaultthreadnum) :_threadnum(threadnum),_waitnum(0),_isrunning(false)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
LOG(INFO,"ThreadPool COnstruct.");
}
//各个线程独立的任务函数
void HandlerTask(string name)
{
LOG(INFO,"%s is running...",name.c_str());
while(true)
{
LockQueue();//开启保护
//等到有任务时才退出循环执行下列语句
while(_task_queue.empty()&&_isrunning)
{
_waitnum++;
ThreadSleep();
_waitnum--;
}
//当任务队列空并且线程池停止时线程退出
if(_task_queue.empty()&&!_isrunning)
{
UnlockQueue();
cout<<name<<" quit "<<endl;
sleep(1);
break;
}
//1.任务队列不为空&&线程池开启
//2.任务队列不为空&&线程池关闭,直到任务队列为空
//所以,只要有任务,就要处理任务
T t=_task_queue.front();//取出对应任务
_task_queue.pop();
UnlockQueue();
LOG(DEBUG,"%s get a task",name.c_str());
//处理任务
t();
LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString().c_str());
}
}
//线程池中线程的构建
void InitThreadPool()
{
for(int i=0;i<_threadnum;i++)
{
string name="thread-"+to_string(i+1);
_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);
LOG(INFO,"init thread %s done",name.c_str());
}
_isrunning=true;
}
//线程池的启动
void Start()
{
for(auto& thread:_threads)
{
thread.start();
}
}
//线程池停止
void Stop()
{
LockQueue();
_isrunning=false;
ThreadWakeupAll();
UnlockQueue();
}
void Wait()
{
for(auto& thread:_threads)
{
thread.Join();
LOG(INFO,"%s is quit...",thread.name().c_str());
}
}
//将任务入队列
bool Enqueue(const T& t)
{
bool ret=false;
LockQueue();
if(_isrunning)
{
_task_queue.push(t);
//如果有空闲的线程,那么唤醒线程让其执行任务
if(_waitnum>0)
{
ThreadWakeup();
}
LOG(DEBUG,"enqueue task success");
ret=true;
}
UnlockQueue();
return ret;
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWakeup()
{
pthread_cond_signal(&_cond);
}
void ThreadWakeupAll()
{
pthread_cond_broadcast(&_cond);
}
int _threadnum;//线程数
vector<Thread> _threads;//存储线程的vector
queue<T> _task_queue;//输入的任务队列
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _cond;//条件变量
int _waitnum;//空闲的线程数
bool _isrunning;//表示线程池是否启动
};
Task.hpp
#include<iostream>
#include<string>
#include<functional>
using namespace std;
class Task
{
public:
Task(){}
Task(int a,int b): _a(a),_b(b),_result(0)
{}
void Excute()
{
_result=_a+_b;
}
string ResultToString()
{
return to_string(_a) + "+"+to_string(_b)+"="+to_string(_result);
}
string DebugToString()
{
return to_string(_a) + "+" + to_string(_b) + "= ?";
}
void operator()()
{
Excute();
}
private:
int _a;
int _b;
int _result;
};
main.cc
int main()
{
srand(time(nullptr)^getpid()^pthread_self());
//EnableScreen();
EnableFile();
unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5));
tp->InitThreadPool();
tp->Start();
int tasknum=10;
while(tasknum)
{
sleep(1);
int a=rand()%10+1;
usleep(1024);
int b=rand()%20+1;
Task t(a,b);
LOG(INFO,"main thread push task: %s",t.DebugToString().c_str());
tp->Enqueue(t);
tasknum--;
}
tp->Stop();
tp->Wait();
}
Loh.hpp
#pragma once
#include<iostream>
#include<fstream>
#include<ctime>
#include<cstdarg>
#include<string>
#include<sys/types.h>
#include<unistd.h>
#include<cstdio>
#include"LockGuard.hpp"
using namespace std;
bool gIsSave=false;//默认输出到屏幕
const string logname="log.txt";
//1.日志是有等级的
enum Level
{
DEBUG=0,
INFO,
WARNING,
ERROR,
FATAL
};
void SaveFile(const string& filename,const string& messages)
{
ofstream out(filename,ios::app);
if(!out.is_open())
{
return;
}
out<<messages;
out.close();
}
//等级转化为字符串
string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unkonwn";
}
}
//获取当前时间
string GetTimeString()
{
time_t curr_time=time(nullptr);//时间戳
struct tm* format_time=localtime(&curr_time);//转化为时间结构
if(format_time==nullptr)
return "None";
char time_buffer[1024];
snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//获取日志信息
void LogMessage(string filename,int line,bool issave,int level,char* format,...)
{
string levelstr=LevelToString(level);
string timestr=GetTimeString();
pid_t selfid=getpid();
char buffer[1024];
va_list arg;
va_start(arg,format);
vsnprintf(buffer,sizeof(buffer),format,arg);
va_end(arg);
string message= "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
LockGuard lockguard(&lock);
if(!issave)
{
cout<<message;
}
else
{
SaveFile(logname,message);
}
}
#define LOG(level,format,...) \
do \
{ \
LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__); \
} while (0)
#define EnableFile() \
do \
{ \
gIsSave=true; \
} while (0)
#define EnableScreen() \
do \
{ \
gIsSave=false; \
} while (0)
线程池解释(代码)
日志解释(代码)
日志是系统或程序记录所有发生事件的文件: