一.生产者-消费者模型
生产者-消费者模型是一个十分经典的多线程并发协作模式。所谓的生产者-消费者,实际上包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域(临界区)。就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中获取数据,不需要关心生产者的行为。举一个简单的例子,在网络通信过程中消费者可以是多个数据写入线程,负责向输入缓冲区写入以太网数据。消费者为多个数据处理线程,负责网络数据解析与处理。需要注意,这里的生产者与消费者是两类线程,实现了数据收发与数据处理的解耦合。好处一方面在于某一线程的错误不会导致整体传输-解析架构的崩溃,另一方面是可以使得整体程序结构变得清晰明了利于后期维护。
二.实现方式(条件变量)
std::conditon_variable(c11)在头文件<condition_variable>定义,它是与std::mutex一起使用的同步原语。用于阻塞一个线程或者同时阻塞多个线程,直至另一个线程修改共享条件变量并通知。阻塞多个线程等待通知有利于减少CPU负载,特别是在多个线程的情况下。举个例子:生产者线程修改输入缓存区后,通知消费者线程进行处理。消费者线程在没有接到通知时,一直处在阻塞等待通知的状态,对比死循环加条件判断的形式更节省OS资源消耗。
有意修改条件变量状态的线程需要注意:
1.必须获得std::mutex所有权。(上锁)
2.在保有锁的时候进行修改操作。
3.在std::condition_variable上执行notify_one或notify_all(释放锁后通知)。
任何有意在std::condition_variable上等待的线程需要注意:
1.在用于保护共享变量的互斥体上获得std::unique_lock<std::mutex>。
2.执行检查:
1.检查条件。
2.调用wait,wait_for,wait_until。
3.检查条件,并在为满足的条件下继续阻塞等待。
示例:与mutex同步实现进程间通信
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// 等待直至 main() 发送数据
std::unique_lock lk(m);
cv.wait(lk, []{ return ready; });
// 等待后,我们占有锁
std::cout << "工作线程正在处理数据\n";
data += "(处理后)";
// 发送数据回 main()
processed = true;
std::cout << "工作线程指示数据已经处理完成\n";
// 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one)
lk.unlock();
cv.notify_one();
}
int main()
{
std::thread worker(worker_thread);
data = "数据样例";
// 发送数据到 worker 线程
{
std::lock_guard lk(m);
ready = true;
std::cout << "main() 指示数据已准备好进行处理\n";
}
cv.notify_one();
// 等候 worker
{
std::unique_lock lk(m);
cv.wait(lk, []{ return processed; });
}
std::cout << "返回 main(),data = " << data << '\n';
worker.join();
}
需注意,在通知前先释放锁,不然会出现子线程唤醒后再次阻塞的风险(假通知)。这是一种常见的条件竞争。
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
std::condition_variable cv;
std::mutex cv_m;
int i = 0;
bool done = false;
void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
std::cout << "等待... \n";
cv.wait(lk, []{ return i == 1; });
std::cout << "...结束等待; i == " << i << '\n';
done = true;
}
void signals()
{
std::this_thread::sleep_for(200ms);
std::cout << "假通知...\n";
cv.notify_one(); // 等待线程被通知且 i == 0。
// cv.wait 被唤醒,检查 i,再回到等待
std::unique_lock<std::mutex> lk(cv_m);
i = 1;
while (!done)
{
std::cout << "真的改动通知...\n";
lk.unlock();
cv.notify_one(); // 等待线程被通知且 i == 1,cv.wait 返回
std::this_thread::sleep_for(300ms);
lk.lock();
}
}
int main()
{
std::thread t1(waits), t2(signals);
t1.join();
t2.join();
}
三.使用条件变量实现简单的多线程通信示例
#include <iostream>
#include <string>
#include <sstream>
#include <list>
#include <thread>
#include <condition_variable>
#include <mutex>
std::mutex mux;
std::condition_variable cv;
std::list<std::string> buffer;
void thRead(int i){
for (;;) {
std::unique_lock<std::mutex> lock(mux);
cv.wait(lock, [](){ return !buffer.empty(); }); //wait解锁阻塞等待唤醒,上锁判断Lambda
//如果Lambda返回true获得锁进入处理逻辑
while (!buffer.empty()){
std::cout << "Read Thread " << i << "\n" << buffer.front() << std::endl;
buffer.pop_front();
}
}
}
void thWrite(){
for (int i = 0; ; ++i) {
std::unique_lock<std::mutex> lock(mux); //获取锁
std::stringstream ss;
ss << "Write " << i + 1 << " Times.";
buffer.push_back(ss.str());
lock.unlock();//先解锁再进行通知,防止读线程死锁
cv.notify_one();
std::this_thread::sleep_for(std::chrono::seconds(1)); //1s写入一次数据
}
}
int main() {
std::thread write(thWrite);
write.detach();
for (int i = 0; i < 3; ++i) {
std::thread th(thRead, i + 1);
th.detach();
}
getchar();
return 0;
}
四.通过信号量通知线程关闭
//
// Created by zty19 on 24-6-3.
//
#include "xmessage.h"
#include <iostream>
void XMessage::stop(){
exit_flag_ = true;
cv.notify_all();
wait();
}
void XMessage::send(const std::string &msg) {
std::unique_lock<std::mutex> lock(buffer_mutex_);
send_buffer_.push_back(std::move(msg));
lock.unlock();
cv.notify_one(); //发布唤醒标志
}
void XMessage::Main() {
while (!is_exit()) {
std::unique_lock<std::mutex> lock(buffer_mutex_);
//std::this_thread::sleep_for(std::chrono::microseconds(100));
cv.wait(lock, [this](){
if (is_exit()) return true;
return !send_buffer_.empty(); });
if (is_exit()) break;
while (!send_buffer_.empty()){
std::string tmp = send_buffer_.front();
send_buffer_.pop_front();
std::cout << tmp << std::endl;
}
}
}