【Linux取经路】基于信号量和环形队列的生产消费者模型

文章目录

  • 一、POSIX 信号量
  • 二、POSIX 信号量的接口
    • 2.1 sem_init——初始化信号量
    • 2.2 sem_destroy——销毁信号量
    • 2.3 sem_wait——等待信号量
    • 2.4 sem_post——发布信号量
  • 三、基于环形队列的生产消费者模型
    • 3.1 单生产单消费模型
    • 3.2 多生产多消费模型
    • 3.3 基于任务的多生产多消费模型
  • 四、结语

在这里插入图片描述

一、POSIX 信号量

共享资源也可以被看成多份,只要规定好每个线程的访问区域即可,此时就可以让多线程去并发的访问临界资源。

POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。但 POSIX 可以用于线程间同步。信号量本质是一把计数器,用来描述可用资源数目的,申请信号量时,其实就已经在间接的做判断,看资源是否就绪了,只要申请到信号量,那么说明资源一定是就绪的。

信号量只能保证,不让多余的线程来访问共享资源,即,当前共享资源有十份,信号量不会允许同时有十一个线程来访问临界资源。但是具体的资源分配是通过程序员编码去实现的。如果出现一个共享资源同时被两个线程访问,就属于程序员的编码 Bug。

二、POSIX 信号量的接口

2.1 sem_init——初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
  • sem:要初始化的信号量

  • pshared:0表示线程间共享,非0表示进程间共享。

  • value:信号量初始值

2.2 sem_destroy——销毁信号量

int sem_destroy(sem_t *sem);

2.3 sem_wait——等待信号量

int sem_wait(sem_t *sem); //P()
  • 功能:会将信号量的值减1

2.4 sem_post——发布信号量

int sem_post(sem_t *sem);//V() 
  • 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量的值加1

三、基于环形队列的生产消费者模型

只要生产和消费不访问同一个格子,那么生产和消费就可以同时进行。那生产和消费什么时候会指向同一个数据呢?答案是队列为空和为满的时候。

image-20240315221954915

image-20240315222028878

基于环形队列的生产消费者模型必须遵守以下三个原则:

  • 当生产和消费指向同一个资源的时候,只能一个人访问。为空的时候,由生产者去访问;为满的时候,由消费者去访问

  • 消费者不能超过生产者

  • 生产者不能把消费者套圈,因为这样会导致数据被覆盖

生产者最关心还剩多少空间(空间数量);消费者最关系还剩多少数据(数据数量)。因为有两种资源,所以需要定义两个信号量。

3.1 单生产单消费模型

// RingQueue.hpp
#pragma once

#include <pthread.h>
#include <vector>
#include <semaphore.h>

template<class T>
class RingQueue 
{
private:
    static const int defaultcap = 5;
    void P(sem_t *sem) // 申请一个信号量
    {
        sem_wait(sem); 
    }

    void V(sem_t *sem) // 归还一个信号量
    {
        sem_post(sem);
    }
public:
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step(0), p_step(0)
    {
        sem_init(&cdata_sem, 0, 0);
        sem_init(&pspace_sem, 0, cap_);
    }

    void Push(const T &data) // 生产行为
    {
        P(&pspace_sem);
        ringqueue_[p_step] = data;
        V(&cdata_sem);

        p_step++;
        p_step %= cap_;
    }

    void Pop(T *out) // 消费行为
    {
        P(&cdata_sem);
        *out = ringqueue_[c_step];
        V(&pspace_sem);
        c_step++;
        c_step %= cap_;
    }

    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);
    }
private:
    std::vector<T> ringqueue_; // 环形队列
    int cap_; // 容量
    int c_step; // 消费者下一个要消费的位置
    int p_step; // 生产者下一个要生产的位置

    sem_t cdata_sem; // 数据资源
    sem_t pspace_sem; // 空间资源
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>

using namespace std;

void *Consumer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);

    while(true)
    {
        int data = 0;
        rq->Pop(&data);
        cout << "Consumer is running... get a data: " << data << endl;

        // 模拟处理数据
        usleep(1000000);
    }
}

void *Productor(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);
    while(true)
    {
        // 获取数据
        usleep(10000); // 模拟获取数据
        int data = rand() % 10;
        rq->Push(data);
        cout << "Productor is running... produce a data: " << data << endl;
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    pthread_t c, p;
    RingQueue<int> *rq = new RingQueue<int>();
    pthread_create(&c, nullptr, Consumer, rq);
    pthread_create(&p, nullptr, Productor, rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

基于环形队列的单生产单消费模型

互斥与同步的体现:当生产下标和消费下标相同的时候,只允许一个来访问,这就是互斥性的体现。当队列为空的时候,让生产者去访问资源,当队列为满的时候,让消费者去访问资源,这就是在指向同一个位置时,让生产和消费具有一定的顺序性,这就是同步性的体现。当队列不为空或不为满的时候,生产下标和消费下标不同,此时两个线程并发执行,并没有体现出很强的互斥特性。

3.2 多生产多消费模型

此时需要对下标资源进行保护。因为生产下标和消费下标各自只有一份,不允许同时有多个生产线程去访问生产下标,消费线程也一样。因此需要通过加锁来实现生产线程之间的互斥和消费线程之间的互斥。

先加锁还是先申请信号量?答案是先申请信号量,以生产线程为例,这样可以让所有生产线程并发的去执行,什么意思呢?如果是先加锁再申请信号量的话,因为始终只有一个生产者线程能够申请到锁,所以也就只有一个生产者线程能去申请信号量,其他生产者线程只能干巴巴的等待锁被释放。这时申请锁和申请信号量的动作是串行的。而先申请信号量的话,可以保证虽然只有一个线程能够申请到锁,但是其他没有锁的线程也可以不用闲着,可以先去申请信号量,因为信号量的申请是原子的,因此也不需要加锁进行保护,只要能申请到信号量,就说明资源还有,此时那些申请到信号量的线程就可能等待锁被释放,拿到锁之后就可以去执行相应的代码了。

// RingQueue.hpp
#pragma once

#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>

template<class T>
class RingQueue 
{
private:
    static const int defaultcap = 5;
    void P(sem_t *sem) // 申请一个信号量
    {
        sem_wait(sem); 
    }

    void V(sem_t *sem) // 归还一个信号量
    {
        sem_post(sem);
    }

    void Lock(pthread_mutex_t *mutex)
    {
        pthread_mutex_lock(mutex);
    }

    void Unlock(pthread_mutex_t *mutex)
    {
        pthread_mutex_unlock(mutex);
    }
public:
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step(0), p_step(0)
    {
        sem_init(&cdata_sem, 0, 0);
        sem_init(&pspace_sem, 0, cap_);
        pthread_mutex_init(&c_mutex, nullptr);
        pthread_mutex_init(&p_mutex, nullptr);
    }

    void Push(const T &data) // 生产行为
    {
        P(&pspace_sem);
        Lock(&p_mutex);
        ringqueue_[p_step] = data;
        p_step++;
        p_step %= cap_;
        Unlock(&p_mutex);
        V(&cdata_sem);
    }

    void Pop(T *out) // 消费行为
    {
        P(&cdata_sem); // 信号量资源是不需要保护的,因为它的操作是原子的,临界区中的代码要尽可能的少,所以不需要把信号量的申请放在加锁之后
        Lock(&c_mutex);
        *out = ringqueue_[c_step];
        c_step++;
        c_step %= cap_;
        Unlock(&c_mutex);
        V(&pspace_sem);
    }

    ~RingQueue()
    {
        sem_destroy(&cdata_sem);
        sem_destroy(&pspace_sem);

        pthread_mutex_destroy(&c_mutex);
        pthread_mutex_destroy(&p_mutex);
    }
private:
    std::vector<T> ringqueue_; // 环形队列
    int cap_; // 容量
    int c_step; // 消费者下一个要消费的位置
    int p_step; // 生产者下一个要生产的位置

    sem_t cdata_sem; // 数据资源
    sem_t pspace_sem; // 空间资源

    pthread_mutex_t c_mutex; // 对消费下标的保护
    pthread_mutex_t p_mutex; // 对生产下标的保护
};

template <class T>
class Message
{
public:
    Message(std::string thread_name, RingQueue<T> *ringqueue)
    :thread_name_(thread_name), ringqueue_(ringqueue)
    {}

    std::string &get_thread_name()
    {
        return thread_name_;
    }

    RingQueue<T> *get_ringqueue()
    {
        return ringqueue_;
    }
private:
    std::string thread_name_;
    RingQueue<T> *ringqueue_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>

using namespace std;

void *Consumer(void *args)
{
    Message<int> *message = static_cast<Message<int> *>(args);
    RingQueue<int> *rq = message->get_ringqueue();
    string name = message->get_thread_name();
    while (true)
    {
        int data = 0;
        rq->Pop(&data);
        printf("%s is running... get a data: %d\n", name.c_str(), data);

        // 模拟处理数据
        // usleep(1000000);
    }
}

void *Productor(void *args)
{
    Message<int> *message = static_cast<Message<int> *>(args);
    RingQueue<int> *rq = message->get_ringqueue();
    string name = message->get_thread_name();
    while (true)
    {
        // 获取数据
        // usleep(1000000); // 模拟获取数据
        int data = rand() % 10;
        rq->Push(data);
        printf("%s is running... produce a data: %d\n", name.c_str(), data);
        usleep(1000000);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    pthread_t c[3], p[5];
    RingQueue<int> *rq = new RingQueue<int>(); 
    vector<Message<int>*> messages; 

    for (int i = 0; i < 5; i++)
    {
        Message<int> *message = new Message<int>("Produttor Thread "+to_string(i), rq);
        pthread_create(p + i, nullptr, Productor, message);
        messages.push_back(message);
    }

    for (int i = 0; i < 3; i++)
    {
        Message<int> *message = new Message<int>("Consumer Thread "+to_string(i), rq);
        pthread_create(c + i, nullptr, Consumer, message);
        messages.push_back(message);
    }


    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }

    for (auto message : messages)
    {
        delete message;
    }

    delete rq;
    return 0;
}

基于环形队列的多生产多消费模型

3.3 基于任务的多生产多消费模型

RingQueue 的内容不变

// Task.h
#include <iostream>
#include <string>

enum
{
    DIVERROR = 1,
    MODERROR,
    UNKNOWERRROR
};

class Task
{
public:
    Task(int a = 0, int b = 0, char op = '+')
    :data1_(a), data2_(b), op_(op), result_(0), exitcode_(0)
    {}

    void run()
    {
        switch(op_)
        {
            case '+':
                result_ = data1_ + data2_;
                break;
            case '-':
                result_ = data1_ - data2_;
                break;
            case '*':
                result_ = data1_ * data2_;
                break;
            case '/':
                if(data2_ == 0) exitcode_ = DIVERROR;
                else result_ = data1_ / data2_;
                break;
            case '%':
                if(data2_ == 0) exitcode_ = MODERROR;
                else result_ = data1_ % data2_;
                break;
            default:
                exitcode_ = UNKNOWERRROR;
                break;
        }
    }

    std::string result_to_string()
    {
        std::string ret = std::to_string(data1_);
        ret += ' ';
        ret += op_;
        ret += ' ';
        ret += std::to_string(data2_);
        ret += ' ';
        ret += '=';
        ret += ' ';
        ret += std::to_string(result_);
        ret += "[exitcode: ";
        ret += std::to_string(exitcode_);
        ret += ']';

        return ret;
    }

    std::string get_task()
    {
        std::string ret = std::to_string(data1_);
        ret += ' ';
        ret += op_;
        ret += ' ';
        ret += std::to_string(data2_);
        ret += ' ';
        ret += '=';
        ret += ' ';
        ret += '?';
        return ret;
    }    
private:
    int data1_;
    int data2_;
    char op_;
    int result_;
    int exitcode_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Task.h"

using namespace std;

const std::string opers = "+-*/%";

void *Consumer(void *args)
{
    Message<Task> *message = static_cast<Message<Task> *>(args);
    RingQueue<Task> *rq = message->get_ringqueue();
    string name = message->get_thread_name();
    while (true)
    {
        // 获取任务
        // int data = 0;
        Task task;
        rq->Pop(&task);

        // 对任务做处理
        task.run();
        printf("%s is running... get a data: %s\n", name.c_str(), task.result_to_string().c_str());

        // 模拟处理数据
        // usleep(1000000);
    }
}

void *Productor(void *args)
{
    Message<Task> *message = static_cast<Message<Task> *>(args);
    RingQueue<Task> *rq = message->get_ringqueue();
    string name = message->get_thread_name();
    int len = opers.size();
    while (true)
    {
        // 获取数据
        // usleep(1000000); // 模拟获取数据
        // int data = rand() % 10;

        // 模拟获取数据
        int data1 = rand() % 10 + 1; // [1, 10]
        usleep(10);

        int data2 = rand() % 13; // [0, 13]
        usleep(10);

        char op = opers[rand() % len];

        Task task(data1, data2, op);
        
        // 生产数据
        rq->Push(task);
        // printf("%s is running... produce a data: %d\n", name.c_str(), data);
        printf("%s is running... produce a Task: %s\n", name.c_str(), task.get_task().c_str());
        usleep(1000000);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    pthread_t c[3], p[2];
    RingQueue<Task> *rq = new RingQueue<Task>(); 
    vector<Message<Task>*> messages; 

    for (int i = 0; i < 5; i++)
    {
        Message<Task> *message = new Message<Task>("Produttor Thread "+to_string(i), rq);
        pthread_create(p + i, nullptr, Productor, message);
        messages.push_back(message);
    }

    for (int i = 0; i < 3; i++)
    {
        Message<Task> *message = new Message<Task>("Consumer Thread "+to_string(i), rq);
        pthread_create(c + i, nullptr, Consumer, message);
        messages.push_back(message);
    }

    // 等待子线程
    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }

    // 释放资源
    for (auto message : messages)
    {
        delete message;
    }

    delete rq;
    return 0; 
}

基于环形队列的多生产多消费模型(基于任务的)

四、结语

今天的分享到这里就结束啦!如果觉得文章还不错的话,可以三连支持一下,春人的主页还有很多有趣的文章,欢迎小伙伴们前去点评,您的支持就是春人前进的动力!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/641443.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# 利用Xejen框架源码,我们来开发一个基于Dapper技术的数据库通用的帮助访问类,通过Dapper的增删改查,可以访问Sqlite数据库

Dapper 是一个轻量级的对象关系映射&#xff08;ORM&#xff09;工具&#xff0c;适用于 .NET 平台。它由 Stack Overflow 团队开发&#xff0c;旨在提供简单、高效的数据访问功能。与其他重量级 ORM&#xff08;如 Entity Framework&#xff09;相比&#xff0c;Dapper 更加轻…

用这8种方法在海外媒体推广发稿平台上获得突破-华媒舍

在今天的数字时代&#xff0c;海外媒体推广发稿平台已经成为了许多机构和个人宣传和推广的有效途径。如何在这些平台上获得突破并吸引更多的关注是一个关键问题。本文将介绍8种方法&#xff0c;帮助您在海外媒体推广发稿平台上实现突破。 1. 确定目标受众 在开始使用海外媒体推…

C++语法|虚函数与多态详细讲解(六)|如何解释多态?(面试向)

系列汇总讲解&#xff0c;请移步&#xff1a; C语法&#xff5c;虚函数与多态详细讲解系列&#xff08;包含多重继承内容&#xff09; 多态分为了两种&#xff0c;一种是静态的多态&#xff0c;一种是动态的多态。 静态&#xff08;编译时期&#xff09;的多态 函数重载 boo…

pands使用openpyxl引擎实现EXCEL条件格式

通过python的openpyxl库&#xff0c;实现公式条件格式。 实现内容&#xff1a;D列单元格不等于E列同行单元格时标红。 #重点是formula后面的公式不需要“”号。 from openpyxl.styles import Color, PatternFill, Font, Border from openpyxl.styles.differential import Dif…

【设计模式】JAVA Design Patterns——Bytecode(字节码模式)

&#x1f50d;目的 允许编码行为作为虚拟机的指令 &#x1f50d;解释 真实世界例子 一个团队正在开发一款新的巫师对战游戏。巫师的行为需要经过精心的调整和上百次的游玩测试。每次当游戏设计师想改变巫师行为时都让程序员去修改代码这是不妥的&#xff0c;所以巫师行为以数据…

git push后一直卡在在Writing objects:问题

git push后一直卡在Writing objects: 解决&#xff1a;设置 git config --global http.postBuffer 5242880000在执行git push。 一般设置后就可以成功了&#xff0c;后面不用看。 2. 我这里结果又报错&#xff1a; fatal: protocol error: bad line length 8192 MiB | 107.46 …

【C++】d1

关键字&#xff1a; 运行、前缀、输入输出、换行 运行f10 前缀必须项&#xff1a; #include <iostream> using namespace std; 输入/输出&#xff1a; cin >> 输入 cout << 输出 语句通过>>或<<分开 换行 endl或者"\n"

想当安卓开发工程师?学习路线分享!

安卓开发学习路线 在前几篇文章中,对安卓开发岗位的岗位要求做了一些科普,本节文章将介绍安卓开发岗位的学习路线。 目前,网络上有很多面经、算法题解、算法课等学习资料,如何合理利用这些资料成为技术求职者的一大困惑。笔者整理了一份安卓开发岗位学习路线供大家参考,…

第四课 communcation服务-can配置第二弹

Davinci配置目标: 介绍DBC基本属性,并且配置出一个DBC。 将DBC导入到vector的davinci工具,生成我们想要的代码。 Davinci配置步骤: 1. 编辑DBC文件 DBC文件是一种非常重要的工具,所谓DBC就是Database CAN,CAN网络的数据库文件,定义了CAN网络的节点、消息、信号的所有…

网络安全知识核心20要点

1、什么是SQL注入攻击 概述 攻击者在 HTTP 请求中注入恶意的 SQL 代码&#xff0c;服务器使用参数构建数据库 SQL 命令时&#xff0c;恶意SQL 被一起构造&#xff0c;并在数据库中执行。 注入方法 用户登录&#xff0c;输入用户名 lianggzone&#xff0c;密码‘ or ‘1’’…

axios如何传递数组作为参数,后端又如何接收呢????

前端的参数是一个数组。 前端编写&#xff1a; 后端接收&#xff1a;

git解决版本冲突 -git pull

当在Git中遇到版本冲突时&#xff0c;通常是因为两个或更多的开发者在同一时间修改了同一个文件的相同部分&#xff0c;并将这些修改推送到远程仓库。Git无法自动合并这些修改&#xff0c;因此会产生冲突。以下是解决Git版本冲突的基本步骤&#xff1a; 拉取最新代码&#xff…

mybatis-plus 优雅的写service接口中方法(3)

多表联查 上文讲过了自定义sql &#xff0c;和wrapper的使用&#xff0c;但是我们可以发现 我们查询的都是数据库中的一张表&#xff0c;那么怎么进行多表联查呢&#xff0c;当然也是用自定义sql来进行实现 比如说 查询 id 为 1 2 4 的用户 并且 地址在北京 的 用户名称 普…

AI崛起,掌握它,开启智能新生活!

AI崛起&#xff0c;掌握它&#xff0c;开启智能新生活&#xff01; &#x1f604;生命不息&#xff0c;写作不止 &#x1f525; 继续踏上学习之路&#xff0c;学之分享笔记 &#x1f44a; 总有一天我也能像各位大佬一样 &#x1f3c6; 博客首页 怒放吧德德 To记录领地 &…

LDAP: error code 32 - No Such Object

目前我的项目版本&#xff1a; Spring版本:5.3.15SpringBoot版本:2.6.3 完整错误 org.springframework.ldap.NameNotFoundException: [LDAP: error code 32 - No Such Object]; nested exception is javax.naming.NameNotFoundException: [LDAP: error code 32 - No Such Objec…

Java进阶学习笔记9——子类中访问其他成员遵循就近原则

正确访问成员的方法。 在子类方法中访问其他成员&#xff08;成员变量、成员方法&#xff09;&#xff0c;是依照就近原则的。 F类&#xff1a; package cn.ensource.d13_extends_visit;public class F {String name "父类名字";public void print() {System.out.p…

[算法][前缀和] [leetcode]724. 寻找数组的中心下标

题目地址 https://leetcode.cn/problems/find-pivot-index/description/ 题目描述 代码 class Solution {public int pivotIndex(int[] nums) {int total Arrays.stream(nums).sum();//前缀和int prefixSum 0;int len nums.length;for(int i 0;i<len;i){if (i-1>0){p…

【已解决】C#设置Halcon显示区域Region的颜色

前言 在开发过程中&#xff0c;突然发现我需要显示的筛选区域的颜色是白色的&#xff0c;如下图示&#xff0c;这对我们来说不明显会导致我的二值化筛选的时候存在误差&#xff0c;因此我们需要更换成红色显示这样的话就可以更加的明显&#xff0c;二值化筛选更加的准确。 解…

若依 Ruoyi-Vue PageHelper 分页失效 total为记录数

分页插件PageHelper返回记录总数total竟然出错了 执行控制台的SQL&#xff0c;查询出来的total数量是对的&#xff0c;很奇怪分页的total设置为查询到的记录数。 怀疑对list.stream操作&#xff0c;影响了分页&#xff0c;代码发现确实是这样&#xff0c;debug&#xff0c;居然…