目录
引入
介绍
概念
使用场景
引入
介绍
注意
本质
计数器的本质
[判断资源是否就绪]
和互斥锁的关联
接口函数
初始化和销毁信号量
sem_init
函数原型
sem
value
sem_destroy
pv操作
sem_wait
编辑
sem_post
其他接口
sem_getvalue
基于环形队列的生产消费者模型
引入
介绍
生产消费过程
三个原则
解释
并发性
单生产单消费
思路
代码
ring_queue.hpp
task.hpp
main.cpp
示例
编辑
多生产多消费
思路
代码
ring_queue.hpp
main.cpp
示例
引入
在之前我们就已经简单介绍了信号量这一概念和用途:
临界资源,临界区,通信的干扰问题(互斥),信号量(本质,上下文切换问题,原子性,自身的安全性,操作)_临界区内不允许发生上下文切换-CSDN博客
本篇文章将继续讨论信号量,让我们来更好地了解和使用它
介绍
概念
- 信号量是一种用于多线程或多进程同步和互斥的机制,用于协调并发执行的程序, 确保在任何给定时刻只有一个线程或进程能够访问共享资源,从而避免竞态条件和数据损坏
- 信号量是一种计数器,用于控制对共享资源的访问
使用场景
引入
- 前面我们已经介绍了互斥锁,他可以用于保证互斥访问临界资源
- 这里的信号量也有同样的作用
既然已经有了互斥锁,又为什么会有信号量呢
- 那自然是有它的独特用处
- 互斥锁适用于保证只有一个线程进入一份资源
- 那如果该资源可以被看作多份,若仍然只有一个线程进入,未免效率太低
- 因为可以被看作多份的资源,可以让同样数量的线程并发访问
- 所以,为了更高的效率,便提出了信号量这一概念,来保证在该情况下并发的安全性
介绍
- 当一份共享资源被划分成n份时,需要信号量来保证只有n个线程并发
- 否则就会出现多个线程同时进入一份临界资源,也就会出现一系列问题(比如数据紊乱等等)
注意
- 信号量只是保证了进入资源的线程数是合理的
- 它并没有保证这些线程的进入不冲突 (这些是我们的工作捏,由代码来合理分配线程)
本质
信号量本质就是原子性的计数器
- 该计数器有p操作(也就是--)
- v操作(也就是++)
计数器的本质
计数器用于描述资源的数量
[判断资源是否就绪]
信号量将判断资源是否就绪这一操作,放在了访问临界资源之外,也就是放在了自己的操作中
- 之前写的用互斥锁保证互斥性的cp代码中,我们是在加锁区域中判断资源是否就绪
- 也就是先拿到锁,才能去访问临界资源,判断其是否就绪
- 但这里不同,只要拿到了信号量,其中一份资源就已经分配给你了,也就不需要判断就绪了
- 因为信号量的核心操作就是pv
- 只有当资源就绪时,才能完成p操作,而完成了p操作,就申请到了信号量
- 反过来论证, 申请到信号量,就代表此时资源就绪
- 所以,我们是先确定了资源就绪,才去访问临界资源的
和互斥锁的关联
我们带着信号量的概念回过头去看之前写的cp代码:
- 当时是将资源看成一个整体的,那么信号量就是1
- 所以,只会有一个线程进入资源 == 信号量要么为0,要么为1
这是否也可以对应申请锁和释放锁的过程呢?
- 申请到锁=信号量变为0=资源就绪
- 释放锁=信号量变为1
- 所以,我们可以将互斥锁看作初始值为1的信号量
接口函数
初始化和销毁信号量
sem_init
函数原型
sem
- 其类型为sem_t
- 和之前pthread库中用于标识互斥锁的mutex参数一样(类型为pthread_mutex_t),都是标识符
pshared
- 指定信号量的类型
- 0表示线程间共享,非零表示进程间共享
value
信号量的初始值
sem_destroy
用于销毁信号量,并释放相关的资源
pv操作
sem_wait
对信号量执行p操作(也就是等待信号响应)
- 如果信号量的值大于0,p操作成功(也就是成功获取相关资源,且信号量计数-1)
- 如果信号量的值为0,线程将被阻塞,等待资源的释放(也就是信号量的计数增加)
sem_post
对信号量执行v操作(也就是发送信号)
- 信号量计数+1
- 如果有等待的线程,唤醒其中一个(或多个)线程
其他接口
sem_getvalue
获取信号量的当前值,将其存储到sval变量中
基于环形队列的生产消费者模型
引入
之前我们已经学习过基于阻塞队列的生产消费者模型:
而下面要介绍的,是基于环形队列的cp模型(大差不差嘟,只要了解他生产消费的过程,就不难写代码)
介绍
- 环形队列是一种数据结构,常用于实现缓冲区或队列的循环存储
- 在环形队列中,队列的尾部与头部相连,形成一个环状结构
- 当队列满时,新的元素会覆盖队列头部的元素,实现了循环利用的目的
- --图源网络
- 和之前学习过的环形队列的操作类似,但这里被赋予了生产者和消费者的含义
- 因此我们来重新梳理一下push和pop的操作
生产消费过程
我们将头指针看作消费者,尾指针看作生产者(因为消费的基础是生产,必然是生产者先跑, 消费者追着生产者走)
- 生产者当前所指位置,是即将生产资源的位置
- 消费者当前所指位置,是即将消费资源的位置
- 也就是说,两者都是先在当前位置完成任务,再移动位置
起始时,生产者和消费者指向同一位置(标识此时无资源):
如果当前有资源且不满时,生产者和消费者必然在不同位置:
如果资源满了,两者将会再次指向同一位置:
三个原则
根据这个环形队列的实际意义,我们可以总结出三个原则:
- 消费者不可以超过生产者
- 生产中不可以领先消费者一圈
- 二者在同一位置时,必须互斥
解释
前两条很容易理解 -- 消费的前提是已经有资源,资源满了自然无法生产
而第三条:
- 从前面的图中我们可以知道,当两者在同一位置时,也就意味着资源要么满,要么空
- 满时只有消费者能动,空时只有生产者能动
- 所以要在这两种情况下,让二者互斥(也就是保证只有一方能动)
并发性
注意,因为只有当在同一位置时,才需要互斥,也就是说 -- 不在同一位置的话,两者就可以并发进行了
单生产单消费
思路
实际和之前写的基于阻塞队列的代码类似
生产消费者模型(引入--超市),321原则,阻塞队列实现+优点(代码,伪唤醒问题,条件变量接口wait中锁的作用),进阶版实现(生产任务,RAII风格),多生产多消费实现+优点-CSDN博客
- 定义一个环形队列模型的类,里面封装出需要的操作(生产和消费)
我们从之前的介绍中,可以知道需要的变量:
- 一个环形队列(用数组模拟)
- 定义队列大小
- 两个当前位置(生产者和消费者)
- 两个信号量
信号量我们着重说一下:
- 信号量实际是计数器,所以是用于定义有数量的东西
- 生产需要空间资源,所以它看中的是目前还剩多少空间
- 消费需要数据资源,所以它看中的是目前有多少数据
- 而空间数量+数据数量=队列容量
- 所以,一旦其中一方达到极大值(也就是容量),另一方就是0
- 那么,另一方必然就无法完成p操作,自然也就形成互斥了
- 所以,我们就可以定义相应的两个信号量,两者互相制约,上面总结的三个原则自然就被保证了
代码
ring_queue.hpp
#include <pthread.h> #include <vector> #include <stdlib.h> #include <semaphore.h> using namespace std; static const int def = 5; template <class T> class ring_queue { private: void P(sem_t *sem) { sem_wait(sem); } void V(sem_t *sem) { sem_post(sem); } public: ring_queue(int num = def) : c_i(0), p_i(0), capacity_(num), rq_(num) { sem_init(&data_num_, 0, 0); sem_init(&cap_num_, 0, num); } ~ring_queue() { sem_destroy(&data_num_); sem_destroy(&cap_num_); } void push(const T data) { P(&cap_num_); rq_[p_i] = data; p_i = (p_i + 1) % capacity_; V(&data_num_); } void pop(T &data) { P(&data_num_); data = rq_[c_i]; c_i = (c_i + 1) % capacity_; V(&cap_num_); } private: vector<T> rq_; int c_i; int p_i; int capacity_; sem_t data_num_; sem_t cap_num_; };
除此之外,我们还需要构建出任务:
task.hpp
#pragma once // 生成二元整数运算任务(加减乘除),有错误码提示 // 1为/0操作,2为%0操作,3为非法错误 #include <iostream> #include <string> using namespace std; string symbol = "+-*/%"; class Task { public: Task() {} // 方便只是为了接收传参而定义一个对象 Task(int x, int y, char c) : x_(x), y_(y), code_(0), op_(c), res_(0) { } int get_result() { return res_; } int get_code() { return code_; } string get_task() { string task = to_string(x_) + op_ + to_string(y_) + " = ?"; return task; } void operator()() { switch (op_) { case '+': res_ = x_ + y_; break; case '-': res_ = x_ - y_; break; case '*': res_ = x_ * y_; break; case '/': if (y_ == 0) { code_ = 1; break; } res_ = x_ / y_; break; case '%': if (y_ == 0) { code_ = 2; break; } res_ = x_ % y_; break; default: code_ = 3; break; } } private: int x_; int y_; int res_; int code_; char op_; };
main.cpp
主线程的任务就是构建出两个线程,分别去生产和消费
注意,不要忘记如何获取任务,以及获取到任务需要处理它哟~
#include "ring_queue.hpp" #include "Task.hpp" #include <random> #include <time.h> #include <unistd.h> int sym_size = symbol.size(); void *consume(void *args) { ring_queue<Task> *rq = static_cast<ring_queue<Task> *>(args); while (true) { Task t; //消费 rq->pop(t); //处理任务 t(); cout << "im consumer,task is " << t.get_task() << " ,result is " << t.get_result() << " ,code is " << t.get_code() << endl; } } void *product(void *args) { ring_queue<Task> *rq = static_cast<ring_queue<Task> *>(args); while (true) { //获取任务 int x = rand() % 10 + 1; int y = rand() % 5; char op = symbol[rand() % (sym_size - 1)]; Task t(x, y, op); //生产 rq->push(t); cout << "im productor,task is " << t.get_task() << endl; sleep(1); } } int main() { srand(time(nullptr) ^ getpid()); ring_queue<Task> rq(5); pthread_t c, p; pthread_create(&c, nullptr, consume, &rq); pthread_create(&p, nullptr, product, &rq); pthread_join(c, nullptr); pthread_join(p, nullptr); return 0; }
示例
如果生产比较慢,消费线程就会等待生产线程的生产
- 和我们预想的一样,生产一个任务,随后就消费一个任务:
如果消费比较慢,生产线程就会立即生产任务直至上限
- 而消费者会慢慢一个一个消费
- 消费一个就挪出一个空位,生产线程就会生产一个任务:
多生产多消费
思路
和单线程不同的是,我们需要保证生产者/消费者之间的互斥性(我们不能让多个生产者对同一位置进行生产,消费也同理)
所以我们需要加锁(生产者和消费者各自有一把锁)
代码
ring_queue.hpp
增加了解锁加锁操作
#include <pthread.h> #include <vector> #include <stdlib.h> #include <semaphore.h> using namespace std; static const int def = 5; template <class T> class ring_queue { private: void P(sem_t *sem) { sem_wait(sem); } void V(sem_t *sem) { sem_post(sem); } void lock(pthread_mutex_t *mutex) { pthread_mutex_lock(mutex); } void unlock(pthread_mutex_t *mutex) { pthread_mutex_unlock(mutex); } public: ring_queue(int num = def) : c_i(0), p_i(0), capacity_(num), rq_(num) { sem_init(&data_num_, 0, 0); sem_init(&cap_num_, 0, num); pthread_mutex_init(&c_mutex_, nullptr); pthread_mutex_init(&p_mutex_, nullptr); } ~ring_queue() { sem_destroy(&data_num_); sem_destroy(&cap_num_); pthread_mutex_destroy(&c_mutex_); pthread_mutex_destroy(&p_mutex_); } void push(const T data) { P(&cap_num_); lock(&p_mutex_); rq_[p_i] = data; p_i = (p_i + 1) % capacity_; unlock(&p_mutex_); V(&data_num_); } void pop(T &data) { P(&data_num_); lock(&c_mutex_); data = rq_[c_i]; c_i = (c_i + 1) % capacity_; unlock(&c_mutex_); V(&cap_num_); } private: vector<T> rq_; int c_i; int p_i; int capacity_; sem_t data_num_; sem_t cap_num_; pthread_mutex_t c_mutex_; pthread_mutex_t p_mutex_; };
main.cpp
多线程的结果比较不清晰,所以在代码中添加了不少sleep
以及,把多线程和单线程的代码分开了:
#include "ring_queue.hpp" #include "Task.hpp" #include <random> #include <time.h> #include <unistd.h> int sym_size = symbol.size(); struct thread { thread(ring_queue<Task> *rq, string name) : rq_(rq), name_(name) { } ring_queue<Task> *rq_; string name_; }; void *consume_single(void *args) { ring_queue<Task> *rq = static_cast<ring_queue<Task> *>(args); while (true) { usleep(20); Task t; rq->pop(t); // 处理任务 t(); cout << "im consumer,task is " << t.get_task() << " ,result is " << t.get_result() << " ,code is " << t.get_code() << endl; // sleep(1); } } void *product_single(void *args) { ring_queue<Task> *rq = static_cast<ring_queue<Task> *>(args); while (true) { // 生产任务 int x = rand() % 10 + 1; int y = rand() % 5; char op = symbol[rand() % (sym_size - 1)]; Task t(x, y, op); rq->push(t); cout << "im producer,task is " << t.get_task() << endl; sleep(1); } } void *consume_multiple(void *args) { thread *t = static_cast<thread *>(args); ring_queue<Task> *rq = t->rq_; while (true) { Task task; rq->pop(task); // 处理任务 task(); cout << "im " << t->name_ << ",task is " << task.get_task() << " ,result is " << task.get_result() << " ,code is " << task.get_code() << endl; // sleep(1); } } void *product_multiple(void *args) { thread *t = static_cast<thread *>(args); ring_queue<Task> *rq = t->rq_; while (true) { // 生产任务 int x = rand() % 10 + 1; usleep(30); int y = rand() % 5; usleep(30); char op = symbol[rand() % (sym_size - 1)]; Task task(x, y, op); rq->push(task); cout << "im " << t->name_ << ",task is " << task.get_task() << endl; sleep(1); } } void single() { ring_queue<Task> *rq = new ring_queue<Task>(10); pthread_t c, p; pthread_create(&c, nullptr, consume_single, &rq); pthread_create(&p, nullptr, product_single, &rq); pthread_join(c, nullptr); pthread_join(p, nullptr); } const int p_num = 4, c_num = 2; void multiple() { ring_queue<Task> *rq = new ring_queue<Task>; pthread_t c[c_num], p[p_num]; for (int i = 0; i < p_num; ++i) { string name = "producer-" + to_string(i + 1); thread *t = new thread(rq, name); pthread_create(&p[i], nullptr, product_multiple, t); } for (int i = 0; i < c_num; ++i) { string name = "consumer-" + to_string(i + 1); thread *t = new thread(rq, name); pthread_create(&c[i], nullptr, consume_multiple, t); } for (size_t i = 0; i < p_num; ++i) { pthread_join(p[i], nullptr); } for (size_t i = 0; i < c_num; ++i) { pthread_join(c[i], nullptr); } } int main() { srand(time(nullptr) ^ getpid()); // single(); multiple(); return 0; }
示例
仔细看消费线程,他们消费的顺序都是按照生产的顺序来的: