引入
以电影院买票为例
去电影院看电影需要先买票,如果买过票了,哪怕我们没有去看电影,在电影票的有效期内,电影院对应的座位就是属于你的。
买票的本质:对资源(座位)的预订。
信号量
本质就是一把“计数器”,通常用来表示临界资源中资源数目的多少。
申请信号量实际上就是对临界资源的预定机制。
信号量的PV操作:
P操作:我们将申请信号量称为P操作。申请信号量的本质就是申请获得临界资源中某块资源的访问权限,当申请成功时临界资源中资源的数目应该减一,因此P操作的本质就是让信号量减一。
V操作:我们将释放信号量称为V操作。释放信号量的本质就是归还临界资源中某块资源的访问权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让信号量加一。
信号量有关函数
sem_init (初始化信号量)
#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);参数:pshared:0表示线程间共享,非零表示进程间共享value:信号量初始值返回值:初始化信号量成功返回0,失败返回-1。
sem_destroy (销毁信号量)
int sem_destroy(sem_t *sem);
sem_wait (初始化信号量)
功能:等待信号量,会将信号量的值减 1int sem_wait(sem_t *sem);
sem_post (发布信号量)
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1 。int sem_post(sem_t *sem);
基于环形队列的生产者消费者模型
上边的环形结构是抽象出来的,实际上就是用定长的数组模拟实现的。
当数组为空或者数组满了的时候,生产者和消费者最终都会指向同一个位置
解决方法是:空出来一个位置 or 采用信号量。
下边将采用信号量来解决。
规则:
为空时,生产者先运行; 为满时,消费者先运行。
不为空也不为满时,二者可以同时运行,将大大提高效率。
生产者最关心的就是 “空间资源”,消费者最关心的就是“数据资源”。
当生产者生产一个数据,空间--,数据++;
当消费者消费一个数据,数据--,空间++。
代码实现
ringQueue.hpp
#ifndef _Ring_QUEUE_HPP_
#define _Ring_QUEUE_HPP_
#include <iostream>
#include <vector>
#include <pthread.h>
#include "sem.hpp"
const int g_default_num = 5;
template<class T>
class RingQueue
{
public:
RingQueue(int default_num = g_default_num)
: ring_queue_(default_num),
num_(default_num),
c_step(0),
p_step(0),
space_sem_(default_num),
data_sem_(0)
{
pthread_mutex_init(&clock, nullptr);
pthread_mutex_init(&plock, nullptr);
}
~RingQueue()
{
pthread_mutex_destroy(&clock);
pthread_mutex_destroy(&plock);
}
void push(const T &in)
{
// 先申请信号量(0)
space_sem_.p();
pthread_mutex_lock(&plock);
ring_queue_[p_step++] = in;
p_step %= num_;
pthread_mutex_unlock(&plock);
data_sem_.v();
}
void pop(T *out)
{
data_sem_.p();
pthread_mutex_lock(&clock);
*out = ring_queue_[c_step++];
c_step %= num_;
pthread_mutex_unlock(&clock);
space_sem_.v();
}
private:
std::vector<T> ring_queue_;
int num_;
int c_step; // 消费下标
int p_step; // 生产下标
Sem space_sem_;
Sem data_sem_;
pthread_mutex_t clock;
pthread_mutex_t plock;
};
#endif
sem.hpp
#ifndef _SEM_HPP_
#define _SEM_HPP_
#include <iostream>
#include <semaphore.h>
class Sem
{
public:
Sem(int value)
{
sem_init(&sem_,0,value);
}
void p()
{
sem_wait(&sem_);
}
void v()
{
sem_post(&sem_);
}
~Sem()
{
sem_destroy(&sem_);
}
private:
sem_t sem_;
};
#endif
test.cc
#include "ringQueue.hpp"
#include <cstdlib>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
void *consumer(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while(true)
{
sleep(1);
int x;
rq->pop(&x);
std::cout << "消费: " << x << " [" << pthread_self() << "]" << std::endl;
}
}
void *productor(void *args)
{
RingQueue<int> *rq = (RingQueue<int> *)args;
while(true)
{
// sleep(1);
int x = rand() % 100 + 1;
std::cout << "生产: " << x << " [" << pthread_self() << "]" << std::endl;
rq->push(x);
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());
RingQueue<int> *rq = new RingQueue<int>();
// rq->debug();
pthread_t c[3],p[2];
pthread_create(c, nullptr, consumer, (void*)rq);
pthread_create(c+1, nullptr, consumer, (void*)rq);
pthread_create(c+2, nullptr, consumer, (void*)rq);
pthread_create(p, nullptr, productor, (void*)rq);
pthread_create(p+1, nullptr, productor, (void*)rq);
for(int i = 0; i < 3; i++) pthread_join(c[i], nullptr);
for(int i = 0; i < 2; i++) pthread_join(p[i], nullptr);
return 0;
}
输出结果: