👍作者主页:进击的1++
🤩 专栏链接:【1++的Linux】
文章目录
- 一,信号量
- 二,基于环形队列的生产消费者模型
- 三,线程池
一,信号量
1,什么是信号量?
任何时候都有一个执行流进入共享资源中,我们将这个共享资源称为临界资源。
若我们将这块共享资源当作整体使用就叫做互斥;那么我要是不想当整体使用,而是想要将这块空间继续细分,让不同的执行流访问不同的区域,那么我们不就可以实现并发了吗!但是我们怎么知道有用了多少资源,还剩多少资源。这就用到信号量,它可以保证我们只要进入这块共享资源,一定可以有一个对应的区域是你的,但是要保证具体哪一个资源是你的需要程序员编码进行控制。
我们以电影院为例:
电影院就是我们的整块共享资源,我们要是让一个人看完后再让另一个人进入去看,效率是不是太低了,因此我们就有了通过先卖票,我只要保证我把票卖给你,就一定有你的位置,买票的本质是不是就是:座位的预定机制。
那么信号量的本质:相当于一把计数器,在访问资源的时候,先申请信号量(sem–),使用完资源后,归还信号量(sem++)。
2,信号量的使用:
信号量的初始化
pshared:0 表示线程间共享,非零表示进程间共享,我们这里直接设成0即可
value :信号量初始值是多少,你想让信号量是多少,这里填几就可以
返回值:成功就是0,失败就是-1
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量(P操作) 功能:申请信号量,成功会将信号量的值减 1,继续向后执行,失败就挂起等待
int sem_wait(sem_t *sem); //P() 发布信号量(V操作)
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1 。 int sem_post(sem_t *sem);//V()
信号量的P V操作
多个执行流去访问信号量资源,那么我们就得保证信号量的操作得是原子的,保证信号量是原子的我们才能保证其所保护的资源不会出现错误。
二,基于环形队列的生产消费者模型
1,什么是环形队列
我们怎么来实现这样一个环形队列呢?
1,双指针链表
2,数组
显然,数组是更加简明一些,那么怎么用数组去实现一个环形呢?
物理结构《------》逻辑结构 我们将线性的物理结构通过模运算,从而可以模拟出环形结构
index++;
index%=(n+1)
解决了怎么实现环形队列的问题,下面我们再去想想怎么处理其判空判满
- 加入计数器
- 多申请一个小方格。
在放数据时先判断:当拿和放指向同一位置时,则队列为空;当放的位置+1==拿的位置,此时说明队列就放慢了。
2,基于环形队列的生产消费者模型原理
生产者,最关心什么资源呢?
环形队列中,空的位置。
消费者,最关心什么资源呢?
环形队列中,数据。
制定规则
规则1:生产者不能把消费者套一个圈(生产者把队列放满以后,不能在继续放一圈数据)。
规则2:消费者不能超过生产者 。
规则3:当指向同一个位置的时候,要根据空,满的状态,来判定让谁先执行(满了只能让消费者走,空了只能让生产走)。
其他:除此之外,消费和生产可以并发执行。
生产者和消费者所看重的资源是不同的,一个在意空位置,一个看重数据。
因此我们就可以定义两个信号量来进行计数,当资源满时,生产者必须要等消费者,资源为空时,消费者必须等生产者。所以量信号量必须要同步。
下面是该模型的简单实现:
#pragma once
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<semaphore.h>
#include<functional>
#include<ctime>
#include<vector>
typedef std::function<int(int,int)> func_t;
#define Q_size 5
template<class T>
class Ring_queue
{
public:
Ring_queue(int res_num=Q_size):dp_(res_num),data_step(0),space_step(0)
{
sem_init(&space_sem,0,Q_size);
sem_init(&data_sem,0,0);
}
~Ring_queue()
{
sem_destroy(&space_sem);
sem_destroy(&data_sem);
}
void Push(const T in)
{
sem_wait(&space_sem);//P操作申请信号量
dp_[space_step++]=in;
std::cout<<"将资源放入仓库中"<<std::endl;
space_step%=Q_size;
sem_post(&data_sem);//v操作,资源已经放好,等待来拿
}
void Pop(T* out)
{
sem_wait(&data_sem);//将这份资源划给相应的执行流,相当于将票卖出去
std::cout<<"拿到资源"<<std::endl;
*out=dp_[data_step++];
data_step%=Q_size;
sem_post(&space_sem);//拿完后将空间归还
}
private:
std::vector<T> dp_;
int num_;
sem_t space_sem;
sem_t data_sem;
int data_step;
int space_step;
};
#include"Ring_queue.hpp"
int Add(int x,int y)
{
return x+y;
}
class Task
{
public:
Task()//一定要写,vector在进行初始化时要默认构造,不写此类中就没有默认构造
{}
Task(int x,int y,func_t func):x_(x),y_(y),func_(func)
{
}
int operator ()()
{
return func_(x_,y_);
}
private:
int x_,y_;
func_t func_;
};
void* consumer(void* args)
{
Ring_queue<Task>* rq =(Ring_queue<Task>*)args;
while(true)
{
sleep(2);
Task ret;
rq->Pop(&ret);
std::cout<<"结果"<<ret()<<std::endl;
}
}
void* producer(void* args)
{
Ring_queue<Task>* rq =(Ring_queue<Task>*)args;
//制造任务
std::cout<<"开始"<<std::endl;
while(true)
{
int x=rand()%99;
int y=rand()%99;
Task t(x,y,Add);
rq->Push(t);
}
}
int main()
{
srand((unsigned)time(nullptr));
pthread_t tid[2];
Ring_queue<Task>* p_rq=new Ring_queue<Task>();
pthread_create(tid,nullptr,consumer,(void*)p_rq);
pthread_create(tid+1,nullptr,producer,(void*)p_rq);
pthread_join(tid[0],nullptr);
pthread_join(tid[1],nullptr);
return 0;
}
多生产多消费者
我们需要再维护生产者和生产者之间,消费者和消费者之间的互斥关系,现在我们不允许两个及两个以上的生产者同时生产或者消费者同时消费,因为所有的生产者共用所有的生产下标,消费者共享所有的消费下标,我们可以引入两把互斥锁维护生产者和消费者各种内部的互斥关系。
在任何时候,生产者可以进入一个,消费者可以进入一个,但是生产和消费却可以同时进行
那么这把锁应该加到哪里呢?
加到申请信号量的后面,我们可以保证,进入生产区域包括更新下标的时候,只有一个执行流进入,如果是多生产者,所有人有资格竞争锁的前提是你必须得先申请信号量,就相当于我们把所有的信号量可以预先分配给所有的生产者,然后当锁一旦释放了,其他人就立马申请锁,在你进入到生产区域的时候,别人就可以并行的先把信号量准备好。
对生产消费模型的再次理解
生产消费模型可以实现并发:对于基于环形队列的生产消费模型来说,其并发体现在三个点:
1,当队列不为空为满时,生产者和消费者可以进行并发;
2,生产者和生产者,消费者和消费者在申请信号量时可以实现并发。
3,获取数据和处理数据才是最耗时间的,生产消费模型,可以将生产和消费之间进行解耦,通过所谓的仓库,来进行数据的拿和放,这样,当生产者生产的特别快的时候,就可以用多个进程来拿数据,这样也可以实现并发。
三,线程池
#pragma once
#include <iostream>
#include "Thread.hpp"
#include <vector>
#include "Mutex.hpp"
#include <queue>
#include"Task.hpp"
template <class T>
class ThreadPool
{
public:
static ThreadPool<T> *GetThreadPool()
{
if (_tp == nullptr)
{
Guard lock(Mtx);
if (_tp == nullptr)
{
_tp = new ThreadPool;
}
}
return _tp;
}
private:
ThreadPool(int num = 5) : _num(num)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_cond, nullptr);
for (int i = 0; i < _num; i++)
{
_pool.push_back(new Thread(i, routine, this));
}
}
ThreadPool<T>(const ThreadPool<T> &th) = delete;
ThreadPool<T> operator=(ThreadPool th) = delete;
pthread_mutex_t &getmtx()
{
return _mtx;
}
pthread_cond_t &getcond()
{
return _cond;
}
bool TaskPool_Isempty()
{
return taskpool.empty();
}
T gettask()
{
T task = taskpool.front();
taskpool.pop();
return task;
}
public:
static void *routine(void *args) // 线程取任务
{
ThreadData *td = (ThreadData *)args;
ThreadPool *tp = (ThreadPool *)td->_args;
while (true)
{
T task;
{
Guard lockguard(tp->getmtx());
while (tp->TaskPool_Isempty())
pthread_cond_wait(&tp->getcond(), &tp->getmtx());
task = tp->gettask();
pthread_mutex_unlock(&Mtx);
}
logmessage(NORMAL, "处理任务中");
// std::cout << task << std::endl;
task();
// 处理任务
}
}
void PushTask(T task) // 放任务
{
Guard lock(_mtx);
taskpool.push(task);
pthread_cond_signal(&_cond);
logmessage(NORMAL, "放任务中");
}
void Run()
{
for (auto &it : _pool)
{
it->Start();
}
}
~ThreadPool()
{
for (auto &it : _pool)
{
it->join();
delete it;
}
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_cond);
}
private:
std::vector<Thread *> _pool;
pthread_cond_t _cond;
pthread_mutex_t _mtx;
static ThreadPool<T> *_tp;
std::queue<T> taskpool;
static pthread_mutex_t Mtx;
int _num;
};
template <class T>
pthread_mutex_t ThreadPool<T>::Mtx = PTHREAD_MUTEX_INITIALIZER;
template <class T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
#pragma once
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include"log.hpp"
#include<functional>
//using func_t=std::function<void*(void*)> ;
typedef void* (*_func_t)(void*);
class ThreadData
{
public:
void* _args;
std::string _name;
};
class Thread
{
public:
Thread(int num,_func_t func,void* args):_func(func)
{
_th_data._args=args;
char buffer[64];
snprintf(buffer,sizeof buffer,"%s-%d","thread",num);
_th_data._name=buffer;
logmessage(NORMAL,"thread init success");
}
void Start()
{
pthread_create(&_tid,nullptr,_func,(void*)&_th_data);
logmessage(NORMAL,"create thread success");
}
void join()
{
pthread_join(_tid,nullptr);
logmessage(NORMAL,"join thread success");
}
~Thread()
{}
private:
pthread_t _tid;
_func_t _func;
ThreadData _th_data;
};