1个生产者在生产的时候,另个生产者不能生产(生产者之间互斥)
条件变量用于线程同步,线程挂起/被唤醒。
条件变量和互斥锁共同保证生产者之间互斥+生产者和消费者的同步。
思路:
1 定义、初始化共享资源
a 缓冲区:存储物品。初始化为空。
b 计数器:缓冲区中物品数量。初始化为0。
c 互斥锁:缓冲区和计数器的互斥访问。初始化。
4个线程共用一个锁
d 条件变量:生产者和消费者的同步。初始化。
not_full 消费者消费后通知生产者(缓冲区不满);生产者在缓冲区满时等待不满;
not_empty生产者生产后通知消费者(缓冲区不空);消费者在缓冲区空时等待不空。
2 创建两个生产者线程、两个消费者线程
生产者行为
a 锁定互斥锁
b 检查条件:缓冲区是否已满。
是则等待不满条件变量。 pthread_cond_wait(¬_full, &mutex)
c 条件允许,生产者生产物品并放入缓冲区。
d 计数器+1:增加物品。
e 条件变量通知消费者缓冲区非空。pthread_cond_signal(¬_empty)
f 释放互斥锁消费者行为
a 锁定互斥锁
b 检查条件:缓冲区是否为空。
是则等待非空条件变量。pthread_cond_wait(¬_empty,&mutex)
c 条件允许,消费者从缓冲区中取一个物品。
d 计数器-1:减少物品。
e 条件变量通知生产者缓冲区未满。 pthread_cond_signal(¬_full)
f 释放互斥锁
3 清理资源
程序结束前,销毁互斥锁和条件变量。
代码
#define BUFFER_SIZE 10 // 定义缓冲区大小
int buffer[BUFFER_SIZE]; // 缓冲区数组
int in = 0; // 生产者放置物品的位置
int out = 0; // 消费者取物品的位置
int count = 0; // 缓冲区中的物品数量
pthread_mutex_t mutex; // 互斥锁,保护缓冲区
pthread_cond_t not_full; // 条件变量,缓冲区未满时通知生产者
pthread_cond_t not_empty; // 条件变量,缓冲区非空时通知消费者
// 生产者线程函数
void *producer(void *id) {
int item;
int *producer_id = (int *)id;
while (1) {
item = 'a' + rand() % 26; // 生产(小写字母)
pthread_mutex_lock(&mutex);
while (count == BUFFER_SIZE ) {// 缓冲区满,则等待
pthread_cond_wait(¬_full, &mutex);// 等待未满条件变量
}
buffer[in] = item;//放入
in = (in + 1) % BUFFER_SIZE;
count++;
printf("生产者%d号生产%c\n", *producer_id, item);
pthread_cond_signal(¬_empty);// 通知消费者缓冲区非空
pthread_mutex_unlock(&mutex);
sleep(1); // 模拟生产时间
}
return NULL;
}
void *consumer(void *id) {
int item;
int *consumer_id = (int *)id;
while (1) {
pthread_mutex_lock(&mutex);
while (count == 0) {//缓冲区空,则等待
pthread_cond_wait(¬_empty, &mutex);// 等待非空条件变量
}
item = buffer[out];//消费
out = (out + 1) % BUFFER_SIZE;
count--;
printf("消费者%d号消耗%c\n", *consumer_id, item);
pthread_cond_signal(¬_full);// 通知生产者缓冲区未满
pthread_mutex_unlock(&mutex);
sleep(1); // 模拟消费时间
}
return NULL;
}
int main() {
pthread_t producers[2], consumers[2];
int producer_ids[2] = {1, 2};
int consumer_ids[2] = {1, 2};
//初始化
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(¬_full, NULL);
pthread_cond_init(¬_empty, NULL);
// 创建生产者、消费者线程
for (int i = 0; i < 2; i++) {
pthread_create(&producers[i], NULL, producer, (void*)&producer_ids[i]);
pthread_create(&consumers[i], NULL, consumer, (void*)&consumer_ids[i]);
}
// 等待线程结束
for (int i = 0; i < 2; i++) {
pthread_join(producers[i], NULL);
pthread_join(consumers[i], NULL);
}
//销毁
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(¬_full);
pthread_cond_destroy(¬_empty);
return 0;
}
运行结果
生产者和消费者是无限循环的,所以程序需要外部干预来停止。