再论生产消费者问题
问题
如果缓冲池为空,如何定义消费者的行为?
一种可能的解决方案
这种方案是可行的,但是如果生产者一直不生产,那么消费者会反复查看产品的数量为 0 并休眠,这样会浪费 cpu 的资源,并且生产者生产后,消费者并不能马上从休眠状态中被唤醒去取走产品,这种方案的效率并不高
需求:
消费者发现缓冲池为空的时候,主动让出互斥量,并进入等待状态
当缓冲池非空,即: 生产者更新缓冲池状态, 消费者再次竞争锁
关键:
消费者不需要反复多次竞争锁 (缓冲池为空, 直接等待)
生产者可通知消费者再次竞争锁 (生产者与消费者协同机制)
Linux 中的条件变量
条件变量的注意事项
条件变量之间不能相互初始化,也不能相互赋值 (行为未定义)
使用 pthread_cond_init() 初始化的条件变量,必须被销毁,且可以重新初始化
使用 PTHREAD_COND_INITIALIZER 初始化的条件变量不需要销毁
不能使用 未初始化 / 已销毁 的条件变量 (行为未定义)
pthread_cond_wait() 只能由拥有互斥量的线程调用
条件变量的使用
条件变量初体验
test1.c
#define _GNU_SOURCE /* To get pthread_getattr_np() declaration */
#define _XOPEN_SOURCE >= 500 || _POSIX_C_SOURCE >= 200809L
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <time.h>
int g_count = 0;
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
void* customer_thread(void* arg)
{
while( 1 )
{
pthread_mutex_lock(&g_mutex);
if( g_count > 0 )
printf("%s : %d\n", __FUNCTION__, --g_count);
else
pthread_cond_wait(&g_cond, &g_mutex); // g_cond points to the queue, g_mutex needs to be unlocked
pthread_mutex_unlock(&g_mutex);
usleep(500 * 1000);
}
return NULL;
}
int main()
{
pthread_t t = 0;
int pre = 0;
pthread_create(&t, NULL, customer_thread, NULL);
printf("Hello World!\n");
while( 1 )
{
pthread_mutex_lock(&g_mutex);
pre = g_count;
printf("%s : %d\n", __FUNCTION__, ++g_count);
if( pre == 0 )
{
printf("%s : %s\n", __FUNCTION__, "signal customer to get product...");
pthread_cond_signal(&g_cond);
}
pthread_mutex_unlock(&g_mutex);
usleep(2000 * 1000);
}
printf("End!\n");
return 0;
}
主线程为生产者,负责生产产品;子线程 customer_thread 为消费者,负责取走产品
第 24 行,如果消费者发现没有产品,则调用 pthread_cond_wait(&g_cond, &g_mutex) 阻塞在 g_cond 这个条件变量上,并释放已获取到的互斥锁 g_mutex
第 54 行,如果生产者生产了产品后发现上一次的产品没了,则调用 pthread_cond_signal(&g_cond),来唤醒阻塞在 g_cond 这个条件变量上的单个消费者线程来取产品,这样原先阻塞在 g_cond 这个条件变量上的某个线程就会被唤醒,重新抢夺互斥锁,取走产品,然后释放互斥锁
程序运行结果如下图所示:
进阶条件变量
int pthread_cond_signal(pthread_cond_t* cond);
- 唤起一个等待目标条件变量的线程
int pthread_cond_broadcast(pthread_cond* cond);
- 唤起所有等待目标条件变量的线程
一个生产者 vs 多个消费者
存在的问题
解决方案
条件变量深度实验
test2.c
#define _GNU_SOURCE /* To get pthread_getattr_np() declaration */
#define _XOPEN_SOURCE >= 500 || _POSIX_C_SOURCE >= 200809L
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <time.h>
int g_count = 0;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
void* customer_thread(void* arg)
{
int i = (long)arg;
// prepare to get product
{
pthread_mutex_lock(&g_mutex);
if( g_count == 0 )
// while( g_count == 0 )
pthread_cond_wait(&g_cond, &g_mutex);
// get product from buffer
printf("customer %d : %d\n", i, g_count);
g_count--;
pthread_mutex_unlock(&g_mutex);
}
return NULL;
}
int main()
{
pthread_t t[10] = {0};
int len = sizeof(t)/sizeof(*t);
long i = 0;
for(i=0; i<len; i++)
{
pthread_create(&t[i], NULL, customer_thread, (void*)i);
}
printf("Hello World!\n");
sleep(1);
{
// prepare to make product
pthread_mutex_lock(&g_mutex);
int v = g_count;
g_count += 10;
if( v == 0 )
pthread_cond_signal(&g_cond);
// pthread_cond_broadcast(&g_cond);
pthread_mutex_unlock(&g_mutex);
}
for(i=0; i<len; i++)
{
pthread_join(t[i], NULL);
}
printf("End!\n");
return 0;
}
第 46 行,主线程创建 10 个消费者线程 customer_thread,随后主线程休眠 1s,由于产品数量为 0,10 个消费者线程会阻塞在 g_cond 这个条件变量上
第 63 行,主线程生产了 10 个产品后,通过 pthread_cond_signal 来通知阻塞在 g_cond 这个条件变量上的单个线程,这样就会导致 10 个被阻塞的线程只有 1 个线程被唤醒,取走产品,其他线程依旧被阻塞
程序运行结果如下图所示:
pthread_cond_signal(cond) 函数只会唤醒单个阻塞在 cond 条件变量上的单个线程,我们将这个函数替换为 pthread_cond_broadcast(cond),而这个函数会唤醒所有阻塞在 cond 条件变量上的所有线程,这样 10 个消费者线程都会被唤醒,然后取走产品
替换后的程序
test2.c
#define _GNU_SOURCE /* To get pthread_getattr_np() declaration */
#define _XOPEN_SOURCE >= 500 || _POSIX_C_SOURCE >= 200809L
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <time.h>
int g_count = 0;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
void* customer_thread(void* arg)
{
int i = (long)arg;
// prepare to get product
{
pthread_mutex_lock(&g_mutex);
if( g_count == 0 )
// while( g_count == 0 )
pthread_cond_wait(&g_cond, &g_mutex);
// get product from buffer
printf("customer %d : %d\n", i, g_count);
g_count--;
pthread_mutex_unlock(&g_mutex);
}
return NULL;
}
int main()
{
pthread_t t[10] = {0};
int len = sizeof(t)/sizeof(*t);
long i = 0;
for(i=0; i<len; i++)
{
pthread_create(&t[i], NULL, customer_thread, (void*)i);
}
printf("Hello World!\n");
sleep(1);
{
// prepare to make product
pthread_mutex_lock(&g_mutex);
int v = g_count;
g_count += 10;
if( v == 0 )
// pthread_cond_signal(&g_cond);
pthread_cond_broadcast(&g_cond);
pthread_mutex_unlock(&g_mutex);
}
for(i=0; i<len; i++)
{
pthread_join(t[i], NULL);
}
printf("End!\n");
return 0;
}
程序运行结果如下图所示:
pthread_cond_broadcast() 使得 10 个消费者线程都被唤醒
test3.c
#define _GNU_SOURCE /* To get pthread_getattr_np() declaration */
#define _XOPEN_SOURCE >= 500 || _POSIX_C_SOURCE >= 200809L
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <time.h>
int g_count = 0;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
void* customer_thread(void* arg)
{
int i = (long)arg;
// prepare to get product
{
pthread_mutex_lock(&g_mutex);
if( g_count == 0 )
// while( g_count == 0 )
pthread_cond_wait(&g_cond, &g_mutex);
// get product from buffer
printf("customer %d : %d\n", i, g_count);
g_count--;
pthread_mutex_unlock(&g_mutex);
}
return NULL;
}
int main()
{
pthread_t t[10] = {0};
int len = sizeof(t)/sizeof(*t);
long i = 0;
for(i=0; i<len; i++)
{
pthread_create(&t[i], NULL, customer_thread, (void*)i);
}
printf("Hello World!\n");
sleep(1);
{
// prepare to make product
pthread_mutex_lock(&g_mutex);
int v = g_count;
g_count += 5;
if( v == 0 )
pthread_cond_broadcast(&g_cond);
pthread_mutex_unlock(&g_mutex);
}
for(i=0; i<len; i++)
{
pthread_join(t[i], NULL);
}
printf("End!\n");
return 0;
}
生产者为 1 个,消费者为 10 个,生产者一次生产 5 个产品
首先,10 个生产者会阻塞在 g_cond 这个条件变量上,生产者生产了 5 个产品后,调用 pthread_cond_broadcast(),唤醒了阻塞在 g_cond 这个条件变量上的 10 个消费者线程,但是消费者线程被唤醒后没有再次进行产品数量判断,而是直接取走了产品,这样就会出现问题
程序运行结果如下图所示:
产品数量出现了负数,我们将 23 行的 if 改为 while 即可解决这个问题
替换后的程序
test3.c
#define _GNU_SOURCE /* To get pthread_getattr_np() declaration */
#define _XOPEN_SOURCE >= 500 || _POSIX_C_SOURCE >= 200809L
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <memory.h>
#include <time.h>
int g_count = 0;
pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;
void* customer_thread(void* arg)
{
int i = (long)arg;
// prepare to get product
{
pthread_mutex_lock(&g_mutex);
while( g_count == 0 )
pthread_cond_wait(&g_cond, &g_mutex);
// get product from buffer
printf("customer %d : %d\n", i, g_count);
g_count--;
pthread_mutex_unlock(&g_mutex);
}
return NULL;
}
int main()
{
pthread_t t[10] = {0};
int len = sizeof(t)/sizeof(*t);
long i = 0;
for(i=0; i<len; i++)
{
pthread_create(&t[i], NULL, customer_thread, (void*)i);
}
printf("Hello World!\n");
sleep(1);
{
// prepare to make product
pthread_mutex_lock(&g_mutex);
int v = g_count;
g_count += 5;
if( v == 0 )
pthread_cond_broadcast(&g_cond);
pthread_mutex_unlock(&g_mutex);
}
for(i=0; i<len; i++)
{
pthread_join(t[i], NULL);
}
printf("End!\n");
return 0;
}
程序运行结果如下图所示:
只有 5 个消费者线程取走了产品