【线程】基于阻塞队列的生产者消费者模型

文章目录

  • 1 生产者消费者模型
  • 2 阻塞队列
    • 2.1 成员变量
    • 2.2 消费者操作
    • 2.3 生产者生产
  • 3 总结

1 生产者消费者模型

在多线程环境中,生产者消费者模型是一种经典的线程同步模型,用于处理生产者线程与消费者线程之间的工作调度和资源共享问题。在这个模型中,生产者和消费者共享一个缓冲区,生产者往缓冲区中放入商品(或者数据),而消费者则从缓冲区中取出商品(或者数据)。为了确保线程安全,避免资源竞争,通常需要使用同步机制如互斥锁(mutex) 和 条件变量(condition variable)。

2 阻塞队列

阻塞队列在生产者消费者模型中是非常常见的一种设计,通过互斥锁条件变量来确保线程同步,避免数据竞争。生产者和消费者分别在合适的时机阻塞和唤醒彼此,使得生产者和消费者能平稳地进行数据的生产和消费。

2.1 成员变量

class BlockQueue
{
	static const int defaultnum = 20;
public:
	BlockQueue(int maxcap = defaultnum)
    :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }
private:
	std::queue<T> _q;  // 队列,存储生产者生产的数据
	int _maxcap;  // 队列的最大容量
	pthread_mutex_t _mutex;  // 互斥锁,用于保护队列的访问
	pthread_cond_t _c_cond;  // 消费者条件变量,用于阻塞消费	者
	pthread_cond_t _p_cond;  // 生产者条件变量,用于阻塞生产者

};
  • _q 是用于存储数据的队列。
  • _maxcap 是队列的最大容量。
  • _mutex 是互斥锁,用来保证生产者和消费者在访问队列时的互斥性。
  • _c_cond 是消费者的条件变量,当队列为空时,消费者会被阻塞,直到队列有数据。
  • _p_cond 是生产者的条件变量,当队列满时,生产者会被阻塞,直到队列有空间。

2.2 消费者操作

T pop()
    {
        //1.上锁  --> 消费的时候,需要给消费者上锁
        pthread_mutex_lock(&_mutex);

        while(_q.size() == 0)
        {
            //当商品为空的时候,就阻塞消费者
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        //3.走到这里,两种情况 : 1.队列满了  2.被唤醒
        T out = _q.front();
        _q.pop();

        //4.当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
        pthread_cond_signal(&_p_cond);

        pthread_mutex_unlock(&_mutex);

        return out;
    }
pthread_mutex_lock(&_mutex);

1. 先上锁,保证数据的安全

 while(_q.size() == 0)
        {
            //当商品为空的时候,就阻塞消费者
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        T out = _q.front();
        _q.pop();

2. 上锁之后,可以进行消费,有两种情况:
case 1 : 队列为空,没有数据,则阻塞消费者
case 2: 队列不为空,进行消费

注意:这里的pthread_cond_wait(&_c_cond, &_mutex);在阻塞消费者的同时会释放mutex互斥锁,避免死锁的产生

 //当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
pthread_cond_signal(&_p_cond);

3. 消费之后,队列一定不满(至少都有一个空位,因为刚刚消费了)。所以可以唤醒生产者进行生产

 pthread_mutex_unlock(&_mutex);

4. 所有操作结束之后,释放锁,避免死锁

2.3 生产者生产

void push(const T& in)
    {
        //1.上锁  --> 生产的时候,需要给生产者上锁
        pthread_mutex_lock(&_mutex);
        
        //2.当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
        //伪唤醒的情况
        while(_q.size() == _maxcap)
        {
            //自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态
            pthread_cond_wait(&_p_cond, &_mutex);
        }

        //3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
        _q.push(in);

        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }
pthread_mutex_lock(&_mutex);

1. 先上锁,保证数据的安全

 //当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
        //伪唤醒的情况
        while(_q.size() == _maxcap)
        {
            //自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态
            pthread_cond_wait(&_p_cond, &_mutex);
        }

        //3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
        _q.push(in);

2. 上锁之后,可以进行生产,有两种情况:
case 1 : 队列为满,不能继续生产,则阻塞生产者
case 2: 队列不为满,继续生产

为什么要使用while?

防止伪唤醒。

注意:这里的pthread_cond_wait(&_p_cond, &_mutex);在阻塞生产者的同时会释放mutex互斥锁,避免死锁的产生

pthread_cond_signal(&_c_cond);

3. 生产之后,队列一定不为空(至少有一个商品,因此可以继续消费)所以可以唤醒消费者进行消费

 pthread_mutex_unlock(&_mutex);

4. 所有操作结束之后,释放锁,避免死锁

3 总结

main.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>

void* Consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
    
    while(true)
    {
        Task t = bq->pop();

        std::cout << "消费了一个任务 : " << t.GetTask() << " 运算结果是 : " 
        << t.GetResult() << "thread id : " << pthread_self() << std::endl;

        sleep(1);
    }

}

void* Productor(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);

    int x = 10, y = 20;
    while(true)
    {
        int data1 = rand() % 10 + 1;  //控制data1为[1,10]之间
        usleep(10);
        int data2 = rand() % 10 + 1;  //控制data2为[1,10]之间
        char op = opers[rand() % opers.size()];  //随机选取一个运算符

        //构建任务
        Task t(data1, data2, op);
        bq->push(t);
        std::cout << "生产了一个任务 : "  << t.GetTask() << "thread id : " << pthread_self() << std::endl;
        sleep(1);
    }
}

int main()
{
    srand(time(nullptr));
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t c[3], p[5];

    for (int i = 0; i < 3; ++ i)
    {
        pthread_create(c + i, nullptr, Consumer, bq);
    }

    for (int i = 0; i < 5; ++ i)
    {
        pthread_create(p + i, nullptr, Productor, bq);
    }

    for (int i = 0; i < 3; ++ i)
    {
        pthread_join(c[i], nullptr);
    }

    for (int i = 0; i < 5; ++ i)
    {
        pthread_join(p[i], nullptr);
    }

    delete bq;
    return 0;
}

BlockQueue.hpp

#include <iostream>
#include <queue>
#include <pthread.h>

template<class T>
class BlockQueue
{
    static const int defaultnum = 20;
public:
    BlockQueue(int maxcap = defaultnum)
    :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }

    T pop()
    {
        //1.上锁  --> 消费的时候,需要给消费者上锁
        pthread_mutex_lock(&_mutex);

        while(_q.size() == 0)
        {
            //当商品为空的时候,就阻塞消费者
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        //3.走到这里,两种情况 : 1.队列满了  2.被唤醒
        T out = _q.front();
        _q.pop();

        //4.当pop之后,队列就一定没满,因此可以唤醒生产者去生产了
        pthread_cond_signal(&_p_cond);

        pthread_mutex_unlock(&_mutex);

        return out;
    }

    void push(const T& in)
    {
        //1.上锁  --> 生产的时候,需要给生产者上锁
        pthread_mutex_lock(&_mutex);
        
        //2.当条件满足的时候,释放与p_cond相关的互斥锁,使“生产”线程进入阻塞状态
        //伪唤醒的情况
        while(_q.size() == _maxcap)
        {
            //自动唤醒  --> 释放_p_cond持有的锁,进入阻塞状态
            pthread_cond_wait(&_p_cond, &_mutex);
        }

        //3.生产内容 --> 走到这一步有两种可能:1.队列未满,2.被唤醒
        _q.push(in);

        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }
private:
    std::queue<T> _q;  //共享资源,只有一个,但是可以被当成很多个
    int _maxcap;  //最大值

    pthread_mutex_t _mutex;   //锁
    pthread_cond_t _c_cond;    //consumer cond 消费者条件变量
    pthread_cond_t _p_cond;    //productor cond 生产者条件变量
};

Task.hpp

#include <iostream>
#include <string>

std::string opers = "+-*%";

enum
{
    DivZero = 1,
    ModZero,
    Unkown
};

class Task
{
public:
    Task(int x1, int x2, char oper)
    :_data1(x1)
    ,_data2(x2)
    ,_oper(oper)
    ,_result(0)
    ,_exitcode(0)
    {}

    void run()
    {
        switch(_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        
        case '-':
            _result = _data1 - _data2;
            break;
        
        case '*':
            _result = _data1 * _data2;
            break;
        
        case '/':
            if (_data2 == 0)
                _exitcode = DivZero;
            else
                _result = _data1 / _data2;
            break;
        case '%':
            if (_data2 == 0)
                _exitcode = ModZero;
            else
                _result = _data1 % _data2;
            break;
        default:
            _exitcode = Unkown;
            break;
        }
    }
    

    //重载operator()
    void operator()()
    {
        run();
    }

    std::string GetTask()
    {
        std::string r = std::to_string(_data1);
        r += _oper;
        r += std::to_string(_data2);
        r += "= ?";

        return r;
    }

    std::string GetResult()
    {
        std::string r = std::to_string(_data1);
        r += _oper;
        r += std::to_string(_data2);
        r += "= ";
        r += std::to_string(_result);
        r += "[code: ";
        r += std::to_string(_exitcode);
        r += "]";

        return r;
    }

    ~Task()
    {}

private:
    int _data1;
    int _data2;
    char _oper;

    int _result;
    int _exitcode;
};

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/963878.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

解决PyG安装中torch-sparse安装失败问题:详细指南

1 问题描述 最近在学习GNN&#xff0c;需要使用PyTorch Geometric&#xff08;PyG&#xff09;库。在安装PyG的过程中&#xff0c;遇到了torch-sparse安装失败的问题&#xff0c;错误提示为&#xff1a; ERROR: Failed building wheel for torch-sparse本文将详细记录问题的解…

4 [危机13小时追踪一场GitHub投毒事件]

事件概要 自北京时间 2024.12.4 晚间6点起&#xff0c; GitHub 上不断出现“幽灵仓库”&#xff0c;仓库中没有任何代码&#xff0c;只有诱导性的病毒文件。当天&#xff0c;他们成为了 GitHub 上 star 增速最快的仓库。超过 180 个虚假僵尸账户正在传播病毒&#xff0c;等待不…

【B站保姆级视频教程:Jetson配置YOLOv11环境(六)PyTorchTorchvision安装】

Jetson配置YOLOv11环境&#xff08;6&#xff09;PyTorch&Torchvision安装 文章目录 1. 安装PyTorch1.1安装依赖项1.2 下载torch wheel 安装包1.3 安装 2. 安装torchvisiion2.1 安装依赖2.2 编译安装torchvision2.2.1 Torchvisiion版本选择2.2.2 下载torchvisiion到Downloa…

自动化软件测试的基本流程

一、自动化测试的准备 1.1 了解测试系统 首先对于需要测试的系统我们需要按照软件需求说明书明确软件功能。这里以智慧养老系统作为案例进行测试&#xff0c;先让我们看看该系统的登录界面和用户管理界面。 登录界面&#xff1a; 登录成功默认界面&#xff1a; 用户管理界面…

前端力扣刷题 | 6:hot100之 矩阵

73. 矩阵置零 给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 法一&#xff1a; var setZeroes function(matrix) {let setX new Set(); // 用于存储需要置零的行索引let setY new Set(); //…

SLAM技术栈 ——《视觉SLAM十四讲》学习笔记(一)

《视觉SLAM十四讲》学习笔记&#xff08;一&#xff09; 第2讲 初识SLAM习题部分 第3讲 三维空间刚体运动3.1 左手系与右手系3.2 齐次坐标3.3 旋转矩阵与变换矩阵3.4 正交群与欧式群3.5 旋转向量与欧拉角3.6 实践Eigen线性代数库3.6.1 QR分解(QR decomposition) 3.7 四元数到其…

自动驾驶---两轮自行车的自主导航

1 背景 无人驾驶汽车最早出现在DARPA的比赛中&#xff0c;从那个时刻开始&#xff0c;逐渐引起全球学者的注意&#xff0c;于是从上个世纪开始各大高校院所开始了无人汽车的研发。直到这两年&#xff0c;无人驾驶汽车才开始走进寻常百姓家&#xff0c;虽然目前市面上的乘用车还…

Spring Boot 2 快速教程:WebFlux处理流程(五)

WebFlux请求处理流程 下面是spring mvc的请求处理流程 具体步骤&#xff1a; 第一步&#xff1a;发起请求到前端控制器(DispatcherServlet) 第二步&#xff1a;前端控制器请求HandlerMapping查找 Handler &#xff08;可以根据xml配置、注解进行查找&#xff09; 匹配条件包括…

优选算法的灵动之章:双指针专题(一)

个人主页&#xff1a;手握风云 专栏&#xff1a;算法 一、双指针算法思想 双指针算法主要用于处理数组、链表等线性数据结构中的问题。它通过设置两个指针&#xff0c;在数据结构上进行遍历和操作&#xff0c;从而实现高效解决问题。 二、算法题精讲 2.1. 查找总价格为目标值…

Intel 与 Yocto 项目的深度融合:全面解析与平台对比

在嵌入式 Linux 领域&#xff0c;Yocto 项目已成为构建定制化 Linux 发行版的事实标准&#xff0c;广泛应用于不同架构的 SoC 平台。Intel 作为 x86 架构的领导者&#xff0c;在 Yocto 生态中投入了大量资源&#xff0c;为其嵌入式处理器、FPGA 和 AI 加速硬件提供了完整的支持…

算法刷题Day29:BM67 不同路径的数目(一)

题目链接 描述 解题思路&#xff1a; 二维dp数组初始化。 dp[i][0] 1, dp[0][j] 1 。因为到达第一行第一列的每个格子只能有一条路。状态转移 dp[i][j] dp[i-1][j] dp[i][j-1] 代码&#xff1a; class Solution: def uniquePaths(self , m: int, n: int) -> int: #…

98,【6】 buuctf web [ISITDTU 2019]EasyPHP

进入靶场 代码 <?php // 高亮显示当前 PHP 文件的源代码&#xff0c;通常用于调试或展示代码&#xff0c;方便用户查看代码逻辑 highlight_file(__FILE__);// 从 GET 请求中获取名为 _ 的参数值&#xff0c;并赋值给变量 $_ // 符号用于抑制可能出现的错误信息&#xff…

C++中常用的十大排序方法之4——希尔排序

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【&#x1f60a;///计算机爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于C中常用的排序方法之4——希尔排序的相…

字节iOS面试经验分享:HTTP与网络编程

字节iOS面试经验分享&#xff1a;HTTP与网络编程 &#x1f31f; 嗨&#xff0c;我是LucianaiB&#xff01; &#x1f30d; 总有人间一两风&#xff0c;填我十万八千梦。 &#x1f680; 路漫漫其修远兮&#xff0c;吾将上下而求索。 目录 字节iOS面试经验分享&#xff1a;HTT…

音视频入门基础:RTP专题(7)——RTP协议简介

一、引言 本文对RTP协议进行简介。在简介之前&#xff0c;请各位先下载RTP的官方文档《RFC 3550》和《RFC 3551》。《RFC 3550》总共有89页&#xff0c;《RFC 3551》总共有44页。本文下面所说的“页数”是指在pdf阅读器中显示的页数&#xff1a; 二、RTP协议简介 根据《RFC 35…

SQLGlot:用SQLGlot解析SQL

几十年来&#xff0c;结构化查询语言&#xff08;SQL&#xff09;一直是与数据库交互的实际语言。在一段时间内&#xff0c;不同的数据库在支持通用SQL语法的同时演变出了不同的SQL风格&#xff0c;也就是方言。这可能是SQL被广泛采用和流行的原因之一。 SQL解析是解构SQL查询…

Java 大视界 -- Java 大数据在智能电网中的应用与发展趋势(71)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…

想表示消息返回值为Customer集合

道奈特(240***10) 14:34:55 EA中序列图。我想表示消息返回值为 Customer 集合。目前只有一个Customer实体类&#xff0c;我需要另外新建一个CustomerList 类吗&#xff1f; 潘加宇(35***47) 17:01:26 不需要。如果是分析&#xff0c;在类的操作中&#xff0c;定义一个参数&…

01.双Android容器解决方案

目录 写在前面 一&#xff0c;容器 1.1 容器的原理 1.1.1 Namespace 1.1.2 Cgroups&#xff08;Control Groups&#xff09; 1.1.3 联合文件系统&#xff08;Union File System&#xff09; 1.2 容器的应用 1.2.1 微服务架构 1.2.2 持续集成和持续部署&#xff08;CI/…

【Elasticsearch】硬件资源优化

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;精通Java编…