40 生产者消费者模型

生产者消费者模型

概念

为何要使用生产者消费者模型,这个是用过一个容器解决生产者和消费的强耦合问题。生产者和消费者之间不需要通讯,通过阻塞队列通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列取,阻塞队列相当于缓冲区,平衡了双方的能力,用来解耦的
在这里插入图片描述

上面超市的例子。消费者需要泡面的话不用去找供货商要货,而是去超市取。如果找供货商,消费者只需要一包,供货商开启生产设备只生产一包,多次这样很浪费效率也不高。超市作为存储,需要一万包泡面,供货商生产1万包摆到超市里,将超市塞满,缓存起来,调整供货商和消费者的速度不一致导致的效率问题。供货商就可以休息下来。消费者需要几包去超市取,支持了一种忙闲不均的状态。供货商关注超市有多少空位置,需要多少货,消费者关注现有商品的数量。供货商在生产的时候,和消费者没关系,消费者购买的时候和供应商也没关系,双方不需要互相考虑,只完成自己的事情,就减少了依赖性,解耦。

在计算机里,生产者和消费者都是由线程承担,超市是一种特殊结构的内存空间,这个结构是一种共享资源,整个过程就是执行流在通信,如何安全高效的通信。共享资源就有并发的问题,这种并发有三种关系:

生产者和生产者:互斥关系。一个在供货的时候另一个需要等待
生产者和消费者:互斥和同步关系。如果供货商正在摆一个商品,消费者有没有得到。有一种不确定性,生产者要确定,数据安全,只有生产了和没有生产,消费者一定可以得到货物。供货商不停联系超市需不需要货,超市已经满了还在不停询问,占用了消费者询问的机会,导致消费者饥饿问题。所以要同步,保证顺序性。供货商刚供货一次再询问时,告知一段时间之内不要询问,消费者询问没有商品时,也告知没有并一段时间不要询问,安全才是本质
消费者和消费者:互斥关系

321原则
3种关系2种角色1个交易场所
3种关系,生产者和消费者之间互相搭配
2种角色:生产和消费
1个交易场所:特定结构的内存空间

优点:
1.支持忙闲不均
2.支持并发
3.生产和消费进行解耦

什么是解耦。main函数内调用一个函数,传入参数,需要等到函数返回才能继续往下执行,可以将两个分开为线程,参数用一段空间缓存,放到缓冲区里,函数调用时在空间里取,这就是解耦

基于BlockingQueue的生产者模型

BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
类似于管道

在这里插入图片描述

类的设计
首先需要数据存储的结构,这个用队列,一个容量设置为队列允许存放的最多数量。对队列的访问同一时间只能有一方,所以需要一个锁。类提供存入数据和取出数据的功能,生产者关心的是还能放多少数据,如果大于最大容量就要停止,所以要判断队列的现有数量,这是对共享资源的访问,加锁和释放锁,判断大于容量时就去条件变量队列等待,同样,消费者取物品也需要一个条件变量,消费者判断有没有商品,没有就到消费者的条件变量等待。当生产者生产出一个商品放入后就唤醒消费者取,消费者取完唤醒生产者生产

#include <queue>
#include <pthread.h>

template<class T>
class BlockQueue
{
    static const int defaultnum = 20;
public:
    BlockQueue(int cap = defaultnum)
    {
        _maxcap = cap;
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
        _lowwater = _maxcap / 3;
        _highwater = _maxcap / 3 * 2;
    }

    void push(const T& x)
    {
        pthread_mutex_lock(&_mutex);
        if (_que.size() == _maxcap) //防止被伪唤醒的状态 
        {
            pthread_cond_wait(&_pcond, &_mutex); //调用,自动释放锁
        }
        _que.push(x);  //确保生产条件满足才能生产
        //if (_que.size() > _highwater)
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }

    T pop()
    {
        pthread_mutex_lock(&_mutex);
        if (_que.size() == 0)
        {
            pthread_cond_wait(&_ccond, &_mutex);
        }
        T tmp = _que.front();
        _que.pop();
        //if (_que.size() < _lowwater)
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
        return tmp;
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

private:
    std::queue<T> _que;
    int _maxcap;   //最大容量
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;
    pthread_cond_t _ccond;
    int _lowwater;
    int _highwater; //控制水位线
};

阻塞队列设置为了模板,不只可以放入内置类型,也可以是自定义类型。弄一个任务类,有两个操作数,操作符加减乘除随机。一个变量记录结果,一个记录可靠性,如果有除0错误设置为对应值。提供返回string类型整个表达式的内容功能

#pragma once
#include <stdio.h>
#include <string>

enum
{
    DIVZERO = 1,
    UNKNOW
};
std::string g_op = "+-*/";
struct task
{
public:
    task(int a, int b, char op)
    :_a(a), _b(b), _op(op), _result(0), _exitcode(0)
    {}

    void run()
    {
        switch(_op)
        {
            case '+':
                _result = _a + _b;
                break;
            case '-':
                _result = _a - _b;
                break;
            case '*':
                _result = _a * _b;
                break;
            case '/':
                if (_b == 0)
                {
                    _exitcode = DIVZERO;
                }
                else
                {
                     _result = _a / _b;
                }      
                break;
            default:
                _exitcode = UNKNOW;
                break;
            }
        //printf("%d+%d结果:%d\n", _a, _b, _a + _b);
    }

    std::string getresult()
    {
        std::string str = std::to_string(_a) + _op  + std::to_string(_b);
        str += "=";
        str += std::to_string(_result);
        str += " [exit:";
        str += std::to_string(_exitcode);
        str += "]";

        return str;
    }

    std::string gettask()
    {
        std::string str = std::to_string(_a) + _op  + std::to_string(_b);
        return str;
    }

private:
    int _a;
    int _b;
    char _op;

    int _result;
    int _exitcode;
};

main文件生成两个线程,生产和消费,传入阻塞队列的实例,生产出一个任务,消费者完成

#include <unistd.h>
#include <cstdlib>
#include <iostream>
#include <ctime>
#include "blockqueue.hpp"
#include "task.hpp"

void *produce(void *bk)
{
    BlockQueue<task>* block = static_cast<BlockQueue<task>*>(bk);
    
    while (true)
    {
        int x1 = rand() % 10;
        usleep(10);
        int x2 = rand() % 10 + 1;
        char op = g_op[rand() % 4];
        task t(x1, x2, op);

        //生产
        printf("生产任务:%s\n", t.gettask().c_str());
        block->push(t);
        sleep(1);
    }
}

void* consume(void* bk)
{
    BlockQueue<task>* block = static_cast<BlockQueue<task>*>(bk);
    while (true)
    {
        //消费
        task n = block->pop();
        n.run();
        printf("完成任务:%s\n", n.getresult().c_str());
        sleep(1);
    }
}

int main()
{
    srand(time(NULL));
    pthread_t ptid, ctid;
    BlockQueue<task>* block = new BlockQueue<task>();
    pthread_create(&ptid, nullptr, produce, block);
    pthread_create(&ctid, nullptr, consume, block);

    while (true)
    {
        sleep(1);
    }

    delete block;
    return 0;
}

结果:
在这里插入图片描述

伪唤醒

当队列里只剩一个位置的时候,生产者如果不小心唤醒了多个生产者。这时它们都会去竞争锁,拿到锁的线程去生产然后放入,接着释放锁。正常情况下,应该消费线程拿到这个锁取数据,但因为刚刚唤醒了多个线程,可能会抢到锁继续放入数据,这时就会超出最大容量出现错误。所以要将if处改为循环,释放锁后判断如果满了就调条件变量里休眠

多生产多消费

将上面的单生成单消费改为多生产多消费版本

#include <unistd.h>
#include <cstdlib>
#include <iostream>
#include <ctime>
#include "blockqueue.hpp"
#include "task.hpp"

void *produce(void *bk)
{
    BlockQueue<task>* block = static_cast<BlockQueue<task>*>(bk);
    
    while (true)
    {
        int x1 = rand() % 10;
        usleep(10);
        int x2 = rand() % 10 + 1;
        char op = g_op[rand() % 4];
        task t(x1, x2, op);

        //生产
        printf("%p生产任务:%s\n", pthread_self(), t.gettask().c_str());
        block->push(t);
        sleep(1);
    }
}

void* consume(void* bk)
{
    BlockQueue<task>* block = static_cast<BlockQueue<task>*>(bk);
    while (true)
    {
        //消费
        task n = block->pop();
        n.run();
        printf("%p完成任务:%s\n", pthread_self(), n.getresult().c_str());
        //sleep(1);
    }
}

int main()
{
    srand(time(NULL));
    BlockQueue<task>* block = new BlockQueue<task>();
    pthread_t ptid[3], ctid[5];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(&ptid[i], nullptr, produce, block);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_create(&ctid[i], nullptr, consume, block);
    }

     for (int i = 0; i < 3; i++)
    {
        pthread_join(ptid[i], nullptr);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_join(ctid[i], nullptr);
    } 
    

    delete block;
    return 0;
}

优势

虽然同一时间只能有一个执行流访问阻塞队列,多个生产者也只能有一个访问队列,那多个生产者和消费者的优势体现在什么地方。
生产者的数据从用户网络等地方获得,数据的获取也需要时间,当一个生产者往队列里放入数据时,其他生产者可以同时获取数据,后面只需要放入数据即可。消费者方拿到数据后要对数据加工处理,这部分也是需要花费时间,同样一个线程获取数据时,其他的可能正在处理获得的数据。所以说,这个模型提高了效率,并发程度,是高效的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/590408.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

41.WEB渗透测试-信息收集-域名、指纹收集(3)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;40.WEB渗透测试-信息收集-域名、指纹收集&#xff08;2&#xff09; 关于oneforall的安装…

DRF视图源码分析

DRF视图源码分析 1 APIView class GenericAPIView(APIView):pass # 10功能class GenericViewSet(xxxx.View-2个功能, GenericAPIView):pass # 5功能能class UserView(GenericViewSet):def get(self,request):passAPIView是drf中 “顶层” 的视图类&#xff0c;在他的内部主要…

SpringBoot+阿里云实现验证码登录注册及重置密码

开通阿里云短信服务 阿里云官网 创建API的Key 可以使用手机号或者刷脸来进行创建Key 创建成功 开通完成以后接下来实现代码请求阶段 配置maven依赖 <!-- 阿里云 oss 短信 依赖--><dependency><groupId>com.aliyun</groupId><artifactId>dysm…

Python的使用

1、打印&#xff1a;print&#xff08;‘hello’&#xff09; 2、Python的除法是数学意义上的除法 print&#xff08;2/3&#xff09; 输出&#xff1a;0.6666... 3、a18 a‘hello’ print(a) 可以直接输出 4、**2 表示2的平方 5、打印类型 print&#xff08;type&am…

【深度学习】第二门课 改善深层神经网络 Week 1 深度学习的实践层面

&#x1f680;Write In Front&#x1f680; &#x1f4dd;个人主页&#xff1a;令夏二十三 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd; &#x1f4e3;系列专栏&#xff1a;深度学习 &#x1f4ac;总结&#xff1a;希望你看完之后&#xff0c;能对…

R语言学习—6—多元相关与回归分析

1、引子 xc(171,175,159,155,152,158,154,164,168,166,159,164) #身高 yc(57,64,41,38,35,44,41,51,57,49,47,46) #体重 par(marc(5,4,2,1)) #设定图距离画布边缘的距离&#xff1a;下5&#xff0c;左4&#xff0c;上2&#xff0c;右1 plot(x,y) 2、相关…

【华为 ICT HCIA eNSP 习题汇总】——题目集20

1、&#xff08;多选&#xff09;若两个虚拟机能够互相ping通&#xff0c;则通讯过程中会使用&#xff08;&#xff09;。 A、虚拟网卡 B、物理网卡 C、物理交换机 D、分布式虚拟交换机 考点&#xff1a;数据通信 解析&#xff1a;&#xff08;AD&#xff09; 物理网卡是硬件设…

webpack 常用插件

clean-webpack-plugin 这个插件的主要作用是清除构建目录中的旧文件&#xff0c;以确保每次构建时都能得到一个干净的环境。 var { CleanWebpackPlugin } require("clean-webpack-plugin") const path require("path");module.exports {mode: "de…

docker 基础命令

docker 安装 更新系统 sudo apt update sudo apt -y dist-upgrade安装docker sudo apt-get -y install ca-certificates curl gnupg lsb-release sudo mkdir -p /etc/apt/keyrings curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/…

数据结构——链表(精简易懂版)

文章目录 链表概述链表的实现链表的节点&#xff08;单个积木&#xff09;链表的构建直接构建尾插法构建头插法构建 链表的插入 总结 链表概述 1&#xff0c;链表&#xff08;Linked List&#xff09;是一种常见的数据结构&#xff0c;用于存储一系列元素。它由一系列节点&…

双链表的应用

cf edu161 D. Berserk Monsters 思路&#xff1a; 因为考虑到&#xff0c;每个怪是否死亡与其左右的怪息息相关&#xff0c;再者&#xff0c;若当前怪死亡&#xff0c;周围怪的相邻信息也会产生变化&#xff0c;由此可以想到使用双链表进行维护&#xff0c;双链表的维护方式有…

STM32——中断篇

技术笔记&#xff01; 1 中断相关概念 1.1 什么是中断&#xff1f; 中断是单片机正在执行程序时&#xff0c;由于内部或外部事件的触发&#xff0c;打断当前程序&#xff0c;转而去处理这一事件&#xff0c;当处理完成后再回到原来被打断的地方继续执行原程序的过程。 在AR…

算法学习系列(五十四):单源最短路的综合应用

目录 引言一、新年好二、通信线路三、道路与航线四、最优贸易 引言 关于这个单源最短路的综合应用&#xff0c;其实最短路问题最简单的就是模板了&#xff0c;这是一个基础&#xff0c;然后会与各种算法结合到一块&#xff0c;就是不再考察单个知识点了&#xff0c;而是各种知…

ICode国际青少年编程竞赛- Python-1级训练场-基础训练1

ICode国际青少年编程竞赛- Python-1级训练场-基础训练1 1、 Dev.step(4)2、 Dev.step(-4) Dev.step(8)3、 Dev.turnLeft() Dev.step(4)4、 Dev.step(3) Dev.turnLeft() Dev.step(-1) Dev.step(4)5、 Dev.step(-1) Dev.step(3) Dev.step(-2) Dev.turnLeft() Dev.step(…

su03t语音模块烧录识别不出问题解决方法

今天被su03t模块的烧写问题&#xff0c;卡了一下午&#xff0c;也是非常困惑。所幸到现在已经能够解决问题&#xff0c;并且有一些心得&#xff0c;因此想要记录一下&#xff0c;也可以帮助有同样困惑的小伙伴。 首先我们来说一下接线问题&#xff0c;因为要利用到ch340&#x…

使用DataGrip连接DM达梦数据库

前言 达梦数据库虽然提供了官方的数据库管理工具"DM管理工具"&#xff0c;但是该软件经常莫名卡顿&#xff0c;影响开发效率和心情。所以&#xff0c;本人一般使用DataGrip进行数据库操作。DataGrip是JetBrains公司开发的一款强大的数据库IDE&#xff0c;支持多种数…

SpringBoot 打包所有依赖

SpringBoot 项目打包的时候可以通过插件 spring-boot-maven-plugin 来 repackage 项目&#xff0c;使得打的包中包含所有依赖&#xff0c;可以直接运行。例如&#xff1a; <plugins><plugin><groupId>org.springframework.boot</groupId><artifact…

Appium + mitmProxy 实现APP接口稳定性测试

随着 App 用户量的不断增长&#xff0c;任何小的问题都可能放大成严重的线上事故&#xff0c;为了避免对App造成损害的任何可能性&#xff0c;我们必须从各个方面去思考 App 的稳定性建设&#xff0c;尽可能减少任何潜在的威胁。 1.背景介绍 为了保障 App 的稳定性&#xff0c…

Linux的Shell脚本详解

本文目录 一、什么是 Shell 脚本文件 &#xff1f;二、编写Shell脚本1. 基本规则2. shell 变量&#xff08;1&#xff09;创建变量&#xff08;2&#xff09;引用变量&#xff08;3&#xff09;删除变量&#xff08;4&#xff09;从键盘读取变量&#xff08;5&#xff09;特殊变…

【USB 3.2 Type-C】 端口实施挑战的集成解决方案 (补充一)

USB 3.2 Type-C 端口集成 补充&#xff0c;上一篇感觉还有没理解到位的一部分&#xff1b; 一、只做正反插的通信&#xff0c;已经差不多够了&#xff0c;但是这并不是完整的TYPE-C,必须要补充上PD; 参考连接&#xff1a; TYPE-C PD浅谈&#xff08;一&#xff09;https://w…