信号量
- 一、POSIX信号量
- 1、信号量的原理
- 2、信号量的概念
- (1)PV操作必须是原子操作
- (2)申请信号量失败被挂起等待
- 3、信号量函数
- 4、销毁信号量
- 5、等待信号量(申请信号量)
- 6、发布信号量(释放信号量)
- 二、二元信号量模拟实现互斥功能
- 1、负数情况
- 2、引入二元信号量
- 三、基于环形队列的生产消费模型
- 1、空间资源和数据资源
- (1)生产者和消费者关注的不同资源
- (2)blank_sem和data_sem的初始值设置
- (3)生产者和消费者申请和释放资源
- i、生产者申请空间资源,释放数据资源
- ii、消费者申请数据资源,释放空间资源
- 2、两个规则
- (1)生产者和消费者不能对同一个位置进行访问
- (2)无论是生产者还是消费者,都不应该将对方套一个圈以上
- 3、利用代码进行讲解
- (1)消费者和生产者步调一致
- (2)生产者生产的快,消费者消费的慢
- (3)生产者生产的慢,消费者消费的快
- 四、信号量保护环形队列的原理
一、POSIX信号量
1、信号量的原理
- 我们将可能会被多个执行流同时访问的资源叫做临界资源,临界资源需要进行保护否则会出现数据不一致等问题。
- 当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
- 但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。
2、信号量的概念
信号量(信号灯)本质是一个计数器,是描述临界资源中资源数目的计数器,信号量能够更细粒度的对临界资源进行管理。
每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。
信号量的PV操作:
- P操作:我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让计数器减一。
- V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让计数器加一。
(1)PV操作必须是原子操作
多个执行流为了访问临界资源会竞争式的申请信号量,因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。
但信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。
(2)申请信号量失败被挂起等待
当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。
3、信号量函数
初始化信号量的函数叫做sem_init,该函数的函数原型如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数说明:
- sem:需要初始化的信号量。
- pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
- value:信号量的初始值(计数器的初始值)。
返回值:
初始化信号量成功返回0,失败返回-1。
注意: POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。
4、销毁信号量
销毁信号量的函数叫做sem_destroy,该函数的函数原型如下:
int sem_destroy(sem_t *sem);
参数说明:
- sem:需要销毁的信号量。
返回值说明:
销毁信号量成功返回0,失败返回-1。
5、等待信号量(申请信号量)
等待信号量的函数叫做sem_wait,该函数的函数原型如下:
int sem_wait(sem_t *sem);
参数说明:
- sem:需要等待的信号量。
返回值说明:
- 等待信号量成功返回0,信号量的值减一。
- 等待信号量失败返回-1,信号量的值保持不变。
6、发布信号量(释放信号量)
发布信号量的函数叫做sem_post,该函数的函数原型如下:
int sem_post(sem_t *sem);
参数说明:
- sem:需要发布的信号量。
返回值说明:
- 发布信号量成功返回0,信号量的值加一。
- 发布信号量失败返回-1,信号量的值保持不变。
二、二元信号量模拟实现互斥功能
1、负数情况
信号量本质是一个计数器,如果将信号量的初始值设置为1,那么此时该信号量叫做二元信号量。
信号量的初始值为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁。
我们不知道还记得之间写的最初的抢票逻辑吗?我们创建四个新线程,四个新线程实现抢票,但最终是出现负数的情况,我们再看一眼下面的代码:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
// 临界资源1000张票
int tickets = 1000;
void* Routine(void* arg)
{
const char* name = (char*)arg;
// 抢票
while(1)
{
if(tickets > 0)
{
// 抢票
usleep(10000);
printf("[%s] get a ticket...left tickets#%d\n", name, --tickets);
}
else
{
break;
}
}
cout << name << "exit..." << endl;
pthread_exit((void*)0);
}
int main()
{
pthread_t tid1, tid2, tid3, tid4;
// 创建四个新线程
pthread_create(&tid1, NULL, Routine, (void*)"thread 1");
pthread_create(&tid2, NULL, Routine, (void*)"thread 2");
pthread_create(&tid3, NULL, Routine, (void*)"thread 3");
pthread_create(&tid4, NULL, Routine, (void*)"thread 4");
// 退出
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
pthread_join(tid4, NULL);
return 0;
}
我们发现是有负数的,这显然不符合我们预期的。
2、引入二元信号量
下面我们在抢票逻辑当中加入二元信号量,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时二元信号量就达到了互斥的效果。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#include <string>
// 实现一个Sem类函数
class Sem
{
private:
sem_t _sem;
public:
// 构造函数
Sem(int num)
{
sem_init(&_sem, 0, num);
}
// 析构函数
~Sem()
{
sem_destroy(&_sem);
}
// P操作
void P()
{
sem_wait(&_sem);
}
// V操作
void V()
{
sem_post(&_sem);
}
};
// 二原信号量
Sem sem(1);
int tickets = 1000;
void* TicketGrabbge(void* arg)
{
// 抢票
std::string Name = (char*)arg;
while(1)
{
sem.P(); // 等待信号量
if(tickets > 0)
{
// 出票
usleep(10000);
std::cout << "[" << Name << "]" << "get a tickets#" << --tickets << std::endl;
sem.V(); // 发布信号量
}
else
{
sem.V(); // 发布信号量
break;
}
}
std::cout << Name << "quit..." << std::endl;
pthread_exit((void*)0);
}
int main()
{
pthread_t tid1, tid2, tid3, tid4;
// 创建线程
pthread_create(&tid1, nullptr, TicketGrabbge, (void*)"thread 1");
pthread_create(&tid2, nullptr, TicketGrabbge, (void*)"thread 2");
pthread_create(&tid3, nullptr, TicketGrabbge, (void*)"thread 3");
pthread_create(&tid4, nullptr, TicketGrabbge, (void*)"thread 4");
// 线程等待
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
return 0;
}
运行代码后就不会出现剩余票数为负的情况了,因为此时同一时刻只会有一个执行流对全局变量tickets进行访问,不会出现数据不一致的问题。
三、基于环形队列的生产消费模型
1、空间资源和数据资源
(1)生产者和消费者关注的不同资源
生产者关注的是空间资源,消费者关注的是数据资源
对于生产者和消费者来说,它们关注的资源是不同的:
- 生产者关注的是环形队列当中是否有空间(blank),只要有空间生产者就可以进行生产。
- 消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费。
(2)blank_sem和data_sem的初始值设置
现在我们用信号量来描述环形队列当中的空间资源(blank_sem)和数据资源(data_sem),在我们初始信号量时给它们设置的初始值是不同的:
- blank_sem的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间。
- data_sem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。
(3)生产者和消费者申请和释放资源
i、生产者申请空间资源,释放数据资源
对于生产者来说,生产者每次生产数据前都需要先申请blank_sem:
- 如果blank_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
- 如果blank_sem的值为0,则信号量申请失败,此时生产者需要在blank_sem的等待队列下进行阻塞等待,直到环形队列当中有新的空间后再被唤醒。
当生产者生产完数据后,应该释放data_sem:
- 虽然生产者在进行生产前是对blank_sem进行的P操作,但是当生产者生产完数据,应该对data_sem进行V操作而不是blank_sem。
- 生产者在生产数据前申请到的是blank位置,当生产者生产完数据后,该位置当中存储的是生产者生产的数据,在该数据被消费者消费之前,该位置不再是blank位置,而应该是data位置。
- 当生产者生产完数据后,意味着环形队列当中多了一个data位置,因此我们应该对data_sem进行V操作。
ii、消费者申请数据资源,释放空间资源
对于消费者来说,消费者每次消费数据前都需要先申请data_sem:
- 如果data_sem的值不为0,则信号量申请成功,此时消费者可以进行消费操作。
- 如果data_sem的值为0,则信号量申请失败,此时消费者需要在data_sem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒。
当消费者消费完数据后,应该释放blank_sem:
- 虽然消费者在进行消费前是对data_sem进行的P操作,但是当消费者消费完数据,应该对blank_sem进行V操作而不是data_sem。
- 消费者在消费数据前申请到的是data位置,当消费者消费完数据后,该位置当中的数据已经被消费过了,再次被消费就没有意义了,为了让生产者后续可以在该位置生产新的数据,我们应该将该位置算作blank位置,而不是data位置。
- 当消费者消费完数据后,意味着环形队列当中多了一个blank位置,因此我们应该对blank_sem进行V操作。
2、两个规则
(1)生产者和消费者不能对同一个位置进行访问
- 如果生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的。
- 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,此时不会出现数据不一致等问题。
(2)无论是生产者还是消费者,都不应该将对方套一个圈以上
- 生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。
- 同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。
3、利用代码进行讲解
signal.hpp:
#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
#define NUM 10
template<class T>
class RingQueue
{
private:
std::vector<T> _q; // 循环队列
int _capacity; // 循环队列的容量
int _p_pos; // 生产位置
int _c_pos; // 消费位置
sem_t _blank_sem; // 描述空间资源
sem_t _data_sem; // 描述数据资源
private:
// P操作 -- 等待
void P(sem_t& s)
{
sem_wait(&s);
}
// V操作 -- 传递
void V(sem_t& s)
{
sem_post(&s);
}
public:
// 构造函数
RingQueue(int cap = NUM)
:_capacity(cap)
,_p_pos(0)
,_c_pos(0)
{
_q.resize(_capacity);
sem_init(&_blank_sem, 0, _capacity); // blank_sem初始值设置为环形队列的容量
sem_init(&_data_sem, 0, 0); // _data_sem初始值设置为0
}
// 析构函数
~RingQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
}
// 生产者 -- 向环形队列中插入元素
void Push(const T& data)
{
P(_blank_sem); // 向空间资源加锁!
_q[_p_pos] = data; // 环形队列中的相对应位置用data填入
V(_data_sem); // 向数据资源解锁!即生产资源
// 更新下一次插入的位置
_p_pos++;
_p_pos %= _capacity;
}
// 消费者
void Pop(T& data)
{
P(_data_sem); // 向数据资源加锁!
data = _q[_c_pos]; // 拿到消费位置的数据
V(_blank_sem); // 空间资源的解锁,即开始将空间资源进行增加空间
// 更新下一次弹出的位置
_c_pos++;
_c_pos %= _capacity;
}
};
- 当不设置环形队列的大小时,我们默认将环形队列的容量上限设置为10。
- 代码中的RingQueue是用vector实现的,生产者每次生产的数据放到vector下标为p_pos的位置,消费者每次消费的数据来源于vector下标为c_pos的位置。
- 生产者每次生产数据后p_pos都会进行++,标记下一次生产数据的存放位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
- 消费者每次消费数据后c_pos都会进行++,标记下一次消费数据的来源位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
- p_pos只会由生产者线程进行更新,c_pos只会由消费者线程进行更新,对这两个变量访问时不需要进行保护,因此代码中将p_pos和c_pos的更新放到了V操作之后,就是为了尽量减少临界区的代码。
我们这里设置两个线程,一个是消费者线程另一个是生产者线程,生产者负责生产数据到环形队列中,消费者负责在环形队列中获取数据:
main.cc:
#include "signal.hpp"
void* Productor(void* arg)
{
RingQueue<int>* Name = (RingQueue<int>*)arg;
while(1)
{
sleep(1);
int data = rand() % 100 + 1;
Name->Push(data);
std::cout << "Productor###" << data << std::endl;
}
}
void* Consumer(void* arg)
{
RingQueue<int>* Name = (RingQueue<int>*)arg;
while(1)
{
sleep(1);
int data = 0;
Name->Pop(data);
std::cout << "Consumer###" << data << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t tid1, tid2;
// 创造两个新线程
RingQueue<int>* rq = new RingQueue<int>;
pthread_create(&tid1, nullptr, Productor, rq);
pthread_create(&tid2, nullptr, Consumer, rq);
// 线程等待
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
delete rq; // 防止内存泄漏
return 0;
}
说明:
- 环形队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个环形队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将环形队列作为线程执行例程的参数进行传入。
- 代码中生产者生产数据就是将获取到的随机数Push到环形队列,而消费者就是从环形队列Pop数据,为了便于观察,我们可以将生产者生产的数据和消费者消费的数据进行打印输出。
(1)消费者和生产者步调一致
我们上面的代码就是消费者和消费者步调一致,我们进行运行代码并进行打印一看:
(2)生产者生产的快,消费者消费的慢
生产者不停的进行生产,而消费者每隔一秒进行消费。
void* Productor(void* arg)
{
RingQueue<int>* Name = (RingQueue<int>*)arg;
while(1)
{
int data = rand() % 100 + 1;
Name->Push(data);
std::cout << "Productor###" << data << std::endl;
}
}
void* Consumer(void* arg)
{
RingQueue<int>* Name = (RingQueue<int>*)arg;
while(1)
{
sleep(1);
int data = 0;
Name->Pop(data);
std::cout << "Consumer###" << data << std::endl;
}
}
此时由于生产者生产的很快,运行代码后一瞬间生产者就将环形队列打满了,此时生产者想要再进行生产,但空间资源已经满了,于是生产者只能在blank_sem的等待队列下进行阻塞等待,直到由消费者消费完一个数据后对blank_sem进行了V操作,生产者才会被唤醒进而继续进行生产。
但由于生产者的生产速度很快,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
(3)生产者生产的慢,消费者消费的快
生产者每隔一秒进行生产,而消费者不停的进行消费。
void* Productor(void* arg)
{
RingQueue<int>* Name = (RingQueue<int>*)arg;
while(1)
{
sleep(1);
int data = rand() % 100 + 1;
Name->Push(data);
std::cout << "Productor###" << data << std::endl;
}
}
void* Consumer(void* arg)
{
RingQueue<int>* Name = (RingQueue<int>*)arg;
while(1)
{
int data = 0;
Name->Pop(data);
std::cout << "Consumer###" << data << std::endl;
}
}
虽然消费者消费的很快,但一开始环形队列当中的数据资源为0,因此消费者只能在data_sem的等待队列下进行阻塞等待,直到生产者生产完一个数据后对data_sem进行了V操作,消费者才会被唤醒进而进行消费。
但由于消费者的消费速度很快,消费者消费完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
四、信号量保护环形队列的原理
在blank_sem和data_sem两个信号量的保护后,该环形队列中不可能会出现数据不一致的问题。
因为只有当生产者和消费者指向同一个位置并访问时,才会导致数据不一致的问题,而此时生产者和消费者在对环形队列进行写入或读取数据时,只有两种情况会指向同一个位置:
- 环形队列为空时。
- 环形队列为满时。
但是在这两种情况下,生产者和消费者不会同时对环形队列进行访问:
- 当环形队列为空的时,消费者一定不能进行消费,因为此时数据资源为0。
- 当环形队列为满的时,生产者一定不能进行生产,因为此时空间资源为0。
当环形队列为空和满时,我们已经通过信号量保证了生产者和消费者的串行化过程。而除了这两种情况之外,生产者和消费者指向的都不是同一个位置,因此该环形队列当中不可能会出现数据不一致的问题。并且大部分情况下生产者和消费者指向并不是同一个位置,因此大部分情况下该环形队列可以让生产者和消费者并发的执行。