本篇文章我们使用C++探讨一下生产者与消费者问题.
1. 多线程的引入
我们学习了操作系统, 知道了进程和线程的概念, 但是如果不进行代码实战的话, 会很难理解它们. 特别是编程的初学者(比如我), 在了解了进程和线程后通常会感到疑惑: 多线程怎么用? 为啥我平时写代码没有使用到多线程呢?
首先我们先看多进程, 这个比较好理解, 比如我们新建一个C++项目, 这个项目在运行的时候就是一个进程, 它和计算机上面运行的其它程序一起工作, 这就是多进程. 至于为什么我们在写代码的时候, 不需要考虑多进程之间的资源分配情况, 我们可以认为是操作系统帮我们做了这些事情, 所以我们写代码时可以只考虑我们自己写的代码逻辑.
我们再看多线程, 先回顾一下C++的最基本的一段代码:
#include <iostream>
int main()
{
std::cout << "Hello World!\n";
}
代码1: 最基本的C++代码
这段代码是多线程吗, 很明显它不是的, 它是一段单线程的代码. 写代码时我们如果没有开启新的线程, 那这段代码就只会是单线程的代码. 所以如果我们想要进行多线程编程, 就需要用代码开启新的线程. 怎么用代码开启新的线程呢? 如下所示.
#include <iostream>
#include <thread>
void function_name() {
// 线程函数的代码
std::cout << "Hello World!\n";
}
int main() {
// 创建新线程并启动
std::thread t(function_name);
// 等待新线程结束
t.join();
return 0;
}
代码2: 开启一个线程
如上所示, 我们开启了一个新的线程, 将"Hello World!"的输出放在了这个线程内执行, 但这也只是开启了一个线程, 还不算是严格意义上的多线程. 至少开启两个线程才算得上是多线程.
2. 生产者与消费者问题引入
现在我们知道了如何开启新的线程, 就可以介绍一下生产者与消费者问题了. 问题很简单: 有一个容纳产品的池子, 生产者不停地生产产品放入池子, 消费者不停地从池子拿走产品进行消费. 我们先考虑最简单的情况: 先不考虑消费者, 只考虑有两个生产者, 而且我们目前只关心产品的数量, 不考虑池子的容量. 用itemCount来表示产品的数量, 代码该如何写呢?
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
/// <summary>
/// 生产者函数1
/// </summary>
void producer_fun1() {
while (true)
{
printf("生产者1生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
}
}
/// <summary>
/// 生产者函数2
/// </summary>
void producer_fun2() {
while (true)
{
printf("生产者2生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
}
}
int main()
{
std::thread producer_thread1(producer_fun1); //生产者线程1
std::thread producer_thread2(producer_fun2); //生产者线程2
producer_thread1.join(); //等待线程结束
producer_thread1.join();
return 0;
}
代码3: 两个生产者(没有对线程操作进行任何限制)
我们看上面的代码, 似乎没有什么问题, 开启了两个生产者的线程, 让它们不停地生产产品, 而且我们还让它们在生产完产品之后"休息"一秒钟, 很人性化, 但这段代码的运行结果是我们想要的吗? 运行一下看看, 结果如下:
运行结果明显不尽人意, 我们希望的是每个生产者生产产品之后, 产品数量能够有序递增, 但现在情况明显不是这样, 甚至读者把这段代码自己运行一下, 运行结果也很可能与我的运行结果不一样, 怎么回事呢? 读者可以把这段代码的两个打印输出的地方打个断点, 会发现代码不会老老实实地执行完一个线程函数再去执行另外一个线程函数, 很诡异的是, 它在一个线程函数还没有执行完的时候, 转而去执行另外一个线程函数了, 这就是多线程(进程)编程有意思的地方, 多线程(进程)的存在, 或者说程序的并发执行, 使得程序有了间断性, 失去封闭性, 失去可再现性.
好了, 我们还是先别考虑更复杂的情况了, 仅仅是两个生产者的线程, 就出现了这么大的bug, 还是先想想怎么解决这个bug才能继续.
3. 互斥问题引入
问题的根源在于, 代码的执行顺序被打乱了, 导致了itemCount没有依次递增, 你可能会有疑问: 不对呀, 尽管代码的执行顺序被打乱了, 但是我们也只是打印输出了itemCount的值而已, 就算打乱了代码每一行的执行顺序, 你总得把这一行代码执行完再去执行别得行的代码吧, 现在却出现了乱序, 难道说printf这一行代码还没执行完就跑去执行其它行的代码了吗? 说得没错. 如果仅仅是打印输出itemCount的值, 那是不会出现问题的, 问题就出现在这个++itemCount. 它看上去是一句代码, 其实CPU在执行的时候, 是分成多个步骤执行的, 也即++itemCount并不是原子操作, 原子操作就是不可细分的操作, 只有原子操作的执行在多线程(进程)中不会被打乱, 这个读者可以另行了解. 而打乱导致的结果就是两个生产者线程争相对itemCount进行自增操作, 从而出现了乱序的情况. 很可惜, 既然++itemCount不是原子操作, 我们就只能想办法, 让它像原子操作那样不被打乱. 要想这个操作不被打乱 就需要对线程进行一些限制, 不能让它们在两个线程直接随意切换执行, 至少要完成地把printf语句执行完才能切换线程.
这是第一个难点, 怎么才能保证完整执行完++itemCount这个操作呢? 我们可以想象这样一个场景: 把itemCount锁在一个屋子里, 屋子的锁只有一把钥匙, 最初锁与钥匙都在门外, 每当有人要进入屋子对itemCount进行操作, 就用这把钥匙开门, 然后锁和钥匙自己保管带入屋子, 然后在屋子里将门反锁, 对itemCount进行操作之后, 就可以出门了, 然后归还锁和钥匙. 这样就可以保证自己在操作itemCount时没有其它人进入屋子, 也即++itemCount这个操作不会被打断. 这就是一个互斥操作.
怎么用C++去创建这个锁和钥匙呢? 我们可以用mutex来创建. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
std::mutex mtx; //互斥量, 即锁和钥匙
/// <summary>
/// 生产者函数1
/// </summary>
void producer_fun1() {
while (true)
{
mtx.lock(); //进入屋子, 上锁
printf("生产者1生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock(); //从屋子出去, 解锁, 归还锁和钥匙
}
}
/// <summary>
/// 生产者函数2
/// </summary>
void producer_fun2() {
while (true)
{
mtx.lock(); //进入屋子, 上锁
printf("生产者2生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1)); //当前线程阻塞1秒
mtx.unlock(); //从屋子出去, 解锁, 归还锁和钥匙
}
}
int main()
{
std::thread producer_thread1(producer_fun1); //生产者线程1
std::thread producer_thread2(producer_fun2); //生产者线程2
producer_thread1.join(); //等待线程结束
producer_thread2.join();
return 0;
}
代码4: 两个生产者(加了锁, 以解决互斥问题)
运行一下看看:
可以看出, 我们的产品数量是按序递增的, 因此互斥问题得到了解决. 你可能会问, 为啥生产者2"进入屋子"这么多次, 而生产者1"进入屋子"的次数这么少? 这说明生产者1迟迟得不到对itemCount的操作权限, 产生了饥饿现象, 这个不是我们目前研究的重点, 后面再讨论. 我们最主要的目的--解决互斥问题--已经实现了.
再看一下一个生产者和一个消费者的情况:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
std::mutex mtx; //互斥量, 即锁和钥匙
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun() {
while (true)
{
mtx.lock(); //进屋, 上锁
printf("生产者生产产品: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock(); //出去, 解锁
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun() {
while (true)
{
mtx.lock(); //进屋, 上锁
if (itemCount > 0)
{
printf("消费者消费产品: %d\n", --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
mtx.unlock(); //出去, 解锁
}
}
int main()
{
std::thread producer_thread(producer_fun); // 创建一个新线程,传入函数和参数
std::thread consumer_thread(consumer_fun);
producer_thread.join(); //等待线程结束
producer_thread.join();
return 0;
}
代码5: 一个生产者, 一个消费者(加了锁, 以解决互斥问题)
这里我们限制itemCount大于0时才能消费产品, 否则不合逻辑, 运行结果如下:
可以看出, 加锁的方法同样可以解决一个生产者和一个消费者的互斥问题.
4. 同步问题引入
现在我们看看一个生产者, 两个消费者的情况, 我们规定产品池最多可以容纳10个产品:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //互斥量, 即锁和钥匙
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun() {
while (true)
{
mtx.lock();
if (itemCount < MAX_VALUE)
{
printf("生产者 生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
mtx.unlock();
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
mtx.lock();
if (itemCount > 0)
{
printf("消费者%d消费产品, 产品数量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
mtx.unlock();
}
}
int main()
{
std::thread producer_thread(producer_fun); //生产者线程
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码6: 一个生产者, 两个消费者(只加了锁, 以解决互斥问题)
这里我们仍使用mutex来限制每个线程对itemCount的访问, 运行一下看看:
可以看出, 我们的产品数量仍然可以按照生产者和消费者的操作进行数量的增加或者减少, 但是又有新的问题出现了: 消费者中只有消费者1在消费产品, 消费者2一个产品也没有消费. 即消费者2产生了饥饿现象. (注意, 这里的饥饿现象不一定百分之百发生, 具体要看操作系统的资源调度, 如下图是同一段代码的运行结果)
这里我们仍然暂不考虑饥饿问题, 根据输出结果来看, 我们似乎很好地解决了生产者和消费者问题, 虽然代码6只展示了一个生产者和两个消费者的问题, 但多个生产者和多个消费者只需要根据代码6的格式多开启几个线程即可, 我们对产品生产和消费操作进行了很好的保护, 使得它们不会被打断. 那生产者和消费者问题我们是不是就已经解决了呢? 其实不然. 代码6还存在优化的空间.
哪个地方需要优化? 以生产者函数为例, 在生产者进入屋子以后, 它会判断当前产品的数量, 如果产品池已经满了, 不能容纳更多的产品, 则生产者进屋子以后就不再生产产品了; 同理, 消费者进入屋子以后, 如果屋子里面没有产品, 则消费者也不能消费. 问题就出现在这里. 生产者和消费者都是进了屋子以后才知道自己能否生产或者消费, 这可能导致生产者和消费者进了屋子以后什么也没干, 这就导致了计算机性能的浪费. 此外, 事实上, 对itemCount的操作的代码我们称之为临界区, 临界区的代码应该尽可能少. 怎么解决? 我们还是先回到一个生产者和一个消费者的情况, 写出代码:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //互斥量, 即锁和钥匙
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生产者 生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun() {
while (true)
{
if (itemCount > 0)
{
mtx.lock();
printf("消费者 消费产品, 产品数量: %d\n", --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
int main()
{
std::thread producer_thread(producer_fun); //生产者线程
std::thread consumer_thread(consumer_fun); //消费者线程
producer_thread.join(); //等待线程结束
consumer_thread.join();
return 0;
}
代码7: 一个生产者, 一个消费者(加锁之前先判断产品数量)
运行结果如下:
看上去没什么问题, 因为只有一个生产者和一个消费者. 那一个生产者两个消费者的情况呢? 先写一下代码:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //互斥量, 即锁和钥匙
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生产者 生产产品, 产品数量: %d\n", ++itemCount);
//std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
if (itemCount > 0)
{ //花括号1
mtx.lock();
printf("消费者%d消费产品, 产品数量: %d\n", index, --itemCount);
//std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
int main()
{
std::thread producer_thread(producer_fun); //生产者线程
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码8: 一个生产者, 两个消费者(加锁之前先判断数量)
这里为了运行得快一点, 我们取消了阻塞线程的操作, 然后看一下部分的运行结果:
结果出现了问题: 消费者2在消费产品时, 产品数量由0变为了-1, 这显然不符合逻辑. 为什么会出现这种情况? 思考这样一种情况: 在代码8里面, 若此时itemCount为1, 消费者线程2执行到了花括号1的位置, 然后发生调度, 转而跳转到了消费者1的线程, 它也执行到了花括号1的位置, 然后消费者1顺利地进入屋子消费产品, 此时itemCount为0; 然后再次发生了调度, 转而去执行消费者2, 它继续从花括号1的位置继续执行, 它也顺利进入屋子消费产品, 则itemCount继续自减1, 变为了-1. 问题就出现在这里.
怎么办, 要不我们退回到代码6, 然后假装无事发生? 不行, 我们探讨过, 代码6有优化的空间. 因此我们需要解决当前的问题. 问题出在哪? 问题在于对itemCount的判断并没有受到限制, itemCount > 0和itemCount < MAX_VALUE是进入屋子的条件, 但消费者1和消费者2在进屋子之前是没有受到限制的, 当满足进屋条件时, 它俩会争相进入屋子, 一点素质也没有, 结果就出现了问题. 我们不希望它俩争抢, 我们要对同一类线程进行管理, 要让它们"排队", 即搞一个线程管理的队列. 只有在生产者生产完产品之后, 才会通知这个队列, 让这个队列中的一个线程进入屋子. 在C++中, 可以用condition_variable来管理这个线程队列. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //用于访问itemCount的互斥量, 即锁和钥匙
std::mutex consumer_mtx; //用于管理消费者线程队列的互斥量, 即锁和钥匙
std::condition_variable consumer_cv; //条件变量, 可以视为消费者线程管理队列
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生产者 生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //唤醒一个消费者线程
}
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
if (itemCount <= 0)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck); //如果产品数量<=0, 则阻塞当前线程, 等待生产者发出唤醒的信号
}
if (itemCount > 0)
{ //花括号1
mtx.lock();
printf("消费者%d消费产品, 产品数量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
int main()
{
std::thread producer_thread(producer_fun); //生产者线程
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码9: 一个生产者, 两个消费者(引入条件变量)
如上所示, 先看消费者的函数, 在消费者进入屋子之前, 先判断产品数量, 小于等于0则将消费者线程阻塞. 等待生产者生产产品之后发出信号, 才能唤醒一个消费者线程继续执行. 看上去似乎没啥问题, 跑一下看看:
还是有问题, 怎么回事呢? 思考这样一种情况: 还是看花括号1的位置, 最开始的时候, 消费者线程一个也没有被阻塞, 然后生产者生产了一个产品, 然后发生调度, 线程消费者1开始执行, 由于itemCount == 1, 因此消费者1不会被阻塞, 执行到了花括号1的位置; 此时再次发生调度, 转而执行消费者2, 由于itemCount的值仍为1, 因此消费者2也不会被阻塞, 它也执行到了花括号1的位置. 此时继续执行消费者1, 它进屋了, itmeCount自减1值为0, 然后出来了; 再次调度执行消费者2, 它也可以进屋, itmeCount继续自减1值为-1. 这就是问题所在.
解决办法也很容易想到, 问题在于阻塞消费者线程的时候, 我们是有条件的阻塞, 这样看来不行, 这个阻塞应该是无条件的, 也即只要消费者想进屋子, 就必须先阻塞自己, 也即先排队, 等生产者发信号过来, 你才能考虑进屋子的事情. 代码修改如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //用于访问itemCount的互斥量, 即锁和钥匙
std::mutex consumer_mtx; //用于管理消费者线程队列的互斥量, 即锁和钥匙
std::condition_variable consumer_cv; //条件变量, 可以视为消费者线程管理队列
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生产者 生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //唤醒一个消费者线程
}
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck); //阻塞当前线程, 等待生产者发出唤醒的信号
if (itemCount > 0)
{ //花括号1
mtx.lock();
printf("消费者%d消费产品, 产品数量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
}
int main()
{
std::thread producer_thread(producer_fun); //生产者线程
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码10: 一个生产者, 两个消费者(消费者直接阻塞)
运行结果如下:
完美运行, 没有问题. 代码10还可以怎么优化? 我们可以将itemCount > 0这个条件作为唤醒消费者线程的条件, 这样就不用在唤醒消费者线程之后再进程一次判断了, 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //用于访问itemCount的互斥量, 即锁和钥匙
std::mutex consumer_mtx; //用于管理消费者线程队列的互斥量, 即锁和钥匙
std::condition_variable consumer_cv; //条件变量, 可以视为消费者线程管理队列
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun() {
while (true)
{
if (itemCount < MAX_VALUE)
{
mtx.lock();
printf("生产者 生产产品, 产品数量: %d\n", ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //唤醒一个消费者线程
}
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return itemCount > 0; }); //阻塞当前线程, 等待生产者发出唤醒的信号
mtx.lock();
printf("消费者%d消费产品, 产品数量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
}
}
int main()
{
std::thread producer_thread(producer_fun); //生产者线程
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码11: 一个生产者, 两个消费者(消费者先阻塞, itemCount > 0时继续执行)
这段代码也时没有问题的, 读者可以自行尝试. 需要注意代码9与代码11的wait语句的区别, 看上似乎差不多, 但是其实是不一样的, 代码9可能不会阻塞消费者线程, 但代码11会先将消费者线程阻塞掉, itemCount > 0时继续执行.
5. 生产者和消费者问题
至此, 解决生产者和消费者问题, 想必应该是水到渠成的事了无非就是多个生产者和多个消费者, 这里我们假设有两个生产者和两个消费者. 我们已经写出了一个生产者和两个消费者的代码, 两个生产者和两个消费者的代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int itemCount = 0;
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
std::mutex mtx; //用于访问itemCount的互斥量, 即锁和钥匙
std::mutex consumer_mtx; //用于管理消费者线程队列的互斥量, 即锁和钥匙
std::mutex producer_mtx; //用于管理生产者线程队列的互斥量, 即锁和钥匙
std::condition_variable consumer_cv; //条件变量, 可以视为消费者线程管理队列
std::condition_variable producer_cv; //条件变量, 可以视为生产者线程管理队列
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(producer_mtx);
producer_cv.wait(lck, [] {return itemCount < MAX_VALUE; }); //阻塞当前线程, 等待消费者发出唤醒的信号
mtx.lock();
printf("生产者%d生产产品, 产品数量: %d\n", index, ++itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //唤醒一个消费者线程
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return itemCount > 0; }); //阻塞当前线程, 等待生产者发出唤醒的信号
mtx.lock();
printf("消费者%d消费产品, 产品数量: %d\n", index, --itemCount);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
producer_cv.notify_one(); //唤醒一个生产者线程
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生产者线程1
std::thread producer_thread2(producer_fun, 2); //生产者线程2
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread1.join(); //等待线程结束
producer_thread2.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码12: 多个生产者, 多个消费者
这段代码也可以正常运行, 读者可以自行尝试. 至此, 我们已经解决了多个生产者和多个消费者的同步与互斥问题. 如果你觉得这跟你在书上看到的P(), V()操作不太一样的话, 别急, 我们可以对代码稍作修改, 使它变成我们熟悉的样子.
首先我们需要用别的值来代替itemCount, 可以这样思考: 消费者消耗产品, 生产者消耗产品池中的产品位置. 用empty记录空位置的个数, 则empty初值为10; 用full记录产品个数, 则full初值为0. 代码改写如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int full = 0;
/// <summary>
/// 空位置的数量, 初始值为MAX_VALUE
/// </summary>
int empty = MAX_VALUE;
std::mutex mtx; //用于访问itemCount的互斥量, 即锁和钥匙
std::mutex consumer_mtx; //用于管理消费者线程队列的互斥量, 即锁和钥匙
std::mutex producer_mtx; //用于管理生产者线程队列的互斥量, 即锁和钥匙
std::condition_variable consumer_cv; //条件变量, 可以视为消费者线程管理队列
std::condition_variable producer_cv; //条件变量, 可以视为生产者线程管理队列
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(producer_mtx);
producer_cv.wait(lck, [] {return empty > 0; }); //阻塞当前线程, 等待消费者发出唤醒的信号, 只要空位置个数 > 0, 就唤醒一个生产者线程
mtx.lock();
--empty;
++full; //注意这里要同时改变full的值, 否则生产者生产完10个产品之后, 消费者的唤醒条件full > 0始终不满足, 程序就无法继续执行了
printf("生产者%d生产产品, 产品数量: %d\n", index, MAX_VALUE - empty);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
consumer_cv.notify_one(); //唤醒一个消费者线程
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return full > 0; }); //阻塞当前线程, 等待生产者发出唤醒的信号, 只要产品个数 > 0, 就唤醒一个消费者线程
mtx.lock();
--full;
++empty; //同理, 这里也要同时改变empty的值
printf("消费者%d消费产品, 产品数量: %d\n", index, full);
std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
producer_cv.notify_one(); //唤醒一个生产者线程
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生产者线程1
std::thread producer_thread2(producer_fun, 2); //生产者线程2
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread1.join(); //等待线程结束
producer_thread2.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码13: 多个生产者, 多个消费者(empty记录空位置个数, full记录产品个数)
如上所示, 产品是消费者消耗的资源, 空位置是生产者消耗的资源. 生产者和消费者的代码执行之前先将自己阻塞, 等条件满足才唤醒一个对应的进程: empty > 0时唤醒一个生产者, full > 0时唤醒一个消费者. 我们再思考一下, 能不能把mtx.lock();这句代码也统一写成wait的形式呢? 答案是可以. 怎么改写呢? 我们可以把锁与钥匙也当作一种资源, 不过这个资源生产者和消费者都可以消耗, 而且出屋子以后就要归还这个资源. 这个资源的初始值是多少? 显然是1, 因为只有一把锁与钥匙. 代码改写如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int full = 0;
/// <summary>
/// 空位置的数量, 初始值为MAX_VALUE
/// </summary>
int empty = MAX_VALUE;
/// <summary>
/// 互斥信号量, 初值为1
/// </summary>
int mutex = 1;
std::mutex mtx; //用于访问itemCount的互斥量, 即锁和钥匙
std::mutex consumer_mtx; //用于管理消费者线程队列的互斥量, 即锁和钥匙
std::mutex producer_mtx; //用于管理生产者线程队列的互斥量, 即锁和钥匙
std::condition_variable consumer_cv; //条件变量, 可以视为消费者线程管理队列
std::condition_variable producer_cv; //条件变量, 可以视为生产者线程管理队列
std::condition_variable producer_and_consumer_cv; //条件变量, 可以视为生产者和消费者线程管理队列
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(producer_mtx);
producer_cv.wait(lck, [] {return empty > 0; }); //阻塞当前线程, 等待消费者发出唤醒的信号, 只要空位置个数 > 0, 就唤醒一个生产者线程
std::unique_lock<std::mutex> lck1(mtx);
producer_and_consumer_cv.wait(lck1, [] {return mutex > 0; }); //阻塞当前线程, 等待其它线程发出唤醒的信号, 只要锁与钥匙个数 > 0, 就唤醒该线程
--mutex;
--empty;
++full; //注意这里要同时改变full的值, 否则生产者生产完10个产品之后, 消费者的唤醒条件full > 0始终不满足, 程序就无法继续执行了
printf("生产者%d生产产品, 产品数量: %d\n", index, MAX_VALUE - empty);
++mutex;
std::this_thread::sleep_for(std::chrono::seconds(1));
producer_and_consumer_cv.notify_all(); //归还锁和钥匙, 唤醒所有的生产者和消费者线程
consumer_cv.notify_one(); //唤醒一个消费者线程
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return full > 0; }); //阻塞当前线程, 等待生产者发出唤醒的信号, 只要产品个数 > 0, 就唤醒一个消费者线程
std::unique_lock<std::mutex> lck1(mtx);
producer_and_consumer_cv.wait(lck1, [] {return mutex > 0; }); //阻塞当前线程, 等待其它线程发出唤醒的信号, 只要锁与钥匙个数 > 0, 就唤醒该线程
--mutex;
--full;
++empty; //同理, 这里也要同时改变empty的值
printf("消费者%d消费产品, 产品数量: %d\n", index, full);
++mutex;
std::this_thread::sleep_for(std::chrono::seconds(1));
producer_and_consumer_cv.notify_all(); //归还锁和钥匙, 唤醒所有的生产者和消费者线程
producer_cv.notify_one(); //唤醒一个生产者线程
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生产者线程1
std::thread producer_thread2(producer_fun, 2); //生产者线程2
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread1.join(); //等待线程结束
producer_thread2.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码14: 多个生产者, 多个消费者(mutex记录锁与钥匙的数量)
至此, 我们对empty, full, mutex的操作完成了形式上的统一. 接下来就可以提取我们需要的P(), V()操作了. 代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int full = 0;
/// <summary>
/// 空位置的数量, 初始值为MAX_VALUE
/// </summary>
int empty = MAX_VALUE;
/// <summary>
/// 互斥信号量, 初值为1
/// </summary>
int mutex = 1;
std::mutex mtx; //用于访问itemCount的互斥量, 即锁和钥匙
std::mutex consumer_mtx; //用于管理消费者线程队列的互斥量, 即锁和钥匙
std::mutex producer_mtx; //用于管理生产者线程队列的互斥量, 即锁和钥匙
std::condition_variable consumer_cv; //条件变量, 可以视为消费者线程管理队列
std::condition_variable producer_cv; //条件变量, 可以视为生产者线程管理队列
std::condition_variable producer_and_consumer_cv; //条件变量, 可以视为生产者和消费者线程管理队列
/// <summary>
/// P操作
/// </summary>
void P(std::mutex& mtx, std::condition_variable& cv, int& num) {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&] {return num > 0; }); //阻塞当前线程, 等待其它线程发出唤醒的信号, 只要num > 0, 就唤醒一个线程
}
/// <summary>
/// V操作
/// </summary>
void V(std::condition_variable& cv) {
cv.notify_all();
}
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun(int index) {
while (true)
{
P(producer_mtx, producer_cv, empty);
P(mtx, producer_and_consumer_cv, mutex);
--mutex;
--empty;
++full; //注意这里要同时改变full的值, 否则生产者生产完10个产品之后, 消费者的唤醒条件full > 0始终不满足, 程序就无法继续执行了
printf("生产者%d生产产品, 产品数量: %d\n", index, MAX_VALUE - empty);
++mutex;
//std::this_thread::sleep_for(std::chrono::seconds(1));
V(producer_and_consumer_cv);
V(consumer_cv);
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
P(consumer_mtx, consumer_cv, full);
P(mtx, producer_and_consumer_cv, mutex);
--mutex;
--full;
++empty; //同理, 这里也要同时改变empty的值
printf("消费者%d消费产品, 产品数量: %d\n", index, full);
++mutex;
//std::this_thread::sleep_for(std::chrono::seconds(1));
V(producer_and_consumer_cv);
V(producer_cv);
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生产者线程1
std::thread producer_thread2(producer_fun, 2); //生产者线程2
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread1.join(); //等待线程结束
producer_thread2.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码15: 多个生产者, 多个消费者(提取出P, V操作, 运行结果有误)
如上所示, 我们提取出了P, V操作, 运行会发现运行结果是错误的, 又回到了最初的乱序状态, 怎么回事呢? 我们打个断点就会发现, 在提取了P, V操作之后, 线程的执行顺序又被打乱了, 原来如此, 因为我们的P, V操作不是原子操作, 它是会被打乱了, 有可能P里面的代码还没运行就转而执行别的代码了. 你可能会问, 那为什么资料上面可以写P, V操作呢? 那是因为书上面写的是伪代码, 默认了P, 和V是原子操作. 那我们现在怎么解决目前的P, V操作会被打断的问题呢? 再加一个锁? 加了锁之后再提取P和V, 然后又会被打断执行, 然后再加一个锁......无限套娃肯定是不行的, 所以我们这里不追求形式上的一致了, 代码14就可以作为生产者和消费者问题的最终代码. 至于我们如何在不使用mutex的情况下实现互斥访问, 这个以后再分析. 至此, 我们已经解决了生产者和消费者问题. 最终代码如下:
#include <iostream>
#include <thread>
#include <windows.h>
#include <condition_variable>
#include <mutex>
#include <chrono>
/// <summary>
/// 产品池可以容纳的产品数量的最大值
/// </summary>
const int MAX_VALUE = 10;
/// <summary>
/// 产品的数量, 初始值为0
/// </summary>
int full = 0;
/// <summary>
/// 空位置的数量, 初始值为MAX_VALUE
/// </summary>
int empty = MAX_VALUE;
/// <summary>
/// 互斥信号量, 初值为1
/// </summary>
int mutex = 1;
std::mutex mtx; //用于访问itemCount的互斥量, 即锁和钥匙
std::mutex consumer_mtx; //用于管理消费者线程队列的互斥量, 即锁和钥匙
std::mutex producer_mtx; //用于管理生产者线程队列的互斥量, 即锁和钥匙
std::condition_variable consumer_cv; //条件变量, 可以视为消费者线程管理队列
std::condition_variable producer_cv; //条件变量, 可以视为生产者线程管理队列
std::condition_variable producer_and_consumer_cv; //条件变量, 可以视为生产者和消费者线程管理队列
/// <summary>
/// 生产者函数
/// </summary>
void producer_fun(int index) {
while (true)
{
//P(empty)
std::unique_lock<std::mutex> lck(producer_mtx);
producer_cv.wait(lck, [] {return empty > 0; }); //阻塞当前线程, 等待消费者发出唤醒的信号, 只要空位置个数 > 0, 就唤醒一个生产者线程
//P(mutex)
std::unique_lock<std::mutex> lck1(mtx);
producer_and_consumer_cv.wait(lck1, [] {return mutex > 0; }); //阻塞当前线程, 等待其它线程发出唤醒的信号, 只要锁与钥匙个数 > 0, 就唤醒该线程
//临界区代码
--mutex;
--empty;
++full; //注意这里要同时改变full的值, 否则生产者生产完10个产品之后, 消费者的唤醒条件full > 0始终不满足, 程序就无法继续执行了
printf("生产者%d生产产品, 产品数量: %d\n", index, MAX_VALUE - empty);
++mutex;
std::this_thread::sleep_for(std::chrono::seconds(1));
//V(mutex)
producer_and_consumer_cv.notify_all(); //归还锁和钥匙, 唤醒所有的生产者和消费者线程
//V(full)
consumer_cv.notify_one(); //唤醒一个消费者线程
}
}
/// <summary>
/// 消费者函数
/// </summary>
void consumer_fun(int index) {
while (true)
{
//P(full)
std::unique_lock<std::mutex> lck(consumer_mtx);
consumer_cv.wait(lck, [] {return full > 0; }); //阻塞当前线程, 等待生产者发出唤醒的信号, 只要产品个数 > 0, 就唤醒一个消费者线程
//P(mutex)
std::unique_lock<std::mutex> lck1(mtx);
producer_and_consumer_cv.wait(lck1, [] {return mutex > 0; }); //阻塞当前线程, 等待其它线程发出唤醒的信号, 只要锁与钥匙个数 > 0, 就唤醒该线程
//临界区代码
--mutex;
--full;
++empty; //同理, 这里也要同时改变empty的值
printf("消费者%d消费产品, 产品数量: %d\n", index, full);
++mutex;
std::this_thread::sleep_for(std::chrono::seconds(1));
//V(mutex)
producer_and_consumer_cv.notify_all(); //归还锁和钥匙, 唤醒所有的生产者和消费者线程
//V(empty)
producer_cv.notify_one(); //唤醒一个生产者线程
}
}
int main()
{
std::thread producer_thread1(producer_fun, 1); //生产者线程1
std::thread producer_thread2(producer_fun, 2); //生产者线程2
std::thread consumer_thread1(consumer_fun, 1); //消费者线程1
std::thread consumer_thread2(consumer_fun, 2); //消费者线程2
producer_thread1.join(); //等待线程结束
producer_thread2.join(); //等待线程结束
consumer_thread1.join();
consumer_thread2.join();
return 0;
}
代码16: 多个生产者, 多个消费者(生产者消费者问题最终代码)