POSIX信号量
POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX可以用于线程同步
31节说了信号量,信号量的本质是一个计数器。将共享资源从一个整体划分为很多不部分,就和电影院的座位一样,信号量就是资源总共的数量。信号量的获取就是一种对资源的预定机制,申请信号量成功,资源数量–,当数量为0时,申请失败。申请和释放就是PV操作。在线程里,存储数据的部分就是共享资源,用信号量来管理它,描述可以存多少数据,把资源是否就绪放在了临界区之外,申请信号量时,就已经间接的判断了,在PV操作之间访问资源,如果获得了信号量,就表明一定有资源可以适用,不需要判断资源是否还有
相关函数
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem);
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);
环形队列的生产消费模型
环形队列采用数组模拟,模运算模拟特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过计数器或者标记位来判断满或者空,另外也可以预留一个空的位置,作为满的状态
但是我们现在有信号量这个计数器,就很简单的实现多线程同步过程
环形队列本质是一个数组,用圆的方式模拟头尾相接。C和P分别是生产者和消费者的下标,刚开始两者都在同一位置,当环形队列为空的时候,一定是P先生产,放入一个数据后往后挪动一下,然后消费者获取数据,获得数据后也往后挪动一下,这时P和C又处于同一位置。
P关注的是还剩余多少空间,初始值为最大存储N,每生产一个这个值就会–,定义一个空间信号量,SpaceSem:N,
C关注的是还有多少剩余数据,初始值为0,取走一个就–,定义一个数据信号量,DataSem:0
根据上面,生产者生产数据,增加DataSem,减少SpaceSem,消费者消费,增加SpaceSem,减少DataSem
P和C什么时候会出现在同一位置,一定是空或者满,不空或不满的时候,它们两个可以同时访问队列,因为它们只需要维护好属于自己的信号量和下标,就不会产生干扰
满足条件
上产消费整个过程就像一个追逐游戏,生产者先走,消费者在后面追,环形队列的生成消费模型必须满足这三个条件
1.指向同一个位置时,只能有一个人访问。
空:生产者访问,放入数据
满:消费者访问,取出数据
2.消费者不能超过生产者生产的速度,生产者生产5个,不能消费5个以上
3.生产者不能套两个圈,生产者在生产满所有位置的时候,不能继续移动了
实现
类的设计
首先需要一个数组存放数据,生产和消费两个下标,剩余空间和当前数据两个信号量,多生产多消费同一时间只能有一个访问存储空间,所以需要两把锁
push的时候先申请空间信号量,申请到了放入数据移动下标,下标是多生产者的共享资源,所以这些访问需要上锁,信号量的操作是原子的,不需要上锁。消费也同样
#pragma once
#include <pthread.h>
#include <vector>
#include <semaphore.h>
static const int defaultcap = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = defaultcap)
:_que(defaultcap)
{
_pstep = 0;
_cstep = 0;
_maxcap = cap;
sem_init(&_sem_sapace, 0, cap);
sem_init(&_sem_data, 0, 0);
pthread_mutex_init(&_pmutex, nullptr);
pthread_mutex_init(&_cmutex, nullptr);
}
void push(const T& x)
{
//P
sem_wait(&_sem_sapace);
//信号量不需要上锁
pthread_mutex_lock(&_pmutex);
_que[_pstep] = x;
//位置后移,维持环形
_pstep++;
_pstep %= _maxcap;
pthread_mutex_unlock(&_pmutex);
//V操作
sem_post(&_sem_data);
}
//消费
void pop(T* out)
{
//P
sem_wait(&_sem_data);
pthread_mutex_lock(&_cmutex);
*out = _que[_cstep];
//维持环形
_cstep++;
_cstep %= _maxcap;
pthread_mutex_unlock(&_cmutex);
// V操作
sem_post(&_sem_sapace);
}
~RingQueue()
{
sem_destroy(&_sem_sapace);
sem_destroy(&_sem_data);
pthread_mutex_destroy(&_pmutex);
pthread_mutex_destroy(&_cmutex);
}
private:
std::vector<T> _que;
int _maxcap;
int _pstep; //生产者下标
int _cstep; //消费者下标
sem_t _sem_sapace;
sem_t _sem_data;
pthread_mutex_t _pmutex;
pthread_mutex_t _cmutex;
};
线程部分和上一节基本一样
#include <unistd.h>
#include <cstdlib>
#include <iostream>
#include <ctime>
#include "blockqueue.hpp"
#include "task.hpp"
#include "RingQuene.hpp"
#include "Ring.hpp"
void *produce(void *bk)
{
RingQueue<task>* block = static_cast<RingQueue<task>*>(bk);
while (true)
{
int x1 = rand() % 10;
usleep(10);
int x2 = rand() % 10 + 1;
char op = g_op[rand() % 4];
task t(x1, x2, op);
//生产
printf("%p生产任务:%s\n", pthread_self(), t.gettask().c_str());
block->push(t);
sleep(2);
}
}
void* consume(void* bk)
{
RingQueue<task>* block = static_cast<RingQueue<task>*>(bk);
while (true)
{
//消费
task n;
block->pop(&n);
n.run();
printf("%p完成任务:%s\n", pthread_self(), n.getresult().c_str());
//sleep(1);
}
}
int main()
{
srand(time(NULL));
RingQueue<task>* block = new RingQueue<task>();
pthread_t ptid[3], ctid[5];
for (int i = 0; i < 3; i++)
{
pthread_create(&ptid[i], nullptr, produce, block);
}
for (int i = 0; i < 5; i++)
{
pthread_create(&ctid[i], nullptr, consume, block);
}
for (int i = 0; i < 3; i++)
{
pthread_join(ptid[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(ctid[i], nullptr);
}
delete block;
return 0;
}