文章目录
- 0.回顾进程池
- 1.计算机层面的池化技术
- 2.线程池预备知识
- 2.1介绍线程池
- 2.2设计线程池的意义是什么?
- 2.3其他知识
- 3.回顾C++类与对象
- 3.1cpp什么情况下成员函数必须是静态的?
- 3.1可变参数列表
- 3.2格式化输出函数
- 3.3预定义符号
- 4.图解线程池运作原理
- 4.0完整代码
- Makefile
- log.hpp
- lockGuard.hpp
- Task.hpp
- thread.hpp
- stdThreadPool.hpp
- stdTestMain.cc
- 4.1详细图解
- 4.2运行结果
- 5.指针版的线程池
0.回顾进程池
模拟进程池
1.计算机层面的池化技术
在计算机层面,池化技术是一种常见且重要的编程和设计技巧。其核心思想在于提前准备和保存大量的资源,以备不时之需,同时实现资源的重复使用,提高资源使用效率。这些资源可以是内存、线程、数据库连接等,它们通常被组织在一个特定的“池子”中,方便进行统一管理和复用。
池化技术有多种应用形式,如内存池、线程池、连接池等。例如,在数据库连接池中,系统会预先创建一定数量的数据库连接,并存放在连接池中。当需要访问数据库时,程序可以直接从连接池中获取一个已存在的连接,而不是每次都重新创建新的连接。这样,可以显著降低系统频繁建连的资源开销,提高应用性能。
池化技术的优点:
提高性能。通过重用资源,减少了创建和销毁资源的时间,从而提高了资源的使用效率
降低系统开销, 避免了频繁地向操作系统申请和释放资源的开销
简化代码。通过封装资源管理逻辑,使得应用程序代码更简洁易懂
此外,在人工智能与机器学习领域,池化技术也有重要的应用。
在卷积神经网络(CNN)中,池化层用于对卷积层的输出进行下采样,以减少参数数量和计算量,同时保留模型的表达能力。这种池化技术对于图像处理、自然语言处理、计算机视觉等任务至关重要。
总的来说,池化技术通过提前创建和重复利用资源,提高了系统的性能和资源使用效率,是计算机领域中一种非常重要的技术。
2.线程池预备知识
2.1介绍线程池
在Linux背景下,线程池是一种优化线程管理的技术,旨在减少线程创建和销毁的开销,提高系统的响应能力和稳定性。线程池预先创建并维护一组线程,这些线程在应用程序需要执行并发任务时可以被复用。
线程池的核心思想是将任务队列与线程集合分离。当有新任务到达时,线程池会将其放入任务队列中,而不是立即创建新线程。线程池中的线程会不断从任务队列中取出任务并执行,直到队列为空或线程池被关闭。
什么是线程池
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度
线程池的设计考虑到了以下几点:
线程数量控制:线程池通过设定最大线程数来限制并发执行的任务数量,避免过多线程导致系统资源耗尽。
任务队列管理:线程池使用队列来存储待执行的任务,这允许任务以先入先出(FIFO)的顺序被处理,同时保证线程可以无锁地获取任务,提高并发性能。
线程复用:线程池中的线程在完成任务后不会立即销毁,而是继续等待新的任务,从而减少了线程的创建和销毁开销。
可扩展性和可配置性:线程池通常提供配置选项,允许开发者根据应用程序的需求调整线程数量、任务队列大小等参数。
在Linux环境下,线程池的实现可以依赖于底层的线程库(如pthread库)或更高级的并发框架。这些实现通常提供了线程池的创建、任务提交、线程管理等功能,使开发者能够更方便地利用线程池来优化应用程序的并发性能。
通过使用线程池,Linux应用程序可以更好地管理线程资源,提高系统的响应速度、吞吐量和稳定性,特别是在处理大量并发任务时表现出色。
2.2设计线程池的意义是什么?
在Linux下,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的ThreadFactory创建一个新线程。
设计线程池的主要意义有以下几点:
资源复用:线程是一种宝贵的系统资源,频繁地创建和销毁线程会消耗大量的系统时间和资源。线程池通过预先创建一定数量的线程并保存在内存中,实现了线程的复用,避免了线程的频繁创建和销毁,从而提高了系统的性能。
控制最大并发数:线程池可以限制线程的数量,防止因为创建过多的线程而耗尽系统资源。通过线程池,我们可以设定一个最大并发数,确保系统的稳定性和可靠性。
提高响应速度:当任务到达时,如果线程池中有空闲线程,那么任务可以立即被处理,无需等待新线程的创建。这可以大大提高系统的响应速度。
便于管理:线程池提供了一种统一的方式来管理线程,包括线程的创建、销毁、调度等。这使得我们可以更方便地对线程进行监控和管理。
总的来说,线程池通过复用线程、控制最大并发数、提高响应速度和便于管理等方式,有效地提高了系统的性能和稳定性。在Linux下,我们可以利用一些库(如pthread库)或者框架(如C++11的std::thread)来方便地实现线程池。
2.3其他知识
- 申请内存要调用系统调用:嵌入内核/更改CPU状态/切换页表/内存管理算法(刷新缓冲区/进行IO/腾出空间)/整理内存碎片/杀掉不常用应用节省空间。这一系列操作要耗费资源(时间/空间)
- 创建线程时:创建/初始化各种数据结构 申请内存 维护各种关系
为什么要用线程池?
主要是为了以空间换时间,预先申请一批线程,新任务到来时,直接指派线程而非实时创建。什么是线程池?一次预先申请一批线程,如果有任务就处理,没任务等待。
线程池的目的:
减少系统调用的次数,提高使用内存的效率。
何为线程池?
- 线程池是一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。
- 线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。 类如WEB服务器完成网页请求这样的任务使用线程池技术是非常合适的。单个任务小,而任务数量巨大,一个热门网站的点击操作的任务量很小,但是次数很多。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器崩溃而产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但是短时间内产生大量线程可能使内存到达极限,出现错误。
线程池示例:
- 创建固定数量线程池,循环从任务队列中获取任务对象
- 获取到任务对象后,执行任务对象中的任务接口
3.回顾C++类与对象
3.1cpp什么情况下成员函数必须是静态的?
在C++项目中,成员函数被声明为静态的情况主要有以下几种:
- 无需访问对象状态:当成员函数不需要访问或修改类的非静态成员变量时,它可以被声明为静态。静态成员函数只能访问静态成员变量和其他静态成员函数。这是因为静态成员函数与任何特定的对象实例无关,它们属于类本身,而不是类的任何特定实例。
- 作为工具函数:静态成员函数经常作为工具函数使用,这些函数执行与类相关的某些任务,但不涉及任何特定的对象实例。例如,一个类可能有一个静态成员函数来解析或生成特定于该类的某种数据格式。
- 作为工厂函数:静态成员函数也经常用作工厂函数,用于创建和返回类的实例。这种方式可以隐藏类的构造函数,并提供更安全的对象创建方式,例如通过确保所有必要的参数都被提供,或者在创建对象之前执行某些初始化步骤。
- 作为回调函数:在某些情况下,需要将类的成员函数作为回调函数传递给其他函数或对象。由于非静态成员函数需要一个对象实例来调用,因此不能直接将它们作为回调函数。在这种情况下,可以创建一个静态成员函数作为回调函数的接口,然后在静态成员函数内部调用非静态成员函数。
请注意,虽然静态成员函数在某些情况下很有用,但它们也有一些限制。例如,静态成员函数不能调用非静态成员函数(除非通过对象实例),也不能直接访问非静态成员变量。因此,在决定使用静态成员函数时,需要仔细考虑这些限制以及它们是否适合你的特定用例。
在C++中,当需要将成员函数作为回调函数传递给其他函数或对象时,常常会遇到问题。这是因为非静态成员函数在调用时需要一个对象实例(通过this指针),而回调函数通常不会提供这样的实例。因此,直接将非静态成员函数作为回调函数是不可行的。
为了解决这个问题,一种常见的方法是使用静态成员函数作为回调函数的接口。静态成员函数不依赖于任何对象实例,因此可以直接作为回调函数传递。在静态成员函数内部,可以通过其他方式(如全局变量、单例模式、参数传递等)来访问或操作类的实例。
下面是一个简单的示例来说明这个概念:
cpp
#include <iostream>
#include <functional>
class MyClass {
public:
// 静态成员函数作为回调函数
static void StaticCallback(const std::function<void()>& func) {
// 在这里调用传递进来的函数对象
func();
}
// 非静态成员函数,我们想要在某个时候以回调的形式调用它
void NonStaticMemberFunction() {
std::cout << "NonStaticMemberFunction called!" << std::endl;
}
// 一个用于绑定非静态成员函数的辅助函数
std::function<void()> BindNonStaticMemberFunction() {
MyClass* instance = this; // 保存当前对象实例的指针
return [instance]() { // 使用lambda捕获实例指针
instance->NonStaticMemberFunction(); // 调用非静态成员函数
};
}
};
int main() {
MyClass obj;
// 将非静态成员函数的绑定作为参数传递给静态回调函数
MyClass::StaticCallback(obj.BindNonStaticMemberFunction());
return 0;
}
在上面的代码中,MyClass有一个静态成员函数StaticCallback,它接受一个std::function<void()>类型的参数。这个std::function可以包含任何可调用对象,包括lambda表达式、函数指针或绑定到对象的成员函数。
NonStaticMemberFunction是MyClass的一个非静态成员函数,我们想要在某个时候以回调的形式调用它。
BindNonStaticMemberFunction是一个辅助成员函数,它返回一个std::function<void()>对象,该对象内部是一个lambda表达式,捕获了MyClass的实例指针,并在调用时通过这个指针调用NonStaticMemberFunction。
在main函数中,我们创建了一个MyClass的实例obj,并通过调用obj.BindNonStaticMemberFunction()将非静态成员函数的绑定作为参数传递给静态回调函数MyClass::StaticCallback。这样,当StaticCallback被调用时,它实际上会调用我们绑定的非静态成员函数。
这种方法允许我们绕过非静态成员函数需要对象实例的限制,使得它们能够以回调的形式被使用。然而,这种方法需要小心处理对象的生命周期,确保在回调函数被调用时对象仍然有效。
3.1可变参数列表
- va_start函数
功能:用于初始化可变参数列表的访问。它设置了一个va_list类型的变量,使其指向可变参数列表的起始位置。
工作原理:在函数内部,参数是以栈的形式存储的,从右向左依次压入栈中。va_start通过获取最后一个固定参数的地址,然后计算出可变参数列表的起始地址,并将这个地址赋值给va_list类型的变量。这样,后续就可以通过这个变量来访问可变参数列表了。
- va_arg宏
功能:用于获取可变参数列表中的下一个参数,并将其转换为指定的类型。
工作原理:va_arg通过指针偏移的方式来访问可变参数列表中的数据。在调用va_arg时,它会根据当前va_list变量所指向的地址,以及所请求的类型的大小,计算出下一个参数的地址,并将va_list变量的值更新为这个新地址。然后,返回这个地址所指向的值,并将其转换为指定的类型。
- va_end函数
功能:用于结束可变参数的获取过程,并清理为va_list变量分配的内部数据。
工作原理:在遍历完可变参数列表后,需要调用va_end来释放与va_list变量相关的资源。这通常涉及到恢复栈的状态,确保在函数返回后,栈能够正确地返回到调用前的状态。如果未正确使用va_end,可能会导致程序崩溃或产生不可预测的行为。
- va_copy函数
功能:用于复制一个va_list变量的状态到另一个va_list变量,这样两个变量都可以用来遍历相同的可变参数列表。
工作原理:va_copy简单地将源va_list变量的值(即指向可变参数列表的指针)复制到目标va_list变量中。这样,两个变量就指向了相同的可变参数列表,可以独立地进行遍历操作。需要注意的是,在使用完复制的va_list变量后,也需要调用va_end来进行清理。
在使用这些函数和宏时,通常遵循以下步骤:首先使用va_start初始化va_list变量,然后使用va_arg逐个获取可变参数,最后使用va_end进行清理。如果需要同时遍历相同的可变参数列表,可以使用va_copy来复制va_list变量的状态。
下面是一个简单的示例,展示了如何使用这些函数来创建一个接受可变数量整数的函数,并计算它们的总和:
#include <stdio.h>
#include <stdarg.h>
int sum_of_ints(int count, ...)
{
int sum = 0;
va_list args;
va_start(args, count);
for (int i = 0; i < count; i++)
{
sum += va_arg(args, int);
}
va_end(args);
return sum;
}
int main()
{
printf("Sum: %d\n", sum_of_ints(3, 1, 2, 3)); // 输出: Sum: 6
return 0;
}
在这个示例中,sum_of_ints 函数接受一个整数 count,表示后面可变参数的数量,然后使用 va_start、va_arg 和 va_end 来遍历并计算这些参数的总和。
3.2格式化输出函数
#include <stdio.h>
int printf(const char *format, ...);
int fprintf(FILE *stream, const char *format, ...);
int sprintf(char *str, const char *format, ...);
int snprintf(char *str, size_t size, const char *format, ...);
#include <stdarg.h>
int vprintf(const char *format, va_list ap);
int vfprintf(FILE *stream, const char *format, va_list ap);
int vsprintf(char *str, const char *format, va_list ap);
int vsnprintf(char *str, size_t size, const char *format, va_list ap);
3.3预定义符号
4.图解线程池运作原理
4.0完整代码
Makefile
stdThreadPool:stdTestMain.cc
g++ -o $@ $^ -std=c++11 -lpthread -DDEBUG_COMPILE
.PHONY:clean
clean:
rm -f stdThreadPool
log.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cstdarg>
#include <ctime>
#include <string>
// 日志级别
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4
const char *gLevelMap[] = {
" DEBUG ",
" NORMAL",
"WARNING",
" ERROR ",
" FATAL "};
#define LOGFILE "./threadPool.log"
// 日志功能: 日志等级 时间 用户自定义(日志内容/文件名/文件行) 等
void logMsg(int level, const char *format, ...)
{
#ifndef DEBUG_COMPILE // 非调试编译下 不输出DEBUG信息
if (level == DEBUG)
return;
#endif
// 1.标准日志内容
char stdBuf[1024];
// 1.1获取时间戳
time_t timestamp = time(nullptr);
if (timestamp == std::time_t(-1))
{
std::cerr << "获取时间失败" << std::endl;
exit(1);
}
// 1.2获取格式化时间
struct tm *CLK = std::localtime(×tamp); // tm *localtime(const time_t *__timer)
//1.3将日志信息输出到日志文件
// snprintf(stdBuf, sizeof stdBuf, "[%s] [%ld] ", gLevelMap[level], timestamp);
snprintf(stdBuf, sizeof stdBuf, "[%s] [%d/%d/%d %d:%d:%d ", gLevelMap[level],
1900 + CLK->tm_year, 1 + CLK->tm_mon, CLK->tm_mday, CLK->tm_hour, CLK->tm_min, CLK->tm_sec);
// 2.用户自定义内容
va_list args;
va_start(args, format);
char logBuf[1024];
// int vsnprintf(char *str, size_t size, const char *format, va_list ap);
vsnprintf(logBuf, sizeof logBuf, format, args);
va_end(args);
FILE *fp = fopen(LOGFILE, "a");
// fprintf(stdout, "%s%s\n", stdBuf, logBuf);
fprintf(fp, "%s%s\n", stdBuf, logBuf);
fclose(fp);
}
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx)
: _pmtx(mtx)
{
}
void lock()
{
//std::cout << "加锁中..." << std::endl;
pthread_mutex_lock(_pmtx);
}
void unlock()
{
//std::cout << "解锁中..." << std::endl;
pthread_mutex_unlock(_pmtx);
}
~Mutex()
{
}
private:
pthread_mutex_t *_pmtx;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx)
: _mtx(mtx)
{
_mtx.lock();
}
~lockGuard()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include "log.hpp"
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task() {}
Task(int x, int y, func_t func)
: _x(x),
_y(y),
_startRoutine(func)
{
}
void operator()(const std::string &threadName)
{
logMsg(WARNING, "%s 处理任务: %d+%d=%d | %s | %d | %s | %s",
threadName.c_str(), _x, _y, _startRoutine(_x, _y), __FILE__, __LINE__, __DATE__, __TIME__);
}
public:
int _x;
int _y;
func_t _startRoutine;
};
thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
// typedef std::function<void* (void*)> fun_t;
typedef void *(*fun_t)(void *);
class ThreadInfo
{
public:
std::string _threadName;
void *_ptrThreadPool;
};
class Thread
{
public:
Thread(int index, fun_t startRoutine, void *ptrTotp)
: _startRoutine(startRoutine)
{
char nameBuf[64];
snprintf(nameBuf, sizeof nameBuf, "Thread-%d", index);
_name = nameBuf;
_tInfo._threadName = _name;
_tInfo._ptrThreadPool = ptrTotp;
}
void start()
{
pthread_create(&_tid, nullptr, _startRoutine, (void *)&_tInfo);
}
void join()
{
pthread_join(_tid, nullptr);
}
std::string name()
{
return _name;
}
~Thread()
{
}
private:
pthread_t _tid;
std::string _name;
fun_t _startRoutine;
ThreadInfo _tInfo;
};
stdThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int g_threadNum = 3;
template <class T>
class stdThreadPool
{
public:
pthread_mutex_t *getMutex()
{
return &lock;
}
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
bool isEmpty()
{
return _taskQueue.empty();
}
T getTask()
{
T task = _taskQueue.front();
_taskQueue.pop();
return task;
}
static void *startRoutine(void *args)
{
ThreadInfo *threadInfo = (ThreadInfo *)args;
stdThreadPool<T> *ptrTotp = (stdThreadPool<T> *)threadInfo->_ptrThreadPool;
while (true)
{
T task;
{
lockGuard lockguard(ptrTotp->getMutex());
while (ptrTotp->isEmpty())
ptrTotp->waitCond();
task = ptrTotp->getTask();
}
task(threadInfo->_threadName);
}
}
// 构造函数
stdThreadPool(int threadNum = g_threadNum)
: _threadNum(threadNum)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
for (int i = 1; i <= _threadNum; i++)
{
// 初始化列表区域 对象还未存在 走到函数块{}内 对象已存在 可以使用this指针
_threads.push_back(new Thread(i, startRoutine, this));
}
}
// 启动多线程
void run()
{
for (auto &iter : _threads)
{
iter->start();
logMsg(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
}
}
void pushTask(const T &task)
{
lockGuard lockguard(&lock);
_taskQueue.push(task);
pthread_cond_signal(&cond);
}
// 析构函数
~stdThreadPool()
{
for (auto &iter : _threads)
{
iter->join();
delete iter;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
int _threadNum;
std::vector<Thread *> _threads;
std::queue<T> _taskQueue;
pthread_mutex_t lock;
pthread_cond_t cond;
};
stdTestMain.cc
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
#include "stdThreadPool.hpp"
#include "Task.hpp"
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
stdThreadPool<Task> *tp = new stdThreadPool<Task>();
tp->run();
while (true)
{
// 生产数据/制作任务 -- 耗费时间
int x = rand() % 10 + 1;
usleep(1000);
int y = rand() % 5 + 1;
Task t(x, y, [](int x, int y) -> int
{ return x + y; });
logMsg(DEBUG, "Main-Pro 发送任务: %d+%d=未知", x, y);
// 推送任务到线程池中
tp->pushTask(t);
sleep(1);
}
return 0;
}
4.1详细图解
4.2运行结果
5.指针版的线程池
- 搞两个queue1, queue2
- std::queue *p_queue, *c_queue; p_queue->queue1 ;c_queue->queue2
- p_queue->生产一批任务之后,swap(p_queue, c_queue), 唤醒所有线程 / 一个线程
- 消费者处理完毕,swap(p_queue, c_queue)
- 生产和消费用的是不同的队列,进行资源的处理【线程安全问题】的时候,仅仅是指针