生产者消费者模型和线程同步问题

文章目录

  • 线程同步概念
  • 生产者消费者模型
  • 条件变量
    • 使用条件变量
    • 唤醒条件变量
  • 阻塞队列

线程同步概念

互斥能保证安全,但是仅有安全不够,同步可以更高效的使用资源

生产者消费者模型

下面就基于生产者消费者来深入线程同步等概念:
在这里插入图片描述在这里插入图片描述如何理解生产消费者模型:
以函数调用为例:
在这里插入图片描述
在这里插入图片描述

两个线程之间要进行信息交互就需要引入一段内存空间(交易场所)
线程a将数据放入缓冲区(交易场所),线程b从缓冲区进行读取
这样线程a完成数据存放后就继续做自己的事情,线程b去读取数据
这样就能很好的实现多执行流之间的执行解耦
特点:很好的提高了处理数据的能力
支持忙闲不均

条件变量

条件变量:为了不让消费者的每次消费为无效消费.
所以对于生产者,在每次完成自己的任务之后对条件做出改变,当条件的变量达到一定条件后,消费者才进行有效消费
无效消费过程: 消费前(加锁)----尝试消费(无效消费)—消费结束(解锁)

条件变量的目的:
1.不做无效的锁申请
2.假设消费者很多,让他们有执行顺序
相当于条件变量给各个线程在调度他之前给一个提醒

条件变量本质是数据:可以理解为:

在这里插入图片描述

使用条件变量

认识接口
在这里插入图片描述
与互斥锁的创建和使用非常相似

pthread_cond_destroy();//创建布局条件变量要后要进行销毁
pthread_cond_init();//对局部的条件变量进行初始化
pthread_cond_t;//关键字 创建布局变量
全局就要提供PTHREAD_COND_INITIALIZER的宏来进行初始化

条件变量创建的前提是有线程安全,所以条件变量的接口和互斥锁的接口大致类似.

条件创建了还要有一个接口来等待条件成立:

在这里插入图片描述

pthread_cond_wait();//等待条件成立,参数为条件变量和互斥锁
上述所有的参数返回值都是在成功时返回0
失败返回错误原因

唤醒条件变量

在这里插入图片描述

pthread_cond_signal();//唤醒指定的条件变量,并唤醒一个线程
pthread_cond_broadcast();//是条件变量成立,并唤醒所有的线程

在没有条件变量的时候,打印信息如图:

在这里插入图片描述
可以看到线程的调度是不确定的,我们想让这个线程按照我们想要的顺序(如:一次Thread-1,Thread-2,Thread-3,这样)进行打印,那么就需要用到条件变量.
代码:

#include <iostream>
#include <unistd.h>
#include <string>
#include <pthread.h>

//创建条件变量和互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *ThreadRoutine(void* args)
{
    std::string name = static_cast<const char *>(args);
    while(true)
    {
        sleep(1);
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);
        std::cout << "I am a new Thread, my name is " << name << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}
int main()
{

    pthread_t t1, t2, t3;
    pthread_create(&t1, nullptr, ThreadRoutine, (void*)"Thread-1");
    pthread_create(&t2, nullptr, ThreadRoutine, (void*)"Thread-2");
    pthread_create(&t3, nullptr, ThreadRoutine, (void*)"Thread-3");


    while(true)
    {
        sleep(1);
        pthread_cond_signal(&cond);
    }
    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);
    pthread_join(t3, nullptr);



    return 0;
}

我们让代码按照t1, t2, t3的线程顺序来执行子线程的任务
在这里插入图片描述
换一个顺序验证也是如此
如果没有条件变量,这个也是按照顺序打印,不过是批次进行,和CPU时钟机制有关,所有使用条件变量更好

使用pthread_cond_broadcast();//唤醒全部线程
就跟加锁机制一样,不过每次是各个线程只执行一次后就会等待,并不想无锁那样批次打印

在这里插入图片描述

单纯的互斥能保证线程的安全, 但不一定合理或者高效.
pthread_cond_wait();//
在等待的时候,会释放这把锁(等待是在临界区内,释放锁是为了资源高效利用,再次加锁是不允许在有锁的临界区内有无锁的线程存在)

再被唤醒的时候,又会再次加锁
当被唤醒的时候,重新申请也是需要参与锁的竞争的(未解决这个问题, 看下main阻塞队列部分的讲解)

阻塞队列

这个队列只有为空,为满两种状态

为空:消费线程不能再消费
为满:生产线程不能在生产

这个场景也满足上述说明的所需的321原则
(3种关系,生产–生产,消费–消费
2中角色:生产者,消费者
1个环境(这个阻塞队列就是一个临界区))
单生产,单消费:
基于队列实现,阻塞队列的操作(消费者生产者实例):

伪唤醒:
在这里插入图片描述

在这份代码中,将来如果因为productor慢不满足生产, 多个线程在一个阻塞队列中等待,而有一个Push达到(有一个生产刚产出), 假设此时的代码是将全部线程都唤醒,那么除了第一个线程得到条件变量的满足和锁的满足,其他线程会在条件变量下的等待转化为竞争锁等待的情况, 假设此时若第一个线程完成pop且unlock速度快,那么这时后续的线程会在得到锁之后直接对空队列进行Pop操作,这是就会出现错误,这个状态就是伪唤醒(条件不满足,但是线程被唤醒了)

(虽然上述只是假设, 但是cpu的运行速度很快, 我们不防会有这样的情况发生)

所以修改Pop内的if(IsFull)代码和Push内的(IsEmpty)代码,还可以使用之前的锁封装的代码

在这里插入图片描述
此时的消费者生产者代码:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"

int defaultcap = 5; // for test

template <class T>
class BlockQueue
{
public:
    BlockQueue(int cap = defaultcap)
        : _capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
    }
    bool IsEmpty()
    {
        return _q.size() == 0;//查看队列状态
    }
    bool IsFull()
    {
        return _q.size() == _capacity;//此时为满,
    }
    bool Pop(T *out)
    {
        LockGuard lg(&_mutex);
        while(IsEmpty())
        {
            //为空, 进行等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        //可以生产//可增加水准线进行响应的操作
        pthread_cond_signal(&_p_cond);
        //pthread_mutex_unlock(&_mutex);
        return true;
    }
    bool Push(const T &in)
    {
        // 当前变量进行加锁
        LockGuard lg(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(IsFull())
        {
            // 为满,进行阻塞等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 进行生产等待
        _q.push(in);
        //可以进行消费
        pthread_cond_signal(&_c_cond);
        //pthread_mutex_unlock(&_mutex);
        return true;
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }

private:
    std::queue<T> _q;
    int _capacity; // 为空时,不能再消费,为满时,不能再生产,状态是capacity与size进行比较
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond;
    pthread_cond_t _c_cond;
};

main.cc

#include "BlockQueue.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>

void *productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data = rand() % 10 + 1;//[1,10];
        bq->Push(data);

        std::cout << "consumer data: " << data << std::endl;

        sleep(1);
    }
}
void *consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data = 0;
        bq->Pop(&data);

        std::cout << "product data: " << data << std::endl;
    }
}
int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());
    BlockQueue<int> * bq = new BlockQueue<int>();
    pthread_t c, p;//两个线程
    pthread_create(&p, nullptr, productor, bq);
    pthread_create(&c, nullptr, consumer, bq);


    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    return 0;
}

利用生产者消费者模型实现分派任务的操作()
在此基础上构建一个任务类型:

Task.hpp

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
enum //设置退出码
{
    ok = 0,
    div_zero,
    mod_zero,
    unknow
};
const std::string opts = "+-*/%)[()]"; //设置随机运算
class Task
{
public:
    Task()//无参构造函数用于生成无参临时对象,如果有参构造是全缺省那么可以不用写这个无参构造函数
    {}
    Task(int x, int y, char op)
    :data_x(x), data_y(y), opt(op), result(0), code(ok)
    {}
    void Run()//任务主题内容
    {
        switch(opt)
        {
            case '+':
                result = data_x + data_y;
                break;
            case '-':
                result = data_x - data_y;
                break;
            case '*':
                result = data_x * data_y;
                break;
            case '/':
            {
                if(data_y == 0)
                {
                    code = div_zero;
                }
                else
                {
                    result = data_x / data_y;
                }
                break;
            }
            case '%':
            {
                if(data_y == 0)
                {
                    code = mod_zero;
                }
                else
                {
                    result = data_x % data_y;
                }
                break;
            }
            default:
                code = unknow;
                break;
        }
    }
    void operator()()
    {
        Run();
    }
    ~Task()
    {}
    //打印任务,用于更清晰的认识
    std::string print_task()
    {
        std::string s;
        s = std::to_string(data_x) + opt + std::to_string(data_y) + "=?\n";
        return s;
    }
    std::string print_result()
    {
        std::string s;
        s = std::to_string(data_x) + opt + std::to_string(data_y) + "=" + std::to_string(result) + "[" + std::to_string(code) + "]" + "\n";
        return s;
    }
private:
    int data_x;
    int data_y;
    char opt;

    int result;
    int code;
};

对上述代码的修改:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>

void *productor(void *args)//生产者
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);//阻塞队列的对象
    while(true)
    {
        int x = rand() % 11;//[0,10];
        int y = rand() % 11;//[0,10];
        char opt = opts[rand() % opts.size()];
        Task t(x, y, opt);
        std::cout << t.print_task() << std::endl;;
        bq->Push(t);//放入队列,队列size+1

        //usleep(1000);
    }
}
void *consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while(true)
    {
        sleep(1);
        Task t;
        //1.拿到消费数据
        bq->Pop(&t);

        //2.执行任务
        t();
        
        //3.打印任务信息
        std::cout << t.print_result() << std::endl;;
    }
}
int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());//伪随机种子
    BlockQueue<Task> * bq = new BlockQueue<Task>();//创建一个阻塞队列
    pthread_t c, p;//两个线程
    pthread_create(&p, nullptr, productor, bq);//两个线程模拟消费者生产者模型
    pthread_create(&c, nullptr, consumer, bq);


    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    return 0;
}

针对上述代码,生产者和消费者本身就是互斥的,也就是串行执行,怎么会高效呢?

探究这个问题,首先从消费者消费后去哪里?生产在生产之前从哪来?来考虑.
生产者的数据,在产生时是花费时间,消费者消费也要花时间.
在消费者处理数据时花时间的同时生产者在某个时刻刚好将数据传给临界区,生产者只需要保证自己完成传送就可以做其他自己的事,消费者自己继续处理数据
所以高效,并发不体现在同步互斥,而是在拿数据,处理数据这里.

多线程任务下的消费者生产者模型多对多:
在这里插入图片描述将bq和线程名字一起封装可以更好的观察:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本篇结束,下篇更精彩,关注我,带你飞~~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/788350.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LNMP搭建Discuz和Wordpress

1、LNMP L:linux操作系统 N&#xff1a;nginx展示前端页面web服务 M&#xff1a;mysql数据库&#xff0c;保存用户和密码&#xff0c;以及论坛相关的内容 P&#xff1a;php动态请求转发的中间件 数据库的作用&#xff1a; 登录时验证用户名和密码 创建用户和密码 发布和…

存储产品选型策略 OSS生命周期管理与运维

最近在看阿里云的 云存储通关实践认证训练营这个课程还是不错的。 存储产品选型策略、对象存储OSS入门、基于对象存储OSS快速搭建网盘、 如何做好权限控制、如何做好数据安全、如何做好数据管理、涉及对象存储OSS的权限控制、使用OSS完成静态网站托管、对OSS中存储的数据进行分…

ubuntu使用kubeadm搭建k8s集群

一、卸载k8s kubeadm reset -f modprobe -r ipip lsmod rm -rf ~/.kube/ rm -rf /etc/kubernetes/ rm -rf /etc/systemd/system/kubelet.service.d rm -rf /etc/systemd/system/kubelet.service rm -rf /usr/bin/kube* rm -rf /etc/cni rm -rf /opt/cni rm -rf /var/lib/etcd …

压缩感知2——算法模型

采集原理 其中Y就是压缩后的信号表示(M维)&#xff0c;Φ表示采集的测量矩阵&#xff0c;可以是一个随机矩阵&#xff0c;X代表原始的数字信号&#xff08;N维&#xff09;。 常见的测量矩阵——随机高斯矩阵 随机伯努利矩阵 稀疏随机矩阵等&#xff0c;矩阵需要满足与信号的稀…

57、基于概率神经网络(PNN)的分类(matlab)

1、基于概率神经网络(PNN)的分类简介 PNN&#xff08;Probabilistic Neural Network&#xff0c;概率神经网络&#xff09;是一种基于概率论的神经网络模型&#xff0c;主要用于解决分类问题。PNN最早由马科夫斯基和马西金在1993年提出&#xff0c;是一种非常有效的分类算法。…

OpenCV MEI相机模型(全向模型)

文章目录 一、简介二、实现代码三、实现效果参考文献一、简介 对于针孔相机模型,由于硬件上的限制(如进光量等),他的视野夹角往往有效区域只有140度左右,因此就有研究人员为每个针孔相机前面再添加一个镜片,如下所示: 通过折射的方式增加了相机成像的视野,虽然仍然达不…

精讲:java之多维数组的使用

一、多维数组简介 1.为什么需要二维数组 我们看下面这个例子&#xff1f;“ 某公司2022年全年各个月份的销售额进行登记。按月份存储&#xff0c;可以使用一维数组。如果改写为按季度为单位存储怎么办呢&#xff1f; 或许现在学习了一维数组的你只能申请四个一维数组去存储每…

修改服务器挂载目录

由于我们的项目通常需要挂载一个大容量的数据盘来存储文件数据&#xff0c;所以我们每台服务器都需要一个默认的挂载目录来存放这些数据&#xff0c;但是由于我们的误操作&#xff0c;导致挂载目录名字建错了&#xff0c;这时候后端就读不到挂载目录了&#xff0c;那我们我们的…

公司内部配置GitLab,通过SSH密钥来实现免密clone、push等操作

公司内部配置GitLab&#xff0c;通过SSH密钥来实现免密clone、push等操作。以下是配置SSH密钥以实现免密更新的步骤&#xff1a; 1.生成SSH密钥 在本地计算机上打开终端或命令提示符。输入以下命令以生成一个新的SSH密钥&#xff1a;ssh-keygen -t rsa -b 4096 -C "your…

AGE Cypher 查询格式

使用 ag_catalog 中的名为 cypher 的函数构建 Cypher 查询&#xff0c;该函数返回 Postgres 的记录集合。 Cypher() Cypher() 函数执行作为参数传递的 Cypher 查询。 语法&#xff1a;cypher(graph_name, query_string, parameters) 返回&#xff1a; A SETOF records 参…

高铁站客运枢纽IPTV电视系统-盐城高铁站西广场IP电视系统应用浅析

高铁站客运枢纽IPTV电视系统-盐城高铁站西广场IP电视系统应用浅析 由北京海特伟业科技有限公司任洪卓于2024年7月9日发布 随着科技的飞速发展&#xff0c;特别是“互联网”战略的深入推进&#xff0c;高铁站客运枢纽的信息化建设成为提升服务质量、增强乘客体验的重要手段。盐…

FTP与TFTP

1、TFTP&#xff08;简单文件传输协议&#xff09; TFTP是TCP/IP协议族中一个用来在客户机与服务器之间进行简单文件传输的协议&#xff0c;提供不复杂、开销不大的文件传输服务。 基于UDP协议 端口号&#xff1a;69 特点&#xff1a;简单、轻量级、易于实现 传输过程&…

20240710 每日AI必读资讯

&#x1f916;微软&#xff1a;不会像 OpenAI 一样阻止中国访问 AI 模型 - OpenAI 将于周二&#xff08;7 月 9 日&#xff09;开始阻止中国用户访问其 API。 - 微软发言人表示&#xff1a;Azure OpenAI API服务在中国的提供方式没有变化。 - 公司仍然通过部署在中国以外地区…

uniapp——银行卡号脱敏

样式 代码 {{bankNumber.replace(/(\d{4})(?\d)/g, "●●●● ").replace(/(\d{2})(?\d{2}$)/, " $1")}} 将银行卡号按照每四位一组的方式进行处理&#xff0c;前面的变成 剩下的正常显示

谷粒商城-个人笔记(集群部署篇三)

前言 ​学习视频&#xff1a;​Java项目《谷粒商城》架构师级Java项目实战&#xff0c;对标阿里P6-P7&#xff0c;全网最强​学习文档&#xff1a; 谷粒商城-个人笔记(基础篇一)谷粒商城-个人笔记(基础篇二)谷粒商城-个人笔记(基础篇三)谷粒商城-个人笔记(高级篇一)谷粒商城-个…

昇思MindSpore25天学习打卡Day17:K近邻算法实现红酒聚类

昇思MindSpore25天学习打卡Day17&#xff1a;K近邻算法实现红酒聚类 1 实验目地2 K近邻算法(KNN)原理介绍2.1 分类问题2.2 回归问题2.3 距离的定义 3 实验环境4 数据处理4.1 数据准备4.2 数据读取与处理4.2.1 导入MindSpore模块和辅助模块 5 模型构建--计算距离6 模型预测 及 打…

java算法day9

232.用栈实现队列 用队列实现栈 有效的括号 删除字符串中的所有相邻重复项 逆波兰表达式求值 解决栈和队列的基本数据结构 Queue&#xff08;队列&#xff09; 在java中是一个接口。定义的方法&#xff1a; //boolean add(E e): 将指定的元素插入此队列&#xff08;如果…

软考高级里《系统架构设计师》容易考吗?

我还是22年通过的架构考试。系统架构设计师属于软考高级科目&#xff0c;难度比初级和中级都要大&#xff0c;往年的通过率也比较低&#xff0c;一般在10-20%左右。从总体来说&#xff0c;这门科目确实是不好过的&#xff0c;大家如果想要备考系统架构设计师的话&#xff0c;还…

【界面态】霍尔效应表征氮化对SiC/SiO2界面陷阱的影响

引言 引言主要介绍了硅碳化物&#xff08;SiC&#xff09;金属-氧化物-半导体场效应晶体管&#xff08;MOSFETs&#xff09;作为新一代高压、低损耗功率器件的商业化背景。SiC MOSFETs因其优越的电气特性&#xff0c;在高电压和高温应用领域具有巨大的潜力。然而&#xff0c;尽…

Leedcode刷题——7 滑动窗口 双指针

注&#xff1a;以下代码均为c 1. 两数之和2&#xff08;输入有序数组&#xff09; // 法1&#xff1a;暴力 vector<int> twoSum1(vector<int>& numbers, int target) {vector<int> ans(2);int n numbers.size();for(int i 0; i < n-1; i){if(i ! 0…