【Linux】基于阻塞队列的生产者消费者模型

​🌠 作者:@阿亮joy.
🎆专栏:《学会Linux》
🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根
在这里插入图片描述

目录

    • 👉为何要使用生产者消费者模型👈
    • 👉生产者消费者模型的优点👈
    • 👉基于阻塞队列的生产者和消费者模型👈
    • 👉总结👈

👉为何要使用生产者消费者模型👈

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型在生活中是相当常见的,比如客户去超市里买商品就是一个很好的例子。在这个例子中,供货商就是生产者,顾客就是消费者,超市就是一个交易场所,本质是一个商品的缓冲区。超市存在的意义就是让生产者和消费者解耦,以提高效率。


在生产者消费者模型中,有一个交易场所超市,生产者和消费者两种角色,生产者和生产者、消费者和消费者、生产者和消费者三种关系。其中生产者和生产者之间的关系是竞争互斥关系,消费者和消费者之间的关系是竞争互斥关系,生产者和消费者之间的关系是互斥和同步关系。当生产者进行生产时,消费者不能进行消费(保证安全),所以生产者和消费者是互斥关系。当商品很少时,生产者需要进行生产,消费者需要进行等待;而当商品很多时,生产者不能进行生产,消费者进行消费,所以生产行为和消费行为具有一定的顺序性,也就是说生产者和消费者是同步关系。

在计算机世界里,生产者和消费者都是通过线程模拟出来的。当生产者线程生产完商品后,就可以通知消费者线程来进行消费;而当消费者线程消费完商品后,就可以通知生产者线程来进行生产,这个过程就是通过条件变量来实现的。

👉生产者消费者模型的优点👈

想要真正理解生产者消费者模型优点,我们需要知道生产者生产数据和消费者消费数据都是想要消耗时间的。那么当消费者线程进行消费数据时,该线程是不会访问阻塞队列的。所以当消费者线程进行消费时,生产者线程从其他地方获取数据并将该数据放入到阻塞队列中,这样就可以提高生产者线程和消费者线程的并发度了。所以生产者消费者模型具有一下优点:解耦、支持并发。

👉基于阻塞队列的生产者和消费者模型👈

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。注:管道本身就是一个阻塞队列,有数据就消费,没数据就等待。管道内部已经实现了互斥和同步的功能。

在这里插入图片描述

单生产者单消费者模型

// BlockQueue.hpp
#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>

const int DefaultCap = 5;

template <class T>
class BlockQueue
{
private:
    bool isEmpty()
    {
        return _bq.empty();
    }

    bool isFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = DefaultCap)
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_empty, nullptr);
        pthread_cond_init(&_full, nullptr);
    }

    // 生产者
    void push(const T& in)
    {
        // 访问临界资源需要进行加锁保护
        pthread_mutex_lock(&_mtx);
        // 1.先检查当前的临界资源是否满足访问条件
        // 检查临界资源是否满足访问条件,也是访问临界资源
        // 所以它也需要在加锁和解锁之间
        // pthread_cond_wait函数是让线程在特定的条件变量下阻塞等待
        // 当进行等待时,线程所持有的锁会被自动释放掉.当条件变量满足
        // 时,线程会被在阻塞挂起的地方被唤醒.线程被唤醒的时候,是在临
        // 界区中的,pthread_cond_wait会自动帮助线程获取锁
        while(isFull())
            pthread_cond_wait(&_full, &_mtx);
        // pthread_cond_wait是一个函数,它就可能会调用失败,从而出现
        // 伪唤醒(临界资源不满足访问条件,却往下进行临界资源的访问)的
        // 情况,所以需要通过while来保证满足访问临界资源的条件,而不能
        // 通过if来判断临界资源是否能被访问
        
        // 2.访问临界资源(来到这里,临界资源100%是就绪的!)
        _bq.push(in);
        // 可以指定特定的唤醒线程的策略,如数据的个数大于容量的一半
        // if(2 * _bq.size() >= _capacity) pthread_cond_signal(&_empty);
        // 唤醒线程可以在释放锁之前,也可以在释放锁之后
        pthread_cond_signal(&_empty);

        pthread_mutex_unlock(&_mtx);
    }

    // 消费者
    void pop(T* out)
    {
        pthread_mutex_lock(&_mtx);
        while(isEmpty())
            pthread_cond_wait(&_empty, &_mtx);

        *out = _bq.front();
        _bq.pop();
        pthread_cond_signal(&_full);

        pthread_mutex_unlock(&_mtx);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_empty);
        pthread_cond_destroy(&_full);
    }

private:
    std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)
    int _capacity;         // 容量上线
    pthread_mutex_t _mtx;  // 互斥锁保护队列的安全
    pthread_cond_t _empty; // 该条件变量表示bq是否为空
    pthread_cond_t _full;  // 该条件变量表示bq是否为满
};

// ConProd.cc
#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

void* consume(void* args)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)args;
    while(1)
    {
        int a;
        bq->pop(&a);
        std::cout << "消费了一个数据:" << a << std::endl;
        sleep(1);
    }
    return nullptr;
}

void* produce(void* args)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)args;
    int a = 1;
    while(1)
    {
        bq->push(a);
        std::cout << "生产了一个数据:" << a << std::endl;
        ++a;
    }
    return nullptr;
}

int main()
{
    BlockQueue<int>* bq = new BlockQueue<int>();
    pthread_t c, p;

    pthread_create(&c, nullptr, consume, (void*)bq);
    pthread_create(&p, nullptr, produce, (void*)bq);
	
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;
    
    return 0;
}

在这里插入图片描述

增加任务类

// Task.hpp
#pragma once

#include <iostream>
#include <functional>

typedef std::function<int(int, int)> func_t;

class Task
{
public:
    Task() = default;

    Task(int x, int y, func_t func)
        : _x(x)
        , _y(y)
        , _func(func)
    {}

    int operator()()
    {
        return _func(_x, _y);
    }

public:
    int _x;
    int _y;
    func_t _func;
};

// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

int Add(int x, int y)
{
    return x + y;
}

void* consume(void* args)
{
    BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
    while(1)
    {
        // 获取任务
        Task t;
        bq->pop(&t);
        // 完成任务
        std::cout << "consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;
    }
    return nullptr;
}

void* produce(void* args)
{
    BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
    while(1)
    {
        // 制作任务 -- 不一定从生产者来的,可能是从网络来的
        int x = rand() % 10 + 1;
        int y = rand() % 20 + 1;
        // 生产任务
        Task t(x, y, Add);
        bq->push(t);
        std::cout << "productor: " << x << " + " << y << " = ?" << std::endl; 
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    pthread_t c, p;

    pthread_create(&c, nullptr, consume, (void*)bq);
    pthread_create(&p, nullptr, produce, (void*)bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq;

    return 0;
}

在这里插入图片描述

多生产者多消费者

// ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

int Add(int x, int y)
{
    return x + y;
}

void* consume(void* args)
{
    BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
    while(1)
    {
        // 获取任务
        Task t;
        bq->pop(&t);
        // 完成任务
        std::cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << std::endl;
    }
    return nullptr;
}

void* produce(void* args)
{
    BlockQueue<Task>* bq = (BlockQueue<Task>*)args;
    while(1)
    {
        // 制作任务 -- 不一定从生产者来的,可能是从网络来的
        int x = rand() % 10 + 1;
        int y = rand() % 20 + 1;
        // int x, y;
        // std::cout << "Please Enter x: ";
        // std::cin >> x;
        // std::cout << "Please Enter y: ";
        // std::cin >> y;
        // 生产任务
        Task t(x, y, Add);
        bq->push(t);
        std::cout << pthread_self() << " productor: " << x << " + " << y << " = ?" << std::endl; 
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr) ^ getpid() ^ 0x123);
    BlockQueue<Task>* bq = new BlockQueue<Task>();
    // pthread_t c, p;

    pthread_t c[2], p[2];
    pthread_create(c, nullptr, consume, (void*)bq);
    pthread_create(c + 1, nullptr, consume, (void*)bq);
    pthread_create(p, nullptr, produce, (void*)bq);
    pthread_create(p + 1, nullptr, produce, (void*)bq);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);
    delete bq;

    return 0;
}

多生产者多消费者模型的意义就是让生产者并发地获取和制作任务,让消费者并发地完成消费任务。多生产者多消费者模型主要用于处理消费任务或者获取和制作任务比较耗时的场景。

锁的封装

// LockGuard.hpp
#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t* pmtx)
        : _pmtx(pmtx)
    {}

    void lock()
    {
        std::cout << "进行加锁" << std::endl;
        pthread_mutex_lock(_pmtx);
    }

    void unlock()
    {
        std::cout << "进行解锁" << std::endl;
        pthread_mutex_unlock(_pmtx);
    }

    ~Mutex()
    {}

private:
    pthread_mutex_t* _pmtx;
};

// RAII的加锁方式
class LockGuard
{
public:
    LockGuard(pthread_mutex_t* pmtx)
        : _mtx(pmtx)
    {
        _mtx.lock();
    }

    ~LockGuard()
    {
        _mtx.unlock();
    }

private:
    Mutex _mtx;
};

// BlockQueue.hpp
#pragma once

#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"

const int DefaultCap = 5;

template <class T>
class BlockQueue
{
private:
    bool isEmpty()
    {
        return _bq.empty();
    }

    bool isFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = DefaultCap)
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_empty, nullptr);
        pthread_cond_init(&_full, nullptr);
    }

    // 生产者
    void push(const T &in)
    {
        // 出了函数的作用域会自动解锁
        LockGuard lockGuard(&_mtx);
        while (isFull())
            pthread_cond_wait(&_full, &_mtx);

        _bq.push(in);
        pthread_cond_signal(&_empty);
    }

    // 消费者
    void pop(T *out)
    {
        // 出了函数的作用域会自动解锁
        LockGuard lockGuard(&_mtx);
        while (isEmpty())
            pthread_cond_wait(&_empty, &_mtx);

        *out = _bq.front();
        _bq.pop();
        pthread_cond_signal(&_full);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_empty);
        pthread_cond_destroy(&_full);
    }

private:
    std::queue<T> _bq;     // 阻塞队列(STL中的容器并不是线程安全的,需要加锁进行保护)
    int _capacity;         // 容量上线
    pthread_mutex_t _mtx;  // 互斥锁保护队列的安全
    pthread_cond_t _empty; // 该条件变量表示bq是否为空
    pthread_cond_t _full;  // 该条件变量表示bq是否为满
};

👉总结👈

本篇博客主要讲解了为什么要使用生产者消费者模型、基于阻塞队列的生产者和消费者模型以及 RAII 的加锁方式等等。那么以上就是本篇博客的全部内容了,如果大家觉得有收获的话,可以点个三连支持一下!谢谢大家!💖💝❣️

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/2974.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

谈谈低代码的安全问题,一文全给你解决喽

低代码是一种软件开发方法&#xff0c;通过使用图形化用户界面和可视化建模工具&#xff0c;以及自动生成代码的技术&#xff0c;使得开发人员可以更快速地构建和发布应用程序。 作为近些年软件开发市场热门之一&#xff0c;市面上也涌现了许多低代码产品&#xff0c;诸如简道云…

SpringCloud:统一网关Gateway

目录 1、网关介绍 2、搭建网关服务 3、路由断言工厂 4、路由过滤器 5、全局过滤器GlobalFilter 6、过滤器执行顺序 7、跨域问题处理 1、网关介绍 网关(Gateway)又称网间连接器、协议转换器。网关在网络层以上实现网络互连&#xff0c;是复杂的网络互 连设备&#xff0…

常见背包问题

一.前言若你想学习或正在学习动态规划&#xff0c;背包问题一定是你需要了解的一种题型&#xff0c;并且大多数人最初都是从背包问题入坑进而打开动态规划这一大门。背包问题分为多种&#xff0c;你可以先掌握最常见的主要是三类&#xff1a;01背包、完全背包、多重背包二.分析…

C语言--动态内存管理1

目录前言动态内存函数介绍mallocfreecallocrealloc常见的动态内存错误对NULL指针的解引用操作对动态开辟空间的越界访问对非动态开辟内存使用free释放使用free释放一块动态开辟内存的一部分对同一块动态内存多次释放动态开辟内存忘记释放&#xff08;内存泄漏&#xff09;对通讯…

TCP和UDP协议的区别?

是否面向连接&#xff1a; TCP 是面向连接的传输&#xff0c;UDP 是面向无连接的传输。 是否是可靠传输&#xff1a;TCP是可靠的传输服务&#xff0c;在传递数据之前&#xff0c;会有三次握手来建立连接&#xff1b;在数据传递时&#xff0c;有确认、窗口、重传、拥塞控制机制…

Linux编辑器-vim

一、vim简述1&#xff09;vi/vim2&#xff09;检查vim是否安装2)如何用vim打开文件3)vim的几种模式命令模式插入模式末行模式可视化模式二、vim的基本操作1)进入vim&#xff08;命令行模式&#xff09;2)[命令行模式]切换至[插入模式]3)[插入模式]切换至[命令行模式]4)[命令行模…

【C语言进阶】动态内存管理

真正的人生&#xff0c;只有在经过艰难卓绝的斗争之后才能实现。 ——塞涅卡 目录 一.为什么存在动态内存分配&#xff1f; 二.动态内存管理的函数 1.malloc函数 2.free函数 ​3.calloc函数 4.realloc函数 三.常见的动态内存错误 1.对N…

python编程:使用pyecharts绘制拟合曲线图

pyecharts库是python下实现的echarts图表绘制库&#xff0c;接下来&#xff0c;我们使用pyecharts来绘制一条曲线&#xff0c;来体验一下pyecharts的基本使用效果。 1、首先&#xff0c;我们要安装下pyecharts库&#xff0c;在pycharm终端输入安装命令&#xff1a; pip install…

pytorch实现深度神经网络与训练

目录 1. 随机梯度下降算法 2.优化器 3. 损失函数 3.1 均方误差损失 3.2 交叉熵损失 4.防止过拟合 4.1 过拟合的概念 4.2 防止过拟合的方法 5. 网络参数初始化 5.1 网络参数初始化方法 5.2 参数初始化方法应用实例 1.针对某一层的权重进行初始化 2.针对一个网络的权…

基于ESP32做低功耗墨水屏时钟

基于ESP32做低功耗墨水屏时钟电子墨水屏概述ESP32实验低功耗电子时钟功能描述接线开发实验结果电子墨水屏 概述 电子墨水是一种革新信息显示的新方法和技术。和传统纸差异是电子墨水在通电时改变颜色&#xff0c;并且可以显示变化的图象&#xff0c;像计算器或手机那样的显示。…

使用ArcGIS为科研论文制作正确、美观、详细的插图

科研论文中的插图&#xff0c;如果图中包含地理信息&#xff0c;那么首先需要在图中标明指北针、比例尺、图例&#xff0c;然后在此基础上再对作的图进一步的美化和修改。 来源&#xff1a;https://doi.org/10.1016/j.uclim.2022.101326 这种就是属于是最常见的研究区概况图&a…

(只需五步)注册谷歌账号详细步骤,解决“此电话号码无法验证”问题

目录 第一步&#xff1a;打开google浏览器 第二步&#xff1a;设置语言为英语&#xff08;美国&#xff09; 第三步&#xff1a;点击重新启动&#xff0c;重启浏览器 第四步&#xff1a;开始注册 第五步&#xff0c;成功登录google账号&#xff01; 如果出现这样的原因&…

java多线程之线程安全(重点,难点)

线程安全1. 线程不安全的原因:1.1 抢占式执行1.2 多个线程修改同一个变量1.3 修改操作不是原子的锁(synchronized)1.一个锁对应一个锁对象.2.多个锁对应一个锁对象.2.多个锁对应多个锁对象.4. 找出代码错误5. 锁的另一种用法1.4 内存可见性解决内存可见性引发的线程安全问题(vo…

乐观锁和悲观锁 面试题

Mysql的乐观锁和悲观锁 实现方式加锁时机常见的调用方式优势不足适用场景乐观锁开发自定义更新数据的时候sql语句中进行version的判断高并发容易出现不一致的问题高并发读&#xff0c;少写悲观锁Mysql内置查询数据的开始select * for update保证一致性低并发互联网高并发场景极…

linux实验之shell编程基础

这世间&#xff0c;青山灼灼&#xff0c;星光杳杳&#xff0c;秋风渐渐&#xff0c;晚风慢慢 shell编程基础熟悉shell编程的有关机制&#xff0c;如标准流。学习Linux环境变量设置文件及其内容/etc/profile/etc/bashrc/etc/environment~/.profile~/.bashrc熟悉编程有关基础命令…

JVM类加载机制

文章目录定义类加载过程加载链接验证准备解析初始化类加载器双亲委派模型定义 Java 虚拟机把描述类的数据从 Class 文件加载到内存&#xff0c;并对数据进行校验、转换解析和初始化&#xff0c;最终形成可以被虚拟机直接使用的 Java 类型&#xff0c;这个过程被称为虚拟机的类…

有手就行 -- 搭建图床(PicGo+腾讯云)

&#x1f373;作者&#xff1a;贤蛋大眼萌&#xff0c;一名很普通但不想普通的程序媛\color{#FF0000}{贤蛋 大眼萌 &#xff0c;一名很普通但不想普通的程序媛}贤蛋大眼萌&#xff0c;一名很普通但不想普通的程序媛&#x1f933; &#x1f64a;语录&#xff1a;多一些不为什么的…

2023最新最详细【接口测试总结】

序章 ​ 说起接口测试&#xff0c;网上有很多例子&#xff0c;但是当初做为新手的我来说&#xff0c;看了不不知道他们说的什么&#xff0c;觉得接口测试&#xff0c;好高大上。认为学会了接口测试就能屌丝逆袭&#xff0c;走上人生巅峰&#xff0c;迎娶白富美。因此学了点开发…

嵌入式学习笔记——SysTick(系统滴答)

系统滴答前言SysTick概述SysTick是个啥SysTick结构框图1. 时钟选择2.计数器部分3.中断部分工作一个计数周期&#xff08;从重装载值减到0&#xff09;的最大延时时间工作流程SysTick寄存器1.控制和状态寄存器SysTick->CTRL2.重装载值寄存器SysTick->LOAD3.当前值寄存器Sy…

async与await异步编程

ECMA2017中新加入了两个关键字async与await 简单来说它们是基于promise之上的的语法糖&#xff0c;可以让异步操作更加地简单明了 首先我们需要用async关键字&#xff0c;将函数标记为异步函数 async function f() {} f()异步函数就是指&#xff1a;返回值为promise对象的函…