目录
前提知识:
一,线程池意义
二,实现流程
阶段一,搭建基本框架
1. 利用linux第三方库,将pthread_creat线程接口封装
2. 实现基本主类ThreadPool基本结构
阶段二,完善多线程安全
1. 日志信息打印——模拟企业级日常日志记录
2. C/C++的格式化输出
3. C,C++接口套用时,考虑this指针
阶段三,优化为单例模式——懒汉
四,源码
嗨!收到一张超美的风景图,愿你每天都能顺心!
前提知识:
C/C++线程接口使用,可参考多线程基础入门【Linux之旅】——上篇【线程控制 || 线程互斥 || 线程安全】-CSDN博客
互斥锁,信号量知识,可参考多线程基础入门【Linux之旅】——下篇【死锁 || 条件变量 || 生产消费者模型 || 信号量】-CSDN博客
C++STL接口使用,如:queue,string等
单例模式,懒汉与饿汉模式,可参考C++特殊类设计【特殊类 || 单例对象 || 饿汉模式 || 懒汉模式】-CSDN博客
一,线程池意义
1. 创建固定数量线程池,循环从任务队列中获取任务对象。2. 获取到任务对象后,执行任务对象中的任务接口。
二,实现流程
阶段一,搭建基本框架
目标:
1. 利用linux第三方库,将pthread_creat线程接口封装
#include <iostream>
#include <pthread.h>
#include <functional>
#include <string>
#include <unistd.h>
typedef void* (*func_t)(void*); // pthread_creat() C接口不接受包装器functional
// 当线程处理函数运行时需要便捷得知当前线程的信息,
// 因此我们将传入一个结构体。
class ThreadDate
{
public:
void* args; // 线程任务函数参数
std::string _name; // 线程名字
pthread_t _id; // 线程ID
// void* _TP; // 主进程对象
};
class Thread
{
public:
Thread(int pid, func_t calltalk, void* args):func_(calltalk)
{
char name[64];
snprintf(name, sizeof name, "Thread - %d ", pid); // 将线程编号导入到名字中
trda_._name = name;
trda_._id = pid;
}
void start()
{
pthread_create(&trda_._id, nullptr, func_ , (void*)&trda_);
}
void join()
{
pthread_join(trda_._id ,nullptr);
}
const std::string& name()
{
return trda_._name;
}
private:
func_t func_; // 处理函数
ThreadDate trda_; // 给任务提供的线程等其他信息
};
当然 我们也可以直接使用C++11提供的thread库,这会简单许多。
2. 实现基本主类ThreadPool基本结构
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = THREADPOOL_NUM):_pnum(num)
{
for (int i = 0; i < num; i++)
{
_thr_pool.push_back(new Thread(i + 1, routine, nullptr));
}
}
// 多线程处理函数——重要函数
static void* routine(void* trda)
// 关于 routine函数,要设置为静态的原因:
// 在Thread构造中,第二位为void*(*func_t)(void*)类型
// 而类中成员函数都隐藏了一个this指针,这样会导致routine类型不匹配的问题
{
ThreadDate* _trda = static_cast<ThreadDate*>(trda);
std::cout << _trda->_name << std::endl;
return nullptr;
}
// 线程池维护区
void Run()
{
for (auto& e : _thr_pool)
{
e->start();
}
}
~ThreadPool()
{
for (auto& e : _thr_pool)
{
e->join();
delete(e);
}
}
private:
std::vector<Thread*> _thr_pool;
int _pnum; // 有效线程数
};
#endif
阶段二,完善多线程安全
static void* routine(void* trda)
// 关于 routine函数,要设置为静态的原因:
// 在Thread构造中,第二位为void*(*func_t)(void*)类型
// 而类中成员函数都隐藏了一个this指针,这样会导致routine类型不匹配的问题
{
ThreadDate* _trda = static_cast<ThreadDate*>(trda);
ThreadPool<T>* th = static_cast<ThreadPool<T>*>(_trda->args);
T task;
while (1) //不断获取任务
{
{
LockGuard lk(th->get_mutex()); //自己封装的一个加锁类
while (th->get_queue_empty())
{
th->wait_cond();
}
task = th->get_task(); //获取任务,执行...
}
(*task)(_trda->_name);
sleep(1);
}
return nullptr;
}
小知识积累:
1. 日志信息打印——模拟企业级日常日志记录
下面是比较标准的日志模式打印,我们可以利用条件编译的方式选择日志输出格式(终端,文件),(不过一般还好全编译为好,除非两者互斥)
// C语言形式实现一个比较标准的日志格式
// 日志级别
#define NOWAIN 1 // 正常日志
#define DEBUG 2 // debug日志
#define WAIN 3 // 警告但能运行
#define ERROR 4 // 错误但能运行
#define FATIL 0 // 致命错误,运行停止
const char* GetlevelMap[] = {
"FATIL",
"NOWAIN",
"DEBUG",
"WAIN",
"ERROR"
};
//公司常见的日志信息:级别,时间,标准内容(文件名代码位置) + 用户自定义内容
void Logmessage(int level, const char* format , ...)
{
#ifdef REALSE // 默认DEBUG
// 标准日志
char normal[1024];
time_t tm = time(0); // 获取时间戳
snprintf(normal, sizeof normal, "[%s] time[%d]", GetlevelMap[level], tm);
printf("%s\n", normal);
#else
// 标准日志
char normal[1024];
time_t tm = time(0); // 获取时间戳
snprintf(normal, sizeof normal, "[%s] time[%d]", GetlevelMap[level], tm);
// 自定义日志——DEBUG阶段
char custom_log[1024];
va_list v_li; // 本质是 char*
va_start(v_li, format); // 设置成format这样将打印全部自定义内容
vsnprintf(custom_log, sizeof custom_log, format, v_li);
printf("%s %s\n", normal, custom_log);
va_end(v_li);
#endif
}
2. C/C++的格式化输出
关于C语言,C++的格式化输出,在格式化输出方面,C做的相对较好,C++就只有个cout。下面是C格式输出的一些常用接口:
3. C,C++接口套用时,考虑this指针
主要是C接口调用C++类中成员函数出的问题,因为类成员函数参数里面隐藏了this指针。
阶段三,优化为单例模式——懒汉
......
// 通过该指令加载,使用时才进行加载——懒汉模式
static ThreadPool<T>* Get_Instance(int num = THREADPOOL_NUM)
{
if (st == nullptr)
{
mutex_inital.lock();
if (st == nullptr)
{
st = new ThreadPool<T>(num);
}
mutex_inital.unlock();
}
return st;
}
......
......
int Get_queue_task_size(){return task_->size();}
int Get_queue_task_reserver_size(){return task_reserver->size();}
// 1.禁用拷贝&赋值
ThreadPool<T>(const ThreadPool<T>& it) = delete;
ThreadPool<T>& operator= (const ThreadPool<T>& it) = delete;
private:
// 2.构造函数私有
ThreadPool(int num = THREADPOOL_NUM):_pnum(num), task_(new std::queue<T>),
task_reserver(new std::queue<T>)
{
for (int i = 0; i < num; i++)
{
_thr_pool.push_back(new Thread(i + 1, routine, this));
// 考虑到处理方法可能还会获取进程池的信息,因此我们将进程池传入
}
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
std::vector<Thread*> _thr_pool;
int _pnum; // 有效线程数
std::queue<T>* task_;
std::queue<T>* task_reserver;
pthread_mutex_t mtx_;
pthread_cond_t cond_; // 估计就是设置的资源量
static ThreadPool<T>* st;
static std::mutex mutex_inital; //这里直接用C++的锁,防止覆盖锁
};
template <class T>
ThreadPool<T>* ThreadPool<T>::st = nullptr;
template <class T>
std::mutex ThreadPool<T>::mutex_inital;
debug踩坑分享
1. 调试,不如直接打印强
2. 关于任务存放的问题,首先让我们看看上面程序的测试代码,testmain
int main()
{
ThreadPool<Task_add*>* st = ThreadPool<Task_add*>::Get_Instance(); //加载
st->Run(); //线程池启动
// 主函数就负责生产任务,向任务队列输入任务
// 未来可能是网络端获取任务
srand(time(0) * 131 + 1);
while (1)
{
if (st->Get_queue_task_reserver_size() < QUEUE_TASK_NUM)
{
int x = rand() / 20 + 1;
usleep(7777);
int y = rand() / 30 + 4;
Task_add tk (x, y, [](int a, int b){
return a + b;
});
st->push(&tk);
}
// 备用队列超过5个任务再交换队列
if (st->Get_queue_task_reserver_size() >= QUEUE_TASK_NUM / 2 &&
st->Get_queue_task_size() == 0 &&
st->Get_queue_task_reserver_size() <= QUEUE_TASK_NUM)
{
st->swap_queue();
}
}
不卖关子了,问题出在tk变量,我选择储存在栈上,同时循环的使用tk来创建任务,但任务队列导入的是tk地址,这就导致线程都是获取第5个任务的相同数据,解决方法:改成堆上储存,然后手动释放,这样基本上就不会出现数据覆盖的问题。
四,源码
源码: ThreadPool · 逆光/Linux - 码云 - 开源中国 (gitee.com)
结语
本小节就到这里了,感谢小伙伴的浏览,如果有什么建议,欢迎在评论区评论,如果给小伙伴带来一些收获请留下你的小赞,你的点赞和关注将会成为博主创作的动力