【Linux多线程】生产者消费者模型

【Linux多线程】生产者消费者模型

目录

  • 【Linux多线程】生产者消费者模型
    • 生产者消费者模型
      • 为何要使用生产者消费者模型
      • 生产者消费者的三种关系
      • 生产者消费者模型优点
      • 基于BlockingQueue的生产者消费者模型
        • C++ queue模拟阻塞队列的生产消费模型
      • 伪唤醒情况(多生产多消费的情况下)

作者:爱写代码的刚子

时间:2024.3.29

前言:本篇博客将会介绍Linux多线程中一个非常重要的模型——生产者消费者模型

生产者消费者模型

  • 321原则(方便记忆):3种关系,2种角色(生产者和消费者),1个交易场所(特定结构的内存空间)

为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者的三种关系

  • 生产者VS生产者 :互斥
  • 消费者VS消费者:互斥
  • 生产者VS消费者:互斥,同步

生产者消费者模型优点

  • 生产和消费进行解耦(多线程其实也是一种解耦)
  • 支持并发
  • 支持忙闲不均

在这里插入图片描述

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构(有点类似于管道)。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

在这里插入图片描述

C++ queue模拟阻塞队列的生产消费模型

代码:

以下代码以单生产者,单消费者为例:

  • 代码一:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{
    static const int defalutnum = 5;
public:
    BlockQueue(int maxcap=defalutnum):maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_,nullptr);
        pthread_cond_init(&c_cond_,nullptr);
        pthread_cond_init(&p_cond_,nullptr);
    }
    T pop()
    {
        pthread_mutex_lock(&mutex_);
        
        if(q_.size()== 0 )
        {
             pthread_cond_wait(&c_cond_,&mutex_);//生产和消费要使用不同的等待队列
        }
        
        T out = q_.front();
        q_.pop();
        pthread_cond_signal(&p_cond_);
        
        
        pthread_mutex_unlock(&mutex_);
        return out;
    }
    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        
        if(q_.size()==maxcap_)//判断本身也是访问临界资源
        {
            pthread_cond_wait(&p_cond_,&mutex_);//调度时自动释放锁
        }
        
        //1.队列没满 2.被唤醒
        
        q_.push(in); 
        pthread_cond_signal(&c_cond_);
        pthread_mutex_unlock(&mutex_);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }
private:
    std::queue<T> q_;//我们不直接使用stl中的queue是因为它本身不是线程安全的,共享资源
    //int mincap_;
    int maxcap_;//队列中的极值
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;
};
#include "BlockQueue.hpp"
#include <unistd.h>
void *Consumer(void *args)
{
    BlockQueue<int> *bq =  static_cast<BlockQueue<int>*>(args); 
    while(true)
    {
        sleep(2);// 由于两个线程谁先执行是不确定的,我们让生产者先执行
        int data = bq->pop();
        std::cout<<"消费了一个数据: "<<data<<std::endl;
        
    }
}
void *Productor(void *args)
{
    BlockQueue<int> *bq =  static_cast<BlockQueue<int>*>(args); 

    int data=0;
    while(true)
    {
        ++data;
        bq->push(data);
        std::cout<<"生产了一个数据: "<<data<<std::endl;
    }
}
int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>(); 
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    
    delete bq;
    
    return 0;
}

在这里插入图片描述

  • 调整代码,使其生产者生产的数据到达一定范围通知消费者,消费者消费了一定的数据通知生产者:

在这里插入图片描述

在这里插入图片描述

【问】:生产者的数据从哪里来?

用户,或者网络等。生产者生产的数据也是要花时间获取的!,所以生产者要做两件事:1. 获取数据 2. 生产数据到队列

  • 同时消费者拿到数据要做加工处理,也要花时间!,消费者要做两件事:1. 消费数据 2. 加工处理数据

【问】:生产者消费者模型为什么是高效的?

存在一个线程访问临界区的代码,一个线程正在处理数据,高效并发。

虽然互斥和同步谈不上高效,更何况加了锁,但是一个线程正在生产数据,一个线程正在消费数据,两者解偶且互不影响。(在更多的生产者消费者情况下,只有少量的执行流在互斥和同步,而大量的执行流都在并发访问)

  • 再次完善代码,使该生产者消费者模型能够执行相应的任务:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
template <class T>
class BlockQueue
{
    static const int defalutnum = 20;
public:
    BlockQueue(int maxcap=defalutnum):maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_,nullptr);
        pthread_cond_init(&c_cond_,nullptr);
        pthread_cond_init(&p_cond_,nullptr);
        low_water_ =maxcap_/3;
        high_water_ =(maxcap_*2)/3;
    }
    T pop()
    {
        pthread_mutex_lock(&mutex_);
        
        if(q_.size()== 0 )
        {
             pthread_cond_wait(&c_cond_,&mutex_);//生产和消费要使用不同的等待队列
        }
        
        
        T out = q_.front();
        q_.pop();

        if(q_.size()<low_water_)
        {
            pthread_cond_signal(&p_cond_);
        }
        
        
        pthread_mutex_unlock(&mutex_);
        return out;
    }
    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        
        if(q_.size()==maxcap_)//判断本身也是访问临界资源
        {
            pthread_cond_wait(&p_cond_,&mutex_);//调度时自动释放锁
        }
        
        //1.队列没满 2.被唤醒
        
        q_.push(in); 
        if(q_.size()>high_water_)
        {
            pthread_cond_signal(&c_cond_);
        }
        pthread_mutex_unlock(&mutex_);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }
    
private:
    std::queue<T> q_;//我们不直接使用stl中的queue是因为它本身不是线程安全的,共享资源
    //int mincap_;
    int maxcap_;//队列中的极值
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;

    int high_water_;
    int low_water_;
};

main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
void *Consumer(void *args)
{
    BlockQueue<Task> *bq =  static_cast<BlockQueue<Task>*>(args); 
    while(true)
    {
        Task t=bq->pop();


        //t.run();
        t();
        std::cout<<"处理任务: "<<t.GetTask()<<" 运算结果是: "<<t.GetResult()<<std::endl;
        
        //sleep(2);// 由于两个线程谁先执行是不确定的,我们让生产者先执行
        //std::cout<<"消费了一个数据: "<<data<<std::endl;
        
    }
}
void *Productor(void *args)
{
    int len = opers.size();
    BlockQueue<Task> *bq =  static_cast<BlockQueue<Task>*>(args); 
    
    int data=0;
    while(true)
    {
        int data1=rand()%10+1;
        usleep(10);
        int data2=rand() % 10;
        char op =opers[rand() % len];
        Task t(data1,data2,op);
        //++data;
        bq->push(t);
        //std::cout<<"生产了一个数据: "<<data<<std::endl;
        std::cout<<"生产了一个任务:"<< t.GetTask() <<std::endl;
         
        sleep(1);
    }
}
int main()
{
    srand(time(nullptr));
    BlockQueue<Task> *bq = new BlockQueue<Task>(); 
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    
    delete bq;
    
    return 0;
}

Task.hpp

#pragma once
#include <iostream>
#include <string>

std::string opers = "+-*/%";

enum
{
    DivZero = 1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
            case '+':
                result_ = data1_ + data2_;
                break;
            case '-':
                result_ = data1_ - data2_;
                break;
            case '*':
                result_ = data1_ * data2_;
                break;
            case '/':
            {
                if (data2_ == 0)
                    exitcode_ = DivZero;
                else
                    result_ = data1_ / data2_;
            }
            break;
            case '%':
            {
                if (data2_ == 0)
                    exitcode_ = ModZero;
                else
                    result_ = data1_ % data2_;
            }
            break;
            default:
                exitcode_ = Unknown;
                break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
         std::string r = std::to_string(data1_);
        r += oper_;
        r += std::to_string(data2_);
        r += "=";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";
        return r;
    }

    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r+=oper_;
        r += std::to_string(data2_);
        r += "=?";
        return r;
    }

    ~Task()
    {}

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

在这里插入图片描述

一定要记得,判断临界资源是否满足,也是在访问临界资源!!!

伪唤醒情况(多生产多消费的情况下)

多生产多消费的情况下:

举例:生产者只生产了一个数据,但是唤醒了多个消费者,多个消费者都在等待队列上,生产者将锁解开,多个消费者竞争这一把锁,其中一个消费者抢到了这把锁消费了一个数据,把锁解开,同时其他刚被唤醒的消费者其中又抢到了锁,进行消费,可是已经没有数据了(条件并不满足了),造成了伪唤醒的情况。(处于等待队列中的线程申请锁失败了会继续在条件变量中的等待队列中等)

或者说可能存在等待失败但是继续向下走的情况。

如何防止线程出现这种情况?

将if改成while(进行重复判断):

在这里插入图片描述

在这里插入图片描述

【问题】:无论是多生产多消费还是单生产单消费,本质上都是一个线程访问临界资源,那意义在哪?

重点是并发生产,并发消费,只是访问临界资源时是单个线程。重点不是获取数据本身,而在于处理数据!!!(本质)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/506839.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【手册】——mq延迟队列

目录 一、背景介绍二、思路&方案三、过程1.项目为啥用延迟队列&#xff1f;2.项目为啥用三方延迟队列&#xff1f;3.项目中为啥用rabbitmq延迟队列&#xff1f;4.rabbitmq延迟队列的安装5.rabbitmq的延迟队列配置方式5.1.exchange配置5.2.queues配置5.3.exchange和queues的…

文件操作(1)【文件打开和关闭】【文件的顺序读写(各种函数)】【sprintf和sscanf的理解】

一.什么是文件&#xff1f; 在程序设计中我们一般谈的文件有两种&#xff1a;程序文件和数据文件 1.程序文件 程序文件是包含计算机程序代码的文件。它通常包含一系列指令和算法&#xff0c;用于执行特定的任务或实现特定的功能。程序文件可以由不同的编程语言编写&#xff…

【C语言环境】Sublime中运行C语言时MinGW环境的安装

要知道&#xff0c;GCC 官网提供的 GCC 编译器是无法直接安装到 Windows 平台上的&#xff0c;如果我们想在 Windows 平台使用 GCC 编译器&#xff0c;可以安装 GCC 的移植版本。 目前适用于 Windows 平台、受欢迎的 GCC 移植版主要有 2 种&#xff0c;分别为 MinGW 和 Cygwin…

【Python】——变量名的命名规则

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

Linux shell编程学习笔记42:md5sum

0 前言 前几天在国产电脑上遇到一个问题&#xff0c;先后接到两个文件&#xff0c;如何判断这两个文件内容是否相同&#xff1f; 如果是在Windows系统&#xff0c;可以用fc命令&#xff0c;或者用我自己写的FileInfo&#xff0c;提取两个文件有MD5、SHA1、CRC32值进行比较来判…

GANs和Diffusion模型(3)

接GANs和Diffusion模型&#xff08;2&#xff09; 扩散(Diffusion)模型 生成学习三重困难(Trilemma) 指生成学习(genrative learning)的模型都需要满足三个需求&#xff1a; 高质量的采样(High Quality Samples)&#xff1a;模型应该能生成非常高质量的采样快速采样(Fast S…

使用 Python 模拟布朗运动(和股票价格)

一、说明 本文先介绍布朗运动的概念&#xff0c;紧接着应用布朗方程到股票的随机斩落模型。进而用python实现&#xff0c;并给出各种各样的条件模型。从中烘托出股票模型的规律所在。 二、什么是布朗运动&#xff1f; 布朗运动以罗伯特布朗的名字命名&#xff0c;他是第一个在通…

持续交付与持续部署相关概念(CD)

目录 一、概述 二、持续交付基本概念 2.1 持续交付的含义 2.1.1 项目管理的视角 2.1.2 产品研发的视角 2.1.3 总结 2.2 持续交付涉及的运作环境 2.2.1 开发环境 2.2.2 测试环境 2.2.3 UAT环境 2.2.4 准生产环境 2.2.5 生产环境 2.3 总结 三、持续部署基本概念 3.…

创新之路:云边对接与行业生态的前沿探索

全球 80% 的数据来自物联网&#xff0c;不论是传统行业还是新兴行业&#xff0c;都将利用更多有价值的数据来驱动业务&#xff0c;实现降本增效。智慧教育、资产追踪、环境监测、工业物联网、智慧城市、家居互联、智慧电力、智慧农业。从智能电表到智能家居&#xff0c;从机器人…

RAG:检索增强生成系统如何工作

随着大型语言模型&#xff08;LLM&#xff09;的发展&#xff0c;人工智能世界取得了巨大的飞跃。经过大量数据的训练&#xff0c;LLM可以发现语言模式和关系&#xff0c;使人工智能工具能够生成更准确、与上下文相关的响应。 但LLM也给人工智能工程师带来了新的挑战&#xff…

shopee、lazada、temu测评自养号策略解析

在跨境电商领域&#xff0c;测评作为提升销量的重要手段&#xff0c;其策略的制定和实施显得尤为重要。特别是对于Shopee和Lazada两大主流平台上的卖家而言&#xff0c;如何有效利用测评策略提升产品销量成为了一大挑战。 自养号测评系统可以批量注册买家账号、模拟真实人工操…

U8二次开发-钉钉集成

钉钉开放平台作为企业沟通和协作的重要工具,其技术的每一次迭代都为企业带来了新的机遇和挑战。随着企业对于高效沟通和智能化管理的需求日益增长,钉钉平台的SDK更新显得尤为重要。把传统的U8与钉钉平台集成,可以有效的将业务功能和角色进行前移,打破应用系统二八原则,即8…

Vue(十二):脚手架配置代理,github案例,插槽

一、脚手架配置代理 老师讲的主要有两种方法&#xff1a; 但是我的没有proxy&#xff0c;只有proxyTable,之前一直不成功&#xff0c;现在我是这样配置的&#xff1a; config文件夹下的index.js: App.vue: 然后就成功了&#xff1a;&#xff08;我真服了&#xff0c;之前在这…

Linux中xz一次恶意后门处理的名场面-尚文网络xUP楠哥

进Q群11372462领取专属报名福利! 说在前面 Linux系统中所使用的xz软件是用于日常文件的归档压缩工具&#xff0c;据悉就在今日&#xff0c;Utils 5.6.0、5.6.1版本存在恶意后门植入漏洞&#xff08;CVE-2024-3094&#xff09;。开发人员在调查SSH性能问题时发现了涉及XZ Util…

Taro多行文本最多展示5行,超出“查看更多”展示,点击弹层

Taro中&#xff0c;页面需求&#xff1a; 多行文本&#xff0c;展示最多展示5行&#xff0c;超出5行&#xff0c;展示“查看更多”按钮&#xff0c;点击弹层展示文本详细信息。 弹层代码就不说了&#xff0c;着重说一下怎么获取区域高度&#xff5e; 1.区域设置max-height&am…

2_2.Linux中的远程登录服务

# 一.Openssh的功能 # 1.sshd服务的用途# #作用&#xff1a;可以实现通过网络在远程主机中开启安全shell的操作 Secure SHell >ssh ##客户端 Secure SHell daemon >sshd ##服务端 2.安装包# openssh-server 3.主配置文件# /etc/ssh/sshd_conf 4.…

嵌入式|蓝桥杯STM32G431(HAL库开发)——CT117E学习笔记12:DAC数模转换

系列文章目录 嵌入式|蓝桥杯STM32G431&#xff08;HAL库开发&#xff09;——CT117E学习笔记01&#xff1a;赛事介绍与硬件平台 嵌入式|蓝桥杯STM32G431&#xff08;HAL库开发&#xff09;——CT117E学习笔记02&#xff1a;开发环境安装 嵌入式|蓝桥杯STM32G431&#xff08;…

Php_Code_challenge12

题目&#xff1a; 答案&#xff1a; 解析&#xff1a; 字符串拼接。

iPhone设备中调试应用程序崩溃日志的高效方法探究

​ 目录 如何在iPhone设备中查看崩溃日志 摘要 引言 导致iPhone设备崩溃的主要原因是什么&#xff1f; 使用克魔助手查看iPhone设备中的崩溃日志 奔溃日志分析 总结 摘要 本文介绍了如何在iPhone设备中查看崩溃日志&#xff0c;以便调查崩溃的原因。我们将展示三种不同的…

Windows 上路由、端口转发配置,跨网络地址段

一、背景 有时候我们会遇到这样的场景&#xff0c;一批同一局域网中只有某一台主机带外且系统为windows&#xff0c;局域网中其他非带外的主机要想访问外网&#xff0c;本文将介绍如何配置在带外主机上开启路由及端口转发。 二、配置操作 2.1、带外主机开启路由转发 1&#x…