【Linux】生产者消费者模型:基于阻塞队列,使用互斥锁和条件变量维护互斥与同步关系

目录

一、什么是生产者消费者模型

二、为什么要引入生产者消费者模型?

三、详解生产者消费者模型 ​编辑

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

生产者和消费者之间为什么会存在同步关系?

为什么生产者与消费者之间需要一个交易场所?

四、基于阻塞队列的生产者消费者模型

生产者消费者模型的使用场景示例:

五、理解生产者消费者模型代码


一、什么是生产者消费者模型

生产者-消费者模型是一种常见的并发设计模式,用于处理在生产和消费任务之间的协调。这个模型主要用来解决在多线程或多进程环境中,生产者和消费者之间的同步和数据共享问题。

在这个模型中:

  • 生产者:负责生成数据或任务,并将其放入一个共享的缓冲区(也称为队列)中。
  • 消费者:从共享缓冲区中取出数据或任务并进行处理。

缓冲区的作用是实现生产者和消费者之间的解耦,使得生产者和消费者可以在不同的速度下独立工作。

二、为什么要引入生产者消费者模型?

在没有这种模型的情况下,生产者和消费者可能会遇到以下问题:

  • 资源浪费:如果生产者生产速度过快,而消费者消费速度跟不上,可能会导致生产出的产品(通常是数据或任务)被丢弃,造成资源浪费。
  • 资源不足:如果消费者消费速度过快,而生产者生产速度跟不上,消费者可能会因为等待新资源而处于空闲状态,造成资源不足。
  • 竞态条件:在没有适当的同步机制的情况下,多个生产者或消费者同时访问共享资源(如队列)可能会导致数据不一致或状态错误。
  •  死锁:如果生产者和消费者在等待对方释放资源时都阻塞了,可能会导致死锁,即系统无法继续前进。
  • 饥饿:某些消费者可能因为其他消费者不断地获取资源而长时间得不到服务,这种现象称为饥饿。

生产者-消费者模型通过引入缓冲区(如队列)和同步机制(如信号量、互斥锁)来解决这些问题。模型通常包含以下几个关键组件:

  • 生产者:负责生成数据或任务。
  • 消费者:负责处理数据或任务。
  • 缓冲区:存储生产者生产的数据,供消费者使用。
  • 同步机制:确保生产者和消费者之间的协调,避免竞态条件和死锁。

通过这些组件,生产者-消费者模型可以有效地管理资源,确保生产者不会在缓冲区满时生产,消费者不会在缓冲区空时消费,从而实现生产者和消费者之间的平衡和同步。

三、详解生产者消费者模型 

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(如阻塞队列、环形队列等)

我们在用代码编写生产者消费者模型的时,本质就是对这三个特点进行维护。

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

生产者将生产的数据放入容器中,而消费者又会从容器中取出数据,这就造成了在同一时刻中,容器中的数据可能被多个执行流访问。而该容器其实就是临界资源,对于临界资源,我们必须保护对临界资源操作的原子性,否则容易造成数据错乱。

因此我们必须保证生产者和消费者访问临界资源的操作是串行的。每次访问,我们只允许一个执行流进入临界区。如此,我们就保证了临界资源的安全。

所以,在访问临界区资源之前,无论是生产者还是消费者,必须先去竞争保护临界资源的那把互斥锁。即生产者和消费者会进行同一把锁的竞争!

生产者和消费者之间为什么会存在同步关系?

如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者再生产数据就会生产失败。反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。

虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

当缓冲区为满时,生产者需要在自己的条件变量下阻塞等待,直到有消费者进行消费,缓冲区有空间剩余时,才会继续与消费者竞争临界资源的管理权。

当缓冲区为空时,消费者需要在自己的条件变量下阻塞等待,直到有生产者进行生产,缓冲区有数据时,才会继续与生产者竞争临界资源的管理权。

【注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。】

为什么生产者与消费者之间需要一个交易场所?

生产者与消费者之间需要一个交易场所,通常称为缓冲区(Buffer),是因为这个缓冲区在多线程环境中起到了至关重要的作用。以下是缓冲区的几个关键作用:

  1. 协调生产和消费:缓冲区作为生产者和消费者之间的中介,允许生产者在缓冲区有空间时生产数据,消费者在缓冲区有数据时消费数据。这种协调机制避免了生产者和消费者之间的直接竞争,确保了生产和消费的有序进行。

  2. 解耦生产者和消费者:缓冲区使得生产者和消费者不必同时运行。生产者可以在没有消费者等待的情况下生产数据,消费者也可以在没有生产者生产的情况下消费数据。这种解耦提高了系统的灵活性和效率。

  3. 防止数据丢失:在没有缓冲区的情况下,如果生产者生产速度过快,消费者可能无法及时消费,导致数据丢失。缓冲区提供了一个存储空间,可以暂时保存生产者生产的数据,直到消费者准备好消费。

  4. 提高并行性:缓冲区允许生产者和消费者以不同的速度独立工作,提高了系统的并行性和吞吐量。生产者可以快速生产数据,而消费者可以根据自己的速度消费数据,两者互不干扰。

  5. 避免死锁和饥饿:通过适当的同步机制(如信号量和条件变量),缓冲区可以避免死锁和饥饿问题。生产者在缓冲区满时等待,消费者在缓冲区空时等待,这些等待条件可以通过同步机制得到管理,确保所有线程都能在适当的时候获得资源。

  6. 实现负载均衡:缓冲区可以平滑生产者和消费者之间的负载波动。在生产者负载高时,缓冲区可以存储额外的数据,而在消费者负载高时,缓冲区可以提供足够的数据供消费。

  7. 简化线程管理:缓冲区提供了一个明确的接口,使得线程管理更加简单。生产者和消费者只需要关注缓冲区的状态,而不需要直接管理其他线程。

总之,缓冲区作为生产者与消费者之间的交易场所,是实现高效、稳定和可扩展的多线程系统的关键组件。它通过协调生产和消费、解耦生产者和消费者、防止数据丢失、提高并行性、避免死锁和饥饿以及实现负载均衡等方式,提高了系统的整体性能和可靠性。

四、基于阻塞队列的生产者消费者模型

什么是阻塞队列?它的作用是什么?

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

生产者将数据放入队列,消费者从队列中取出数据。

  • 当队列为空时,消费者会阻塞等待,直到有数据可用;
  • 当队列满时,生产者会阻塞等待,直到有空间可用。 

通过阻塞操作,阻塞队列可以有效地管理资源,避免过度生产或消费,从而提高并发性能。

#pragma once
#include <pthread.h>
#include <iostream>
#include <queue>
#include "Thread.hpp"

static const int MAX_CAPACITY = 6;

template <typename T>
class BlockQueue
{
private:
    std::queue<T> _block_queue;//阻塞队列,临界资源
    int _max_capacity;//队列最大容量
    pthread_mutex_t _mutex;//生产者和消费者之间需要满足互斥关系。因为当生产者在生产时,消费者不能去消费,否则容易造成临界资源错乱。反之亦然
    pthread_cond_t _producer_cond;//当生产资源达到最大容量的时候,生产者需要在生产者的条件变量下等待,当有空余空间时,在进行生产,由消费者告知生产者来生产
    pthread_cond_t _consumer_cond;//当生产资源被消费完之后,消费者需要在消费者条件变量下等待,直到新的资源被生产,由生产者唤醒消费者来进行消费
    //在生产资源未到最大容量和生产资源未空时,消费者和生产者形成互斥关系,竞争临界资源的管理权
public:
    //构造
    BlockQueue(int capacity = MAX_CAPACITY)
    :_max_capacity(capacity)
    {
        pthread_mutex_init(&_mutex, nullptr);//初始化锁
        pthread_cond_init(&_producer_cond, nullptr);//初始化条件变量
        pthread_cond_init(&_consumer_cond, nullptr);
    }

    void Push(const T& in)
    {
        //生产者和消费者竞争同一把锁
        pthread_mutex_lock(&_mutex);

        while(IsFull())//为什么用while,不用if? 防止当有多个生产者时,使用broadcast会造成多个线程被唤醒,而此时多个线程已经经过了if判断。
        //使用if只能判断一次,无法避免时间差所带来的条件改变,因此需要循环检查。
        {
            //生产者在生产者的条件变量处等待,等待时释放锁。被唤醒时重新参与锁的竞争
            pthread_cond_wait(&_producer_cond, &_mutex);
        }
        //走到这里一定不为满
        _block_queue.push(in);
        pthread_mutex_unlock(&_mutex);

        //走到这个地方说明此时阻塞队列中一定有数据,可以唤醒消费者线程
        pthread_cond_signal(&_consumer_cond);
    }

    void Pop(T* out)    //输出型参数,将数据带出来
    {
        //生产者和消费者竞争同一把锁
        pthread_mutex_lock(&_mutex);
        while(IsEmpty())
        {
            //消费者在消费者的条件变量处等待,等待时释放锁,被唤醒时重新参与锁的竞争。竞争到锁之后才能返回
            pthread_cond_wait(&_consumer_cond, &_mutex);
        }
        //走到这里一定不为空
        *out = _block_queue.front();
        _block_queue.pop();
        pthread_mutex_unlock(&_mutex);

        //走到这个地方说明阻塞队列中一定有剩余空间,可以唤醒生产者
        pthread_cond_signal(&_producer_cond);
    }

    bool IsFull()
    {
        return _block_queue.size() == _max_capacity;
    }

    bool IsEmpty()
    {
        return _block_queue.empty();
    }

    //析构
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_producer_cond);
        pthread_cond_destroy(&_consumer_cond);
    }
};

需要注意的是,条件变量的等待必须要放在while循环中进行条件判断。

  • 在多线程环境中,多个线程可能同时操作共享资源。即使一个线程被唤醒,也可能因为其他线程的操作使得条件不再满足。因此,在条件变量的等待过程中,必须持续检查条件是否真的符合期望,确保线程在安全的条件下继续执行。
  • 在pthread_cond_wait函数的等待队列中可能存在多个等待线程。如果使用 if 进行条件判断,仅能在该执行流执行管理资源的代码前进行判断。假如此时阻塞队列中恰好生产了一个数据,而条件变量的等待队列中恰好有多个线程被其他线程使用pthread_cond_broadcast函数同时唤醒,无论

为什么不使用 if 判断呢?

  • pthread_cond_wait函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。
  • 其次,在多消费者的情况下,当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者,此时在pthread_cond_wait函数的等待队列中可能存在多个等待线程,因此就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。
  • 为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断。

生产者消费者模型的使用场景示例:

生产者承担的工作:不断地将创建的任务放入阻塞队列中。

消费者承担的工作:不断地从阻塞队列中提取任务,并进行执行。

阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。

#include "Blocking_Queue.hpp"
#include <functional>
#include <unistd.h>
#include <ctime>
using task_t = std::function<void()>;

void download()
{
    std::cout << "DownLooad ......" << std::endl;
}

void *Productor(void *block_queue)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(block_queue);
    while (true)
    {
        std::cout << "Producing ......" << std::endl;
        bq->Push(download);
        sleep(1);
    }
    return (void*)0;
}

void *Consumer(void *block_queue)
{
    BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(block_queue);
    while (true)
    {
        sleep(1);
        std::cout << "Consuming ......" << std::endl;
        task_t out;
        bq->Pop(&out);
        out();
    }
    return (void*)0;
}

int main()
{
    BlockQueue<task_t> block_queue;
    pthread_t productor_tid, consumer_tid;
    pthread_create(&productor_tid, nullptr, Productor, static_cast<void *>(&block_queue));
    pthread_create(&consumer_tid, nullptr, Consumer, static_cast<void *>(&block_queue));
    pthread_join(productor_tid, nullptr);
    pthread_join(consumer_tid, nullptr);
    return 0;
}

由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。

需要注意的是:由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。这点可根据需求自行修改。

五、理解生产者消费者模型代码

看完上述内容后,同学们可能会有这种疑惑:生产者消费者模型不是为了实现高并发而设计的吗?为什么在生产者创建任务时和消费者提取任务时是串行的,而不是并行的?

在回答这个问题之前,我们需要了解一下什么是并发,什么又是并行。

并发(Concurrency):

定义:并发指的是系统能够处理多个任务的能力,这些任务可能是同时进行的,也可能是交替进行的。在并发的场景下,任务可以在同一时间段内交替执行,但不一定是同时的并发更关注的是任务的切换和管理。

并行(Parallelism)

定义:并行指的是系统能够真正地同时执行多个任务。在并行的场景下,多个任务在同一时间段内在不同的处理器或计算核心上同时执行。并行更多地关注的是计算任务的真正同时处理。

在生产者消费者模型中,生产者和消费者对阻塞队列的操作仅仅是向阻塞队列中添加和提取数据,而阻塞队列是共享资源,我们必须对其进行线程安全的保护,让各个执行流串行地去执行临界区代码,保证对临界资源操作的原子性。

添加与提取任务恰恰是生产者消费者模型中执行比较快速的操作。我们可以假设此时有多个生产者线程,此时只能有一个生产者线程能够进入临界区中。那么,在同一时刻,其他的生产者线程一定都在等待进入临界区中呢?答案是否定的!

我们需要知道,今天我们只是使用该模型执行了简单的任务。当遇见复杂任务时生产者需要时间来构建任务!反之,消费者也需要时间来执行任务! 在某个生产者线程向阻塞队列中添加任务时,其他生产者线程可能在等待,也可能在进行任务的生产。消费者线程亦然!

由此我们可以看出,上述代码实现的生产者消费者模型并不是低效的。相反,它有效地管理共享资源的访问、合理安排线程的任务,并确保系统在处理复杂任务时仍能保持高效和稳定。同时解决了生产者与消费者可能出现的忙闲不均的问题,实现了负载均衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/879043.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C++基础知识7 list

list 1. list的介绍及使用1.1 list的介绍1.2 list的使用1.2.1 list的构造1.2.2 list iterator的使用1.2.3 list capacity1.2.4 list element access1.2.5 list modifiers1.2.6 list的迭代器失效 2.1 模拟实现list 1. list的介绍及使用 1.1 list的介绍 1.2 list的使用 1.2.1 l…

基于扣子(Coze)打造第一个智能体——个性化对话机器人

文章目录 一&#xff0c;智能体体验二&#xff0c;动手打造一个自己的智能体1&#xff0c;主页点击创建机器人1.1 创建一个新的机器人1.2 修订Bot基础信息1.3 工具编排信息修订人设和回复逻辑、增补开场白等 2&#xff0c;使用插件优化机器人3&#xff0c;使用工作流优化机器人…

基于SpringBoot实现SpringMvc上传下载功能实现

目录 SpringMvc上传下载功能实现 1.创建新的项目 1&#xff09;项目信息填写 2&#xff09;选择所用的包 3&#xff09;创建controller包 4&#xff09;创建DownLoadController类 5&#xff09;创建UpLoadController类 6&#xff09;创建index.html 7&#xff09;创建upload.h…

dubbo三

dubbo dubbo架构各层说明 URL举例解析 消费者引用服务过程 项目初始化

nginx服务介绍

nginx 安装使用配置静态web服务器 Nginx是一个高性能的Web服务器和反向代理服务器&#xff0c;它最初是为了处理大量并发连接而设计的。Nginx还可以用作负载均衡器、邮件代理服务器和HTTP缓存。它以其轻量级、稳定性和高吞吐量而闻名&#xff0c;广泛用于大型网站和应用中 Ngin…

SpringCloud Feign 以及 一个标准的微服务的制作

一个标准的微服务制作 以一个咖啡小程序项目的订单模块为例&#xff0c;这个模块必将包括&#xff1a; 各种实体类&#xff08;pojo,dto,vo....&#xff09; 控制器 controller 服务类service ...... 其中控制器中有的接口需要提供给其他微服务&#xff0c;订单模块也需要…

55.【C语言】字符函数和字符串函数(strstr函数)

11.strstr函数 *简单使用 strstr: string string cplusplus的介绍 点我跳转 翻译: 函数 strstr const char * strstr ( const char * str1, const char * str2 ); 或另一个版本char * strstr ( char * str1, const char * str2 ); 寻找子字符串 返回指向第一次出现在字…

软件测试 | APP测试 —— Appium 的环境搭建及工具安装教程

大家应该都有同一种感觉&#xff0c;学习appium最大的难处之一在于环境的安装&#xff0c;安装流程比较繁琐&#xff0c;安装的工具和步骤也较多&#xff0c;以下是基于Windows系统下的Android手机端的安装流程。就像我们在用Selenium进行web自动化测试的时候一样&#xff0c;我…

计算机的错误计算(九十六)

摘要 探讨 的计算精度问题。 计算机的错误计算&#xff08;五十五&#xff09;与&#xff08;七十八&#xff09;分别列出了 IEEE 754-2019 中的一些函数与运算。下面再截图给出其另外3个运算。 例1. 已知 x-0.9999999999966 . 计算 不妨在Python下计算&#xff0c;则有&am…

phpstudy 建站使用 php8版本打开 phpMyAdmin后台出现网页提示致命错误:(phpMyAdmin这是版本问题导致的)

报错提示&#xff1a; 解决方法&#xff1a;官网下载phpmyadmin 5.2.1版本。 下载地址&#xff1a;phpMyAdmin 将网站根目录phpMyAdmin4.8.5里面的文件换成 官网下载的5.2.1版本即可。 重启网站&#xff0c;打开phpMyAdmin后台即可&#xff08;若打不开更改 mysql密码即可&am…

【有啥问啥】弱监督学习新突破:格灵深瞳多标签聚类辨别(Multi-Label Clustering and Discrimination, MLCD)方法

弱监督学习新突破&#xff1a;格灵深瞳多标签聚类辨别&#xff08;Multi-Label Clustering and Discrimination, MLCD&#xff09;方法 引言 在视觉大模型领域&#xff0c;如何有效利用海量无标签图像数据是一个亟待解决的问题。传统的深度学习模型依赖大量人工标注数据&…

rabbitmq容器化部署

需求 容器化部署rabbitmq服务 部署服务 找到如下官网信息版本 官网版本发布信息 这里看到最新版本是3.13版本&#xff0c;这里在3.13中找一个版本下载容器镜像即可。 找到dockrhub.com中 找到3.13.2版本镜像。 容器服务安装此处省略 现在下载容器镜像需要配置容器代理 ~#…

树莓派提示:error: externally-managed-environment 树莓派安装虚拟环境,树莓派flask报错

错误信息 raspberryraspberrypi:~ $ pip install flask error: externally-managed-environment脳 This environment is externally managed 鈺扳攢> To install Python packages system-wide, try apt install python3-xyz, where xyz is the package you are trying to i…

进程间关系与进程守护

一、进程组 1、理解 每一个进程除了有一个进程 ID(PID)之外 还属于一个进程组&#xff0c; 进程组是一个或者多个进程的集合&#xff0c; 一个进程组可以包含多个进程。 每一个进程组也有一个唯一的进程组 ID(PGID)&#xff0c; 并且这个 PGID 类似于进程 ID&#xff0c; 同样…

微信电脑版聊天图片DAT格式文件转为普通JPG图片

1-7 本文章主要教你如何恢复微信聊天中的聊天图片&#xff0c;主要应用场景是&#xff0c;当你的微信被封号了&#xff0c;或者无法登录了&#xff0c;会导致微信聊天中的聊天图片没办法再打开&#xff0c;如果是重要的图片&#xff0c;那就有损失了&#xff0c;所以有了本文的…

android 删除系统原有的debug.keystore,系统运行的时候,重新生成新的debug.keystore,来完成App的运行。

1、先上一个图&#xff1a;这个是keystore无效的原因 之前在安装这个旧版本android studio的时候呢&#xff0c;安装过一版最新的android studio&#xff0c;然后通过模拟器跑过测试的demo。 2、运行旧的项目到模拟器的时候&#xff0c;就报错了&#xff1a; Execution failed…

Fisco Bcos 2.11.0配置console控制台2.10.0及部署调用智能合约

Fisco Bcos 2.11.0配置console控制台2.10.0及部署调用智能合约 文章目录 Fisco Bcos 2.11.0配置console控制台2.10.0及部署调用智能合约前言版本适配一、启动FIsco Bcos区块链网络二、获取控制台文件三、配置控制台3.1 执行download_console.sh脚本3.2 拷贝控制台配置文件3.3 修…

react crash course 2024 (1)理论概念

state的作用 react hooks 而无需写一个class jsx 样式用 spa

WebGL系列教程六(纹理映射与立方体贴图)

目录 1 前言2 思考题3 纹理映射介绍4 怎么映射&#xff1f;5 开始绘制5.1 声明顶点着色器和片元着色器5.2 修改顶点的颜色为纹理坐标5.3 指定顶点位置和纹理坐标的值5.4 获取图片成功后进行绘制5.5 效果5.6 完整代码 6 总结 1 前言 上一讲我们讲了如何使用索引绘制彩色立方体&a…

TDengine 首席架构师肖波演讲整理:探索新型电力系统的五大关键场景与挑战

在 7 月 26 日的 TDengine 用户大会上&#xff0c;涛思数据&#xff08;TDengine&#xff09;首席架构师肖波进行了题为《TDengine 助力新型电力系统高质量发展》的主题演讲。他不仅分享了 TDengine 在新型电力系统中的应用案例&#xff0c;还深入探讨了如何利用 TDengine 的高…