POSIX信号量
POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。但POSIX可以用于线程间同步
1.快速认识信号量接口
POSIX信号量分为两种类型:
-
命名信号量(Named Semaphores):命名信号量可以在多个进程之间共享,因为它们通过一个名字来标识,这个名字在文件系统中可见。
-
未命名信号量(Unnamed Semaphores):未命名信号量通常用于同一进程内的线程同步,因为它们没有名字,只能通过传递它们在内存中的地址来进行共享。
以下是一些常用的POSIX信号量接口函数:
初始化信号量
-
sem_t sem;
:声明一个信号量变量。 -
int sem_init(sem_t *sem, int pshared, unsigned int value);
:初始化一个未命名的信号量。pshared
参数如果为0,则信号量在进程内线程间共享;如果非0,则信号量在进程间共享。value
参数是信号量的初始值。
销毁信号量
-
int sem_destroy(sem_t *sem);
:销毁一个之前由sem_init
初始化的未命名信号量。
操作信号量
-
int sem_wait(sem_t *sem);
:等待信号量变为正值,然后将其减一。如果信号量的值为0,则调用线程会被阻塞,直到信号量变为正值。 -
int sem_trywait(sem_t *sem);
:尝试等待信号量,如果信号量的值为0,则立即返回EAGAIN
错误,不会阻塞。 -
int sem_post(sem_t *sem);
:增加信号量的值。如果其他线程正在等待该信号量,它们中的一个可能会被唤醒。 -
int sem_getvalue(sem_t *sem, int *sval);
:获取信号量的当前值。
命名信号量的操作
-
sem_t *sem_open(const char *name, int oflag, ...);
:打开一个命名信号量。name
是信号量的名字,oflag
是打开标志,可以是O_CREAT
等。 -
int sem_close(sem_t *sem);
:关闭之前打开的命名信号量。 -
int sem_unlink(const char *name);
:删除命名信号量的名字,允许它在不再被任何进程引用时释放资源。
2.基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标识位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
问题:多线程如何在环形队列中进行生产和消费 ?
1.如果队列为空,生产者先生产
2.如果队列满了,消费者来消费
3.如果队列不为空,也不为满,让生产和消费同时进行
注意:
a.不让生产者把消费者套一个圈
b.不能让消费者,超过生产者
用信号量就能做到如果队列为空,生产者先生产;如果队列满了,消费者来消费
消费者需要管理数据资源,生产者需要管理空间资源
所以数据资源+空间资源 = N
初始化的时候 空间资源space_sem = 0;数据资源 data_sem = N
在生产者这里:需要P(space_sem)(--空间资源) V(data_sem)(++数据资源)
在消费者这里:需要P(space_sem) (--数据资源) V(space_sem)(++空间资源)
Task.hpp
#pragma once
#include<iostream>
#include<functional>
// typedef std::function<void()> task_t;
// using task_t = std::function<void()>;
// void Download()
// {
// std::cout << "我是一个下载的任务" << std::endl;
// }
// 要做加法
class Task
{
public:
Task()
{
}
Task(int x, int y) : _x(x), _y(y)
{
}
void Excute()
{
_result = _x + _y;
}
void operator ()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <pthread.h>
#include <semaphore.h>
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
RingQueue(int max_cap)
: _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, max_cap);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void Push(const T &in) // 生产者
{
// 信号量:是一个计数器,是资源的预订机制。
// 预订:在外部,可以不判断资源是否满足,就可以知道内部资源的情况!
P(_space_sem); // 信号量这里,对资源进行使用,申请,为什么不判断一下条件是否满足???信号量本身就是判断条件!
pthread_mutex_lock(&_p_mutex);
_ringqueue[_p_step++] = in;
_p_step %= _max_cap;
pthread_mutex_unlock(&_p_mutex);
V(_data_sem);
}
void Pop(T *out) // 消费
{
P(_data_sem);
pthread_mutex_lock(&_c_mutex);
*out = _ringqueue[_c_step++];
_c_step %= _max_cap;
pthread_mutex_unlock(&_c_mutex);
V(_space_sem);
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
std::vector<T> _ringqueue;
int _max_cap;
int _c_step;
int _p_step;
sem_t _data_sem; // 消费者关心
sem_t _space_sem; // 生产者关心
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *Consumer(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
Task t;
// 1. 消费
rq->Pop(&t);
// 2. 处理数据
t();
std::cout << "Consumer-> " << t.result() << std::endl;
}
}
void *Productor(void *args)
{
RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
while (true)
{
sleep(1);
// 1. 构造数据
int x = rand() % 10 + 1; //[1, 10]
usleep(x * 1000);
int y = rand() % 10 + 1;
Task t(x, y);
// 2. 生产
rq->Push(t);
std::cout << "Productor -> " << t.debug() << std::endl;
}
}
int main()
{
RingQueue<Task> *rq = new RingQueue<Task>(5);
// 单单
pthread_t c1, c2, p1, p2, p3;
pthread_create(&c1, nullptr, Consumer, rq);
pthread_create(&c2, nullptr, Consumer, rq);
pthread_create(&p1, nullptr, Productor, rq);
pthread_create(&p2, nullptr, Productor, rq);
pthread_create(&p3, nullptr, Productor, rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}