一、什么是 POSIX 信号量
信号量本质就是一个统计资源数量的计数器。
1、PV 操作
pv操作就是一种让信号量变化的操作。其中 P 操作可以让信号量减 1(如果信号量大于 0),V 操作可以让信号量加 1.
2、信号量类型——sem_t
3、相关函数
3.1. 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared: 0表示线程间共享,非零表示进程间共享
value:信号量初始值
3.2. 销毁信号量
int sem_destroy(sem_t *sem);
3.3. P 操作
int sem_wait(sem_t *sem);
3.4. V 操作
int sem_post(sem_t *sem);
二、环形生产者消费者模型
1、原理
对于这个模型有一个规则:
- 只能有一个消费者和一个生产者访问这个环形内存;不能多个消费者和多个生产者同时访问这个环形内存。
- 消费者不能超过生产者,也不能套生产者的圈。
- 生产者不能套消费者的圈。
2、代码
#pragma once
#include <vector>
#include <semaphore.h>
#include <pthread.h>
const int default_cap = 5;
template<class T>
class ring_queue
{
private:
void P(sem_t& sem) { sem_wait(&sem); }
void V(sem_t& sem) { sem_post(&sem); }
void Lock(pthread_mutex_t& mutex) { pthread_mutex_lock(&mutex); }
void UnLock(pthread_mutex_t& mutex) { pthread_mutex_unlock(&mutex); }
public:
ring_queue(int capacity = default_cap) : _capacity(capacity), _arr(std::vector<T>(capacity))
{
sem_init(&_sem_consumer, 0, 0);
sem_init(&_sem_producer, 0, _capacity);
pthread_mutex_init(&_mutex_consumer, nullptr);
pthread_mutex_init(&_mutex_producer, nullptr);
}
~ring_queue()
{
sem_destroy(&_sem_consumer), sem_destroy(&_sem_producer);
pthread_mutex_destroy(&_mutex_consumer), pthread_mutex_destroy(&_mutex_producer);
}
void push(const T& in)
{
P(_sem_producer); // pv 操作一步到位,不会被中断,因此可以保证结果正确
Lock(_mutex_producer); // _index_producer 下标互斥
_arr[_index_producer] = in;
_index_producer = (_index_producer + 1) % _capacity;
UnLock(_mutex_producer);
V(_sem_consumer);
}
T pop()
{
P(_sem_consumer); // pv 操作一步到位,不会被中断,因此可以保证结果正确
Lock(_mutex_consumer); // _index_consumer 下标互斥
T out = _arr[_index_consumer];
_index_consumer = (_index_consumer + 1) % _capacity;
UnLock(_mutex_consumer);
V(_sem_producer);
return out;
}
private:
std::vector<T> _arr;
int _capacity;
int _index_consumer, _index_producer;
sem_t _sem_consumer, _sem_producer;
pthread_mutex_t _mutex_consumer, _mutex_producer;
};
三、线程池
1、结构
因为中间的共享内存是被写任务的线程和拿任务的线程共享,因此我们可以把线程池看作是多生产者和多消费者的 CP 模型。
四、线程安全单例模式
1、什么是单例模式
只能建立一个对象的设计模式。
2、单例模式类型
2.1. 懒汉模式
获取实例的时候才开辟空间并初始化。
2.2. 饿汉模式
获取实例前就已经开辟空间并初始化好了。
3、代码(懒汉版)
#ifndef __THREAD_POOL_HPP__
#define __THREAD_POOL_HPP__
#include <vector>
#include <queue>
#include <pthread.h>
const int default_cap = 5;
class thread_info
{
pthread_t _tid;
std::string name;
};
template<class task>
class thread_pool
{
private:
void lock() { pthread_mutex_lock(&_mutex); }
void unlock() { pthread_mutex_unlock(&_mutex); }
~thread_pool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); }
thread_pool(int capacity = default_cap) : _capacity(capacity), _tasks(std::vector<task>(capacity))
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
thread_pool(const thread_pool<task>& pool) = delete;
thread_pool<task>& operator=(const thread_pool<task>& pool) = delete;
public:
static thread_pool<task>* get_instance()
{
if (pt == nullptr) // 第一次申请单例时才要考虑多线程互斥问题,因为后面的 _pt 都不为空了,因此不用判断 _pt 是否为空
{
pthread_mutex_lock(&mutex);
if (pt != nullptr) return pt;
pthread_mutex_unlock(&mutex);
thread_pool<task>* ret = new thread_pool<task>;
}
return ret;
}
// pthread_create 要求的函数是 void* start_routine(void* args),而如果不加 static,则为 void* start_routine(thread_pool<task>* this, void* args)
static void* start_routine(void* args)
{
thread_pool<task>* obj = static_cast<thread_pool<task>*>(args);
obj->lock();
while ((obj->_tasks).empty()) pthread_cond_wait(&_cond, &_mutex);
task t = obj->pop();
obj->unlock();
// run t
return t;
}
void start()
{
for (int i = 0; i < _capacity; i++)
_threads[i].name = "thread - " + std::to_string(i),
pthread_create(&(_threads[i]._tid), nullptr, start_routine, this);
}
void push(const task& in)
{
lock();
_tasks.push(in);
pthread_cond_signal(&_cond);
unlock();
}
task pop() // 可保证在调 pop 方法时只有一个线程在调
{
task out = _tasks.front();
_tasks.pop();
return out;
}
private:
std::vector<thread_info> _threads;
std::queue<task> _tasks;
int _capacity;
pthread_mutex_t _mutex; // 抢任务执行
pthread_cond_t _cond; // 消费者的阻塞队列
static thread_pool<task>* pt;
static pthread_mutex_t mutex;
};
template<class task>
thread_pool<task>* thread_pool<task>::pt = nullptr;
template<class task>
pthread_mutex_t thread_pool<task>::mutex = PTHREAD_MUTEX_INITIALIZER;
#endif