零基础Linux_24(多线程)线程同步+条件变量+生产者消费模型_阻塞队列版

目录

1. 线程同步和生产者消费者模型

1.1 生产者消费者模型的概念

1.2 线程同步的概念

1.3 生产者消费者模型的优点

2. 线程同步的应用

2.1 条件变量的概念

2.2 条件变量操作接口

3. 生产者消费者模型_阻塞队列

3.1 前期代码(轮廓)

3.2 中期代码(简单使用)

BlockQueue.hpp:

3.3 生产者消费者模型高效的体现

3.4 后期代码(处理任务)

Task.hpp

ProdCon.cc

3.5 最终代码(RAII风格的锁)

lockGuard.hpp

BlockQueue.hpp

本篇完。


1. 线程同步和生产者消费者模型

1.1 生产者消费者模型的概念

以生活中消费者生产者为例:

生活中,我们大部分人都扮演着消费者的角色,会经常在超市买东西,比如买方便面,而超市的方便面是由供应商生成的。此时我们就是消费者,供应商就是生产者,而超市就是一个交易场所。

  • 将读取数据的线程叫做消费者线程
  • 将产生数据的线程叫做生产者线程
  • 将共享的特定数据结构叫做缓冲区

超市的供应商肯定不止一家,即使同一种商品的供应上也不止一家,不同牌子方便面的生产者它们之间的关系是竞争关系,竞争的表现就是互斥。

站在超市的角度,假设只有一块区域是买方便面的,当生产者来供货的时候,这块区域只能让一家供应商来供货,否则就会导致不同的东西混着放,对消费者来说很不友好。

  • 生产者线程和生产者线程之间是互斥关系
  • 在同一时间只能有一个生产者线程来访问缓冲区。

假设现在超市只有一包方便面了,但是同时来了好多消费者都要买方便面,此时这些消费者之间的关系也是竞争关系,我买上你就买不上了。所以当只有一包方便面的时候,只能有一个买方便面的消费者进入超市。

  • 消费者线程和消费者线程之间是互斥关系
  • 在同一时间只能由一个消费者线程来访问缓冲区。

再假设,超市的方便面卖完了,生产者正在给超市供货,而消费者也正在买方便面,那消费者到底买没买到方便面?有可能生产者刚把方便面搬下来,还没来及摆上去,那么消费者就没有买到,也由可能生产者把方便面摆上去了,那么消费者就买到了。所以最好的办法就是生产者供货的时候,不让消费者进来。

在Linux中,缓冲区存放的都是数据,数据是可以覆盖的,比如消费者线程在读取缓冲区中的数据时,数据是"hello world",刚刚读取完"hello",生产者线程把"world"改成了"rtx",那么消费者线程读取到的就成了"hello rtx",就出错了。所以最好的办法就是当消费者线程访问缓冲区的时候,生产者线程不能访问缓冲区。

  • 消费者线程和生产者线程之间是互斥关系
  • 在同一时间内只有一个线程可以访问缓冲区。

1.2 线程同步的概念

保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。(在计算机操作系统中,饥饿(Starvation)是指某些进程或线程由于资源分配不公平或调度策略不合理而无法获得所需资源或执行时间的现象。当某个进程或线程长时间无法满足其资源需求时,就会出现饥饿问题。)

就像上篇博客中抢票的互斥代码,在每个线程抢完票以后没有进行延时代替其他处理动作时,所有票都被一个线程抢了,其他线程没有机会抢。

由于竞争能力弱而缺乏调度的线程就处于饥饿状态。

而同步就是让所有线程按照一定顺序来抢票,做到尽量公平,避免线程饥饿问题产生。具体如何实现后面会详细讲解。

继续拿超市来说,生产者不能无休止的向超市中供货,否则消费者无法进来消费,最终方便面会放不下。同样,消费者也不能无休止来买方便面,否则生产者进不来,方便面就会卖完,而且没有人来供货。所以最好的办法就是生产者供货,当货架摆满了就不供货了,让消费者来买,当方便面卖完了再让生产者来供货,从而让消费者和生产者协同起来。

  • 消费者线程和生产者线程之间又是同步关系
  • 生产者线程和消费者线程按照一定顺序去访问缓冲区。

根据上面例子和分析,对于生产者消费模型的本质可以总结为123原则(非官方版)

  • 1个交易场所:一段特定结构的缓冲区(上面例子就是超市)
  • 2种角色:生产者和消费者(上面例子就是客人和供货商)
  • 3种关系:生产者和生产者(互斥关系),消费者和消费者(互斥关系),生产者和消费者(互斥+同步关系)。

1.3 生产者消费者模型的优点

有了超市这个交易场所,生产者只要给超市供大量的货即可,比如几万包方便面,不用关心是消费者什么时候来买,只需要专注自己的生成即可。

对于消费者而言,只需要直接去超市买方便面就行,也不用等待方便面的生产。

超市只需要做的就是方便面卖完了,告诉生产者来上货,然后告诉消费者来买。消费者和生产者完全独立,不存在任何交集。

生产者消费者模型实现了消费者线程和生产者线程之间的解耦。(低耦合,高内聚)

我们平时写的C/C++代码,如果将main函数看成是生产者,普通函数看出是消费者,那么它两就存在高度耦合。

比如main函数里调用func函数:当执行func函数的时候,main函数在等待,只有func执行完毕以后main函数才能继续执行下去。如果将这两个函数看出两个执行流,那么它们就存在高度耦合。

而生产者消费者模型就成功的让生产者执行流和消费者执行流解耦了,生产者只管向缓冲区生产数据,消费者只管从缓冲区消耗数据,不用关心对方的状态。

再比如大部分人在周一到周五上班,在周六日休息,上班时候时间比较少,去超市消费的人也比较少,由于消费者和生产者互斥,所以就可以让生产者在周一到周五的时候来上货。

当周六日消费者休息的时候,去超市消费的人就比较多,方便面也卖的比较快,但是由于生产者供货量足够,所以并不会因为买的人多了就不够了的情况。

生产者消费者模型有助于解决生产者线程和消费者线程忙闲不均的问题。因为缓冲区能够缓存一定量的数据。

再比如我们买东西肯定不会直接去找供应商,因为人家不零售,因为生产者如果零售的话,每次开机器就仅生成几包方便面,成本高,效率低。

对于消费者而言,直接去找生产者还需等待生成者完成商品生成,消耗时间成本高,效率也低。

生产者消费者模型提高了了生产者线程和消费者线程的执行效率。


2. 线程同步的应用

同步是为了让多线程按照一定顺序互斥访问临界资源,在上面的生产者消费者模型中,如何实现同步呢?就要涉及下面的条件变量。

2.1 条件变量的概念

条件变量:用来描述某种临界资源是否就绪的一种数据化描述

当一个线程互斥地访问某个临界资源时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如,存在一个共享的队列,生产者线程负责生产数据到队列中,消费者线程负责从队列中读取数据,当消费者线程发现队列为空时就不要再去竞争锁去访问了,而是应该等待,直到生产者线程将数据生成到队列中。

要想让消费者线程等待,就需要使用到条件变量。

那么条件变量是什么呢?继续拿超市举例:假设现在超出的架子上一次只放一包方便面,只有这包方便面被人买走了,才会放上新的方便面。

此时来了一堆消费者消费者都要买方便面,因为只有一包,所以只能去竞争了,那些竞争能力强的才能买上方便面,甚至不停的抢不停的买,此时那些竞争能力弱的消费者就会始终都买不到方便面。

竞争能力弱的消费者就会始终抢不到锁,就会产生饥饿问题。

为了维持秩序,超市的工作人员设置了一个等待区,所有消费者都在这里排队购买,方便面被摆出来了,工作人员让一个消费者进去买,没有摆出来就等着。如果消费者想买两包甚至多包,只能重新排队。

等待区及工作人员就相当于条件变量

多线程互斥访问一个临界资源时,为了让这些线程按照一定顺序访问,将这些线程都放在条件变量的等待队列中,当另一个线程让条件变量符合条件(唤醒线程)时,队列中的第一个线程就去访问临界资源。


2.2 条件变量操作接口

条件变量同样是由POSIX线程库维护的,所以使用的是POSIX标准,和互斥锁的接口非常相似。

创建条件变量:

pthread_cond_t cond;
  • 同样要加pthread_。同样是类似int a;
  • cond是英文condition的缩写。

条件变量的初始化,释放:man pthread_cond_init:

使用类似互斥锁,只是传递的参数是创建好的条件变量。

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

cond:要初始化的条件变量

attr:nullptr

返回值:成功返回0,失败返回错误码

man pthread_cond_wait:

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
  • 第一个参数cond:创建的条件变量地址。
  • 第二个参数mutex:互斥锁的地址,这个必须有,后面再讲解为什么必须传锁。
  • 返回值:放入条件变量的等待队列成功返回0,失败返回错误码。

pthread_cond_wait的作用:调用该接口的线程放入传入条件变量的等待队列中。

唤醒条件变量等待队列中的一个线程:

man pthread_cond_signal:

int pthread_cond_signal(pthread_cond_t *cond);
  • 参数cond:所在等待队列的条件变量地址
  • 返回值:唤醒成功返回0,失败返回错误码

pthread_cond_signal作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程。

唤醒条件变量等待队列中的所有线程:

int pthread_cond_broadcast(pthread_cond_t *cond);
  • 参数:所在等待队列的条件变量地址
  • 返回值:唤醒成功返回0,失败返回错误码

pthread_cond_broadcast作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程。

将条件变量用到上一篇抢票的代码中,实现多线程按照一定顺序互斥抢票:

  •  创建全局的条件变量(后面就不创建成全局的了)。
  •  每个线程一抢上锁以后就进入条件变量的等待队列。
  •  主线程每个一秒钟唤醒一个等待的线程进行抢票。
#include <iostream>
#include <cstdio>
#include <cerrno>
#include <cstring>
#include <cassert>
#include <thread>
#include <unistd.h>
#include <pthread.h>
using namespace std;

#define THREAD_NUM 4
int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题 -> 临界资源
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 创建条件变量

class ThreadData
{
public:
    ThreadData(const std::string &n,pthread_mutex_t *pm):tname(n), pmtx(pm)
    {}
public:
    std::string tname;
    pthread_mutex_t *pmtx;
};

void *getTickets(void *args)
{
    ThreadData *td = (ThreadData*)args;
    while(true) // 抢票逻辑
    {
        int n = pthread_mutex_lock(td->pmtx); // 加锁
        assert(n == 0);

        pthread_cond_wait(&cond, td->pmtx); // 进入等待队列
        // 临界区
        if(tickets > 0) // 判断的本质也是计算的一种
        {
            usleep(rand()%1500);
            printf("%s: %d\n", td->tname.c_str(), tickets);
            tickets--; // 也可能出现问题
            n = pthread_mutex_unlock(td->pmtx); // 解锁
            assert(n == 0);
        }
        else
        {
            n = pthread_mutex_unlock(td->pmtx);  // break之前解锁
            assert(n == 0);
            break;
        }
        
        usleep(rand()%2000); // 抢完票,其实还需要后续的动作
    }
    delete td;
    return nullptr;
}

int main()
{
    time_t start = time(nullptr);
    pthread_mutex_t mtx;
    pthread_mutex_init(&mtx, nullptr);

    pthread_t t[THREAD_NUM];

    for(int i = 0; i < THREAD_NUM; i++) // 多线程抢票的逻辑
    {
        std::string name = "thread ";
        name += std::to_string(i+1);
        ThreadData *td = new ThreadData(name, &mtx);
        pthread_create(t + i, nullptr, getTickets, (void*)td);
    }

    while(true)
    {
        sleep(1);
        pthread_cond_signal(&cond); // 唤醒一个等待线程
        cout << "main thread wakeup a new thread" << endl;
    }

    for(int i = 0; i < THREAD_NUM; i++)
    {
        pthread_join(t[i], nullptr);
    }

    pthread_mutex_destroy(&mtx);
    return 0;
}

此时就按照4 1 2 3 的顺序抢票了。

  • 每个线程都会被先挂起到等待队列中,等待主线程的唤醒。
  • 唤醒一个线程抢完票以后会继续进入等待队列,并且排在队列的后面。

如果不使用同步,就可能会只有一个线程在抢票,其他线程就会处于饥饿状态。

再放一份代码:(条件变量不再是全局的,还加了函数指针的方法让新线程执行不同的任务)

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>

#define TNUM 4
typedef void (*func_t)(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;

// pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 可以定义成全局的,这样很多地方不用传参了,但是这里写正式点
// pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

class ThreadData
{
public:
    ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
        : name_(name), func_(func), pmtx_(pmtx), pcond_(pcond)
    {
    }

public:
    std::string name_;
    func_t func_;
    pthread_mutex_t *pmtx_;
    pthread_cond_t *pcond_;
};

void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while (!quit)
    {
        // wait一定要在加锁和解锁之间进行wait
        pthread_mutex_lock(pmtx);
        // if(临界资源是否就绪 -> 没就绪) pthread_cond_wait 但是现在没有这样的判断
        if (!quit) // 这个if加不加不重要
        {
            pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        }
        std::cout << name << " running  --  a播放" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}
void func2(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while (!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running  -- b下载" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}
void func3(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while (!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running  -- c刷新" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}
void func4(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{
    while (!quit)
    {
        pthread_mutex_lock(pmtx);
        pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞
        std::cout << name << " running  -- d扫描" << std::endl;
        pthread_mutex_unlock(pmtx);
    }
}

void *Entry(void *args) // 添加了一个类似软件层的东西,只是演示下
{
    ThreadData *td = (ThreadData *)args;         // td在每一个线程自己私有的栈空间中保存
    td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回
    delete td;
    return nullptr;
}

int main()
{
    pthread_mutex_t mtx;
    pthread_cond_t cond;
    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t tids[TNUM];
    func_t funcs[TNUM] = {func1, func2, func3, func4};
    for (int i = 0; i < TNUM; i++)
    {
        std::string name = "Thread ";
        name += std::to_string(i + 1);
        ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);
        pthread_create(tids + i, nullptr, Entry, (void *)td);
    }

    sleep(5); // 主线程和所有新线程都休眠五秒 -> 新线程阻塞等待

    // ctrl new thread
    int cnt = 10;
    while (cnt)
    {
        std::cout << "resume thread run code ...." << cnt-- << std::endl;
        pthread_cond_signal(&cond);
        sleep(1); // 每隔一秒唤醒一个线程,线程不用自己休眠了

        // pthread_cond_broadcast(&cond); // 全部唤醒 -> 设置条件变量有效
    }

    std::cout << "ctrl done" << std::endl; // 控制结束
    quit = true;
    pthread_cond_broadcast(&cond); // quit = true;后再全部唤醒一次

    for (int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        std::cout << "thread: " << tids[i] << "quit" << std::endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

这个代码只是演示一下不同的使用方法和场景、


3. 生产者消费者模型_阻塞队列

上图所示就是要实现的模型,有一个生产者线程,一个消费者线程,还有一个阻塞队列。

  • 阻塞队列这里使用C++STL容器中的queue来实现。
  • 阻塞队列是公共资源,所以要保证它的安全,线程A和线程B要互斥访问,只需要一把锁就能实现生产者和消费者,生产者和生产者,消费者和消费者之间的互斥。
  • 阻塞队列中有数据消费者才能读,此时生产者不能进行生产,生产者线程要进入它的等待队列中。
  • 阻塞队列中没有数据或者不满时,生产者才能进行生产,消费者在生产的时候不能读,要进入它的等待队列。

3.1 前期代码(轮廓)

先敲个轮廓出来就是这样的:(代码具体就不讲解了,看注释就行)

Makefile:

ProdCon:ProdCon.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f ProdCon

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>

const int gDefaultCap = 7;

template <class T>
class BlockQueue
{
public:
    BlockQueue(int capacity = gDefaultCap) 
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }
    void push(const T& in)
    {
    }
    void pop(T* out)
    {
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

protected:
    std::queue<T> _bq;     // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 保证队列安全
    pthread_cond_t _Empty; // 表示bq 是否空的条件
    pthread_cond_t _Full;  // 表示bq 是否满的条件
};

ProdCon.cc:(Producer consumer model)

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

void* consumer(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;
    while(true) // 获取任务
    {
        sleep(1);
    }

    return nullptr;
}

void* productor(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;
    while(true) // 制作任务
    {
        sleep(1);
    }

    return nullptr;
}

int main()
{
    BlockQueue<int> *bqueue = new BlockQueue<int>();

    pthread_t c[2],p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, productor, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;
    return 0;
}

3.2 中期代码(简单使用)

然后现在来写一写关键的pop和push:

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>

const int gDefaultCap = 7;

template <class T>
class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = gDefaultCap) : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }
    void push(const T& in) // 生产者
    {
        pthread_mutex_lock(&_mtx);

        // pthread_cond_wait: 只要是一个函数,就可能调用失败,可能存在 伪唤醒 的情况,所以用while
        while(isQueueFull()) //1. 先检测当前的临界资源是否能够满足访问条件
        {
            pthread_cond_wait(&_Full, &_mtx); // 满的时候就在_Full这个条件变量下等待
            // 此时思考:我们是在临界区中,我是持有锁的,如果我去等待了,锁该怎么办呢?
            // 所以pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放
            // 当我被唤醒时,我从哪里醒来呢?->从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的
            // 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁
        }

        _bq.push(in); // 2. 队列不为空或者被唤醒 -> 访问临界资源,100%确定,资源是就绪的

        pthread_cond_signal(&_Empty); // 唤醒
        pthread_mutex_unlock(&_mtx); // 解锁
    }

    void pop(T* out)
    {
        pthread_mutex_lock(&_mtx);
        while (isQueueEmpty())
        {
            pthread_cond_wait(&_Empty, &_mtx);
        }

        *out = _bq.front(); // 访问临界资源
        _bq.pop();

        pthread_cond_signal(&_Full); // 唤醒
        pthread_mutex_unlock(&_mtx); // 解锁
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

protected:
    std::queue<T> _bq;     // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全
    pthread_cond_t _Empty; // 用它来表示bq 是否空的条件
    pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

ProdCon.cc:

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>

void* consumer(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;
    while(true) // 获取任务
    {
        int a = 0;
        bqueue->pop(&a);
        std::cout << "消费一个数据" << a << std::endl;
        sleep(1);
    }
    return nullptr;
}

void* productor(void *args)
{
    BlockQueue<int> *bqueue = (BlockQueue<int> *)args;
    int a = 1;
    while(true) // 制作任务
    {
        bqueue->push(a);
        std::cout << "生产一个数据" << a++ << std::endl;
        // sleep(1); // 两个换着sleep看看能不能看到约束的效果
    }
    return nullptr;
}

int main()
{
    BlockQueue<int> *bqueue = new BlockQueue<int>();

    pthread_t c[2],p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, productor, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;
    return 0;
}

编译运行试试效果:

此时就实现了生产者生产一批数据,然后两个线程一人消费一个,然后再生成,再消费。

把代码里生产者和消费者的sleep注释换一下:(此时就应该是消费者跟着生产者走了)

编译运行:

也成功达到了预想的效果。

如果不想类似生产一个消费一个的话还可以在生产者或者消费者里的唤醒线程前加if判断语句,比如:在生产者线程里:if(_bq.size() >= _capacity / 2) pthread_cond_signal(&_Empty);


3.3 生产者消费者模型高效的体现

前面在分析生产者消费者模型时,一直都在说该模型高效,那么到底体现在什么地方呢?

多个生产者线程向阻塞队列中生成数据,多个消费者线程从阻塞队列中消费数据。

该模型的三种关系决定了访问阻塞队列的线程同一时间只有一个。

尤其是上面代码现象中,消费和生产是一前一后的,对于阻塞队列的访问是串行的,凭什么说这个模型是高效的呢?

因为在生产者线程和消费者线程中,访问阻塞队列临界资源的代码都只有一条,只有临界区的代码才是串行访问的。

除了临界区的代码,其他部分代码所有线程都是并发执行的。

实际的线程中,临界区之外的代码会有很多,而且有可能会非常耗时,但是这些代码是可以多线程并发执行的,该模型的效率就会很高。

生产者消费者模型的高效体现在:非临界区的代码,多线程可以并发执行。

该模型的高效并不体现在对临界资源(阻塞队列)的访问上。


3.4 后期代码(处理任务)

生产者消费者模型实际上并不是仅仅用来生产消费整型数据的,它更多的是处理任务的。

这里创建一个计算任务的类Task(这里写在了一个头文件下,只弄了加法)。在类中的仿函数调用回调函数执行具体的计算逻辑,还有一个显示任务的接口。把传给阻塞队列的int传成Task

BlockQueue.hpp和前面一样

Task.hpp

#pragma once
#include <iostream>
#include <functional>
typedef std::function<int(int, int)> func_t;
class Task
{
public:
    Task() 
    {}
    Task(int x, int y, func_t func)
        : _x(x)
        , _y(y)
        , _func(func)
    {}

    int operator()()
    {
        return _func(_x, _y);
    }

public: // 不想写get接口就直接弄公有了
    int _x;
    int _y;
    // int type;
    func_t _func;
};

ProdCon.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>

inline int myAdd(int x, int y)
{
    return x + y;
}
void* consumer(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    while(true) // 获取任务
    {
        Task t;
        bqueue->pop(&t);
        std::cout << pthread_self() <<" consumer: "<< t._x << " + " << t._y << " = " << t() << std::endl;
        // sleep(1);
    }
    return nullptr;
}

void* productor(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    int a = 1;
    while(true) // 制作任务
    {
        int x = rand() % 10 + 1;
        usleep(rand() % 1000);
        int y = rand() % 5 + 1;
        Task t(x, y, myAdd);
        bqueue->push(t);
        std::cout <<pthread_self() <<" productor: "<< t._x << " + " << t._y << " = ?" << std::endl;
        sleep(1); // 两个换着sleep看看能不能看到约束的效果
    }
    return nullptr;
}

int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ 0x777); // 只是为了更随机
    BlockQueue<Task> *bqueue = new BlockQueue<Task>();

    pthread_t c[2],p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, productor, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;
    return 0;
}

编译运行:


3.5 最终代码(RAII风格的锁)

ProdCon.cc和Task.hpp跟前面一样,

lockGuard.hpp

#pragma once
#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t* mtx) 
        :_pmtx(mtx)
    {}
    void lock()
    {
        pthread_mutex_lock(_pmtx);
        std::cout << "进行加锁成功" << std::endl;
    }
    void unlock()
    {
        pthread_mutex_unlock(_pmtx);
        std::cout << "进行解锁成功" << std::endl;
    }
    ~Mutex()
    {}
protected:
    pthread_mutex_t* _pmtx;
};

class lockGuard // RAII风格的加锁方式
{
public:
    lockGuard(pthread_mutex_t* mtx) // 因为不是全局的锁,所以传进来,初始化
        :_mtx(mtx)
    {
        _mtx.lock();
    }
    ~lockGuard()
    {
        _mtx.unlock();
    }
protected:
    Mutex _mtx;
};

BlockQueue.hpp

(只是加锁方式变了,不用自己解锁了)

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
const int gDefaultCap = 7;

template <class T>
class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return _bq.size() == 0;
    }
    bool isQueueFull()
    {
        return _bq.size() == _capacity;
    }

public:
    BlockQueue(int capacity = gDefaultCap) 
        : _capacity(capacity)
    {
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Empty, nullptr);
        pthread_cond_init(&_Full, nullptr);
    }
    void push(const T& in) // 生产者
    {
        lockGuard lockgrard(&_mtx); // 自动调用构造函数
        //pthread_mutex_lock(&_mtx);

        // pthread_cond_wait: 只要是一个函数,就可能调用失败,可能存在 伪唤醒 的情况,所以用while
        while(isQueueFull()) //1. 先检测当前的临界资源是否能够满足访问条件
        {
            pthread_cond_wait(&_Full, &_mtx); // 满的时候就在_Full这个条件变量下等待
            // 此时思考:我们是在临界区中,我是持有锁的,如果我去等待了,锁该怎么办呢?
            // 所以pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放
            // 当我被唤醒时,我从哪里醒来呢?->从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的
            // 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁
        }

        _bq.push(in); // 2. 队列不为空或者被唤醒 -> 访问临界资源,100%确定,资源是就绪的

        pthread_cond_signal(&_Empty); // 唤醒
        // pthread_mutex_unlock(&_mtx); // 解锁
    } // 出了代码块自动调用析构函数

    void pop(T* out)
    {
        lockGuard lockgrard(&_mtx); // 自动调用构造函数
        // pthread_mutex_lock(&_mtx);

        while (isQueueEmpty())
        {
            pthread_cond_wait(&_Empty, &_mtx);
        }

        *out = _bq.front(); // 访问临界资源
        _bq.pop();

        pthread_cond_signal(&_Full); // 唤醒
        // pthread_mutex_unlock(&_mtx); // 解锁
    } // 出了代码块自动调用析构函数
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Empty);
        pthread_cond_destroy(&_Full);
    }

protected:
    std::queue<T> _bq;     // 阻塞队列
    int _capacity;         // 容量上限
    pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全
    pthread_cond_t _Empty; // 用它来表示bq 是否空的条件
    pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

成功运行。


本篇完。

下一篇:零基础Linux_25(多线程)信号量+自选锁+读写锁(基于环形队列的生产者消费模型)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/109893.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Go学习第十五章——Gin参数绑定bind与验证器

Go web框架——Gin&#xff08;参数绑定bind与验证器&#xff09; 1 bind参数绑定1.1 JSON参数1.2 Query参数1.3 Uri绑定动态参数1.4 ShouldBind自动绑定 2 验证器2.1 常用验证器2.2 gin内置验证器2.3 自定义验证的错误信息2.4 自定义验证器 1 bind参数绑定 在Gin框架中&#…

数据结构Demo——简单计算器

简单计算器 一、项目介绍二、技术使用三、具体代码实现1.前端部分2.后端部分 一、项目介绍 本项目实现了一个通过网页访问的简单计算器&#xff0c;它可以对带括号的加减乘除表达式进行计算并将计算结果返回给用户&#xff0c;并且可以对用户输入的表达式进行合法性判断&#…

Maven第二章:Maven基本概念与生命周期

Maven第二章&#xff1a;Maven基本概念与生命周期 前言 本章主要内容&#xff0c;介绍Maven基本概念&#xff0c;包括maven坐标含义&#xff0c;命名规则&#xff0c;继承与聚合、了解与理解生命周期&#xff0c;如何通过Maven进行依赖和版本管理。 什么是Maven的坐标&#xf…

【第25例】IPD体系进阶:需求分析团队RAT

目录 简介 RAT CSDN学院相关内容推荐 作者简介 简介 RAT是英文Requirement Analysis Team英文首字母的简称,也即需求分析团队,每个产品线都需要设定对应的一个RAT的组织。 RAT主要负责产品领域内需求的分析活动,是RMT的支撑团队: 这个时候可以将RAT细化为PL-RAT团队,…

Ansible的安装和部署

目录 1.Ansible的安装 2.构建Ansible清单 直接书写受管主机名或ip 设定受管主机的组[组名称] 主机规格的范围化操作 指定其他清单文件 ansible命令指定清单的正则表达式 3.Ansible配置文件参数详解 配置文件的分类与优先级 常用配置参数 4.构建用户级Ansible操作环…

Mysql数据库 4.SQL语言 DQL数据查询语言 查询

DQL数据查询语言 从数据表中提取满足特定条件的记录 1.单表查询 2.多表查询 查询基础语法 select 关键字后指定要查询到的记录的哪些列 语法&#xff1a;select 列名&#xff08;字段名&#xff09;/某几列/全部列 from 表名 [具体条件]&#xff1b; select colnumName…

linux进程概念

文章目录 1、冯诺依曼体系结构2、操作系统(Operator System)2.1、概念2.2、设计OS的目的2.3、定位2.4、如何理解 "管理"2.5、总结 3、系统调用和库函数概念4、进程4.1、基本概念4.2、描述进程-PCB4.2.1、task_struct-PCB的一种4.2.2、task_ struct内容分类 4.3、组织…

【Linux】第六站:Centos系统如何安装软件?

文章目录 1.Linux安装软件的方式2.Linux的软件生态3. yum4. rzsz软件的安装与卸载5.yum如何知道去哪里下载软件&#xff1f; 1.Linux安装软件的方式 在linux中安装软件常用的有三种方式 源代码安装&#xff08;我们还需要进行编译运行后才可以&#xff0c;很麻烦&#xff09; …

H5游戏源码分享-跳得更高

H5游戏源码分享-跳得更高 控制跳动踩到云朵上 <!DOCTYPE html> <html> <head><meta http-equiv"Content-Type" content"text/html; charsetUTF-8"><meta http-equiv"Content-Type" content"text/html;"&g…

基于SSM的养老院管理系统

基于SSM的养老院管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringSpringMVCMyBatisVUE工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 摘要 养老院管理系统是一个基于SSM&#xff08;Spring、Spring MVC、MyBatis&…

[量化投资-学习笔记002]Python+TDengine从零开始搭建量化分析平台-MA均线的多种实现方式

MA 均线时最基本的技术指标&#xff0c;也是最简单&#xff0c;最不常用的&#xff08;通常使用EMA、SMA&#xff09;。 以下用两种不同的计算方法和两种不同的画图方法进行展示和说明。 MA 均线指标公式 MA (N)(C1 C2 C3 …C N )/N目录 方式一1.SQL 直接查询均值2.使用 pyp…

java八股文(基础篇)

面向过程和面向对象的区别 面向过程&#xff1a;在解决问题时&#xff0c;特别自定义函数编写一步一步的步骤解决问题。 面向对象&#xff1a;其特点就是 继承&#xff0c;多态&#xff0c;继承&#xff0c;在解决问题时&#xff0c;不再注重函数的编写&#xff0c;而在于注重…

这么理解矩阵乘法,让你吊打面试官

大家好啊&#xff0c;我是董董灿。 很多与深度学习算法相关的面试&#xff0c;面试官可能都会问一个问题&#xff0c;那就是你是如何理解矩阵乘算法的。 更有甚者&#xff0c;会让你当场手写矩阵乘算法&#xff0c;然后问细节&#xff0c;问如何优化&#xff0c;面试现场&…

治疗红斑性肢痛症的【Chromocell】申请870万美元纳斯达克IPO上市

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;总部位于美国的生物制药公司Chromocell Therapeutics Corporation&#xff08;简称&#xff1a;Chromocell&#xff09;近期已向美国证券交易委员会&#xff08;SEC&#xff09;提交招股书&#x…

VM搭建虚拟机2(自定义安装)

文章目录 自定义安装选择你的centos下载目录设置用户名密码自定义安装目录注意&#xff0c;尽量别再同一位置安装虚拟机设置处理器数量内存根据所需配置&#xff08;默认1G&#xff09;NAT按需设置磁盘大小点击完成即可等待安装即可 VMware、centos、典型安装 自定义安装 选择你…

【机器学习(二) 线性代数基础I(Linear Algebra Foundations)】

机器学习&#xff08;二&#xff09; 线性代数基础I&#xff08;Linear Algebra Foundations) 这一节主要介绍一些线性代数的基础。 目录 机器学习&#xff08;二&#xff09; 线性代数基础I&#xff08;Linear Algebra Foundations)1. 向量 Vectors2. 复杂度 Complexity3.线…

【Linux】第七站:vim的使用以及配置

文章目录 一、vim1.vim的介绍2.vim基本使用3.vim的命令模式常用命令4.底行模式 二、vim的配置 一、vim 1.vim的介绍 vim编辑器&#xff0c;用来文本编写&#xff0c;可以写代码 它是一个多模式的编辑器 它有很多的模&#xff0c;不过我们暂时先只考虑这三种模式 命令模式插入模…

2023年阿里云双11有什么优惠活动?详细攻略来了!

随着双十一的临近&#xff0c;阿里云也正式开启了双11大促&#xff0c;推出了“金秋云创季”活动&#xff0c;那么&#xff0c;2023年阿里云双11的优惠活动究竟有哪些呢&#xff1f;本文将为大家详细介绍。 一、阿里云双11活动时间 1、2023年10月27日-2023年10月31日&#xff…

洛谷 B2009 计算 (a+b)/c 的值 C++代码

目录 题目描述 AC Code 切记 题目描述 题目网址&#xff1a;计算 (ab)/c 的值 - 洛谷 AC Code #include<bits/stdc.h> using namespace std; int main() {int a,b,c;cin>>a>>b>>c;cout<<(ab)/c<<endl;return 0; } 切记 不要复制题…

[论文阅读]Ghost-free High Dynamic Range Imaging with Context-aware Transformer

Ghost-free HDRI with Context-aware Transformer 背景介绍已有算法本文算法实验对比 背景介绍 高动态范围成像&#xff08;HDR&#xff09;是一种图像技术&#xff0c;它能够捕捉到比传统图像更广泛的亮度范围。1997年&#xff0c;Paul Debevec在他的论文《Recovering High D…