文章目录
- 一、引言
- 二、生产者消费者模型概述
- 1、基本概念和核心思想
- 2、生产者消费者模型的优点
- 三、消费者和生产者之间的同步与互斥
- 四、代码实现
- 1、事前准备
- 2、环形队列的实现
- 3、阻塞队列的实现
- 4、两种实现方式的区别
一、引言
在现代计算机系统中,很多任务需要同时进行或者依赖于异步的处理方式。生产者消费者模型通过有效地管理和协调多个线程之间的数据流动,提供了一种可靠的并发编程解决方案。它不仅可以优化系统资源的利用,还能够避免竞争条件和死锁等问题的发生,从而提升程序的性能和可靠性。 同步以及互斥的概念,以及互斥锁,条件变量,信号量的使用
二、生产者消费者模型概述
生产者消费者模型是一种经典的并发编程模式,用于解决多线程环境下的数据交换与同步问题。它通过合理的线程协作机制,有效地管理共享的有限缓冲区,使得生产者和消费者之间能够安全、有效地进行数据交换。
并发数据的传递问题是指在多线程或多进程环境中,数据如何在这些并发单元之间安全、有效地传递。在并发编程中,数据的传递和共享是一个重要的挑战,因为多个线程或进程可能同时访问和修改同一份数据,这可能导致数据的不一致、冲突和错误。
1、基本概念和核心思想
生产者消费者模型包含三个核心组件:
- 生产者(Producer):负责生成数据或执行任务,并将其放入共享的缓冲区中。
- 生产者负责生成需要被消费者处理的数据或任务。这些数据可以是任何类型的信息,如计算结果、消息、事件等。
- 消费者(Consumer):从共享的缓冲区中获取数据或任务,并进行相应的处理。
- 共享缓冲区(Buffer):用于存放生产者生成的数据或任务,以便消费者能够访问和处理。
- 缓冲区作为生产者和消费者之间的中介,起着存储和传递数据的作用。在生产者消费者模型中,缓冲区通常是一个队列,具有以下特点:
- 作用:用于临时存储生产者生成的数据或任务,以便消费者能够按照一定的顺序或条件进行处理。
- 队列结构:可以是不同类型的队列,如有界队列(固定大小)或无界队列(动态增长)等,具体选择依据于应用的需求和性能考量。
- 缓冲区的设计直接影响到生产者和消费者之间的数据交换效率和系统的整体性能。因此,在实际应用中,需要根据具体的场景和需求来选择合适的队列结构和实现方式。
- 缓冲区作为生产者和消费者之间的中介,起着存储和传递数据的作用。在生产者消费者模型中,缓冲区通常是一个队列,具有以下特点:
生产者和消费者之间通过共享缓冲区进行通信,但是需要注意的是生产者和消费者的执行速度可能不同,因此必须确保在缓冲区为空或已满时进行适当的等待和唤醒操作,以避免资源竞争和死锁。
2、生产者消费者模型的优点
生产者消费者模式就是通过一个缓冲区来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过缓冲区来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给缓冲区,消费者不找生产者要数据,而是直接从缓冲区中取,平衡了生产者和消费者的处理能力。这个缓冲区就是用来给生产者和消费者解耦的。
- 解耦生产者和消费者:生产者和消费者之间的解耦使得它们可以独立地运行和扩展。生产者可以专注于生成任务,而消费者可以专注于处理任务,两者之间的同步和协调通过共享的数据缓冲区来实现。即:生产者和消费者不直接通信,而是通过缓冲区进行间接通信。生产者在生产完数据之后,将数据放入缓冲区,而不需要等待消费者立即处理。
- 增加系统的并发性:允许多个生产者和消费者同时运行,有效利用多核处理器和多线程环境,提高系统的效率。
- 解决生产者-消费者速度差异:生产者和消费者在速度上可能不一致,缓冲区的存在可以平衡二者之间的速度差异,避免资源浪费或阻塞。
- 缓冲区的作用:缓冲区充当了一个中介的角色,存储生产者生产的数据,供消费者需要时进行获取。它能够暂时存储数据,平衡生产者和消费者之间的速度差异,防止因为速度不一致而导致的资源浪费或者阻塞现象。
- 简化同步问题:通过使用同步机制(如信号量、互斥锁等),生产者消费者模型可以有效地管理和保护共享资源,避免数据竞争和不一致状态。
为什么生产者消费者模型能提供较好的并发度呢?
首先,让我们描述一下通常的生产者消费者模型中的并发场景:
- 生产者多线程并发获取任务:生产者线程负责生成任务,并互斥的将这些任务放入共享的数据缓冲区中。由于生产者线程是并发的,它们可以同时运行,从而提高了任务的生成速度。
- 并行性:生产者线程可以并行地运行,每个线程都可以独立地生成任务。这种并行性能够显著提高任务的生成速度,尤其是在任务生成逻辑较为复杂或者需要处理大量数据的情况下。
- 互斥访问:生产者线程在将任务放入共享数据缓冲区时,需要使用互斥锁或其他同步机制来确保数据的一致性和正确性。虽然互斥访问可能会引入一些开销,但它是必要的,以防止多个生产者线程同时写入缓冲区导致的数据混乱。
- 消费者多线程并发处理各自的任务:消费者线程互斥的从共享的数据缓冲区中取出任务并进行处理。同样,由于消费者线程是并发的,它们可以同时运行,从而提高了任务的处理速度。
- 并行处理:消费者线程也可以并行地运行,每个线程都可以独立地从共享数据缓冲区中取出任务并进行处理。这种并行处理能够充分利用多核CPU的资源,提高系统的整体处理能力。
- 负载均衡:当消费者线程数量与生产者线程数量相匹配时,可以实现负载均衡,即每个消费者线程都能获得相对均衡的任务量,从而提高系统的整体效率和吞吐量。
- 互斥访问:与生产者线程类似,消费者线程在从共享数据缓冲区中取出任务时也需要使用互斥锁或其他同步机制来确保数据的一致性和正确性。这种互斥访问同样会引入一些开销,但它是必要的,以防止多个消费者线程同时读取缓冲区导致的竞争条件。
现在,让我们解释为什么这种模式能提供较好的并发度:
-
并行处理:生产者和消费者线程可以同时运行,从而实现了并行处理。生产者线程在生成任务时不会阻塞消费者线程,消费者线程在处理任务时也不会阻塞生产者线程。这大大提高了系统的吞吐量。
-
任务缓冲:共享的数据缓冲区作为任务的中转站,使得生产者和消费者可以解耦。生产者可以将任务放入缓冲区中,而无需等待消费者立即处理。同样,消费者可以从缓冲区中取出任务进行处理,而无需等待生产者生成新的任务。这种解耦使得生产者和消费者可以独立地运行,互不干扰。
-
资源利用率:由于生产者和消费者线程是并发的,它们可以充分利用多核CPU的资源。生产者线程可以并行地生成任务,而消费者线程可以并行地处理任务,从而提高了系统的整体性能。
-
负载均衡:通过调整生产者和消费者线程的数量,可以实现系统的负载均衡。如果任务生成速度较快,可以增加消费者线程的数量来加快处理速度。反之,如果处理速度较快,可以减少消费者线程的数量以减少不必要的资源消耗。
三、消费者和生产者之间的同步与互斥
在生产者-消费者模型中,确保生产者和消费者之间正确的互斥和同步是至关重要的,以避免数据竞争、死锁和其他并发问题。让我们详细讨论如何实现这些关系和需求:
生产者和生产者之间 — 互斥
多个生产者竞争向共享缓冲区写入数据时,需要确保只有一个生产者能够访问缓冲区,以防止数据写入的冲突和覆盖。
当多个生产者线程试图同时向共享缓冲区写入数据时,可能会发生数据竞争和覆盖的情况。因此,需要使用某种形式的互斥机制来确保在任何时候只有一个生产者可以访问缓冲区。
不需要同步:因为生产者之间只需要确保互斥访问,不需要考虑数据的顺序或等待其他生产者。
消费者和消费者之间 — 互斥
多个消费者竞争从共享缓冲区读取数据时,需要确保只有一个消费者能够访问缓冲区,以避免读取到不一致或错误的数据。
如果多个消费者线程试图同时从共享缓冲区读取数据,也可能会导致数据不一致或其他并发问题。因此,需要使用互斥机制来确保在任何时候只有一个消费者可以访问缓冲区。
不需要同步:消费者之间通常不需要同步,因为每个消费者都是独立地读取数据,不需要等待其他消费者完成。
消费者和生产者之间 — 同步 && 互斥
生产者和消费者之间的同步是生产者消费者模型中至关重要的一部分。这确保了生产者和消费者在合适的时机进行数据的生产和消费,避免了潜在的数据处理问题和资源浪费。
-
互斥:生产者和消费者都需要确保在访问缓冲区时是互斥的,以避免数据竞争。
-
同步:生产者和消费者之间还需要某种形式的同步机制来协调他们的活动。例如,当缓冲区为空时,消费者应该等待生产者生成数据;而当缓冲区满时,生产者应该等待消费者消耗数据。这种同步可以通过信号量、条件变量或其他同步原语来实现。
四、代码实现
1、事前准备
在实现环形队列和阻塞队列之前,我们首先需要创建几个基本的文件和类来管理线程、构建项目(使用Makefile)、以及任务的定义和处理。
Thread.hpp
这个文件包含了线程管理的相关类和函数。我们将创建一个简单的 Thread
类来管理线程的创建和销毁。
#pragma once
#include <pthread.h>
#include <functional>
#include <string>
#include <iostream>
#include <string>
namespace ThreadMoudle
{
template <class T>
using func_t = std::function<void(T &, const std::string &)>;
template <class T>
class Thread
{
void Excute() { _func(_data, _threadname); }
public:
Thread(func_t<T> func, T &data, const std::string &name = "none_name")
: _func(func), _data(data), _threadname(name) {}
static void *threadroutine(void *args)
{
Thread *t = static_cast<Thread *>(args);
t->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if (!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach() { pthread_detach(_tid); }
void Join()
{
if (!_stop)
{
pthread_join(_tid, nullptr);
}
}
void Stop() { _stop = true; }
~Thread() {}
std::string name() { return _threadname; }
private:
pthread_t _tid;
std::string _threadname;
T &_data;
func_t<T> _func;
bool _stop = true;
};
}
task.hpp
这个文件定义了任务的接口和基本的任务处理类。
#pragma once
#include <iostream>
#include <string>
#include <functional>
// typedef std::function<void()> Task;
using Task = std::function<void()>; // 同上
void PrintW()
{
std::cout << "Hello World ! " << std::endl;
}
void PrintL()
{
std::cout << "Hello Linux !" << std::endl;
}
void PrintC()
{
std::cout << "Hello C++ !" << std::endl;
}
Task tasks[3] = {PrintW, PrintL, PrintC};
Makefile
这个Makefile用于编译和链接我们的项目。这里简单示范一个基本的Makefile。
cp:main.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -rf cp
2、环形队列的实现
#pragma once
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
template <class T>
class RingQueue
{
void P(sem_t &sem) { sem_wait(&sem); }
void V(sem_t &sem) { sem_post(&sem); }
void Lock(pthread_mutex_t &mutex) { pthread_mutex_lock(&mutex); }
void Unlock(pthread_mutex_t &mutex) { pthread_mutex_unlock(&mutex); }
public:
RingQueue(int cap)
: _ring_queue(cap), _cap(cap), _productor_step(0), _consumer_step(0)
{
sem_init(&_room_sem, 0, _cap);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_productor_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr);
}
void Enqueue(const T &in)
{
P(_room_sem);
Lock(_productor_mutex);
_ring_queue[_productor_step++] = in; // 生产
_productor_step %= _cap;
Unlock(_productor_mutex);
V(_data_sem);
}
void Pop(T *out)
{
// 消费
P(_data_sem);
Lock(_consumer_mutex);
*out = _ring_queue[_consumer_step++];
_consumer_step %= _cap;
Unlock(_consumer_mutex);
V(_room_sem);
}
private:
std::vector<T> _ring_queue; //环形队列
int _cap; //环形队列的大小
int _productor_step; //生产者在环形队列中的位置
int _consumer_step; //消费者在环形队列中的位置
sem_t _room_sem;
//表示剩余空间的信号量。每当生产者向队列添加一个元素时,该信号量减少
sem_t _data_sem;
//表示可用数据的信号量。每当消费者从队列取出一个元素时,该信号量增加。
pthread_mutex_t _productor_mutex;
pthread_mutex_t _consumer_mutex;
//分别用于保护生产者和消费者在队列中的操作,
//确保同一时间只有一个线程可以访问_productor_step和 _consumer_step
};
这是一个使用信号量和互斥锁实现的环形队列 (RingQueue
)。
方法解释
-
P(sem_t &sem)
和V(sem_t &sem)
:P
和V
分别是 P 操作(等待)和 V 操作(发送信号),用于操作信号量。在Enqueue
和Pop
方法中,P
和V
分别对应于等待剩余空间和可用数据的信号量。 -
Lock(pthread_mutex_t &mutex)
和Unlock(pthread_mutex_t &mutex)
: 这两个函数分别用于加锁和解锁互斥锁,保护对_productor_step
和_consumer_step
的操作,确保线程安全。
入队列和出队列
void Enqueue(const T &in)
{
P(_room_sem); // 等待剩余空间
Lock(_productor_mutex); // 加锁生产者互斥锁
_ring_queue[_productor_step++] = in; // 生产,将元素放入队列
_productor_step %= _cap; // 环形队列,更新生产者位置
Unlock(_productor_mutex); // 解锁生产者互斥锁
V(_data_sem); // 发送数据信号量,表示有数据可用
}
P(_room_sem)
确保在队列有剩余空间之前,生产者线程被阻塞。一旦队列有空间,生产者线程将继续执行。Lock(_productor_mutex)
和Unlock(_productor_mutex)
用于保护_productor_step
的修改,确保在多线程环境下的安全性。V(_data_sem)
发送信号量,通知消费者有新的数据可用。
void Pop(T *out)
{
P(_data_sem); // 等待有可用数据
Lock(_consumer_mutex); // 加锁消费者互斥锁
*out = _ring_queue[_consumer_step++]; // 消费,从队列中取出元素
_consumer_step %= _cap; // 环形队列,更新消费者位置
Unlock(_consumer_mutex); // 解锁消费者互斥锁
V(_room_sem); // 发送空间信号量,表示有空间可用
}
P(_data_sem)
确保在队列有可用数据之前,消费者线程被阻塞。一旦队列有数据可用,消费者线程将继续执行。Lock(_consumer_mutex)
和Unlock(_consumer_mutex)
用于保护_consumer_step
的修改,确保在多线程环境下的安全性。V(_room_sem)
发送信号量,通知生产者有空间可用。
为什么使用两个信号量和两个互斥锁?
在使用两个信号量和两个互斥锁的环形队列实现中,每个信号量和互斥锁有其特定的作用,确保了生产者和消费者之间的正确同步和队列访问的线程安全性:
-
两个互斥锁 (
_productor_mutex
和_consumer_mutex
):- 目的: 互斥锁用于保护对共享资源(即环形队列
_ring_queue
)的访问,确保在任何时刻只有一个线程可以访问队列,从而避免数据竞争和队列操作的冲突。 - 使用场景:
_productor_mutex
用于保护生产者对队列的写操作(Enqueue
方法)。_consumer_mutex
用于保护消费者对队列的读操作(Pop
方法)。
- 效果: 生产者和消费者在访问队列时,先获取相应的互斥锁,这样可以确保生产者和消费者的操作互不干扰,从而保证了队列的线程安全性。
- 目的: 互斥锁用于保护对共享资源(即环形队列
-
两个信号量 (
_room_sem
和_data_sem
):- 目的: 信号量用于实现生产者和消费者之间的同步和协调。
- 使用场景:
_room_sem
用于控制生产者等待队列有空闲空间(Enqueue
方法中使用)。_data_sem
用于控制消费者等待队列有可用数据(Pop
方法中使用)。
- 效果:
- 当队列已满时,生产者调用
P(_room_sem)
来等待,直到有空闲空间。 - 当队列为空时,消费者调用
P(_data_sem)
来等待,直到有可用数据。 - 生产者在向队列中添加数据时,通过
V(_data_sem)
通知消费者有新的数据可用。 - 消费者在取出数据时,通过
V(_room_sem)
通知生产者有空闲空间可以继续生产。
- 当队列已满时,生产者调用
通过使用两个互斥锁和两个信号量,分别保护队列的访问和实现生产者消费者之间的同步。
为什么使用信号量就不需要生产者和消费者之间互斥
在这种基于信号量和互斥锁的环形队列实现中,生产者和消费者之间不需要显式的互斥(即不需要对生产者和消费者的操作进行互斥锁保护),这是因为它们分别使用了两个不同的信号量来进行同步控制:
- 队列访问互斥已经由互斥锁保护: 生产者和消费者在访问队列的时候,都会先获取相应的互斥锁 (
_productor_mutex
和_consumer_mutex
),这确保了在任意时刻只有一个线程可以访问队列。因此,即使生产者和消费者同时操作队列,由于互斥锁的保护,不会发生数据竞争或者队列操作的冲突。 - 互斥和信号量的配合: 生产者在
Enqueue
方法中通过P(_room_sem)
等待有剩余空间,消费者在Pop
方法中通过P(_data_sem)
等待有可用数据。这两个操作利用了信号量的特性,保证了生产者和消费者之间的同步。信号量sem_t
的作用在于控制生产者和消费者的活动,而互斥锁pthread_mutex_t
的作用在于保护共享资源(即队列本身)的访问。 - 生产者和消费者的独立性: 生产者和消费者在向队列中添加或者取出元素时,不需要相互等待或者协调。每个生产者和消费者线程根据自己的需求和信号量的状态进行操作,彼此之间是独立的。生产者和消费者之间的同步是通过信号量来实现的,而不是通过互斥来限制彼此的访问。
对于生产者和消费者来说,它们关注的资源是不同的:
生产者关注空间资源:
- 生产者在向队列中放置数据时,会检查队列是否有足够的空间(即队列是否已满)。如果队列已满,生产者会调用
P(_room_sem)
来等待,直到有空闲的空间可以继续生产。这里的_room_sem
信号量确保了生产者只有在队列有足够空间时才会进行生产操作。消费者关注数据资源:
- 消费者在从队列中取出数据时,会检查队列是否有可用的数据(即队列是否为空)。如果队列为空,消费者会调用
P(_data_sem)
来等待,直到有数据可供消费。这里的_data_sem
信号量确保了消费者只有在队列有数据可用时才会进行消费操作。因此,生产者和消费者通过不同的信号量来进行等待和唤醒操作,彼此之间不会造成竞争或冲突,也不需要额外的互斥锁来保护队列的访问。这种设计有效地分离了生产者和消费者的关注点,使它们能够独立地等待和通知,从而实现了高效的同步和互操作。
下面段代码展示了如何使用之前实现的阻塞队列 RingQueue
和线程类 Thread
来实现生产者-消费者模型。
#include "Thread.hpp"
#include "RingQueue.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>
// #include <iostream>
using namespace ThreadMoudle;
using ringqueue_t = RingQueue<Task>;
void Consumer(ringqueue_t &rq, std::string name)
{
while (true)
{
sleep(2);
Task task;
rq.Pop(&task);
std::cout << " [" << name << "] : ";
task();
}
}
void Productor(ringqueue_t &rq, std::string name)
{
srand(time(nullptr) ^ pthread_self());
while (true)
{
// 获取任务
// 生产任务
int tmp = rand() % 3;
rq.Enqueue(tasks[tmp]);
std::cout << " [" << name << "] : " << tmp << std::endl;
}
}
void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func, const std::string &who)
{
for (int i = 0; i < num; i++)
{
std::string name = who + "-" + std::to_string(i + 1);
threads->emplace_back(func, rq, name);
}
}
void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Consumer, "consumer");
}
void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{
InitComm(threads, num, rq, Productor, "productor");
}
void WaitAllThread(std::vector<Thread<ringqueue_t>> &threads)
{
for (auto &thread : threads)
{
thread.Join();
}
}
void StartAll(std::vector<Thread<ringqueue_t>> &threads)
{
for (auto &thread : threads)
{
std::cout << "start: " << thread.name() << std::endl;
thread.Start();
}
}
int main()
{
ringqueue_t *rq = new ringqueue_t(10);
std::vector<Thread<ringqueue_t>> threads;
InitProductor(&threads, 2, *rq);
InitConsumer(&threads, 3, *rq);
StartAll(threads);
WaitAllThread(threads);
return 0;
}
3、阻塞队列的实现
#pragma once
#include <iostream>
#include <pthread.h>
#include <string>
#include <queue>
template <class T>
class BlockQueue
{
bool IsFull() { return _block_queue.size() == _cap; }
bool IsEmpty() { return _block_queue.empty(); }
void lock() { pthread_mutex_lock(&_mutex); }
void unlock() { pthread_mutex_unlock(&_mutex); }
public:
BlockQueue(int cap) : _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_product_cond, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_product_cond);
pthread_cond_destroy(&_consum_cond);
}
void Enqueue(const T &in)
{
lock();
while (IsFull())
{
_productor_wait_num++;
pthread_cond_wait(&_product_cond, &_mutex);
_productor_wait_num--;
}
_block_queue.push(in);
if (_consumer_wait_num > 0)
pthread_cond_signal(&_consum_cond);
unlock();
}
void Pop(T *out)
{
lock();
while (IsEmpty())
{
_consumer_wait_num++;
pthread_cond_wait(&_consum_cond, &_mutex);
_consumer_wait_num--;
}
*out = _block_queue.front();
_block_queue.pop();
if (_productor_wait_num > 0)
pthread_cond_signal(&_product_cond);
unlock();
}
private:
std::queue<T> _block_queue; //队列
int _cap; //队列的最大容量
pthread_mutex_t _mutex; //互斥锁,用于保护对 _block_queue 的访问
pthread_cond_t _product_cond; //条件变量,用于通知生产者线程
pthread_cond_t _consum_cond; //条件变量,用于通知消费者线程
int _productor_wait_num; //等待生产者的数量
int _consumer_wait_num; //等待消费者的数量
};
阻塞队列 (BlockQueue
) 实现了一个线程安全的队列,用于在生产者-消费者模型中作为缓冲区。
入队列和出队列
下面详细解释阻塞队列中的 Enqueue
和 Pop
两个函数的细节,这两个函数是实现生产者-消费者模型中的核心操作。
void Enqueue(const T &in)
{
lock(); // 加锁,确保线程安全
while (IsFull())
{
_productor_wait_num++; // 增加等待生产者的计数
pthread_cond_wait(&_product_cond, &_mutex); // 队列已满,等待生产者条件变量
_productor_wait_num--; // 减少等待生产者的计数
}
_block_queue.push(in); // 将元素入队
if (_consumer_wait_num > 0)
pthread_cond_signal(&_consum_cond); // 唤醒一个等待消费者的线程
unlock(); // 解锁,释放互斥锁
}
-
加锁 (
lock()
函数):调用pthread_mutex_lock(&_mutex)
,确保在操作共享资源_block_queue
之前,线程独占互斥锁_mutex
。这样可以防止其他线程同时修改队列,确保操作的原子性和线程安全性。 -
循环检查队列是否已满:使用
IsFull()
函数判断队列是否已满。如果队列已满,则当前线程进入等待状态,同时释放互斥锁_mutex
,并等待_product_cond
条件变量的信号。 -
等待条件变量 (
pthread_cond_wait(&_product_cond, &_mutex)
)- 当一个生产者线程将元素入队后,它会调用
pthread_cond_signal(&_consum_cond)
唤醒一个等待的消费者线程。
- 当一个生产者线程将元素入队后,它会调用
-
入队操作:一旦队列有空闲位置,当前生产者线程就会将元素
in
入队_block_queue.push(in)
。 -
唤醒消费者线程:如果有消费者线程在等待消费,通过
pthread_cond_signal(&_consum_cond)
唤醒一个等待的消费者线程,以便消费者可以消费生产的数据。 -
解锁 (
unlock()
函数):最后调用pthread_mutex_unlock(&_mutex)
解锁互斥锁_mutex
,允许其他线程继续访问队列。
void Pop(T *out)
{
lock(); // 加锁,确保线程安全
while (IsEmpty())
{
_consumer_wait_num++; // 增加等待消费者的计数
pthread_cond_wait(&_consum_cond, &_mutex); // 队列为空,等待消费者条件变量
_consumer_wait_num--; // 减少等待消费者的计数
}
*out = _block_queue.front(); // 取出队首元素
_block_queue.pop(); // 出队操作
if (_productor_wait_num > 0)
pthread_cond_signal(&_product_cond); // 唤醒一个等待生产者的线程
unlock(); // 解锁,释放互斥锁
}
-
加锁 (
lock()
函数):同样地,先调用pthread_mutex_lock(&_mutex)
,确保在操作共享资源_block_queue
之前,线程独占互斥锁_mutex
。 -
循环检查队列是否为空:使用
IsEmpty()
函数判断队列是否为空。如果队列为空,则当前线程进入等待状态,同时释放互斥锁_mutex
,并等待_consum_cond
条件变量的信号。 -
等待条件变量 (
pthread_cond_wait(&_consum_cond, &_mutex)
)- 当一个消费者线程从队列取出元素后,它会调用
pthread_cond_signal(&_product_cond)
唤醒一个等待的生产者线程。
- 当一个消费者线程从队列取出元素后,它会调用
-
出队操作:一旦队列非空,当前消费者线程就从队列头部取出元素并赋值给
*out
(*out = _block_queue.front()
),然后将该元素出队_block_queue.pop()
。 -
唤醒生产者线程:如果有生产者线程在等待空闲位置,通过
pthread_cond_signal(&_product_cond)
唤醒一个等待的生产者线程,以便生产者可以继续向队列中添加元素。 -
解锁 (
unlock()
函数):最后调用pthread_mutex_unlock(&_mutex)
解锁互斥锁_mutex
,允许其他线程继续访问队列。
在多线程编程中,特别是在实现阻塞队列等同步数据结构时,使用
while
循环而不是if
语句是非常重要的。这里解释为什么在条件变量的等待和通知机制中推荐使用while
而不是if
。
条件变量的等待和唤醒
在使用条件变量(pthread_cond_t
或 std::condition_variable
)进行线程同步时,通常的模式是:
- 生产者在向队列中插入数据时,如果队列已满,则需要等待消费者从队列中取走数据后再继续插入。
- 消费者在从队列中取数据时,如果队列为空,则需要等待生产者向队列中插入数据后再继续取出。
这种等待和唤醒的模式通常会使用 while
循环来检查条件,而不是简单的 if
语句。
为什么要使用 while 而不是 if?
- 虚假唤醒问题:
- 虚假唤醒是指线程在没有收到明确信号的情况下被唤醒,这在多线程系统中是可能发生的。
- 如果使用
if
语句来检查条件,线程可能会在没有实际满足条件的情况下被唤醒,然后直接执行后续的操作。这样可能导致线程在不合适的时候执行不安全的操作,比如从空队列中取数据或者向满队列中插入数据,从而引发程序错误。
- 条件变量的等待和唤醒机制:
- 条件变量的等待需要与互斥锁结合使用。等待过程中,线程会释放互斥锁,并且在被唤醒后重新获取互斥锁。
- 当使用
if
语句时,每个线程在检查队列状态后,如果发现队列已满或空,则直接执行后续的操作。这时,如果多个生产者或消费者同时进入临界区(即同时操作队列),就会导致竞态条件的出现。例如:- 若多个生产者同时检查到队列不满,都尝试向队列中添加元素,可能会导致队列超过容量。
- 若多个消费者同时检查到队列非空,都尝试从队列中取出元素,可能会导致队列为空。
- 使用
while
循环可以确保线程在被唤醒后重新检查条件。如果条件不满足(例如队列仍然为空或者仍然为满),线程会再次等待。这种重复检查的机制可以有效地避免虚假唤醒带来的问题。 - 使用
if
语句时,线程在检查条件后如果直接执行操作,可能会错过唤醒的机会。例如:- 如果一个生产者向队列中添加了元素,此时队列之前是空的,它应该唤醒等待中的消费者线程,但如果使用
if
语句,它可能会错过发出信号的时机,导致消费者线程长时间阻塞。
- 如果一个生产者向队列中添加了元素,此时队列之前是空的,它应该唤醒等待中的消费者线程,但如果使用
- 竞态条件和程序正确性:
- 在多线程环境中,竞态条件是一个常见的问题,它指的是多个线程同时访问共享资源,且最终结果依赖于线程调度的具体时机。
- 使用
while
循环可以降低竞态条件的风险。通过重新检查条件,可以确保线程在执行关键操作之前,条件依然处于正确的状态,从而提高程序的正确性和稳定性。
因此,为了保证阻塞队列在多线程环境下的正确性和稳定性,始终使用 while
循环结构来等待条件的满足。这样可以有效地避免虚假唤醒带来的潜在问题,确保线程在适当的时候重新检查条件,并在条件满足时进行安全的操作。
使用场景
- 线程安全: 使用互斥锁
_mutex
确保在多线程环境下对队列的操作是安全的。 - 条件变量: 使用条件变量
_product_cond
和_consum_cond
实现生产者和消费者之间的同步:- 当队列已满时,生产者线程等待。
- 当队列为空时,消费者线程等待。
- 当数据入队或出队时,唤醒等待的线程。
- 等待计数:
_productor_wait_num
和_consumer_wait_num
记录等待的生产者和消费者数量,用于条件变量的合理唤醒。
使用两个条件变量和一个互斥锁是为了在阻塞队列的实现中有效地实现生产者和消费者的同步和互斥。
互斥锁
- 保护共享资源:
_mutex
是一个互斥锁,它用于保护对_block_queue
的访问。在队列中插入元素或者取出元素之前,需要先获取这个互斥锁,以确保同一时间只有一个线程可以访问_block_queue
。 - 避免数据竞争: 通过互斥锁的使用,可以避免多个线程同时对队列进行修改,从而防止数据竞争和数据不一致的情况发生。
条件变量
- 实现线程等待和通知: 在生产者和消费者模型中,条件变量
_product_cond
和_consum_cond
负责实现线程的等待和唤醒操作,从而实现生产者和消费者之间的协调。
使用场景解释:
-
生产者等待队列不满: 在
Enqueue
函数中,生产者线程在向队列中添加元素时,如果队列已满,生产者线程需要进入等待状态。这时,生产者线程会释放_mutex
并等待_product_cond
条件变量被唤醒。当消费者线程从队列中取出元素时,会发送信号给_product_cond
,唤醒一个或多个等待在该条件变量上的生产者线程。 -
消费者等待队列不空: 在
Pop
函数中,消费者线程在从队列中取出元素时,如果队列为空,消费者线程需要进入等待状态。消费者线程会释放_mutex
并等待_consum_cond
条件变量被唤醒。当生产者线程向队列中添加元素时,会发送信号给_consum_cond
,唤醒一个或多个等待在该条件变量上的消费者线程。
原因解释:
-
避免忙等待: 使用条件变量的主要目的是避免忙等待。如果仅使用互斥锁来保护队列,并使用
while
循环检查队列的状态,可以确保线程在条件不满足时进入等待状态,从而避免了忙等待,节省了系统资源。 -
线程唤醒: 条件变量允许线程在满足特定条件时被唤醒,这对于生产者消费者模型中的任务协调至关重要。每个条件变量与特定的条件相关联,当条件不满足时,线程可以安全地释放互斥锁并等待条件变量的通知。
下面段代码展示了如何使用实现的阻塞队列 BlockQueue
和线程类 Thread
来实现生产者-消费者模型。
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <vector>
#include <ctime>
#include <unistd.h>
using namespace ThreadMoudle;
using blockqueue_t = BlockQueue<Task>;
void StartComm(std::vector<Thread<blockqueue_t>> &threads, int num, blockqueue_t &bq, func_t<blockqueue_t> func, const std::string &who)
{
for (int i = 0; i < num; i++)
{
std::string name = who + "-" + std::to_string(i + 1);
threads.emplace_back(func, bq, name);
}
}
void Productor(blockqueue_t &bq, const std::string &name)
{
srand(time(nullptr) ^ pthread_self());
while (true)
{
// sleep(5);
int tmp = rand() % 3;
Task task = tasks[tmp];
bq.Enqueue(task);
std::cout << " [" << name << "] : " << tmp << std::endl;
}
}
void Consumer(blockqueue_t &bq, const std::string &name)
{
while (true)
{
sleep(1);
Task task;
bq.Pop(&task);
std::cout << " [" << name << "] : ";
task();
}
std::cout << " [" << name << "] : " << std::endl;
}
void InitProductor(std::vector<Thread<blockqueue_t>> &threads, int num, blockqueue_t &bq)
{
StartComm(threads, num, bq, Productor, "producer");
}
void InitConsumer(std::vector<Thread<blockqueue_t>> &threads, int num, blockqueue_t &bq)
{
StartComm(threads, num, bq, Consumer, "consumer");
}
void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads)
{
for (auto &thread : threads)
{
thread.Join();
}
}
void StartAll(std::vector<Thread<blockqueue_t>> &threads)
{
for (auto &thread : threads)
{
std::cout << "start: " << thread.name() << std::endl;
thread.Start();
}
}
int main()
{
blockqueue_t bq(10);
std::vector<Thread<blockqueue_t>> threads;
InitProductor(threads, 10, bq);
InitConsumer(threads, 3, bq);
StartAll(threads);
WaitAllThread(threads);
}
4、两种实现方式的区别
在两种实现方式中,主要区别体现在如何使用锁(互斥锁)和信号量(或条件变量)来实现生产者消费者模型中的同步和互斥控制。以下是两种方式在入队列(Enqueue
)和出队列(Pop
)操作中锁和信号量使用的顺序的比较:
使用信号量和互斥锁实现(RingQueue)
-
入队列操作:
- 首先,生产者需要获取
_room_sem
(空闲空间信号量),确保队列中还有空间可以放置数据。 - 接着,生产者获取
_productor_mutex
(生产者互斥锁),保证在将数据放入队列的过程中,其他生产者无法同时修改_productor_step
或者读取_ring_queue
。 - 生产者完成数据的放入后,释放
_productor_mutex
。 - 最后,生产者释放
_data_sem
(数据信号量),通知消费者有新的数据可供消费。
- 首先,生产者需要获取
-
出队列操作:
- 首先,消费者需要获取
_data_sem
(数据信号量),确保队列中有数据可以消费。 - 接着,消费者获取
_consumer_mutex
(消费者互斥锁),保证在从队列中取出数据的过程中,其他消费者无法同时修改_consumer_step
或者修改队列。 - 消费者完成数据的取出后,释放
_consumer_mutex
。 - 最后,消费者释放
_room_sem
(空闲空间信号量),通知生产者有新的空间可供生产。
- 首先,消费者需要获取
使用条件变量和互斥锁实现(BlockQueue)
在这种实现中,生产者消费者之间的互斥和同步控制是通过一个互斥锁 _mutex
和两个条件变量 _product_cond
和 _consum_cond
来完成的。
-
入队列操作:
- 生产者先获取
_mutex
,进入临界区。 - 如果队列已满(
IsFull()
返回 true),则生产者调用pthread_cond_wait(&_product_cond, &_mutex)
进入等待状态,等待消费者消费数据后发出信号。 - 当有空闲位置时,生产者将数据放入队列。
- 如果有消费者在等待消费数据,则生产者通过
pthread_cond_signal(&_consum_cond)
通知消费者可以消费。 - 最后,生产者释放
_mutex
,退出临界区。
- 生产者先获取
-
出队列操作:
- 消费者先获取
_mutex
,进入临界区。 - 如果队列为空(
IsEmpty()
返回 true),则消费者调用pthread_cond_wait(&_consum_cond, &_mutex)
进入等待状态,等待生产者放入数据后发出信号。 - 当有数据可供消费时,消费者从队列中取出数据。
- 如果有生产者在等待放入数据,则消费者通过
pthread_cond_signal(&_product_cond)
通知生产者可以继续生产。 - 最后,消费者释放
_mutex
,退出临界区。
- 消费者先获取
区别总结
-
互斥性控制:
- 使用信号量和互斥锁实现时,互斥性控制是通过互斥锁实现的,即生产者和消费者分别获取自己的互斥锁来保证对队列数据结构的互斥访问。
- 使用条件变量和互斥锁实现时,互斥性控制也是通过互斥锁实现的,但是条件的判断和等待/通知是通过条件变量来实现的,条件变量用于等待特定的条件满足后才能继续执行。
-
信号量的作用:
- 使用信号量和互斥锁时,信号量用于控制生产者和消费者的同步操作,确保在正确的时机进行生产和消费。
- 使用条件变量和互斥锁时,条件变量用于实现线程的阻塞和唤醒,以等待特定条件的发生或者通知其他线程条件已经满足。
-
适用场景:
- 使用信号量和互斥锁实现适合于需要精确控制资源数量和互斥访问的场景,例如固定大小的缓冲区。
- 使用条件变量和互斥锁实现适合于需要等待某些条件满足才能进行操作的场景,例如动态变化的队列大小或者需要等待其他线程的信号通知的场景。
完整代码,点击此处。