C++ Thread多线程并发记录(8)生产者-消费者模型与信号量(条件变量)

一.生产者-消费者模型

        生产者-消费者模型是一个十分经典的多线程并发协作模式。所谓的生产者-消费者,实际上包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据为了解耦生产者和消费者的关系,通常会采用共享的数据区域(临界区)。就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中获取数据,不需要关心生产者的行为。举一个简单的例子,在网络通信过程中消费者可以是多个数据写入线程,负责向输入缓冲区写入以太网数据。消费者为多个数据处理线程,负责网络数据解析与处理。需要注意,这里的生产者与消费者是两类线程,实现了数据收发与数据处理的解耦合。好处一方面在于某一线程的错误不会导致整体传输-解析架构的崩溃,另一方面是可以使得整体程序结构变得清晰明了利于后期维护。

二.实现方式(条件变量)

        std::conditon_variable(c11)在头文件<condition_variable>定义,它是与std::mutex一起使用的同步原语。用于阻塞一个线程或者同时阻塞多个线程,直至另一个线程修改共享条件变量并通知。阻塞多个线程等待通知有利于减少CPU负载,特别是在多个线程的情况下。举个例子:生产者线程修改输入缓存区后,通知消费者线程进行处理。消费者线程在没有接到通知时,一直处在阻塞等待通知的状态,对比死循环加条件判断的形式更节省OS资源消耗。

有意修改条件变量状态的线程需要注意:

1.必须获得std::mutex所有权。(上锁)

2.在保有锁的时候进行修改操作。

3.在std::condition_variable上执行notify_one或notify_all(释放锁后通知)。

任何有意在std::condition_variable上等待的线程需要注意:

1.在用于保护共享变量的互斥体上获得std::unique_lock<std::mutex>。

2.执行检查:

        1.检查条件。

        2.调用wait,wait_for,wait_until。

        3.检查条件,并在为满足的条件下继续阻塞等待。

示例:与mutex同步实现进程间通信

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
 
void worker_thread()
{
    // 等待直至 main() 发送数据
    std::unique_lock lk(m);
    cv.wait(lk, []{ return ready; });
 
    // 等待后,我们占有锁
    std::cout << "工作线程正在处理数据\n";
    data += "(处理后)";
 
    // 发送数据回 main()
    processed = true;
    std::cout << "工作线程指示数据已经处理完成\n";
 
    // 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one)
    lk.unlock();
    cv.notify_one();
}
 
int main()
{
    std::thread worker(worker_thread);
 
    data = "数据样例";
    // 发送数据到 worker 线程
    {
        std::lock_guard lk(m);
        ready = true;
        std::cout << "main() 指示数据已准备好进行处理\n";
    }
    cv.notify_one();
 
    // 等候 worker
    {
        std::unique_lock lk(m);
        cv.wait(lk, []{ return processed; });
    }
    std::cout << "返回 main(),data = " << data << '\n';
 
    worker.join();
}

需注意,在通知前先释放锁,不然会出现子线程唤醒后再次阻塞的风险(假通知)。这是一种常见的条件竞争。

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
 
std::condition_variable cv;
std::mutex cv_m;
int i = 0;
bool done = false;
 
void waits()
{
    std::unique_lock<std::mutex> lk(cv_m);
    std::cout << "等待... \n";
    cv.wait(lk, []{ return i == 1; });
    std::cout << "...结束等待; i == " << i << '\n';
    done = true;
}
 
void signals()
{
    std::this_thread::sleep_for(200ms);
    std::cout << "假通知...\n";
    cv.notify_one(); // 等待线程被通知且 i == 0。
                     // cv.wait 被唤醒,检查 i,再回到等待
 
    std::unique_lock<std::mutex> lk(cv_m);
    i = 1;
    while (!done) 
    {
        std::cout << "真的改动通知...\n";
        lk.unlock();
        cv.notify_one(); // 等待线程被通知且 i == 1,cv.wait 返回
        std::this_thread::sleep_for(300ms);
        lk.lock();
    }
}
 
int main()
{
    std::thread t1(waits), t2(signals);
    t1.join(); 
    t2.join();
}

三.使用条件变量实现简单的多线程通信示例

#include <iostream>
#include <string>
#include <sstream>
#include <list>
#include <thread>
#include <condition_variable>
#include <mutex>

std::mutex mux;
std::condition_variable cv;
std::list<std::string> buffer;

void thRead(int i){
    for (;;) {
        std::unique_lock<std::mutex> lock(mux);
        cv.wait(lock, [](){ return !buffer.empty(); });  //wait解锁阻塞等待唤醒,上锁判断Lambda
        //如果Lambda返回true获得锁进入处理逻辑
        while (!buffer.empty()){
            std::cout << "Read Thread " << i << "\n" << buffer.front() << std::endl;
            buffer.pop_front();
        }
    }
}
void thWrite(){
    for (int i = 0; ; ++i) {
        std::unique_lock<std::mutex> lock(mux); //获取锁
        std::stringstream ss;
        ss << "Write " << i + 1 << " Times.";
        buffer.push_back(ss.str());

        lock.unlock();//先解锁再进行通知,防止读线程死锁
        cv.notify_one();

        std::this_thread::sleep_for(std::chrono::seconds(1));   //1s写入一次数据
    }
}


int main() {
    std::thread write(thWrite);
    write.detach();
    for (int i = 0; i < 3; ++i) {
        std::thread th(thRead, i + 1);
        th.detach();
    }

    getchar();
    return 0;
}

四.通过信号量通知线程关闭

//
// Created by zty19 on 24-6-3.
//
#include "xmessage.h"
#include <iostream>

void XMessage::stop(){
    exit_flag_ = true;
    cv.notify_all();
    wait();
}

void XMessage::send(const std::string &msg) {
    std::unique_lock<std::mutex> lock(buffer_mutex_);
    send_buffer_.push_back(std::move(msg));
    lock.unlock();
    cv.notify_one(); //发布唤醒标志
}

void XMessage::Main() {
    while (!is_exit()) {
        std::unique_lock<std::mutex> lock(buffer_mutex_);
        //std::this_thread::sleep_for(std::chrono::microseconds(100));
        cv.wait(lock, [this](){
            if (is_exit()) return true;
            return !send_buffer_.empty(); });
        if (is_exit()) break;
        while (!send_buffer_.empty()){
            std::string tmp = send_buffer_.front();
            send_buffer_.pop_front();
            std::cout << tmp << std::endl;
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/688985.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

苹果Safari怎么清理缓存?原来快速清除浏览器的历史记录那么容易

在数字化时代&#xff0c;互联网已经成为我们日常生活中不可或缺的一部分。我们使用各种设备&#xff0c;如智能手机、平板电脑和笔记本电脑来浏览网页、获取信息、娱乐和社交。而在这些设备中&#xff0c;iPhone无疑是最受欢迎的选择之一。iPhone搭载的Safari浏览器以其简洁的…

three.js官方案例(animation / multiple)webgl_animation_multiple.html学习笔记

目录 ​编辑 1 骨架工具&#xff08;SkeletonUtils&#xff09; 1.1 clone方法 2 蒙皮网格&#xff08;SkinnedMesh&#xff09; 3 自测 4 webgl_animation_multiple.html全部脚本 1 骨架工具&#xff08;SkeletonUtils&#xff09; 用于操控 Skeleton、 SkinnedMesh、和…

Spring AI 第二讲 之 Chat Model API 第八节Anthropic 3 Chat

Anthropic Claude 是一系列基础人工智能模型&#xff0c;可用于各种应用。对于开发人员和企业来说&#xff0c;您可以利用 API 访问&#xff0c;直接在 Anthropic 的人工智能基础架构之上进行构建。 Spring AI 支持用于同步和流式文本生成的 Anthropic 消息 API。 Anthropic …

因为宇宙一片漆黑,所以地球才有昼夜之分,宇宙为什么是黑的?

因为宇宙一片漆黑&#xff0c;所以地球才有昼夜之分&#xff0c;宇宙为什么是黑的&#xff1f; 地球为何会有昼夜之分&#xff1f; 乍一看&#xff0c;这个问题很是简单&#xff0c;当然是因为地球一直在自转了&#xff0c;当地球的一部分被太阳照射时就是白昼&#xff0c;而…

Servlet搭建博客系统

现在我们可以使用Servlet来搭建一个动态(前后端可以交互)的博客系统了(使用Hexo只能实现一个纯静态的网页,即只能在后台自己上传博客)。有一种"多年媳妇熬成婆"的感觉。 一、准备工作 首先创建好项目,引入相关依赖。具体过程在"Servlet的创建"中介绍了。…

【Vue】单页应用程序介绍

通常基于Vue去开发一整个网站&#xff0c;开发出来的这整个网站应用&#xff0c;我们都会叫做单页应用程序 概念 单页应用程序&#xff1a;SPA【Single Page Application】是指所有的功能都在一个html页面上实现 我们可以将页面共用的部分封装成组件&#xff0c;底下要切换的也…

A6500-LC LVDT 前置器,用于A6500-UM, 导轨安装

电源 22.5V to 32VDC <30mA <0.1%/V <60V( 使用SELV/PELV 供电电源) 约2.2Vrms,5kHz IP20 IEC 60529 -35C to 75C(-31F to 167F) -35C to 85C(-31F to 185F) 电流损耗 供电电压对 运行温度 存储温度 0.35mm(0.014 in ),10 to 55Hz 15g 根据 EN 60068-2-27 根据IEC 613…

nginx配置WebSocket参数wss连接

目录 一、原文连接 二、 配置参数 三、实践 四、重启nginx 五、连接websocket 一、原文连接 nginx配置websocket支持wss-腾讯云开发者社区-腾讯云 二、 配置参数 map $http_upgrade $connection_upgrade { default upgrade; close; } upstream websocket { se…

大数据处理学习笔记

sudo tar -zxvf hadoop-1.1.2.tar.gz -C / #解压到/usr/local目录下 sudo mv hadoop-1.1.2 hadoop #重命名为hadoop sudo chown -R python ./hadoop #修改文件权限 //java安装同上给hadoop配置环境变量&#xff0c;将下面代…

Thinkphp使用Elasticsearch查询

在Thinkphp中调用ES&#xff0c;如果自己手写json格式的query肯定是很麻烦的。我这里使用的是ONGR ElasticsearchDSL 构建 ES 查询。ongr ElasticsearchDSL 的开源项目地址&#xff1a;GitHub - ongr-io/ElasticsearchDSL: Query DSL library for Elasticsearch。ONGR Elastics…

分布式数据库架构:从单实例到分布式,开发人员需及早掌握?

现在互联网应用已经普及,数据量不断增大。对淘宝、美团、百度等互联网业务来说,传统单实例数据库很难支撑其性能和存储的要求,所以分布式架构得到了很大发展。而开发人员、项目经理,一定要认识到数据库技术正在经历一场较大的变革,及早掌握好分布式架构设计,帮助公司从古…

DSP28335模块配置模板系列——定时器中断配置模板

一、配置步骤&#xff1a; 1.使能定时器时钟 EALLOW;SysCtrlRegs.PCLKCR3.bit.CPUTIMER2ENCLK 1; // CPU Timer 2EDIS; 2.设置定时器的中断向量 EALLOW;PieVectTable.TINT2 &TIM2_IRQn;EDIS;其中TIM2_IRQn时定时器中断服务程序的名称 &#xff0c;将中断服务函数的地址…

【回溯算法】N皇后问题·构建多叉决策树,遍历决策节点,做出决策(边),收集答案

0、前言 在由树形解空间入手&#xff0c;深入分析回溯、动态规划、分治算法的共同点和不同点这篇博客&#xff0c;其实已经对回溯算法的思想、做题框架做出了详细的阐述。这篇文章我们再从N皇后问题&#xff0c;加深我们对其理解。 这里在简单再次对其进行概述&#xff1a; …

dataphin是什么及其简单使用示例

1.1dataphin是什么&#xff1f; Dataphin是由阿里研发的智能大数据建设平台&#xff0c;提供一站式数据中台&#xff08;大数据平台&#xff09;建设服务。Dataphin通过沙箱&#xff08;项目&#xff09;实现业务及作业资源隔离&#xff0c;运行更快&#xff0c;且数据同步到D…

代码随想录算法训练营第四十八 | ● 121. 买卖股票的最佳时机 ● 122.买卖股票的最佳时机II

121. 买卖股票的最佳时机 买卖股票的最佳时机 视频讲解&#xff1a;https://www.bilibili.com/video/BV1Xe4y1u77q https://programmercarl.com/0121.%E4%B9%B0%E5%8D%96%E8%82%A1%E7%A5%A8%E7%9A%84%E6%9C%80%E4%BD%B3%E6%97%B6%E6%9C%BA.html class Solution { public:int ma…

因你而变 共赴新程 | AidLux全新版本震撼发布!

历经400多个日夜&#xff0c;AidLux 2.0&#xff08;基础版&#xff09;终于要与大家见面了。 开发者们问过无数次&#xff0c;新版本何时发布&#xff0c;期待的功能何时上线……在此&#xff0c;让我先真诚地感谢大家长期以来的期待与关心&#xff01; 一年多以来&#xff…

如何从官网下载 mysql 二进制安装包

一.下载二进行包 1. 官网网址: https://www.mysql.com/ 如图所示进入官网 2. 点击 DOWNLOADS ,进入如下图 在该页面找到 MySQL Community (GPL) Downloads 点进去 如上图页面&#xff0c;找到 MySQL Community Server 在点进去 下载 linux 通用版 点击最下面 Compressed …

服务监控-微服务小白入门(5)

背景 什么是服务监控 监视当前系统应用状态、内存、线程、堆栈、日志等等相关信息&#xff0c;主要目的在服务出现问题或者快要出现问题时能够准确快速地发现以减小影响范围。 为什么要使用服务监控 服务监控在微服务改造过程中的重要性不言而喻&#xff0c;没有强大的监控…

kafka-生产者拦截器(SpringBoot整合Kafka)

文章目录 1、生产者拦截器1.1、创建生产者拦截器1.2、KafkaTemplate配置生产者拦截器1.3、使用Java代码创建主题分区副本1.4、application.yml配置----v1版1.5、屏蔽 kafka debug 日志 logback.xml1.6、引入spring-kafka依赖1.7、控制台日志 1、生产者拦截器 1.1、创建生产者拦…

SkyWalking之P0核心业务场景输出调用链路应用

延伸扩展&#xff1a;XX核心业务场景 路由标签打标、传播、检索 链路标签染色与传播 SW: SkyWalking的简写 用户请求携带HTTP头信息X-sw8-correlation “X-sw8-correlation: key1value1,key2value2,key3value3” 网关侧读取解析HTTP头信息X-sw8-correlation&#xff0c;然后通…