文章目录
- 一、如何实现一个线程
- 1、基本结构
- 2、实现成员函数
- 3、演示
- 4、代码总汇
- `Thread.hpp`
- `Main.cc`
- 二、如何封装线程池
- 1、设计成员变量
- 2、构造函数与析构函数
- 3、初始化
- 4、启动与回收
- 5、主线程放入任务
- 6、子线程读取任务
- 7、终止线程池
- 三、测试
- 四、线程池总代码
- 1、`ThreadPool.hpp`
- 2、`Main.cc`
一、如何实现一个线程
1、基本结构
Thread类是用来描述一个线程的,所以成员变量中需要有tid和线程名。所以第一个成员变量就是==_tid==;
一个线程创建出来,肯定是用来执行对于的任务的,那么我们还需要一个成员变量来接收传递的函数,所以第三个成员变量就是==_func==,是一个void(T&)类型的参数,所以第四个成员变量就是传递过来的参数_data。
template<class T>
using func_t = std::function<void(T&)>;//模版方法
template<class T>
class Thread
{
public:
Thread(){}
static void* ThreadRoutinue(){}
void Start(){}
void Join(){}
void Detach(){}
~Thread(){}
private:
pthread_t _tid; //线程tid
std::string _threadname; //线程名
func_t<T> _func; //线程执行的函数
T _data; //需要处理的数据
};
2、实现成员函数
成员函数中我们需要注意的就是
pthread_create()
函数,它的第三个参数是一个参数为void *,返回值为void *的一个函数。但是我们将这个函数
ThreadRoutinue()
定义在这个类里,他就是一个成员函数,成员函数的参数,会隐藏this指针,所以如果我们正常写,就会报错,因为这个函数不满足pthread_create()
函数的条件。但是只要让
ThreadRoutinue()
的参数中没有this指针就可以了,那么该如何实现呢?在前面加上static就可以,变成静态成员变量,就不会有this指针了。那么问题又来了,没有了this指针,我们又该如何访问到成员变量呢?
别忘了这个函数可以传递一个返回值为void*的参数,我们只需要将this指针传递过去,在函数内部强转一下就可以了。
template<class T>
using func_t = std::function<void(T&)>;//模版方法
template<class T>
class Thread
{
public:
//thread(func,5,"thread-1");
Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
:_func(func),_data(data),_threadname(name)
{}
//需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了
static void* ThreadRoutinue(void* args)
{
//将传过来的this指针强转一下,然后就可以访问到_func和_data了
Thread<T>* self = static_cast<Thread<T>*>(args);
self->_func(self->_data);
return nullptr;
}
void Start()
{
//创建线程
int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);
return ret==0;
}
void Join()
{
pthread_join(_tid,nullptr);
}
void Detach()
{
pthread_detach(_tid);
}
~Thread(){}
private:
pthread_t _tid; //线程tid
std::string _threadname; //线程名
func_t<T> _func; //线程执行的函数
T _data; //需要处理的数据
};
3、演示
让我们写一段测试代码,来看一下效果:
#include"Thread.hpp"
void test(int x)
{
while(true)
{
std::cout<<x<<std::endl;
sleep(1);
}
}
int main()
{
MyThread::Thread<int> mt(test,2025,"thread-1");
if(mt.Start() == true)
{
std::cout<<"MyThread start success!\n";
}
else
{
std::cout<<"MyThread start failed!\n";
}
mt.Join();
return 0;
}
运行结果:
让我们使用ps -aL
指令来查看一下是否真的创建了线程:
可以看到,程序运行之后,真的创建出了两个名为mythread的线程。
4、代码总汇
Thread.hpp
#pragma once
#include<string>
#include<pthread.h>
#include<unistd.h>
#include<iostream>
#include<functional>
namespace MyThread
{
template<class T>
using func_t = std::function<void(T&)>;//模版方法
template<class T>
class Thread
{
public:
//thread(func,5,"thread-1");
Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
:_func(func),_data(data),_threadname(name)
{}
//需要设置成static静态成员函数,否则参数会多一个this指针,就不符合pthread_create的要求了
static void* ThreadRoutinue(void* args)
{
//将传过来的this指针强转一下,然后就可以访问到_func和_data了
Thread<T>* self = static_cast<Thread<T>*>(args);
self->_func(self->_data);
return nullptr;
}
bool Start()
{
//创建线程
int ret = pthread_create(&_tid,nullptr,ThreadRoutinue,this);
return ret==0;
}
void Join()
{
pthread_join(_tid,nullptr);
}
void Detach()
{
pthread_detach(_tid);
}
~Thread(){}
private:
pthread_t _tid; //线程tid
std::string _threadname; //线程名
func_t<T> _func; //线程执行的函数
T _data; //需要处理的数据
};
}
Main.cc
#include"Thread.hpp"
void test(int x)
{
while(true)
{
std::cout<<x<<std::endl;
sleep(1);
}
}
int main()
{
MyThread::Thread<int> mt(test,2025,"thread-1");
if(mt.Start() == true)
{
std::cout<<"MyThread start success!\n";
}
else
{
std::cout<<"MyThread start failed!\n";
}
mt.Join();
return 0;
}
二、如何封装线程池
1、设计成员变量
线程池内部维护多个线程和一个任务队列,主线程将任务放入任务队列当中,然后子线程就从任务队列中拿取任务进行处理。
所以需要一个数组来管理多个线程:
_threads
以及一个任务队列:
_taskQueue
此外我们还需要知道一共有多少个线程:
_threadNum
然后还可以设置一个变量来查看真在等待任务的线程数目:
_waitNum
最后我们再设置一个变量来判断当前线程池是否运行,如果已经退出了,我们需要将任务队列中的任务处理完再退出:
_isRunning
定义这些变量就够了吗?
我们忽略了一个多线程编程中最重要的问题:线程之间的互斥和同步
我们的任务是:主线程往任务队列中放入任务,子线程从任务队列中拿取任务。那么我们思考一下下面几个问题:
多个线程之间可以同时从任务队列中拿任务吗?
答:不能,任务队列是临界资源,线程和线程之间要互斥,否则会出现不同的线程拿取同一个任务的情况。
主线程放入任务时,子线程可以同时拿取任务吗?
答:不能,主线程和子线程之间也需要互斥。
因为他们都是竞争任务队列这一个资源,所以我们只要定一个一把锁就可以了。这样互斥的问题就解决了。
那么同步呢?
是不是只有任务队列中有任务时,子线程才能获取任务,所以需要主线程先放任务,子线程才能拿任务,这就需要一个条件变量来维护。
综上:
我们还需要两个成员变量:
_mutex
和_cond
#include"Thread.hpp"
#include<vector>
template<class T>
class ThreadPool
{
private:
std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程
std::queue<T> _taskQueue;//任务队列
int _threadNum;//线程数
int _waitNum;//等待的线程数
bool _isRunning;//线程池是否在运行
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _cond;//条件变量
};
2、构造函数与析构函数
构造和析构的主要作用就是对
_mutex
和_cond
的初始化和销毁。同时我们还需要知道这个线程池需要创建多少个线程,所以需要外部传递参数来告诉我们。
然后就是构造函数对其他成员变量进行初始化。
template<class T>
class ThreadPool
{
public:
ThreadPool(const int num = 5)
:_threadNum(num),_waitNum(0),_isRunning(false)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
}
3、初始化
上面的构造函数只是创建了锁和条件变量,以及部分变量的初始化,并没有创建出线程对象。
我们可以定义一个
ThreadInit()
函数来创建线程。让我们先回顾一下
Thread
的构造函数需要哪些变量:Thread(func_t<T> func,const T &data,const std::string &name = "none-name")
func
参数是需要调用的函数,data
是这个函数需要处理的数据,name
是线程名。在此,我们让线程去执行一个叫做
handerTask
的函数,这个函数内部实现线程到任务队列中获取任务的过程。而
handerTask
的第一个参数也是线程的名字,以便在handerTask
内部查看是哪个线程执行了任务。这里我们使用
bind
函数来将HanderTask
函数与this
参数绑定在一起,并且将这个参数绑定到HandleTask
的第一个参数位置。
void HanderTask(std::string)
{
//执行任务队列的任务
}
void InitThread()
{
for(int i=0;i<_threadNum;i++)
{
auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);
std::string name = "Thread-"+std::to_string(i);
//_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员
_threads.emplace_back(func,name,name);
}
_isRunning = true;
}
4、启动与回收
我们已经创建出来了一批线程,接下来还需要启动这一批线程,并且回收。
因此还需要定义成员函数
StartAll
和JoinAll
来启动和等待这批线程。
void StartAll()
{
for(auto& thread : _threads)
{
thread.Start();
}
}
void JoinAll()
{
for(auto& thread : _threads)
{
thread.Join();
}
}
5、主线程放入任务
我们可以定义一个
EnQueue
,用来让主线程往任务队列中投放任务。投放任务的要求:
- 访问队列时需要与其他线程互斥,即对
_mutex
加锁;- 添加任务后,就可以唤醒在等待的线程了
void EnQueue(const T& task)
{
pthread_mutex_lock(&_mutex);
if(_isRunning)
{
_taskQueue.push(task);
if(_waitNum > 0)
{
pthread_cond_signal(&_cond);
}
}
pthread_mutex_unlock(&_mutex);
}
6、子线程读取任务
子线程读取任务的要求如下:
- 保持互斥,从任务队列获取数据前需要加锁,获取结束后解锁;
- 保持同步,如果任务队列中没有数据,就去
_cond
下等待,等待被唤醒。
void HanderTask(std::string name)
{
//子线程需要一直处理,所以这里使用死循环
while(true)
{
pthread_mutex_lock(&_mutex);
while(_taskQueue.empty())//这里是while循环,不是if判断,避免伪唤醒
{
_waitNum++;
pthread_cond_wait(&_cond,&_mutex);
_waitNum--;
}
T task = _taskQueue.front();
_taskQueue.pop();
std::cout<<name<<"get a task..."<<std::endl;
pthread_mutex_unlock(&_mutex);
task();
}
}
这里需要注意一点,判断当前任务队列是否为空时,使用的是while循环,而不是if语句,因为当前线程被主线程唤醒之后,可能会发生伪唤醒,其实任务队列中根本没有任务。所以还要进入下一次while判断,确保访问任务队列时,一定是有任务的。
但是目前还有一个问题,如果线程访问任务队列时,线程池被终止了怎么办?
我们可以通过
_isRunning
来判定,在执行任务时判断一下_isRunning
的值:
- 如果为true:正常运行
- 如果为false:
- 如果任务队列中还有任务:把任务执行完
- 如果没有任务:当前线程退出
于是我们的代码改进为:
void HanderTask(std::string name)
{
//子线程需要一直处理,所以这里使用死循环
while(true)
{
pthread_mutex_lock(&_mutex);
while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒
{
_waitNum++;
pthread_cond_wait(&_cond,&_mutex);
_waitNum--;
}
//线程池终止了,并且任务队列中没有任务 --> 线程退出
if(_taskQueue.empty()&&!_isRunning)
{
pthread_mutex_unlock(&_mutex);
std::coud<<name<<" quit..."<<std::endl;
break;
}
//走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出
T task = _taskQueue.front();
_taskQueue.pop();
std::cout<<name<<" get a task..."<<std::endl;
pthread_mutex_unlock(&_mutex);
task();
}
}
7、终止线程池
终止线程池不仅仅是将
_isRunning
设置为false这么简单,需要考虑以下问题:
如果在
Stop
的时候,有线程正在调用HanderTask
函数怎么办?答:此时多个线程访问变量
_isRunning
,就有可能会造成线程安全问题,所以访问_isRunning
时也要加锁,由于之前所有的访问_isRunning
的操作,都在_mutex
锁中,所以和之前共用同一把锁就行。如果
Stop
之后,还有线程在_cond
下面等待怎么办?答:如果线程一直在
_cond
下面等待,就会导致无法退出,此时在_isRunning = false
之后,还要通过pthread_cond_broadcast
唤醒所有等待的线程,让他们重新执行HanderTask
的逻辑,从而正常退出。
void Stop()
{
pthread_mutex_lock(&_mutex);
_isRunning = false;//终止线程池
pthread_cond_broadcast(&_cond);//唤醒所有等待的线程
pthread_mutex_unlock(&_mutex);
}
三、测试
我们可以用以下代码进行测试:
#include <iostream>
#include <vector>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include"ThreadPool.hpp"
int Add()
{
int a = rand() % 100 + 1;
int b = rand() % 100 + 1;
std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;
return a+b;
}
int main()
{
srand(static_cast<unsigned int>(time(nullptr)));
ThreadPool<int(*)(void)> tp(3);
tp.ThreadInit();
tp.StartAll();
for (int i = 0; i < 10; i++)
{
tp.EnQueue(Add);
sleep(1);
}
tp.Stop();
tp.JoinAll();
return 0;
}
通过
ThreadPool<int(*)(void)> tp(3);
创建有三个线程的线程池,执行的任务类型为int(void)
,但是要注意,此处要传入可调用对象,C++的可调用对象有:函数指针,仿函数,lambda表达式。此处我用了函数指针int(*)(void)
。接着
ThreadInit
初始化线程池,此时线程对象Thread已经创建出来了,但是还有没创建线程。随后调用StartAll
,此时才真正创建了线程。然后进入一个for循环,给任务队列派发任务,总共派发十个任务,都是函数
Add
,其中生成两个随机数的加法。最后调用
Stop
终止退出线程池,此时线程也会一个个退出,然后调用JoinAll
回收所有线程。
运行结果:
四、线程池总代码
1、ThreadPool.hpp
#include"Thread.hpp"
#include<vector>
#include<queue>
#include<string>
#include <unistd.h>
#include <pthread.h>
template<class T>
class ThreadPool
{
public:
ThreadPool(const int num = 5)
:_threadNum(num),_waitNum(0),_isRunning(false)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond,nullptr);
}
void HanderTask(std::string name)
{
//子线程需要一直处理,所以这里使用死循环
while(true)
{
pthread_mutex_lock(&_mutex);
while(_taskQueue.empty()&&_isRunning)//这里是while循环,不是if判断,避免伪唤醒
{
_waitNum++;
pthread_cond_wait(&_cond,&_mutex);
_waitNum--;
}
//线程池终止了,并且任务队列中没有任务 --> 线程退出
if(_taskQueue.empty()&&!_isRunning)
{
pthread_mutex_unlock(&_mutex);
std::cout<<name<<" quit..."<<std::endl;
break;
}
//走到这里无论线程池是否终止,都一定还有任务要执行,将任务执行完再退出
T task = _taskQueue.front();
_taskQueue.pop();
std::cout<<name<<" get a task..."<<std::endl;
pthread_mutex_unlock(&_mutex);
task();
}
}
void ThreadInit()
{
for(int i=0;i<_threadNum;i++)
{
auto func = bind(&ThreadPool::HanderTask,this,std::placeholders::_1);
std::string name = "Thread-"+std::to_string(i);
//_threads.push_back(HanderTask,name,name);//第一个name是handerTask的参数,第二个name是Thread内部的成员
_threads.emplace_back(func,name,name);
}
_isRunning = true;
}
void StartAll()
{
for(auto& thread : _threads)
{
thread.Start();
}
}
void JoinAll()
{
for(auto& thread : _threads)
{
thread.Join();
}
}
void EnQueue(const T& task)
{
pthread_mutex_lock(&_mutex);
if(_isRunning)
{
_taskQueue.push(task);
if(_waitNum > 0)
{
pthread_cond_signal(&_cond);
}
}
pthread_mutex_unlock(&_mutex);
}
void Stop()
{
pthread_mutex_lock(&_mutex);
_isRunning = false;//终止线程池
pthread_cond_broadcast(&_cond);//唤醒所有等待的线程
pthread_mutex_unlock(&_mutex);
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
std::vector<MyThread::Thread<std::string>> _threads;//用数组管理多个线程
std::queue<T> _taskQueue;//任务队列
int _threadNum;//线程数
int _waitNum;//等待的线程数
bool _isRunning;//线程池是否在运行
pthread_mutex_t _mutex;//互斥锁
pthread_cond_t _cond;//条件变量
};
2、Main.cc
#include <iostream>
#include <vector>
#include <string>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include"ThreadPool.hpp"
int Add()
{
int a = rand() % 100 + 1;
int b = rand() % 100 + 1;
std::cout<<a<<" + "<<b<<" = "<<a+b<<std::endl;
return a+b;
}
int main()
{
srand(static_cast<unsigned int>(time(nullptr)));
ThreadPool<int(*)(void)> tp(3);
tp.ThreadInit();
tp.StartAll();
for (int i = 0; i < 10; i++)
{
tp.EnQueue(Add);
sleep(1);
}
tp.Stop();
tp.JoinAll();
return 0;
}