文章目录
- 一、POSIX 信号量
- 二、POSIX 信号量的接口
- 2.1 sem_init——初始化信号量
- 2.2 sem_destroy——销毁信号量
- 2.3 sem_wait——等待信号量
- 2.4 sem_post——发布信号量
- 三、基于环形队列的生产消费者模型
- 3.1 单生产单消费模型
- 3.2 多生产多消费模型
- 3.3 基于任务的多生产多消费模型
- 四、结语
一、POSIX 信号量
共享资源也可以被看成多份,只要规定好每个线程的访问区域即可,此时就可以让多线程去并发的访问临界资源。
POSIX
信号量和 SystemV
信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。但 POSIX
可以用于线程间同步。信号量本质是一把计数器,用来描述可用资源数目的,申请信号量时,其实就已经在间接的做判断,看资源是否就绪了,只要申请到信号量,那么说明资源一定是就绪的。
信号量只能保证,不让多余的线程来访问共享资源,即,当前共享资源有十份,信号量不会允许同时有十一个线程来访问临界资源。但是具体的资源分配是通过程序员编码去实现的。如果出现一个共享资源同时被两个线程访问,就属于程序员的编码 Bug。
二、POSIX 信号量的接口
2.1 sem_init——初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
-
sem
:要初始化的信号量 -
pshared
:0表示线程间共享,非0表示进程间共享。 -
value
:信号量初始值
2.2 sem_destroy——销毁信号量
int sem_destroy(sem_t *sem);
2.3 sem_wait——等待信号量
int sem_wait(sem_t *sem); //P()
- 功能:会将信号量的值减1
2.4 sem_post——发布信号量
int sem_post(sem_t *sem);//V()
- 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量的值加1
三、基于环形队列的生产消费者模型
只要生产和消费不访问同一个格子,那么生产和消费就可以同时进行。那生产和消费什么时候会指向同一个数据呢?答案是队列为空和为满的时候。
基于环形队列的生产消费者模型必须遵守以下三个原则:
-
当生产和消费指向同一个资源的时候,只能一个人访问。为空的时候,由生产者去访问;为满的时候,由消费者去访问
-
消费者不能超过生产者
-
生产者不能把消费者套圈,因为这样会导致数据被覆盖
生产者最关心还剩多少空间(空间数量);消费者最关系还剩多少数据(数据数量)。因为有两种资源,所以需要定义两个信号量。
3.1 单生产单消费模型
// RingQueue.hpp
#pragma once
#include <pthread.h>
#include <vector>
#include <semaphore.h>
template<class T>
class RingQueue
{
private:
static const int defaultcap = 5;
void P(sem_t *sem) // 申请一个信号量
{
sem_wait(sem);
}
void V(sem_t *sem) // 归还一个信号量
{
sem_post(sem);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step(0), p_step(0)
{
sem_init(&cdata_sem, 0, 0);
sem_init(&pspace_sem, 0, cap_);
}
void Push(const T &data) // 生产行为
{
P(&pspace_sem);
ringqueue_[p_step] = data;
V(&cdata_sem);
p_step++;
p_step %= cap_;
}
void Pop(T *out) // 消费行为
{
P(&cdata_sem);
*out = ringqueue_[c_step];
V(&pspace_sem);
c_step++;
c_step %= cap_;
}
~RingQueue()
{
sem_destroy(&cdata_sem);
sem_destroy(&pspace_sem);
}
private:
std::vector<T> ringqueue_; // 环形队列
int cap_; // 容量
int c_step; // 消费者下一个要消费的位置
int p_step; // 生产者下一个要生产的位置
sem_t cdata_sem; // 数据资源
sem_t pspace_sem; // 空间资源
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
using namespace std;
void *Consumer(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);
while(true)
{
int data = 0;
rq->Pop(&data);
cout << "Consumer is running... get a data: " << data << endl;
// 模拟处理数据
usleep(1000000);
}
}
void *Productor(void *args)
{
RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);
while(true)
{
// 获取数据
usleep(10000); // 模拟获取数据
int data = rand() % 10;
rq->Push(data);
cout << "Productor is running... produce a data: " << data << endl;
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t c, p;
RingQueue<int> *rq = new RingQueue<int>();
pthread_create(&c, nullptr, Consumer, rq);
pthread_create(&p, nullptr, Productor, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
互斥与同步的体现:当生产下标和消费下标相同的时候,只允许一个来访问,这就是互斥性的体现。当队列为空的时候,让生产者去访问资源,当队列为满的时候,让消费者去访问资源,这就是在指向同一个位置时,让生产和消费具有一定的顺序性,这就是同步性的体现。当队列不为空或不为满的时候,生产下标和消费下标不同,此时两个线程并发执行,并没有体现出很强的互斥特性。
3.2 多生产多消费模型
此时需要对下标资源进行保护。因为生产下标和消费下标各自只有一份,不允许同时有多个生产线程去访问生产下标,消费线程也一样。因此需要通过加锁来实现生产线程之间的互斥和消费线程之间的互斥。
先加锁还是先申请信号量?答案是先申请信号量,以生产线程为例,这样可以让所有生产线程并发的去执行,什么意思呢?如果是先加锁再申请信号量的话,因为始终只有一个生产者线程能够申请到锁,所以也就只有一个生产者线程能去申请信号量,其他生产者线程只能干巴巴的等待锁被释放。这时申请锁和申请信号量的动作是串行的。而先申请信号量的话,可以保证虽然只有一个线程能够申请到锁,但是其他没有锁的线程也可以不用闲着,可以先去申请信号量,因为信号量的申请是原子的,因此也不需要加锁进行保护,只要能申请到信号量,就说明资源还有,此时那些申请到信号量的线程就可能等待锁被释放,拿到锁之后就可以去执行相应的代码了。
// RingQueue.hpp
#pragma once
#include <pthread.h>
#include <vector>
#include <semaphore.h>
#include <string>
template<class T>
class RingQueue
{
private:
static const int defaultcap = 5;
void P(sem_t *sem) // 申请一个信号量
{
sem_wait(sem);
}
void V(sem_t *sem) // 归还一个信号量
{
sem_post(sem);
}
void Lock(pthread_mutex_t *mutex)
{
pthread_mutex_lock(mutex);
}
void Unlock(pthread_mutex_t *mutex)
{
pthread_mutex_unlock(mutex);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step(0), p_step(0)
{
sem_init(&cdata_sem, 0, 0);
sem_init(&pspace_sem, 0, cap_);
pthread_mutex_init(&c_mutex, nullptr);
pthread_mutex_init(&p_mutex, nullptr);
}
void Push(const T &data) // 生产行为
{
P(&pspace_sem);
Lock(&p_mutex);
ringqueue_[p_step] = data;
p_step++;
p_step %= cap_;
Unlock(&p_mutex);
V(&cdata_sem);
}
void Pop(T *out) // 消费行为
{
P(&cdata_sem); // 信号量资源是不需要保护的,因为它的操作是原子的,临界区中的代码要尽可能的少,所以不需要把信号量的申请放在加锁之后
Lock(&c_mutex);
*out = ringqueue_[c_step];
c_step++;
c_step %= cap_;
Unlock(&c_mutex);
V(&pspace_sem);
}
~RingQueue()
{
sem_destroy(&cdata_sem);
sem_destroy(&pspace_sem);
pthread_mutex_destroy(&c_mutex);
pthread_mutex_destroy(&p_mutex);
}
private:
std::vector<T> ringqueue_; // 环形队列
int cap_; // 容量
int c_step; // 消费者下一个要消费的位置
int p_step; // 生产者下一个要生产的位置
sem_t cdata_sem; // 数据资源
sem_t pspace_sem; // 空间资源
pthread_mutex_t c_mutex; // 对消费下标的保护
pthread_mutex_t p_mutex; // 对生产下标的保护
};
template <class T>
class Message
{
public:
Message(std::string thread_name, RingQueue<T> *ringqueue)
:thread_name_(thread_name), ringqueue_(ringqueue)
{}
std::string &get_thread_name()
{
return thread_name_;
}
RingQueue<T> *get_ringqueue()
{
return ringqueue_;
}
private:
std::string thread_name_;
RingQueue<T> *ringqueue_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>
using namespace std;
void *Consumer(void *args)
{
Message<int> *message = static_cast<Message<int> *>(args);
RingQueue<int> *rq = message->get_ringqueue();
string name = message->get_thread_name();
while (true)
{
int data = 0;
rq->Pop(&data);
printf("%s is running... get a data: %d\n", name.c_str(), data);
// 模拟处理数据
// usleep(1000000);
}
}
void *Productor(void *args)
{
Message<int> *message = static_cast<Message<int> *>(args);
RingQueue<int> *rq = message->get_ringqueue();
string name = message->get_thread_name();
while (true)
{
// 获取数据
// usleep(1000000); // 模拟获取数据
int data = rand() % 10;
rq->Push(data);
printf("%s is running... produce a data: %d\n", name.c_str(), data);
usleep(1000000);
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t c[3], p[5];
RingQueue<int> *rq = new RingQueue<int>();
vector<Message<int>*> messages;
for (int i = 0; i < 5; i++)
{
Message<int> *message = new Message<int>("Produttor Thread "+to_string(i), rq);
pthread_create(p + i, nullptr, Productor, message);
messages.push_back(message);
}
for (int i = 0; i < 3; i++)
{
Message<int> *message = new Message<int>("Consumer Thread "+to_string(i), rq);
pthread_create(c + i, nullptr, Consumer, message);
messages.push_back(message);
}
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
for (auto message : messages)
{
delete message;
}
delete rq;
return 0;
}
3.3 基于任务的多生产多消费模型
RingQueue
的内容不变
// Task.h
#include <iostream>
#include <string>
enum
{
DIVERROR = 1,
MODERROR,
UNKNOWERRROR
};
class Task
{
public:
Task(int a = 0, int b = 0, char op = '+')
:data1_(a), data2_(b), op_(op), result_(0), exitcode_(0)
{}
void run()
{
switch(op_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
if(data2_ == 0) exitcode_ = DIVERROR;
else result_ = data1_ / data2_;
break;
case '%':
if(data2_ == 0) exitcode_ = MODERROR;
else result_ = data1_ % data2_;
break;
default:
exitcode_ = UNKNOWERRROR;
break;
}
}
std::string result_to_string()
{
std::string ret = std::to_string(data1_);
ret += ' ';
ret += op_;
ret += ' ';
ret += std::to_string(data2_);
ret += ' ';
ret += '=';
ret += ' ';
ret += std::to_string(result_);
ret += "[exitcode: ";
ret += std::to_string(exitcode_);
ret += ']';
return ret;
}
std::string get_task()
{
std::string ret = std::to_string(data1_);
ret += ' ';
ret += op_;
ret += ' ';
ret += std::to_string(data2_);
ret += ' ';
ret += '=';
ret += ' ';
ret += '?';
return ret;
}
private:
int data1_;
int data2_;
char op_;
int result_;
int exitcode_;
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <vector>
#include "Task.h"
using namespace std;
const std::string opers = "+-*/%";
void *Consumer(void *args)
{
Message<Task> *message = static_cast<Message<Task> *>(args);
RingQueue<Task> *rq = message->get_ringqueue();
string name = message->get_thread_name();
while (true)
{
// 获取任务
// int data = 0;
Task task;
rq->Pop(&task);
// 对任务做处理
task.run();
printf("%s is running... get a data: %s\n", name.c_str(), task.result_to_string().c_str());
// 模拟处理数据
// usleep(1000000);
}
}
void *Productor(void *args)
{
Message<Task> *message = static_cast<Message<Task> *>(args);
RingQueue<Task> *rq = message->get_ringqueue();
string name = message->get_thread_name();
int len = opers.size();
while (true)
{
// 获取数据
// usleep(1000000); // 模拟获取数据
// int data = rand() % 10;
// 模拟获取数据
int data1 = rand() % 10 + 1; // [1, 10]
usleep(10);
int data2 = rand() % 13; // [0, 13]
usleep(10);
char op = opers[rand() % len];
Task task(data1, data2, op);
// 生产数据
rq->Push(task);
// printf("%s is running... produce a data: %d\n", name.c_str(), data);
printf("%s is running... produce a Task: %s\n", name.c_str(), task.get_task().c_str());
usleep(1000000);
}
}
int main()
{
srand((unsigned int)time(nullptr));
pthread_t c[3], p[2];
RingQueue<Task> *rq = new RingQueue<Task>();
vector<Message<Task>*> messages;
for (int i = 0; i < 5; i++)
{
Message<Task> *message = new Message<Task>("Produttor Thread "+to_string(i), rq);
pthread_create(p + i, nullptr, Productor, message);
messages.push_back(message);
}
for (int i = 0; i < 3; i++)
{
Message<Task> *message = new Message<Task>("Consumer Thread "+to_string(i), rq);
pthread_create(c + i, nullptr, Consumer, message);
messages.push_back(message);
}
// 等待子线程
for (int i = 0; i < 3; i++)
{
pthread_join(c[i], nullptr);
}
for (int i = 0; i < 5; i++)
{
pthread_join(p[i], nullptr);
}
// 释放资源
for (auto message : messages)
{
delete message;
}
delete rq;
return 0;
}
四、结语
今天的分享到这里就结束啦!如果觉得文章还不错的话,可以三连支持一下,春人的主页还有很多有趣的文章,欢迎小伙伴们前去点评,您的支持就是春人前进的动力!