Linux POSIX信号量 线程池
- 一. 什么是POSIX信号量?
- 二. POSIX信号量实现原理
- 三. POSIX信号量接口函数
- 四. 基于环形队列的生产消费模型
- 五. 线程池
一. 什么是POSIX信号量?
POSIX信号量是一种用于同步和互斥操作的机制,属于POSIX(Portable Operating System Interface) 标准的一部分。这一标准定义了操作系统应该为应用程序提供的接口,而POSIX信号量是在多线程和多进程环境下实现同步的一种方式。
信号量本质上是一个计数器,用于描述临界资源的数量。在多线程或多进程的情况下,当多个执行单元(线程或进程)需要访问共享资源时,使用信号量可以有效地协调它们的行为,避免竞争条件和提高程序的可靠性。
二. POSIX信号量实现原理
POSIX信号量的实现原理基于一个计数器和一个等待队列。关键的操作包括P操作和V操作:
P操作:申请信号量,如果信号量计数器大于零,表示资源可用,计数器减一;如果计数器为零,线程将被阻塞,并加入等待队列。
V操作:释放信号量,计数器加一,并唤醒等待队列中的一个线程。
具体实现原理如下:
- 信号量结构包括计数器和等待队列。
- 当计数器为零时,表示资源不可用,线程申请信号量时将被阻塞,并放入等待队列。
- 当计数器大于零时,表示资源可用,线程申请信号量时计数器减一,线程获得资源。
- 当释放信号量时,计数器加一,如果等待队列不为空,唤醒等待队列中的一个线程。
三. POSIX信号量接口函数
-
sem_t 是一个数据类型,用于表示信号量。作为同步机制的一部分,信号量用于协调共享资源的访问。用户通过提供的接口函数(如 sem_init、sem_wait、sem_post)来操作信号量,
-
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem: 指向要初始化的信号量的指针。
pshared: 0表示信号量在线程间共享,非零表示在进程间共享。
value: 信号量的初始值。
返回值:成功时返回0,失败时返回-1。
- 等待信号量
#include <semaphore.h>
int sem_wait(sem_t *sem); // P()
sem: 指向要等待的信号量的指针。
功能:等待信号量,将信号量的值减1。如果信号量的值为0,线程将被阻塞。
返回值:成功时返回0,失败时返回-1。
- 释放信号量
#include <semaphore.h>
int sem_post(sem_t *sem); // V()
sem: 指向要发布的信号量的指针。
功能:释放信号量,表示资源使用完毕,将信号量的值加1。通常用于释放信号。
返回值:成功时返回0,失败时返回-1。
- 销毁信号量
#include <semaphore.h>
int sem_destroy(sem_t *sem);
sem: 要销毁的信号量的指针。
返回值:成功时返回0,失败时返回-1。
四. 基于环形队列的生产消费模型
- 环形队列简介
环形队列是一种基于数组或链表的数据结构,具有循环特性。其关键特点包括循环性、高效性和固定大小。常用于缓冲区、循环缓存和生产者-消费者模型等场景。由于采用模运算,插入和删除操作的时间复杂度为O(1),使得其在实时系统和有限资源的应用中得以广泛应用。
放数据操作:
等待生产者信号量: 通过 P(prodSemaphore),生产者等待信号量,确保有足够的空间可供数据生产。
将数据放入缓冲区: 数据被放入环形缓冲区的当前生产者索引位置 (buffer_[prodIndex])。 生产者索引 prodIndex 被更新,并通过取模操作确保索引在缓冲区容量内循环。
发送消费者信号量: 通过 V(consSemaphore),生产者通知消费者有新的数据可供消费。
拿数据操作:
等待消费者信号量: 通过 P(consSemaphore),消费者等待信号量,确保有足够的数据可供消费。
从缓冲区取出数据: 数据被从环形缓冲区的当前消费者索引位置取出 (buffer_[consIndex])。 消费者索引 consIndex
被更新,并通过取模操作确保索引在缓冲区容量内循环。
发送生产者信号量: 通过 V(prodSemaphore),消费者通知生产者有空间可供数据生产。
五. 线程池
什么是线程池?
线程池是一种常见的多线程使用模式。它通过维护一组线程,等待监督管理者分配可并发执行的任务。这种设计避免了在处理短时间任务时创建与销毁线程的开销,提高了系统性能。线程池通过保证内核的充分利用,同时防止过度调度,对于某些应用场景尤其有效。
实例:创建一个简单的固定数量线程池
#include <iostream>
#include <queue>
#include <vector>
#include <pthread.h>
#include <unistd.h>
using namespace std;
// 线程数据结构
struct threaddata
{
pthread_t tid; // 线程ID
};
// 任务类
class Task
{
public:
Task(int data) : data_(data)
{}
// 重载运算符,用于执行任务
void operator()()
{
run();
}
private:
// 任务执行函数
void run()
{
cout << "数据:" << data_ << endl;
}
int data_;
};
// 线程池模板类
template<class T>
class ThreadPool
{
public:
// 构造函数,默认线程数为6
ThreadPool(int n = 6) : td(6)
{
pthread_mutex_init(&mutex_, nullptr); // 初始化互斥锁
pthread_cond_init(&cond_, nullptr); // 初始化条件变量
}
// 析构函数
~ThreadPool()
{
pthread_mutex_destroy(&mutex_); // 销毁互斥锁
pthread_cond_destroy(&cond_); // 销毁条件变量
}
// 线程处理函数
static void* handler(void* args)
{
ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
while (1)
{
tp->lock(); // 加锁,保护临界区
while (tp->isQueueEmpty())
{
tp->wait(); // 当任务队列为空时,等待条件变量
}
T data = tp->queueFront(); // 获取任务队列的队首元素
tp->queuePop(); // 弹出任务队列的队首元素
tp->unlock(); // 解锁,释放临界区
data(); // 执行任务
}
}
// 启动线程池
void start()
{
for (int i = 0; i < td.size(); i++)
{
pthread_create(&(td[i].tid), nullptr, handler, static_cast<void*>(this)); // 创建线程
}
}
// 将任务放入任务队列
void push(T& data)
{
lock(); // 加锁,确保线程安全
task.push(data); // 将任务加入队列
wakeup(); // 唤醒等待的线程
unlock(); // 解锁,释放锁,允许其他线程访问任务队列
}
// 判断任务队列是否为空
bool isQueueEmpty()
{
return task.empty();
}
// 获取任务队列的队首元素
T& queueFront()
{
return task.front();
}
// 弹出任务队列的队首元素
void queuePop()
{
task.pop();
}
public:
// 加锁操作
void lock()
{
pthread_mutex_lock(&mutex_);
}
// 解锁操作
void unlock()
{
pthread_mutex_unlock(&mutex_);
}
// 等待条件变量
void wait()
{
pthread_cond_wait(&cond_, &mutex_);
}
// 唤醒等待条件变量的线程
void wakeup()
{
pthread_cond_signal(&cond_);
}
private:
vector<threaddata> td; // 线程数据
queue<T> task; // 任务队列
pthread_mutex_t mutex_; // 互斥锁
pthread_cond_t cond_; // 条件变量
};
int main()
{
srand(time(nullptr));
ThreadPool<Task> thread(6); // 创建线程池,设置线程数为6
thread.start(); // 启动线程池
while (1)
{
Task d(rand() % 100);
thread.push(d); // 将任务放入线程池
sleep(1);
}
return 0;
}
线程池模板类 ThreadPool: 创建了一个线程池类,模板参数为任务类型 T,默认线程数为6。 使用 pthread 库提供的互斥锁和条件变量来实现线程同步。 提供了启动线程池的 start 函数,创建指定数量的线程,并在这些线程中执行 handler
函数。 提供了将任务推送到任务队列的 push 函数,该函数会将任务加入队列,唤醒等待中的线程。
任务类 Task: 任务类用于封装线程池中执行的具体任务,其中包含一个整数类型的数据。 通过重载 () 运算符实现了任务的执行函数,输出任务的数据。
线程处理函数 handler: 作为线程的入口函数,不断从任务队列中取出任务并执行。 使用互斥锁保护任务队列,条件变量用于在任务队列为空时等待新任务。 通过调用线程池的成员函数来实现任务的执行、入队、出队等操作。
主函数 main: 在主函数中,创建了一个 ThreadPool 对象,设置线程数为6,并启动线程池。 进入无限循环,每次循环生成一个随机数,创建一个包含该随机数的 Task 对象,并通过线程池的 push 函数将任务推送到任务队列中。
程序不断创建新的任务,由线程池中的线程执行。