来源:多线程学习
目录
condition_variable与其使用场景
生产者与消费者模型
C++11实现跨平台线程池
condition_variable与其使用场景
生产者与消费者模型
生产者-消费者模式是一种经典的多线程设计模式,用于解决多个线程之间的数据共享和协作问题。在生产者-消费者模式中,有两类线程:生产者线程和消费者线程。它们之间通过共享一个缓冲区(或队列)来协作,生产者将数据放入缓冲区,消费者从缓冲区取出数据并进行处理。
生产者-消费者模式的主要目标是实现生产者和消费者之间的解耦,使它们可以独立地进行工作,从而提高系统的性能和可维护性。
condition_variable的步骤如下:
- 创建一个condition_variable对象
- 创建一个互斥锁 mutex 对象,用来保护共享资源的访问。
-
在需要等待条件变量的地方:使用unique_lock<mutex>对象锁定互斥锁,并调用condition_variable::wait()、condition_variable::wait_for()或condition_variable::wait_until()函数等待条件变量。
-
在其他线程中需要通知等待的线程时,调用condition_variable::notify_one()或condition_variable::notify_all()函数通知等待的线程。
代码参考:
#include <iostream>
#include <thread>
#include <mutex>
#include <string>
#include <condition_variable>
#include <queue>
using namespace std;
queue<int> g_queue; //任务队列
condition_variable g_cv;
mutex mtx;
//实现生产者
void Producer() {
for (int i = 0; i < 10; ++i) {
{
unique_lock<mutex> lock(mtx);
g_queue.push(i);
//加任务的时候哦,需要通知消费者来取任务
g_cv.notify_one();
cout << "Producer: " << i << endl;
}
this_thread::sleep_for(chrono::microseconds(100)); //休眠100ms
}
}
//实现消费者
void Consumer() {
while (1) {
unique_lock<mutex> lock(mtx);
//如果队列为空,理应需要等待
//第二个参数是函数指针(可以用lambda表达式)返回的true则不堵塞,false则阻塞,注意阻塞的时候会释放资源所以不会发生死锁
g_cv.wait(lock, []() {return !g_queue.empty(); });
int value = g_queue.front();
g_queue.pop();
cout << "Consumer:" << value << endl;
}
}
int main() {
thread t1(Producer);
thread t2(Consumer);
t1.join();
t2.join();
return 0;
}
C++11实现跨平台线程池
线程池符合的就是生产者和消费者模型。线程池提前维护一个线程的数组和一个任务队列,不同的让线程去完成队列里的任务。
使用线程池可以解决不断销毁创建线程的消耗。
代码参考:
//实现线程池
#include <iostream>
#include <thread>
#include <mutex>
#include <string>
#include <condition_variable> //条件变量
#include <queue>
#include <thread>
#include<functional> //对象包装器
using namespace std;
//创先线程池的类
class ThreadPool {
public:
//构造函数
ThreadPool(int numThreads) : stop(false) {
for (int i = 0; i < numThreads; i++) {
threads.emplace_back([this] {
while (1) {
unique_lock<mutex> lock(mtx); //互斥锁,因为线程是操作任务队列的
condition.wait(lock, [this] { //判断任务队列里面是否有任务,且是否停止
return !tasks.empty() || stop;
});
if (stop && tasks.empty()) { //如果线程终止了,则结束线程
return;
}
function<void()> task(move(tasks.front())); //拷贝构造,但是使用move能够防止赋值
tasks.pop();
lock.unlock(); //取完任务之后,解锁,让其他线程可以接续取任务
task();
}
});
}
}
//析构函数
~ThreadPool() {
//手动加{}提供作用域
{
unique_lock<mutex> lock(mtx);
stop = true;
}
condition.notify_all(); //通知线程完成所有任务
for (thread& t : threads) { //这个地方要使用引用,因为线程是不可以复制的
t.join();
}
}
//加任务,加函数,使用模版可以实现可变参数
template<class F,class... Args>
void enqueue(F&& f, Args&&...args) { //&&万能引用
function<void()> task = bind(forward<F>(f), forward<Args>(args)...);
{
unique_lock <mutex> lock(mtx);
tasks.emplace(move(task));
}
condition.notify_one();
}
private:
vector<thread> threads; //线程数组
queue<function<void()>> tasks; //任务队列,队列里面包含的是函数模版
mutex mtx; //互斥量
condition_variable condition; //条件变量
bool stop;
};
int main() {
ThreadPool pool(4);
for (int i = 0; i < 10; i++) {
pool.enqueue([i] {
cout << "task: " << i << " start" << endl;
this_thread::sleep_for(chrono::seconds(1));
cout << "task: " << i << " down" << endl;
});
}
return 0;
}
知识点汇总:
1. thread线程库:vector<thread> threads; //线程数组
2.function函数模版:
3.mutex互斥锁
4.condtion_variable 条件变量:解决生产者消费者问题
5.lambda表达式-匿名函数
6.move移动语义
7.template<class F, class...Args>
8.&&右值引用,万能引用
9.bind函数适配器,绑定函数和函数参数
10.forward完美转发,配合&&实现万能引用