Linux 生产消费者模型

 💓博主CSDN主页:麻辣韭菜💓

⏩专栏分类:Linux初窥门径

🚚代码仓库:Linux代码练习🚚

🌹关注我🫵带你学习更多Linux知识
  🔝 

前言

 1. 生产消费者模型

1.1  什么是生产消费者模型?

1.2 生产消费者模型原则

1.3 生产消费者模型的优点 

 2. 基于阻塞队列实现生产消费者模型

2.1 单生产单消费模型  

2.2 多生产多消费 

3. POSIX 信号量

POSIX 信号量有两种类型:

POSIX 信号量的基本操作:

4. 基于循环队列实现生产消费者模型

4.1 多生产多消费

环形队列的优缺点:

阻塞队列的优缺点:


前言

生产者-消费者模型是一个经典的并发编程问题,它描述了两种角色:生产者和消费者。生产者负责生成数据,而消费者则负责消费这些数据。这个模型通常用于处理多线程或多进程环境中的资源分配问题。 

 1. 生产消费者模型

1.1  什么是生产消费者模型?

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

上面的名词有些抽象,我们直接用生活中案例来举例子,大家就会豁然开朗。

 超市工作模式:

超市需要从工厂拿货,工厂则需要提供给超市商品

消费者在超市消费,超市需要向顾客提供商品 

 超市的作用就是平衡消费者和工厂供需平衡

为什么这么说?

简单来说就是要做到 顾客可以在超市买到想要购买的商品,工厂也能同超市完成足量的需求订单,超市这样就可以为双方提供便利。

顾客再也不用到工厂去买商品

工厂也不需要将商品亲自送到顾客手中。

如果没有超市,顾客直接去工厂消费,工厂生产出来商品再送到顾客手中,这种关系就是高度相互依赖,离开谁都不能干。这就是传说中的强耦合关系。

超市的出现,极大了提高效率,从而顾客工厂之间不再单方面的依赖。使得它们之间依赖度降低。而这就是传说的中解耦。 

生产者消费者模型的本质:忙闲不均 

我们再回到编程的视角

  •  工厂 —> 生产者
  •  顾客 —> 消费者
  •  超市 —> 某种容器  

这样我们就可以利用线程来干事了,线程充当生产者和消费者。利用STL的队列容器(缓冲区)充当超市。 常见的有 阻塞队列 和 环形队列

在实现中,超市不可能只面向一个顾客,一个工厂。在多线程中,也就意味着它们都能看到这个队列(超市),那么必须就要让线程之间存在互斥与同步。对于互斥与同步不理解的可以看 Linux 线程的同步与互斥

从上面我们就可以的得出它们之间关系。

生产者VS生产者:互斥

 一张图解释一切,这么多汽车生产商,相互竞争,对于多线程之间也是一样,所以需要互斥。

消费者VS消费者:互斥 

比如宝马4S店里,只剩最后一辆宝马7系,如果这时来了两个消费者,张三李四都想要这辆车,如果是张三先交了订金,那么李四就没有机会了,但是如果李四私下愿意加钱。那么张三和李四之间存在竞争。对于线程来说,我们需要互斥。

生产者VS消费者:互斥、同步

我们假设李四拿到了车,但是张三是个非常执着的人,其他车都不要,就要宝马7系。对于4S店来说,它就应该给工厂发消息生产7系车。然后再告诉张三有车了,进而消费。就对于生产线程和消费线程那就是同步

如果宝马一直疯狂生产,也不管4S店到底卖出去没有,也不管消费者到底买不买,那么这样就乱套了。结局只有破产!!!所以需要根据消费者的需求来进行合理生产。反过来消费者和宝马也是同理。而这对于多线程来说,那就是互斥

1.2 生产消费者模型原则

生产消费者模型原则:321原则

三种关系:

  • 生产者VS生产者:互斥
  • 消费者VS消费者:互斥
  • 生产者VS消费者:同步、互斥

 两种角色:

  • 生产者
  • 消费者

 一个交易场所:

  • 特定的容器:阻塞队列、环形队列 

生产消费者模型原则,书本是没有这个概念,为了方便记忆,大牛提炼总结出来的。

1.3 生产消费者模型的优点 

 为什么生产消费者模型高效?

  • 生产者、消费者 可以在同一个交易场所中进行操作
  • 生产者在生产时,无需关注消费者的状态,只需关注交易场所中是否有空闲位置
  • 消费者在消费时,无需关注生产者的状态,只需关注交易场所中是否有就绪数据
  • 可以根据不同的策略,调整生产者于与消费者间的协同关系

 生产消费者模型可以根据供需关系灵活调整策略做到忙闲不均。生产者和消费者无需关心他人的状态,做到并发。

 2. 基于阻塞队列实现生产消费者模型

在正式编写代码前,我们先了解阻塞队列与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

2.1 单生产单消费模型  

为了方便理解我们先用单生产、单消费的方式来讲解

先创建Blockqueue.hpp的头文件。

#include <iostream>
#include <queue>
#include <pthread.h>

template <class T>
class Blockqueue
{
    static const int defaultnum= 10;

public:
    Blockqueue(int maxcap = defaultnum)
        : _maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
    }
    void push(const T &data) //生产数据
    {
    }
    T pop() //取数据
    {
    }
    ~Blockqueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_c_cond);
        pthread_cond_destroy(&_p_cond);
    }

private:
    std::queue<T> _q;
    int _maxcap; // 极值
    pthread_mutex_t _mutex;
    pthread_cond_t _c_cond; // 消费者
    pthread_cond_t _p_cond; // 生产者
};

阻塞队列框架搭建出来后生产和消费我们后面实现。

由于我们是单生产单消费的生产消费者模型。所以

mian.cc主函数中创建两个线程

#include "Blockqueue.cpp"

void * Consumer(void *args) //消费者
{

}
void * Productor(void *args) //生产者
{

}

int main()
{
    Blockqueue<int> *bq = new Blockqueue<int>;
    //创建线程(生产、消费)
    pthread_t c,p;
    pthread_create(&c,nullptr,Consumer,bq);
    pthread_create(&p,nullptr,Productor,bq);
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    delete bq;
    return 0;
}

上面就是生产消费者模型的大致框架,我们在实现具体细节之前,我们先要明白一个关键问题。

生产和消费要不要耗费时间?

生产和消费是肯定要耗费时间的,一辆车不会平白无故的出现,车从生产到成品这个过程是要耗费大量的数据,同理作为消费者使用车,也是要耗费时间的。开车不需要耗费时间吗?

所以在代码层面角度来说:生产和消费都是需要耗费时间的,并不是一味的在阻塞队列里进行生产和消费。而是生产者在生产数据之前,要对数据做加工,做完之后才放进阻塞队列,消费者也不是从阻塞队列拿到数据就完事了,而是拿到数据之后,对数据做分析,然后决策。 

为什么生产和消费只需要同一把锁? 

因为它们两个是基于阻塞队列的,我们可以把阻塞队列看成一份整体资源,所以只需要一把锁,但是共享资源也可以被看做多份。

为什么生产和消费各自需要一个条件变量?

这就是为什么叫做阻塞队列。两个线程各自基于自己的条件变量,当条件不满足时候,那么就会阻塞等待。

明白这点之后 我们来实现生产和消费

生产和消费都能看到同一个阻塞队列,之前我们也说了生产和消费是既有同步又互斥的关系,那么生产线程和消费线程在访问阻塞队列时,只能是只有一个在访问。那么必然要互斥

   void push(const T &data) //生产数据
    {
        pthread_mutex_lock(&_mutex);
        _q.push(data);
        pthread_mutex_unlock(&_mutex);
    }

 生产是想生产就能生产的吗?

当然不是,阻塞队列如同超市一样,商品在货架上都放满了,生产出来的商品没有人买,那不是妥妥亏钱?

所以在生产之前还得问问超市,条件满足不?满足生产,不满足堵塞等待被唤醒

   void push(const T &data) //生产数据
    {
        pthread_mutex_lock(&_mutex);
        if(_q.size() == _maxcap) 
        {
            pthread_cond_wait(&_p_cond,&_mutex);//不满足阻塞
        }
        _q.push(data);
        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }

当生产条件不满足的时候,那么生产线程要去等待。这里就有个问题,生产线程在访问条件满不满足的时候,是已经拿到了锁的,不释放锁去等待,那么会造成死锁的问题。所以我们利用

pthread_cond_wait函数 ,等待的同时解锁。


同理消费数据也是一样。 

 T pop() //消费数据
    {
        pthread_mutex_lock(&_mutex);
        if(_q.size() == 0) 
        {
            pthread_cond_wait(&_c_cond,&_mutex);//不满足阻塞
        }
        T out = _q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);
        pthread_mutex_unlock(&_mutex);
        return out;
    }

那么我们在实现了生产和消费之后,就需要在mian.cc中实现生产消费的回调函数 

我们先srand函数模拟随机数

srand(time(nullptr) ^ getpid());
#include <ctime>
#include <unistd.h>
void *Consumer(void *args) // 消费者
{
    Blockqueue<int> *bq = static_cast<Blockqueue<int> *>(args);
    while (true)
    {
        int t = bq->pop();
        std::cout << "消费了一个数据..." << t << std::endl;
    }
}
void *Productor(void *args) // 生产者
{
    Blockqueue<int> *bq = static_cast<Blockqueue<int> *>(args);
    while (true)
    {
        int data = rand() % 10 + 1;
        bq->push(data);
        std::cout << "生产了一个数据..." << data << std::endl;
        sleep(1);
    }
}

 结果符合预期,生产和消费实现了同步互斥。但是我们就传入个整数,未免有点锉了,我们是用C++写的,而且我们blockqueue是带模板,我们可以传入对象。

先创建一个Task.hpp的头文件

我们在Task.hpp这个头文件中,创建一个Task类。在这个类中实现一些加减乘除的函数方法,由生产者生产任务。然后消费者拿到任务数据做加工

#pragma once
#include <iostream>
#include <string>

std::string opers = "+-*/%";

enum
{
    DivZero = 1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task(int data1, int data2, char oper)
        : _data1(data1), _data2(data2), _oper(oper), _result(0), _exitcode(0)

    {
    }

    void run()
    {
        switch (_oper)
        {
        case '+':
            _result = _data1 + _data2;
            break;
        case '-':
            _result = _data1 - _data2;
            break;
        case '*':
            _result = _data1 * _data2;
            break;
        case '/':
        {
            if (_data2 == 0)
                _exitcode = DivZero;
            else
                _result = _data1 / _data2;
        }
        break;

        case '%':
        {
            if (_data2 == 0)
                _exitcode = ModZero;
            else
                _result = _data1 % _data2;
        }
        break;

        default:
            _exitcode = Unknown;
            break;
        }
    }
    std::string GetResult()
    {
        std::string r = std::to_string(_data1);
        r += _oper;
        r += std::to_string(_data2);
        r += "=";
        r += std::to_string(_result);
        r += "[code: ";
        r += std::to_string(_exitcode);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(_data1);
        r += _oper;
        r += std::to_string(_data2);
        r += "=?";
        return r;
    }
    void operator()() //运算符重载让对象像函数一样使用
    {
        run();
    }
    ~Task()
    {
    }

private:
    int _data1;
    int _data2;
    char _oper;
    int _result;
    int _exitcode;
};

void *Consumer(void *args) // 消费者
{
    // Blockqueue<int> *bq = static_cast<Blockqueue<int> *>(args);
    Blockqueue<Task> *bq = static_cast<Blockqueue<Task> *>(args);

    while (true)
    {
        Task t = bq->pop();
        t();
        std::cout << "处理任务: " << t.GetTask() << " 运算结果是: "
         << t.GetResult() << " thread id: " << pthread_self() << std::endl;
    }
}
void *Productor(void *args) // 生产者
{
    // Blockqueue<int> *bq = static_cast<Blockqueue<int> *>(args);
    Blockqueue<Task> *bq = static_cast<Blockqueue<Task> *>(args);
    int len = opers.size();
    while (true)
    {
        int data1 = rand() % 10 + 1;
        int data2 = rand() % 10;
        char oper = opers[rand() % len];
        Task t(data1, data2, oper);
        bq->push(t);
        std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << pthread_self() << std::endl;
        sleep(1);
    }
}

注:

 其实我们不用非要等到满了,才停止生产。我们可以定策略,就如同水库的警戒线,当河水上涨到警戒线时,就开闸放水,而不是等到水库满了才放。消费也是同理。

 int low_water_;
 int high_water_;

2.2 多生产多消费 

我们实现了单生产单消费,这里改成多生产多消费,非常简单。只需要在mian.cc这里循环创建线程即可

int main()
{
    srand(time(nullptr) ^ getpid());
    Blockqueue<Task> *bq = new Blockqueue<Task>;
    // 创建线程(生产、消费)
    pthread_t c[3], p[5];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, Consumer, bq);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_create(p + i, nullptr, Productor, bq);
    }
    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }
    delete bq;
    return 0;
}

出现上面的错误是因为伪唤醒的原因

为什么会出现伪唤醒的?

现在是多个线程了,也就是说当阻塞队列满时,所有的生产线程被阻塞等待被唤醒。消费线程这时消费一个数据,当阻塞队列不满时,那么就会唤醒所有的生产线程,3个线程只有一个线程能拿到锁,其中一个拿到锁线程进行生产此时阻塞队列已经满了。等其他线程拿到锁后,条件不满足。生产不了,这就是伪唤醒。

所以我们把if改成while 循环判断防止伪唤醒

void push(const T &data) // 生产数据
    {
        pthread_mutex_lock(&_mutex);
        while (_q.size() == _maxcap) // 用while防止伪唤醒,判断条件满不满足
        {
            pthread_cond_wait(&_p_cond, &_mutex); // 不满足阻塞
        }
        _q.push(data);
        pthread_cond_signal(&_c_cond);
        pthread_mutex_unlock(&_mutex);
    }
    T pop() // 消费数据
    {
        pthread_mutex_lock(&_mutex);
        while (_q.size() == 0) // 用while防止伪唤醒,判断条件满不满足
        {
            pthread_cond_wait(&_c_cond, &_mutex); // 不满足阻塞
        }
        T out = _q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);
        pthread_mutex_unlock(&_mutex);
        return out;
    }

 

这里我们直接用C++的锁。

 

std:: mutex _mutex;
void *Consumer(void *args) // 消费者
{
    // Blockqueue<int> *bq = static_cast<Blockqueue<int> *>(args);
    Blockqueue<Task> *bq = static_cast<Blockqueue<Task> *>(args);

    while (true)
    {
        Task t = bq->pop();
        t();
        std::lock_guard<std::mutex> guard(_mutex);
        std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " 
        << t.GetResult() << " thread id: "<< std::hex  << pthread_self() << std::endl;
       
    }
}
void *Productor(void *args) // 生产者
{
    int len = opers.size();
    // Blockqueue<int> *bq = static_cast<Blockqueue<int> *>(args);
    Blockqueue<Task> *bq = static_cast<Blockqueue<Task> *>(args);
    while (true)
    {   sleep(1);
        int data1 = rand() % 10 + 1;
        int data2 = rand() % 10;
        char oper = opers[rand() % len];
        Task t(data1, data2, oper);
        bq->push(t);
        std::lock_guard<std::mutex> guard(_mutex);
        std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << std::hex << pthread_self() << std::endl;
        
    }
}

 

为什么只修改线程创建的代码,多线程就能适应原来的消费场景?

原因有2点:

  1. 生产者、消费者都是在对同一个 _queue 操作,用一把锁,保护一个临界资源,足够了
  2. 当前的 _queue 始终是被当作一个整体使用的,无需再增加锁区分

当然也可以让生产者和消费者各自拿一把锁,但是都是基于_queue的完全没有必要,画蛇添足。

3. POSIX 信号量

在 POSIX 标准中,信号量(semaphore)是一种用于控制多个进程或线程对共享资源访问的同步机制。信号量是一个计数器,它可以跟踪一定数量的资源或信号量单位。进程或线程可以通过原子操作对信号量进行增加或减少,从而实现对共享资源的协调访问。 

也就是说,让线程的同步的方法,不仅仅只有条件变量,还有信号量。 

POSIX 信号量有两种类型:

  1. 无名信号量(Unnamed semaphores):也称为进程间信号量,因为它们可以在不同的进程之间共享。无名信号量使用 sem_t 类型表示,并通过 sem_init() 函数初始化,使用 sem_destroy() 函数销毁。无名信号量需要一个与之关联的键值来标识,这个键值可以通过 ftok()shmget() 函数获得。

  2. 命名信号量(Named semaphores):也称为系统V信号量,它们是系统范围内唯一的,并且可以跨会话使用。命名信号量通过 semget() 函数创建,使用 semctl() 函数控制,使用 semop() 函数进行操作。

文档的话太抽象了,下面我用大白话来解释信号量

我们将阻塞队列比喻成电影院,而信号量就如同电影票,电影院是一个整体的公共资源,那么电影院的座位就把电影院这个整体划分为无数份的资源。而信号量就是预定座位资源。 

那么当我们购买电影票成功或不成功,对应编程来说,其实就是在访问临界资源的同时进行了临界资源就绪或者不就绪判断。

就绪意味者线程可以访问

不就绪意味着线程不可访问

POSIX 信号量的基本操作:

初始化:使用 sem_init() 初始化一个无名信号量

int sem_init(sem_t *sem, int pshared, unsigned int value);
  • sem:指向信号量变量的指针。
  • pshared:非零表示信号量可以被其他进程访问,零表示只能在当前进程内访问。
  • value:信号量的初始值。

 等待(减):使用 sem_wait()sem_trywait() 减少信号量,如果信号量的值大于零,则减少其值,否则进程将等待。

int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);

信号量值增加(信号):使用 sem_post() 增加信号量的值,如果其他进程因为信号量的值小于或等于零而等待,则其中一个进程将被唤醒。 

int sem_post(sem_t *sem);

获取信号量值:使用 sem_getvalue() 获取信号量的当前值。 

int sem_getvalue(sem_t *sem, int *sval);

销毁信号量:使用 sem_destroy() 销毁一个无名信号量。 

int sem_destroy(sem_t *sem);

   这些接口使用起来还是比较简单,下面我们用信号量来实现生产消费者模型。前面用的是阻塞队列,我们用信号量实现基于循环队列版本。

4. 基于循环队列实现生产消费者模型

在实现之前我们先了解循环队列这种数据结构。我们利用数组这种数据结构,然后对下标进行取模可以让数组变成循环的结构 

 

一张动图搞定循环队列这种数据结构

这里有几个关键问题:

问题1:生产者关注什么资源?消费者关注什么资源?

生产者关注的是数组还有多少空间、消费者关注的是数组还有多少数据。 

问题2:生产者和消费者什么时候才会指向同一个位置?

要么数组为空、要么数组为满。(这两种状态只能是生产和消费其中一个进行访问,空生产者访问、满消费者访问。)

反之一定是指向不同的位置 (这句话非常重要,意味着生产和消费可以同时访问)

那么循环队列要正常运行必须满足3个条件

1. 空或者满只能有一个人访问

2. 消费者一定不能超过生产者

3. 生产者一定不能套圈消费者 

如果消费者超过生产者,前面都没有数据,访问什么?

为什么这么说?因为最开始一定为空。那么一定是生产者先走!毫无疑问

如果生产者套圈消费者意味着生产速度大于消费速度之前没有消费的数据要被覆盖。数据出现覆盖,严重错误。 

 理解了这些问题我们直接多生产多消费来实现

4.1 多生产多消费

老规矩先创建RingQueue.hpp头文件 

#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
const static int defaultcap = 5;
template <class T>
class RingQueue
{
public:
    RingQueue(int cap = defaultcap)
        : _ringqueue(cap), _cap(cap), _c_step(0), _p_step(0)
        sem_init(&_cdata_sem, 0, 0);
        sem_init(&_pspace_sem, 0, cap);
        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }
    void push(const T& data)
    {

    }
    T pop(T* out)
    {
        
    }
    ~RingQueue()
    {
        sem_destroy(&_cdata_sem);
        sem_destroy(&_pspace_sem);
        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }

private:
    std::vector<T> _ringqueue; // 循环队列
    int _cap;                  // 循环队列容量
    int _c_step;               // 消费者下标
    int _p_step;               // 生产者下标
    sem_t _cdata_sem;          // 消费者关注的数据资源
    sem_t _pspace_sem;         // 生产者关注的空间资源
    pthread_mutex_t _c_mutex;  // 消费者锁
    pthread_mutex_t _p_mutex;  // 生产者锁
};

框架大致构建出来,为了方便生产消费的互斥与同步。我们接下来对生产和消费线程互斥与同步的函数进行封装 

void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void UnLock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
    void P(sem_t &sem) //减少
    {
        sem_wait(&sem);
    }
    void v(sem_t &sem) //增加
    {
        sem_post(&sem);
    }

实现push 和 pop函数 

 void Push(const T &data)
    {
        P(_pspace_sem);
        Lock(_p_mutex);
        _ringqueue[_p_step++] = data;
        _p_step %= _cap;
        UnLock(_p_mutex);
        V(_cdata_sem);
    }
    T Pop(T *out)
    {
        P(_cdata_sem);
        Lock(_c_mutex);
        *out = _ringqueue[_c_step++];
        _c_step %= _cap;
        Unlock(_c_mutex);
        V(_pspace_sem);
        return out;
    }

 这里解释push函数P操作为什么传入的是空间信号量,很简单生产者关注的是空间资源,所以这里P判断空间资源就不就绪,V为什么传入的是数据信号量?当P申请成功意味着可以生产,那么对应空间资源减少,数据资源增加。

同理pop也是一样。

我们mian.cc创建线程 和回调函数

#include <unistd.h>
#include <mutex>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"
std::mutex _mutex;
void *consumer(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        Task t;
        rq->Pop(&t);
        t();
        std::lock_guard<std::mutex> guard(_mutex);
        std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " 
        << t.GetResult() << " thread id: "<< std::hex  << pthread_self() << std::endl;
    }
}
void *productor(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
     int len = opers.size();
    while (true)
    {
        sleep(1);
        int data1 = rand() % 10 + 1;
        int data2 = rand() % 10;
        char oper = opers[rand() % len];
        Task t(data1, data2, oper);
        rq->Push(t);
        std::lock_guard<std::mutex> guard(_mutex);
        std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << std::hex << pthread_self() << std::endl;
    }
}

int main()
{
    srand(time(nullptr) ^ getpid()); // 随机数种子
    RingQueue<Task> *rq = new RingQueue<Task>(40);
    pthread_t c[3], p[3];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, consumer, rq);
    }
    for (int i = 0; i < 3; i++)
    {
        pthread_create(p + i, nullptr, productor, rq);
    }
    for (int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    for (int i = 0; i < 3; i++)
    {
        pthread_join(p[i], nullptr);
    }
    delete rq;
    return 0;
}

 这里打印只打印了线程ID,我们可以重新创建一个线程名字的类。把线程名字加入进去

struct ThreadData
{

    RingQueue<Task> *rq;
    std::string threadname;
};

细节: 加锁行为放在信号量申请成功之后,可以提高并发度

 

为什么这么说,信号量在加锁之前就好比,没进电影院之前就已经选好了座位,如果在加锁之后,那就如同进到电影院之后在选座位,而再选座位就又得排队买票。而且信号量本身就是原子操作

那既然阻塞队列也能实现生产消费者模型,那搞出来个循坏队列又有什么用?

环形队列的优缺点:

优点

  1. 空间利用率高:由于是环形结构,已使用的空间可以重复利用,不会像普通队列一样造成空间的浪费。
  2. 插入和删除速度快:由于是线性结构,环形队列的插入和删除操作通常很快,因为它们只涉及到头尾指针的移动。
  3. 固定大小的存储空间:可以避免内存泄漏等问题,因为不会动态地分配和回收内存。

缺点

  1. 需要额外的指针维护状态:增加了复杂度,需要维护队列头和队尾的指针。
  2. 存储空间可能未被充分利用:一旦队列满了,就需要覆盖队列头的元素,这可能导致存储空间没有被完全利用。
  3. 队列大小必须预先定义:难以动态调整大小,这在某些需要灵活内存使用的场景下可能是一个限制。

阻塞队列的优缺点:

优点

  1. 线程同步:阻塞队列可以很好地实现线程之间的同步,简化了生产者和消费者之间的数据传递和通信。
  2. 解耦合:作为生产者消费者模式的缓冲空间,阻塞队列降低了生产者和消费者之间的耦合性。
  3. 削峰填谷:由于阻塞队列的大小是有限的,它可以起到限制作用,平衡突发的流量高峰。

缺点

  1. 可能引发死锁:如果使用不当,比如生产者和消费者互相等待对方释放资源时,可能会发生死锁。
  2. 对性能的影响:线程的挂起和唤醒操作可能会对系统性能产生影响,尤其是在高并发场景下。
  3. 处理超时操作较复杂:在设置了超时时间的情况下,需要处理超时异常并进行相应的补偿或回滚操作,增加了编程复杂性。

每种数据结构都有其特定的使用场景和限制,开发者在选择时应根据具体需求和上下文来决定使用哪一种。

本篇我们学习了什么是生产消费者模型,基于两种数据结构,分别实现了生产消费者模型,

还掌握了一个线程同步神奇——信号量。这对于提高线程之间的并发度非常有用。再次理解了生产消费者模型为什么高效?总之生产消费者模型非常值得我们学习。

 


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/756756.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

stm32学习笔记---ADC模数转换器(代码部分)AD单通道/多通道

目录 第一个代码&#xff1a;AD单通道 ADC初始化步骤 ADC相关的库函数 RCC_ADCCLKConfig 三个初始化相关函数 ADC_Cmd ADC_DMACmd ADC_ITConfig 四个校准相关函数 ADC_SoftwareStartConvCmd ADC_GetSoftwareStartConvStatus ADC_GetFlagStatus ADC_RegularChannel…

探索 Electron:将 Web 技术带入桌面应用

Electron是一个开源的桌面应用程序开发框架&#xff0c;它允许开发者使用Web技术&#xff08;如 HTML、CSS 和 JavaScript&#xff09;构建跨平台的桌面应用程序&#xff0c;它的出现极大地简化了桌面应用程序的开发流程&#xff0c;让更多的开发者能够利用已有的 Web 开发技能…

iOS17系统适配

iOS17 新功能 文章目录 iOS17 新功能iOS17支持哪几款机型Xcode15新特性iOS17-开发适配指南 横屏待机 在iOS 17中&#xff0c;还带来了横屏待机功能&#xff0c;苹果将这个新功能命名为“Standby”模式&#xff0c;为 iPhone 带来了全新的玩法。iPhone启用之后&#xff0c;默认情…

文件加密|电脑文件夹怎么设置密码?5个文件加密软件,新手必看!

电脑文件夹怎么设置密码&#xff1f;您是否希望更好地在电脑上保护您的个人或敏感文件&#xff1f;设置电脑文件夹密码是一种简单而有效的方式来确保你的隐私不被侵犯。通过使用文件加密软件&#xff0c;您可以轻松地为您的文件和文件夹设置密码保护。因此&#xff0c;本文将介…

快速应用开发(RAD):加速软件开发的关键方法

目录 前言1. 快速应用开发的概念1.1 什么是快速应用开发&#xff1f;1.2 RAD与传统开发方法的对比 2. 快速应用开发的实施步骤2.1 需求分析与规划2.2 快速原型开发2.3 用户评估与反馈2.4 迭代开发与改进2.5 最终交付与维护 3. 快速应用开发的优点与应用场景3.1 优点3.2 应用场景…

Python逻辑控制语句 之 判断语句--if elif else 结构(多重判断)

1.if elif else 的介绍 # if elif else 如果 ... 如果 ... 否则 .... # 多个如果之间存在关系 应用场景&#xff1a;在判断条件时, 需要判断 多个条件, 并且对应不同条件要执行 不同的代码 2.if elif else 的语法 if 判断条件1: 判断条件1成立&#xff0c;执行的代码 elif 判…

React@16.x(44)路由v5.x(9)源码(1)- path-to-regexp

目录 1&#xff0c;作用2&#xff0c;实现获取 match 对象2.1&#xff0c;match 对象的内容2.2&#xff0c;注意点2.3&#xff0c;实现 1&#xff0c;作用 之前在介绍 2.3 match 对象 时&#xff0c;提到了 react-router 使用第3方库 path-to-regexp 来匹配路径正则。 我们也…

【漏洞复现】科立讯通信有限公司指挥调度管理平台uploadgps.php存在SQL注入

0x01 产品简介 科立讯通信指挥调度管理平台是一个专门针对通信行业的管理平台。该产品旨在提供高效的指挥调度和管理解决方案&#xff0c;以帮助通信运营商或相关机构实现更好的运营效率和服务质量。该平台提供强大的指挥调度功能&#xff0c;可以实时监控和管理通信网络设备、…

【Android面试八股文】请描述一下Service的生命周期是什么样的?

文章目录 一、Service的生命周期是什么样的?1.1 通过 `startService` 启动的 Service 生命周期:1.1.1 相关方法说明1.1.2 流程1.1.3 总结1.2 通过 bindService 启动的 Service 生命周期1.2.1 相关方法说明1.2.2 流程1.3 生命周期调用1.4 总结一、Service的生命周期是什么样的…

20240629在飞凌开发板OK3588-C上使用Rockchip原厂的SDK跑通I2C扩展GPIO芯片TCA6424ARGJRR

20240629在飞凌开发板OK3588-C上使用Rockchip原厂的SDK跑通I2C扩展GPIO芯片TCA6424ARGJRR 2024/6/29 18:02 1、替换DTS了&#xff1a; Z:\repo_RK3588_Buildroot20240508\kernel\arch\arm64\boot\dts\rockchip viewproviewpro-ThinkBook-16-G5-IRH:~/repo_RK3588_Buildroot2024…

Unity WebGL项目问题记录

一、资源优化 可通过转换工具配套提供的资源优化工具&#xff0c;将游戏内纹理资源针对webgl导出做优化。 工具入口&#xff1a; 工具介绍 Texture 搜索规则介绍 已开启MipMap: 搜索已开启了MipMap的纹理。 NPOT: 搜索非POT图片。 isReadable: 搜索已开启readable纹理。 …

【机器学习】大模型训练的深入探讨——Fine-tuning技术阐述与Dify平台介绍

目录 引言 Fine-tuning技术的原理阐 预训练模型 迁移学习 模型初始化 模型微调 超参数调整 任务设计 数学模型公式 Dify平台介绍 Dify部署 创建AI 接入大模型api 选择知识库 个人主页链接&#xff1a;东洛的克莱斯韦克-CSDN博客 引言 Fine-tuning技术允许用户根…

Day 48 消息队列集群RabbitMQ

消息队列集群-RabbitMQ 一、消息中间件 中间件 tomcat java web中间件 web容器 mysql php php mysql uwsgi python mysql mycat 数据库中间件 rabbitMQ 消息中间件 1、简介 MQ 全称为&#xff08;Message Queue消息队列&#xff09;。是一种应用程序对应用程序的通信方…

Python之父推荐!Star 60k!这本 CPython 书把内部实现全讲透了!

都说 Python 是人工智能的“天选”语言&#xff0c;为什么呢&#xff1f; 可能很多读者都知道&#xff0c;Python 的解释器是用 C 语言写的&#xff0c;所以其实我们在谈论 “Python” 的时候&#xff0c;99.9% 的情况说的就是 “CPython”&#xff01; CPython 是目前最流行的…

OpenAI推出自我改进AI- CriticGPT

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

学习gateway网关路由时遇到的问题

遇到这个问题先别慌&#xff0c;我们首先要检查是哪里出问题了&#xff0c;从报错信息中我们可以看到&#xff0c;他说 Unable to find GatewayFilterFactory with name -AddRequestHeader 找不到这个路由过滤器&#xff0c;所以导致网关设置失败&#xff0c;从这条信息上我…

myCrayon个人博客项目基于springBoot+Vue全栈开发

目录 项目介绍 简介 项目架构 项目模块组成 数据库设计 项目展示 首页 用户登录与注册 个人信息模块 商城展示 博客模块 博客浏览 博客发布与编辑 博客搜索 社区模块 新闻模块 后台管理系统 部署方式 结语 项目介绍 简介 项目类似于CSDN&#xff0c;支持所…

【反者道之动,弱者道之用】统计学中的哲理——回归均值 Regression to the mean

&#x1f4a1;&#x1f4a1;在统计学中&#xff0c;回归均值(Regression toward the Mean/Regression to the Mean) 指的是如果变量在其第一次测量时是极端的&#xff0c;则在第二次测量时会趋向于接近平均值的现象。   在金融学中&#xff0c; 回归均值是指股票价格无论高于…

基于Java毕业生生活用品出售网站的设计和实现(源码+LW+调试文档+讲解等)

&#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN作者、博客专家、全栈领域优质创作者&#xff0c;博客之星、平台优质作者、专注于Java、小程序技术领域和毕业项目实战✌&#x1f497; &#x1f31f;文末获取源码数据库&#x1f31f; 感兴趣的可以先收藏起来&#xff0c;…

个人搭建cppreference网站

近日,由于购买的腾讯云服务器要过期了,之前在服务器搭建的cppreference也要重新搭建,故写下此文章 cppreference的访问速度也慢,故自己WSL子系统简单搭键一下是个不错的选择 环境准备 首先,自己先安装Nginx,在网上找安装教程即可下载cppreference网站资源包:https://pan.baidu…