<Linux> 线程池

目录

前言:

一、线程池概念

(一)池化技术

(二)优点

(三)应用场景

二、线程池的实现

(一)线程池_V1(朴素版)

(二)线程池_V2(封装版)

(三)线程池_V3(优化版)

三、单例模式

(一)什么是单例模式

(二)单例模式特点

(三)单例模式的简单实现

1. 饿汉模式

2. 懒汉模式

3. 懒汉模式(线程安全版)

(四)线程池_V4(最终版)

四、线程周边问题

(一)STL线程安全问题

(二)智能指针线程安全问题

(三)其他常见锁概念

五、读者写者模型


前言:

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

一、线程池概念

(一)池化技术

所谓的 线程池 就是 提前创建一批线程,当任务来临时,线程直接从任务队列中获取任务执行,可以提高整体效率;同时一批线程会被合理维护,从而避免频繁地创建和销毁对象所带来的开销。

像这种把未来会高频使用到,并且创建较为麻烦的资源提前申请好的技术称为 池化技术池化技术 可以极大地提高性能,最典型的就是 线程池,常用于各种涉及网络连接相关的服务中,比如 MySQL 连接池、HTTP 连接池、Redis 连接池 等。

除了线程池外还有内存池,比如 STL 中的容器在进行空间申请时,都是直接从 空间配置器 allocator 中获取的,并非直接使用系统调用来申请空间。

池化技术 的本质:空间换时间

池化技术 就好比你把钱从银行提前取出一部分放在支付宝中,可以随时使用,十分方便和高效,总不至于需要用钱时还得跑到银行排队取钱。

(二)优点

线程池 的优点在于 高效、方便

  1. 线程在使用前就已经创建好了,使用时直接将任务交给线程完成
  2. 线程会被合理调度,确保 任务与线程 间能做到负载均衡

线程池 中的线程数量不是越多越好,因为线程增多会导致调度变复杂,具体创建多少线程取决于具体业务场景,比如 处理器内核、剩余内存、网络中的 socket 数量等。

线程池 还可以配合 「生产者消费者模型」 一起使用,做到 解耦与提高效率。

  • 可以把 任务队列 换成 「生产者消费者模型」

(三)应用场景

线程池 有以下几种应用场景:

  1. 存在大量且短小的任务请求:比如 Web 服务器中的网页请求,使用 线程池 就非常合适,因为网页点击量众多,并且大多都没有长时间连接访问。
  2. 对性能要求苛刻,力求快速响应需求:比如游戏服务器,要求对玩家的操作做出快速响应。
  3. 突发大量请求,但不至于使服务器产生过多的线程:短时间内,在服务器创建大量线程会使得内存达到极限,造成出错,可以使用 线程池 规避问题。
  4. 定时任务调度:对于需要定时执行的任务,线程池也能发挥重要作用。比如,定期发送邮件、更新数据等任务可以通过线程池来调度和执行。

二、线程池的实现

(一)线程池_V1(朴素版)

朴素版:实现最基本的线程池功能,直接使用系统提供的接口。

所谓朴素版就是不加任何优化设计,只实现 线程池 最基础的功能,便于理解 线程池

创建 ThreadPool_v1.hpp 头文件

将 线程池 实现为一个类,提供接口供外部调用

首先要明白 线程池 的两大核心:一批线程 与 任务队列,客户端发出请求,新增任务,线程获取任务,执行任务,因此 ThreadPool_v1.hpp 的大体框架如下:

  • 一批线程,通过容器管理
  • 任务队列,存储就绪的任务
  • 互斥锁
  • 条件变量

互斥锁 的作用是 保证多个线程并发访问任务队列时的线程安全,而 条件变量 可以在 任务队列 为空时,让一批线程进入等待状态,也就是线程同步。

#pragma once

#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>

#define THREAD_NUM 5

template<class T>
class MyThreadPool
{
public:
    MyThreadPool(int num = THREAD_NUM)
        :_threads(num), _num(num)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }
    
    ~MyThreadPool()
    {
        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    void init()
    {
        // 其他信息初始化...
    }

    void start()
    {
        // 启动线程池...
    }

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理...
    }
private:
    std::vector<pthread_t> _threads;
    int _num; // 线程数量
    std::queue<T> _tasks; // 利用STL自动扩容的特性,无须担心容量
    pthread_mutex_t _mtx;
    pthread_cond_t _cond;
};

注意:

  • 需要提前给 vector 扩容,避免后面使用时发生越界访问。
  • 提供给线程的回调函数需要设置为静态,否则线程调不动(参数不匹配)。

填补函数体

初始化线程池 init()

当前场景只需要初始化 互斥锁 和 条件变量,在 构造函数 中完成就行了,所以这里的 init() 函数不需要补充

启动线程池 start()

启动 线程池 需要先创建出一批线程,这里直接循环创建即可

    void start()
    {
        // 启动线程池
        for(int i = 0; i < _num; i++)
            pthread_create(&_threads[i], nullptr, threadRoutine, this);
        
    }

这里进行简单打印,打印当前线程的线程 ID 就行了,并且直接 detach,主线程无需等待次线程运行结束

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        while(true)
        {
            sleep(1);

            // std::cout << " thread running..." << pthread_self()  << std::endl;
            printf("thread running... %lu\n", (unsigned long)pthread_self());
            printf("================================\n");
        }
    }

创建 main.cc 源文件,测试线程池的代码

#include "ThreadPool.hpp"
#include <memory>

int main()
{
    std::unique_ptr<MyThreadPool<int>> ptr(new MyThreadPool<int>());

    ptr->init();
    int cnt = 3;
    while(cnt--)
    {
        sleep(1);
        ptr->start();
    }

    return 0;
}

编译并运行代码,可以看到 确实创建了一批线程,当主线程退出后,其他次线程也就跟着终止了 


线程池 还需要提供一个重要的接口 pushTask(),将用户需要执行的业务装载至 任务队列 中,等待线程执行

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护
        pthread_mutex_lock(&_mtx);
        _tasks.push(task);

        // 唤醒消费者消费
        pthread_cond_signal(&_cond);
        pthread_mutex_unlock(&_mtx);
    }

装载任务的本质就是在生产任务,相当于用户充当生产者,通过这个接口将任务生产至任务队列中,而线程充当消费者,从任务队列中获取任务并消费。

所以线程的回调函数需要从 任务队列 中获取任务,进行消费

  1. 检测是否有任务
  2. 有 -> 消费
  3. 没有 -> 等待

修改回调函数

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        while(true)
        {
            // sleep(1);

            // 任务队列是临界资源,需要保护
            pthread_mutex_lock(&_mtx);

            // 等待条件满足
            while(_tasks.empty())
                pthread_cond_wait(&_cond, &_mtx);

            T task = _tasks.front();
            _tasks.pop();
            // task(); // 进行消费,稍后实现

            pthread_mutex_unlock(&_mtx);
        }
    }

注意: 判断任务队列是否为空需要使用 while,确保在多线程环境中不会出现问题。

因为 任务队列、互斥锁、条件变量 是类内成员,而这里的 threadRoutine() 函数是一个静态函数,并没有 this 指针以访问类内成员,可以采取传递 this 指针的方式解决问题

    void start()
    {  
        // 启动线程池
        for(int i = 0; i < _num; i++)
            pthread_create(&_threads[i], nullptr, threadRoutine, this);
    }

threadRoutine() 函数需要将参数 void* 转化为所在类对象的指针,并通过该指针访问类内成员

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        auto ptr = static_cast<MyThreadPool<T>*>(args);
        
        while(true)
        {
            sleep(1);

            // 任务队列是临界资源,需要保护
            pthread_mutex_lock(&ptr->_mtx);

            // 等待条件满足
            while(ptr->_tasks.empty())
                pthread_cond_wait(&ptr->_cond, &ptr->_mtx);

            T task = ptr->_tasks.front();
            ptr->_tasks.pop();
            // task(); // 进行消费

            pthread_mutex_unlock(&ptr->_mtx);
        }
    }

为了使得提高代码的可阅读性及可拓展性,这里将会封装一批接口,供函数调用

加解锁

    void lockQueue()
    {
        pthread_mutex_lock(&_mtx);
    }
    void unlockQueue()
    {
        pthread_mutex_unlock(&_mtx);
    }

等待和唤醒

    void threadWait()
    {
        pthread_cond_wait(&_cond, &_mtx);
    }
    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

判空、获取任务

    void isEmpty()
    {
        return _tasks.empty();
    }

    T popTask()
    {
        T task = _tasks.front();
        _tasks.pop();
        return task;
    }

修改装载任务 pushTask()

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护
        lockQueue();
        _tasks.push(task);

        // 唤醒消费者消费
        threadWakeup();
        threadWait();
    }

以及 消费者 threadRountine()

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        auto ptr = static_cast<MyThreadPool<T>*>(args);
        
        while(true)
        {
            sleep(1);

            // 任务队列是临界资源,需要保护
            ptr->lockQueue();

            // 等待条件满足
            while(ptr->isEmpty())
                ptr->threadWait();

            T task = ptr->popTask();

            pthread_mutex_unlock(&ptr->_mtx);

            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            // task(); 
        }
    }

细节: 轮到线程执行任务时,不需要加锁,这就好比你买桶泡面回家,是不必担心别人会和你争抢,可以慢慢消费;同样的,你也不应该占用锁资源,主动让出锁资源以提高整体效率。

task() 表示执行任务,这里实际是一个 operator()() 的重载,详见 <Linux> 生产者消费者模型 中关于 Task.hpp 的设计,因为我们这里也需要使用任务,所以可以直接把之前写的代码拷贝过来

#pragma once

#include <string>

template<class T>
class Task
{
public:
    Task(T x = 0, T y = 0, char op = '+')
        :_x(x), _y(y), _op(op), _result(0), _err(0)
    {}

    // 重载运算操作
    void operator()()
    {
        // 加减乘除
        switch(_op)
        {
            case '+': 
                _result = _x + _y;
            break;
            case '-': 
                _result = _x - _y;
            break;
            case '*': 
                _result = _x * _y;
            break;
            case '/': 
                _result = _x / _y;
            break;
            case '%':
                if(_y == 0) _err = -2;
                else _result = _x % _y;
            break;
            default:
                _err = -3;
            break;
        }
    }

    // 获取计算结果
    std::string getResult()
    {
        std::string ret = std::to_string(_x) + " " + _op
                        + " " + std::to_string(_y);

        if(_err)
        {
            ret += " error ";

            // 判断是 / 错误还是 % 错误
            if(_err == -1) ret += " [-1]  / 0 引发了错误";
            if(_err == -2) ret += " [-2]  % 0 引发了错误";
            else ret += " [-3]  不合法的操作符,只能为 [+-*/%]";
        }
        else
        {
            ret += " = " + std::to_string(_result);
        }
        return ret;
    }
private:
    T _x;
    T _y;
    char _op; // 运算符
    T _result; // 结果
    int _err; // 错误标识
};

轮到 Main.cc 进行操作了,逻辑很简单:创建线程池对象,初始化线程池,启动线程池,装载任务,等待运行结果

#include "ThreadPool.hpp"
#include <memory>

int main()
{
    std::unique_ptr<MyThreadPool<int>> ptr(new MyThreadPool<int>());

    ptr->init();
    ptr->start();

    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x = 0, y = 0;
        char op = '+';
        std::cout << "输入x: ";
        std::cin >> x;
        std::cout << "输入y: ";
        std::cin >> y;
        std::cout << "输入op: ";
        std::cin >> op;

        // 构建任务对象
        Task task(x, y, op);
        
        // 装载任务
        ptr->pushTask(task);
    }
    return 0;
}

现在还有最后一个问题:如何获取计算结果?可以在 线程 执行完任务后,直接显示计算结果,也可以通过传入回调函数的方式,获取计算结果,前者非常简单,只需要在 threadRoutine() 中加入这行代码即可:

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
            // ...
            
            // 显示计算结果
            std::cout << task.getResult() << std::endl;
    }

除此之外,我们也可以通过 回调函数 的方式获取计算结果

目标:给线程传入一个回调函数,线程执行完 任务后,将任务传给回调函数,回调函数结合业务逻辑,灵活处理结果。

这里我们选择使用回调函数打印结果,在主函数中很容易就可以写出这个回调函数

// 回调函数
void CallBack(Task<int> &task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();
    std::cout << "计算结果为:" << ret << std::endl;;
}

为了能让 线程 在执行任务后能回调,需要将这个函数对象作为参数,传递给 ThreadPool 对象

int main()
{
    std::unique_ptr<MyThreadPool<Task<int>>> ptr(new MyThreadPool<Task<int>>(CallBack));

    // ...
}

当然,这边传递了一个对象,那边就得接收此对象,为了存储该函数对象,ThreadPool 新增一个类成员:_func,函数对象类型为 void (T&)

#include <functional>

#define THREAD_NUM 5

template<class T>
class MyThreadPool
{
    using func_t = std::function<void(T&)>;

public:
    MyThreadPool(func_t func, int num = THREAD_NUM)
        :_threads(num), _num(num), _func(func)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

private:
    func_t _func;
};

修改完成后,创建 ThreadPool 对象时,支持传入一个类型为 void(T&) 的函数对象

获取函数对象后,需要让 线程 在执行完任务后进行回调,但又因为这玩意是一个类内成员,同样需要借助外部传入的 this 指针进行访问,这里直接封装成一个接口,顺便进行调用

private: 
   func_t callBack(T &task)
    {
        _func(task);
    }

最后补充任务处理与 threadRoutine

    // 给线程的回调函数
    static void *threadRoutine(void *args)
    {
            // ...

            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            task();
            ptr->callBack(task);
        }
    }

程序结果正常,不必在意打印问题,因为屏幕也是被多线程并发访问的资源,没加锁保护,导致出现问题。

(二)线程池_V2(封装版)

引入自己封装实现的线程库 Thread.hpp,支持对线程做出更多操作

#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>

using namespace std;

class Thread
{
public:
    // 状态表
    typedef enum
    {
        NEW = 0,
        RUNNING,
        EXITED
    }ThreadStatus;
    typedef void (*func_t)(void *);
public:
    Thread(int num, func_t func, void *args)
        :_tid(0), _func(func), _status(NEW), _args(args)
    {
        // 根据ID写入名字
        char name[128];
        snprintf(name, sizeof(name), "thread-%d", num);
        _name = name;
    }
    
    ~Thread()
    {}
    
    // 获取线程ID
    pthread_t getID() 
    { 
        if (_status == RUNNING)
            return _tid;
        else
            return 0;
     }

    // 获取线程名
    string getName() { return _name; }

    // 获取线程状态
    int getStatus() { return _status; }

    // 启动线程
    void run()
    {
        int n = pthread_create(&_tid, nullptr, runHelper, this/*需考虑*/);
        if(n != 0)
        {
            cerr << "create thread fail" << endl;
            exit(1);
        }
        _status = RUNNING;// 线程跑起来状态为运行中
    }

    // // 回调函数
    // static void *runHelper(void *args)
    // {
    //     Thread *ts = static_cast<Thread*>(args);

    //     ts->_func(ts->_args); 
    //     // return nullptr;
    // }

    static void *runHelper(void *args)
    {
        Thread *ts = (Thread*)args; // 就拿到了当前对象
        // _func(_args);
        (*ts)();
        return nullptr;
    }
    void operator ()() //仿函数
    {
        if(_func != nullptr) _func(_args);
    }

    // 线程等待
    void join()
    {
        int n = pthread_join(_tid, nullptr);
        if(n != 0)
        {
            cerr << "join thread fail" << endl;
            exit(1);
        }
        _status = EXITED;// 线程等待成功后状态为退出
    }
private:
    pthread_t _tid; // 线程ID
    func_t _func; // 线程回调函数
    ThreadStatus _status; // 线程状态
    void *_args; // 回调函数的参数,可以设置成模板
    string _name; // 线程名
};

不再直接使用原生线程库,转而使用自己封装的线程库

#pragma once
#include "Task.hpp"
#include "Thread.hpp"

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <functional>

#define THREAD_NUM 5

template<class T>
class ThreadPool
{
    using func_t = std::function<void(T&)>; // 包装器
public:
    void lockQueue()
    {
        pthread_mutex_lock(&_mtx);
    }
    void unlockQueue()
    {
        pthread_mutex_unlock(&_mtx);
    }

    void threadWait()
    {
        pthread_cond_wait(&_cond, &_mtx);
    }
    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    bool isEmpty()
    {
        return _tasks.empty();
    }

    T popTask()
    {
        T task = _tasks.front();
        _tasks.pop();
        return task;
    }

    func_t callBack(T &task)
    {
        _func(task);
    }
public:
    ThreadPool(func_t func, int num = THREAD_NUM)
        : _num(num), _func(func)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        // 等待线程退出
        for(auto &t : _threads)
            t.join();

        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    void init()
    {
        for(int i = 0; i < _num; i++)
            _threads.push_back(Thread(i, threadRoutine, this));
    }

    void start()
    {
        // 启动线程
        for(int i = 0; i < _num; i++)
            _threads[i].run();
    }

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护
        lockQueue();
        _tasks.push(task);

        // 唤醒消费者消费
        threadWakeup();
        unlockQueue();
    }

    // 给线程的回调函数
    static void threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        pthread_detach(pthread_self());

        ThreadPool<T> *ptr = static_cast<ThreadPool<T> *>(args);

        while(true)
        {
            // 任务队列是临界资源,需要保护
            ptr->lockQueue();

            // 等待条件满足
            while(ptr->isEmpty())
                ptr->threadWait();

            T task = ptr->popTask();

            ptr->unlockQueue();

            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            task();
            ptr->callBack(task);
        }
    }
private:
    func_t _func;
    std::vector<Thread> _threads;
    int _num; // 线程数量
    std::queue<T> _tasks; // 利用STL自动扩容的特性,无须担心容量
    pthread_mutex_t _mtx;
    pthread_cond_t _cond;
};

涉及修改的内容:

  • _threads 类型由 vector<pthread_t> 变为 vector<Thread>
  • init() 函数用于创建线程,注册线程信息
  • start() 函数用于启动线程
  • ~ThreadPool() 中新增等待线程退出
  • 线程回调函数 threadRoutinue() 返回值改为 void
  • 新增函数对象 _func


 (三)线程池_V3(优化版)

 优化版:引入 RAII 风格的锁,实现自动化加锁与解锁。

手动 加锁、解锁 显得不够专业,并且容易出问题,比如忘记释放锁资源而造成死锁,因此我们可以设计一个小组件 LockGuard,实现 RAII 风格的锁:初始化创建,析构时销毁

#pragma once

#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *pmtx)
        :_pmtx(pmtx)
    {
        // 加锁
        pthread_mutex_lock(_pmtx);
    }

    ~LockGuard()
    {
        // 解锁
        pthread_mutex_unlock(_pmtx);
    }
private:
    pthread_mutex_t *_pmtx;
};

将这个锁类加入 ThreadPool_V3.hpp 中,可以得到以下代码:

#pragma once
#include "Task.hpp"
#include "Thread.hpp"
#include "Mutex.hpp"

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <functional>

#define THREAD_NUM 5

template<class T>
class ThreadPool
{
    using func_t = std::function<void(T&)>; // 包装器
public:
    pthread_mutex_t* getlock()
    {
        return &_mtx;
    }

    void threadWait()
    {
        pthread_cond_wait(&_cond, &_mtx);
    }

    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    bool isEmpty()
    {
        return _tasks.empty();
    }

    T popTask()
    {
        T task = _tasks.front();
        _tasks.pop();
        return task;
    }

    func_t callBack(T &task)
    {
        _func(task);
    }
public:
    ThreadPool(func_t func, int num = THREAD_NUM)
        : _num(num), _func(func)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        // 等待线程退出
        for(auto &t : _threads)
            t.join();

        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    void init()
    {
        for(int i = 0; i < _num; i++)
            _threads.push_back(Thread(i, threadRoutine, this));
    }

    void start()
    {
        // 启动线程
        for(int i = 0; i < _num; i++)
            _threads[i].run();
    }

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护,自动加锁解锁
        LockGuard lockgrard(&_mtx);
        _tasks.push(task);

        // 唤醒消费者消费
        threadWakeup();
    }

    // 给线程的回调函数
    static void threadRoutine(void *args)
    {
        // 任务处理
        // 避免等待线程,直接分离
        // pthread_detach(pthread_self());

        ThreadPool<T> *ptr = static_cast<ThreadPool<T> *>(args);

        while(true)
        {
            T task;
            {
                //  自动解锁加锁
                LockGuard lockguard(ptr->getlock());
                // 等待条件满足
                while(ptr->isEmpty())
                    ptr->threadWait();

                task = ptr->popTask();
            }
            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            task();
            ptr->callBack(task);
        }
    }
private:
    func_t _func;
    std::vector<Thread> _threads;
    int _num; // 线程数量
    std::queue<T> _tasks; // 利用STL自动扩容的特性,无须担心容量
    pthread_mutex_t _mtx;
    pthread_cond_t _cond;
};

如何证明现在有一批线程在运行呢?

通过指令查看,当程序运行后,再新开一个终端,并输入以下命令

ps -aL | grep threadPool

可以看到:除了主线程 20251外,其他次线程都在等待任务就绪,当大量并发任务来临时,线程池是能大大提高效率的 :

三、单例模式

(一)什么是单例模式

代码构建类,类实例化出对象,这个实例化出的对象也可以称为 实例,比如常见的 STL 容器,在使用时,都是先根据库中的类,形成一个 实例 以供使用;正常情况下,一个类可以实例化出很多很多个对象,但对于某些场景来说,是不适合创建出多个对象的。

比如本文中提到的 线程池,当程序运行后,仅需一个 线程池对象 来进行高效任务计算,因为多个 线程池对象 无疑会大大增加调度成本,因此需要对 线程池类 进行特殊设计,使其只能创建一个 对象,换句话说就是不能让别人再创建对象。

正如 一山不容二虎 一样,线程池 对象在一个程序中是不推荐出现多个的,在一个程序中只允许实例化出一个对象,可以通过 单例模式 来实现,单例模式 是非常 经典、常用、常考 的设计模式。

什么是设计模式?
设计模式就是计算机大佬们在长时间项目实战中总结出来的解决方案,是帮助菜鸡编写高质量代码的利器,常见的设计模式有 单例模式、建造者模式、工厂模式、代理模式等

(二)单例模式特点

单例模式 最大的特点就是 只允许存在一个对象(实例),这就好比现在的 一夫一妻制 一样,要是在古代,单例模式 肯定不被推崇

在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百 GB) 到内存中,此时往往要用一个 单例 的类来管理这些数据;在我们今天的场景中,也需要一个 单例线程池 来协同生产者与消费者。

(三)单例模式的简单实现

单例模式 有两种实现方向:饿汉 与 懒汉,它们避免类被再次创建出对象的手段是一样的:构造函数私有化、删除拷贝构造。

只要外部无法访问 构造函数,那么也就无法构建对象了,比如下面这个类 Signal

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
};

当外界试图创建对象时

#include <iostream>
#include "Signal.hpp"

int main()
{
    Signal s;

    return 0;
}

当然这只实现了一半,还有另一半是 创建一个单例对象,既然外部受权限约束无法创建对象,那么类内是肯定可以创建对象的,只需要创建一个指向该类对象的 静态指针 或者一个 静态对象,再初始化就好了;因为外部无法访问该指针,所以还需要提供一个静态函数 getInstance() 以获取单例对象句柄,至于具体怎么实现,需要分不同方向(饿汉 or 懒汉)

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    // 获取单例对象的句柄
    static Signal *getInstance()
    {
        return _sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:
    static Signal *_sigptr;
};

注意: 构造函数不能只声明,需要实现,即使什么都不写。

为什么要删除拷贝构造?
如果不删除拷贝构造,那么外部可以借助拷贝构造函数,拷贝构造出一个与 单例对象 一致的 “对象”,此时就出现两个对象,这是不符合 单例模式 特点的。

为什么要创建一个静态函数?
单例对象也需要被初始化,并且要能被外部使用
调用链逻辑:通过静态函数获取句柄(静态单例对象地址)-> 通过地址调用该对象的其他函数

1. 饿汉模式

张三总是很饿,尽管饭菜还没准备好,他就已经早早的把碗洗好了,等到开饭时,直接开干。

饿汉模式 也是如此,在程序加载到内存时,就已经早早的把 单例对象 创建好了(此时程序服务还没有完全启动),也就是在外部直接通过 new 实例化一个对象,具体实现如下

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    // 获取单例对象的句柄
    static Signal *getInstance()
    {
        return _sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:
    static Signal *_sigptr;
};

Signal *_sigptr = new Signal();

注:在程序加载时,该对象会被创建

这里的 单例对象 本质就有点像 全局变量,在程序加载时就已经创建好了。外部可以直接通过 getInstance() 获取 单例对象 的操作句柄,来调用类中的其他函数。

#include <iostream>
#include "Signal.hpp"

int main()
{
    Signal::getInstance()->print();

    return 0;
}

这就实现了一个简单的 饿汉版单例类,除了创建 static Signal* 静态单例对象指针 外,也可以直接定义一个 静态单例对象,生命周期随进程,不过要注意的是:getInstance() 需要返回的也是该静态单例对象的地址,不能返回值,因为拷贝构造被删除了;并且需要在类的外部初始化该静态单例对象:

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    // 获取单例对象的地址
    static Signal *getInstance()
    {
        return &_sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:    
    // 静态单例对象
    static Signal _sig;
};
// 初始化
Signal Signal::_sig;

饿汉模式 是一个相对简单的单例实现方向,只需要在类中声明,在类外初始化就行了,但它也会带来一定的弊端:延缓服务启动速度。

完全启动服务是需要时间的,创建单例对象也是需要时间的,饿汉模式 在服务正式启动前会先创建对象,但凡这个单例类很大,服务启动时间势必会受到影响,大型项目启动,时间就是金钱。

并且由于饿汉模式每次都会先创建单例对象,再启动服务,如果后续使用单例对象还好说,但如果后续没有使用 单例对象,那么这个对象就是白创建了,在延缓服务启动的同时造成了一定的资源浪费。

综上所述,饿汉模式不是很推荐使用,除非图实现简单,并且服务规模较小;既然 饿汉模式 有缺点,就需要改进,于是就出现了懒汉模式。

2. 懒汉模式

李四也是个很饿的人,他也有一个自己的碗,吃完饭后碗会脏,但他不像张三那样极端,李四比较懒,只有等他吃饭的时候,他才会去洗碗,李四这种做法让他感到无比轻松。

在 懒汉模式 中,单例对象 并不会在程序加载时创建,而是在第一次调用时创建,第一次调用创建后,后续无需再创建,直接使用即可

#pragma once

#include <iostream>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    static Signal *getInstance()
    {
        // 第一次调用才创建
        if(_sigptr == nullptr)
        {
            _sigptr = new Signal();
        }
        return _sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:
    // 指向单例对象的静态指针
    static Signal *_sigptr;
};

// 初始化静态指针
Signal* Signal::_sigptr = nullptr;

注意: 此时的静态指针需要初始化为 nullptr,方便第一次判断 

饿汉模式 中出现的问题这里全都避免了

  • 创建耗时 -> 只在第一次使用时创建
  • 占用资源 -> 如果不使用,就不会被创建

懒汉模式 的核心在于 延时加载,可以优化服务器的速度及资源占用

延时加载这种机制就有点像 写时拷贝,就赌你不会使用,从而节省资源开销,类似的还有 动态库、进程地址空间 等。

当然,懒汉模式 下也是可以正常使用 单例对象 的

这样看来,懒汉模式 确实优秀,实现起来也不麻烦,为什么会说 饿汉模式 更简单呢?

这是因为当前只是单线程场景,程序暂时没啥问题,如果当前是多线程场景,问题就大了,如果一批线程同时调用 getInstance(),同时认定 _sigptr 为空,就会创建多个 单例对象,这是不合理的,也就是说当前实现的 懒汉模式 存在严重的线程安全问题。

如何证明?
简单改一下代码,每创建一个单例对象,就打印一条语句,将代码放入多线程环境中测试

    static Signal *getInstance()
    {
        // 第一次调用才创建
        if(_sigptr == nullptr)
        {
            std::cout << "已创建一个单例对象" << std::endl;
            _sigptr = new Signal();
        }
        return _sigptr;
    }

其中使用了 lambda 表达式来作为线程的回调函数,重点在于查看现象 

#include <iostream>
#include <pthread.h>
#include "Signal.hpp"

int main()
{
    // Signal::getInstance()->print();

    // 多线程场景
    pthread_t pd[5];
    for(int i = 0; i < 5; i++)
    {
        pthread_create(pd+i, nullptr, [](void*)->void*
            {
                auto ptr = Signal::getInstance();
                ptr->print();
                return nullptr;
            }, nullptr);
    }

    for(int i = 0; i < 5; i++)
        pthread_join(pd[i], nullptr);

    return 0;
}

当前代码在多线程环境中,同时创建了多个 单例对象,因此是存在线程安全问题的

饿汉模式没有线程安全问题吗?
没有,因为饿汉模式下,单例对象一开始就被创建了,即便是多线程场景中,也不会创建多个对象,它们也做不到

3. 懒汉模式(线程安全版)

解决多线程并发访问的利器是 互斥锁,那就创建 互斥锁 保护单例对象的创建

#pragma once

#include <iostream>
#include <mutex>

class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造
    Signal(const Signal&) = delete;
public:
    static Signal *getInstance()
    {
        // 加锁解锁
        pthread_mutex_lock(&_mtx);

        // 第一次调用才创建
        if(_sigptr == nullptr)
        {
            std::cout << "已创建一个单例对象" << std::endl;
            _sigptr = new Signal();
        }
        pthread_mutex_unlock(&_mtx);

        return _sigptr;
    }

    void print()
    {
        std::cout << "good moring" << std::endl;
    }
private:
    // 指向单例对象的静态指针
    static Signal *_sigptr;   

    static pthread_mutex_t _mtx;
};

// 初始化静态指针
Signal* Signal::_sigptr = nullptr;

// 初始化互斥锁
pthread_mutex_t Signal::_mtx = PTHREAD_MUTEX_INITIALIZER;

注意: getInstance() 是静态函数,互斥锁也要定义为静态的,可以初始化为全局静态锁

结果是没有问题,单例对象也只会创建一个,现在还面临最后一个问题:效率问题

当前代码确实能保证只会创建一个单例对象,但即使后续不会创建单例对象,也需要进行加锁、判断、解锁这个流程,要知道加锁也是有资源消耗的,所以这种写法不妥。

解决方案是:DoubleCheck 双检查加锁

在 加锁 前再增加一层判断,如此一来,N 个线程,顶多只会进行 N 次 加锁与解锁,这是非常优雅的解决方案:

    static Signal *getInstance()
    {
        // 双检查
        if(_sigptr == nullptr)
        {
            // 加锁解锁
            pthread_mutex_lock(&_mtx);

            // 第一次调用才创建
            if(_sigptr == nullptr)
            {
                std::cout << "已创建一个单例对象" << std::endl;
                _sigptr = new Signal();
            }
            pthread_mutex_unlock(&_mtx);
        }
        return _sigptr;
    }

单纯的 if 判断并不会消耗很多资源,但 加锁 行为会消耗资源,延缓程序运行速度,双检查加锁 可以有效避免这个问题

这是个精妙绝伦的代码设计,值得学习。


所以 懒汉模式 麻烦吗?
相比于 饿汉模式,确实挺麻烦的,不仅要判断后创建 单例对象,还需要考虑线程安全问题

值得一提的是,懒汉模式 还有一种非常简单的写法:调用 getInstance() 时创建一个静态单例对象并返回,因为静态单例对象只会初始化一次,所以是可行的,并且在 C++11 之后,可以保证静态变量初始化时的线程安全问题,也就不需要 双检查加锁 了,实现起来非常简单

#pragma once

#include <iostream>
#include <mutex>

// 懒汉模式
class Signal
{
private:
    // 构造函数私有化
    Signal()
    {}

    // 删除拷贝构造     
    Signal(const Signal&) = delete;
public:
    static Signal *getInstance()
    {
        // 静态单例对象,只会初始化一次,并且生命周期随进程
        static Signal _sig;
        return &_sig;
    }

    void print()
    {
        std::cout << "Hello Signal!" << std::endl;
    }
};

所以如果当前的生产环境所支持的 C++ 版本为 C++11 及以后,在实现 懒汉模式 时可以选择这种简便的方式,是非常不错的;如果为了兼容性,也可以选择传统写法。

注意: 静态变量创建时的线程安全问题,在 C++11 之前是不被保障的。

关于 单例模式 的其他问题

new 出来的单例对象不需要销毁吗?
这个单例对象生成周期随进程,进程结束了,资源也就都被销毁了,如果想手动销毁,可以设计一个垃圾回收内部类 GC,主动去销毁单例对象。

(四)线程池_V4(最终版)

最终版:将线程池改为 单例模式,只允许存在一个线程池对象。这里选择 懒汉模式,因为比较优秀,并且为了确保兼容性,选择 经典写法

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <functional>

#define THREAD_NUM 5

template<class T>
class ThreadPool
{
    using func_t = std::function<void(T&)>; // 包装器

private: // 私有化
    ThreadPool(func_t func, int num = THREAD_NUM)
        : _num(num), _func(func)
    {
    }

    ~ThreadPool()
    {
        // 等待线程退出
        for(auto &t : _threads)
            t.join();
    }

    // 删除拷贝构造
    ThreadPool(const ThreadPool<T> &) = delete;

    // ...
private:
    // ...

    // 创建静态单例对象指针以及互斥锁
    static ThreadPool<T> *_inst;
    static pthread_mutex_t _mtx;
};

// 初始化指针
template<class T>
ThreadPool<T>* ThreadPool<T>::_inst = nullptr;

// 初始化指针
template<class T>
pthread_mutex_t ThreadPool<T>::_mtx = PTHREAD_MUTEX_INITIALIZER;

然后提供一个获取 单例对象 句柄的函数,如果是第一次创建 单例对象,就需要在创建完对象后,顺便进行 init() 和 start()

获取句柄 getInstance()

    static ThreadPool<T>* getInstance()
    {
        // 双检查
        if(_inst == nullptr)
        {
            // 加锁
            LockGuard lock(&_mtx);
            if(_inst == nullptr)
            {
                // 创建对象
                _inst = new ThreadPool<T>();

                // 初始化及启动服务
                _inst->init();
                _inst->start();
            }
        }
    }

单例模式 改完了,但现在面临一个尴尬的问题:main.cc 无法直接将回调函数 callBack() 进行传递,因为它根本无法创建对象

可以试试曲线救国:将函数对象传递给 getInstance() 函数,如果用户不传,那就使用缺省参数,也就是直接打印结果

修修改改后的线程池长这样:

#pragma once
#include "Task.hpp"
#include "Thread.hpp"
#include "Mutex.hpp"

#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <memory>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <functional>

#define THREAD_NUM 5

template<class T>
class ThreadPool
{
    using func_t = std::function<void(T&)>; // 包装器
private:
    ThreadPool(int num = THREAD_NUM)
        : _num(num)
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()
    {
        // 等待线程退出
        for(auto &t : _threads)
            t.join();

        // 销毁互斥锁和条件变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_cond);
    }

    // 删除拷贝构造
    ThreadPool(const ThreadPool<T> &) = delete;
public:
    pthread_mutex_t* getlock()
    {
        return &_mtx;
    }

    void threadWait()
    {
        pthread_cond_wait(&_cond, &_mtx);
    }

    void threadWakeup()
    {
        pthread_cond_signal(&_cond);
    }

    bool isEmpty()
    {
        return _tasks.empty();
    }

    T popTask()
    {
        T task = _tasks.front();
        _tasks.pop();
        return task;
    }

    // 装载任务
    void pushTask(const T &task)
    {
        // 本质上就是在生产商品,需要加锁保护,自动加锁解锁
        LockGuard lockgrard(&_mtx);
        _tasks.push(task);

        // 唤醒消费者消费
        threadWakeup();
    }

    func_t callBack(T &task)
    {
        _func(task);
    }
public:
    static ThreadPool<T>* getInstance(const func_t &func = [](T& task){ std::cout << task.getResult() << std::endl; })
    {
        // 双检查
        if(_inst == nullptr)
        {
            // 加锁
            LockGuard lock(&_instance_mtx);
            if(_inst == nullptr)
            {
                // 创建对象
                _inst = new ThreadPool<T>();

                // 初始化及启动服务
                _inst->init();
                _inst->start();
            }
        }
        // 支持随时更改 main.cc 传过来的回调函数
        _inst->_func = func;
        return _inst;
    }

    void init()
    {
        for(int i = 0; i < _num; i++)
            _threads.push_back(Thread(i, threadRoutine, this));
    }

    void start()
    {
        // 启动线程
        for(int i = 0; i < _num; i++)
            _threads[i].run();
    }

    // 给线程的回调函数
    static void threadRoutine(void *args)
    {
        ThreadPool<T> *ptr = static_cast<ThreadPool<T> *>(args);

        while(true)
        {
            //  自动解锁加锁
            LockGuard lockguard(ptr->getlock());

            // 等待条件满足
            while(ptr->isEmpty())
                ptr->threadWait();

            // 获取任务
            T task = ptr->popTask();

            // 进行消费,可以不用加锁,因为一个商品只会被一个线程消费
            task();
            ptr->callBack(task);
        }
    }
private:
    func_t _func;
    std::vector<Thread> _threads;
    int _num; // 线程数量
    std::queue<T> _tasks; // 利用STL自动扩容的特性,无须担心容量
    pthread_mutex_t _mtx;
    pthread_cond_t _cond;

    // 创建静态单例对象指针及互斥锁
    static ThreadPool<T> *_inst;
    static pthread_mutex_t _instance_mtx;
};

// 初始化指针
template<class T>
ThreadPool<T>* ThreadPool<T>::_inst = nullptr;

// 初始化互斥锁
template<class T>
pthread_mutex_t ThreadPool<T>::_instance_mtx = PTHREAD_MUTEX_INITIALIZER;

此时 Main.cc 想要使用线程池对象时,就得通过 getInstance() 获取句柄,然后才能进行操作 

#include "ThreadPool_V4.hpp"
#include <memory>

// 回调函数
void CallBack(Task &task)
{
    // 获取计算结果后打印
    std::string ret = task.getResult();
    std::cout << "计算结果为:" << ret << std::endl;;
}

int main()
{
    while(true)
    {
        // 输入 操作数 操作数 操作符
        int x, y;
        char op;

        std::cout << "输入x: ";  
        std::cin >> x;
        std::cout << "输入y: ";
        std::cin >> y;
        std::cout << "输入op: ";
        std::cin >> op;

        // 构建任务对象
        Task task(x, y, op);
        
        // 装载任务
        ThreadPool<Task>::getInstance(CallBack)->pushTask(task);
    }
    return 0;
}

 此时是可以获取结果的,也可以看到一批线程正在候等任务到达

如何证明当前的 单例模式 生效了?

在调用 getInstance() 之前查看正在运行中的线程数量,调用完后再次查看,如果线程数量从 1 个变成多个,就证明 单例模式 是生效的(延迟加载)。可以看到调用前只有一个线程,调用后多个线程在运行

还可以通过其他方式证明,比如多行打印 单例对象句柄,查看地址是否为同一个,就可以知道 单例模式 是否生效了

    printf("0X%x\n", ThreadPool<Task>::getInstance());
    printf("0X%x\n", ThreadPool<Task>::getInstance());
    printf("0X%x\n", ThreadPool<Task>::getInstance());
    printf("0X%x\n", ThreadPool<Task>::getInstance());
    printf("0X%x\n", ThreadPool<Task>::getInstance());
    printf("0X%x\n", ThreadPool<Task>::getInstance());

 至此我们的 线程池_V4 最终版 代码算是完善了,以下是一些注意事项及建议

  1. 注意加锁解锁的位置,尽可能提高效率
  2. 使用双检查加锁,避免不必要的竞争
  3. 可以使用 volatile 修饰静态单例对象指针,避免被编译器优化覆盖

四、线程周边问题

(一)STL线程安全问题

STL 库中的容器是否是 线程安全 的?

答案是 不是

因为 STL 设计的初衷就是打造出极致性能容器,而加锁、解锁操作势必会影响效率,因此 STL 中的容器并未考虑线程安全,在之前编写的 生产者消费者模型、线程池中,使用了部分 STL 容器,如 vector、queue、string 等,这些都是需要我们自己去加锁、解锁,以确保多线程并发访问时的线程安全问题。

从另一方面来说,STL 容器种类繁多,容器间实现方式各不相同,无法以统一的方式进行加锁、解锁操作,比如哈希表中就有 锁表、锁桶 两种方式。

所以在多线程场景中使用 STL 库时,需要自己确保线程安全。

(二)智能指针线程安全问题

C++ 标准提供的智能指针有几种:unique_ptr、shared_ptr、weak_ptr、 make_unique

首先来说 unique_ptr,这是个功能单纯的智能指针,只具备基本的 RAII 风格,不支持拷贝,因此无法作为参数传递,也就不涉及线程安全问题

其次是 shared_ptr,得益于 引用计数,这个智能。指针支持拷贝,可能被多线程并发访问,但标准库在设计时考虑到了这个问题,索性将 shared_ptr 对于引用计数的操作设计成了 原子操作 CAS,这就确保了它的 线程安全,至于 weak_ptr,这个就是 shared_ptr 的小弟,名为弱引用智能指针,具体实现与 shared_ptr 一脉相承,因此它也是线程安全的。make_unique为c++14版本,有兴趣的可以自查。

(三)其他常见锁概念

悲观锁:总是认为数据会被其他线程修改,于是在自己访问数据前,会先加锁,其他线程想访问时只能等待,之前使用的锁都属于悲观锁。

乐观锁:并不认为其他线程会来修改数据,因此在访问数据前,并不会加锁,但是在更新数据前,会判断其他数据在更新前有没有被修改过,主要通过 版本号机制 和 CAS操作实现。

CAS 操作:当需要更新数据时,会先判断内存中的值与之前获取的值是否相等,如果相等就用新值覆盖旧值,失败就不断重试。

自旋锁:申请锁失败时,线程不会被挂起,而且不断尝试申请锁。

自旋 本质上就是一个不断 轮询 的过程,即不断尝试申请锁,这种操作是十分消耗 CPU 时间的因此推荐临界区中的操作时间较短时,使用 自旋锁 以提高效率;操作时间较长时,自旋锁 会严重占用 CPU 时间。

自旋锁 的优点:可以减少线程切换的消耗。

自旋锁相关接口:

#include <pthread.h>

pthread_spinlock_t lock; // 自旋锁类型

int pthread_spin_init(pthread_spinlock_t *lock, int pshared); // 初始化自旋锁

int pthread_spin_destroy(pthread_spinlock_t *lock); // 销毁自旋锁

// 自旋锁加锁
int pthread_spin_lock(pthread_spinlock_t *lock); // 失败就不断重试(阻塞式)
int pthread_spin_trylock(pthread_spinlock_t *lock); // 失败就继续向后运行(非阻塞式)

// 自旋锁解锁
int pthread_spin_unlock(pthread_spinlock_t *lock);

就这接口风格,跟 mutex 互斥锁 是一脉相承,可以轻易上手,将线程池中的互斥锁轻易改为自旋锁。

公平锁:一种用于同步多线程或多进程之间访问共享资源的机制,它通过使用互斥锁和相关的调度策略来确保资源的公平分配,以提高系统的性能和稳定性。

非公平锁:通常使用信号量(Semaphore)或自旋锁(Spinlock)等机制。这些锁机制没有严格的按照请求的顺序来分配锁,而是以更高的性能为目标,允许一些线程或进程在较短时间内多次获取锁资源,从而减少了竞争开销。

五、读者写者模型

除了 生产者消费者模型 外,还有一个 读者写者模型,用来解决 读者写者 问题,核心思想是 读者共享,写者互斥。

这就好比博客发布了,允许很多人同时读,但如果作者想要进行修改,那么其他人自然也就无法查看了,这就是一个很典型的 读者写者 问题。

读者写者模型 也遵循 321 原则

3 种关系:

  • 读者<->读者 无关系
  • 写者<->写者 互斥
  • 读者<->写者 互斥、同步

2 种角色:读者写者

1 个交易场所:阻塞队列或其他缓冲区

为什么读者与读者间甚至不存在互斥关系?
因为读者读取数据时,并不会对数据做出修改,因此不需要维持互斥关系

pthread 库中提供了 读写锁 相关接口:

#include <pthread.h>

pthread_rwlock_t; // 读写锁类型

// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *__restrict__ __rwlock, 
                const pthread_rwlockattr_t *__restrict__ __attr); 

// 销毁读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *__rwlock) 

 // 读者,加锁
int pthread_rwlock_rdlock(pthread_rwlock_t *__rwlock); // 阻塞式
int pthread_rwlock_tryrdlock(pthread_rwlock_t *__rwlock); // 非阻塞式

// 写者,加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *__rwlock); // 阻塞式 
int pthread_rwlock_trywrlock(pthread_rwlock_t *__rwlock); // 非阻塞式

// 解锁(读者锁、写者锁都可以解)
int pthread_rwlock_unlock(pthread_rwlock_t *__rwlock); 

注意: 读者和写者使用的加锁接口并不是同一个

关于 读者写者模型 的实现

  • 读者读数据时,允许其他读者一起读取数据,但不允许写者修改数据
  • 写者写数据时,不允许读者进入
  • 读者读取完数据后,通知写者进行写入
  • 写者写完数据后,通知读者进行读取

以下是伪代码:

int reader_cnt = 0; // 统计读者的数量
pthread_mutex_t lock; // 互斥锁
sem_t w(1); // 二元信号量

读者
	// 加锁
	{
		pthread_mutex_lock(&lock);
		if(reader_cnt == 0)
			P(w); // 第一个读者进入,申请信号量

		reader_cnt++; // 进入了一个读者
		pthread_mutex_unlock(&lock);	
	}
	
	// 读取数据

	// 解锁
	{
		pthread_mutex_lock(&lock);
		reader_cnt--; // 走了一个读者
		if(reader_cnt == 0)
			V(w); // 最后一个读者走了,归还信号量
		pthread_mmutex_unlock();
	}



写者
	// 加锁
	{
		P(w); // 申请信号量
		if(reader_cnt > 0)
		{
			V(w); // 归还信号量
			// 挂起等待
		}
	}
	
	// 写入数据

	// 解锁
	{
		V(w); // 归还信号量
	}

因为现实中,读者数量大多数情况下都是多于写者的,所以势必会存在很多很多读者不断读取,导致写者根本申请不到信号量,写者陷入 死锁 状态。

这是读者写者模型的特性,也是 读者优先 策略的体现,如果想要避免死锁,可以选择 写者优先 策略,优先让写者先写,读者先等一等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/483424.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GaussDB WDR分析之集群报告篇

AWR报告目前已经成为Oracle DBA分析问题&#xff0c;定位故障最为重要的报告&#xff0c;阅读与分析AWR报告的技能也是Oracle DBA必备的技能。国产数据库为了提高运维便捷性&#xff0c;都在做类似Oracle AWR报告的模仿&#xff0c;只不过由于指标体系不够完善&#xff0c;因此…

列强,瓜分台积电!

特朗普曾说&#xff0c;台积电是他能想到的&#xff0c;唯一一家被迫停产会导致全球经济萧条的企业。 如今&#xff0c;美国、日本、欧洲争相发出巨额补贴&#xff0c;“邀请”台积电到当地设厂&#xff0c;对这家唯一重要的企业发起了攻势。 2022年12月6日&#xff0c;美国…

【LeetCode: 120. 三角形最小路径和 + 动态规划】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

WEB DDOS的安全策略

近年来网络攻击的数量和频率急剧上升&#xff0c;针对Web应用程序的DDoS海啸攻击就是其中增长非常迅速的一个种类。过去常见的HTTP/S洪水攻击正在大范围的转变为更难对付的Web DDoS海啸攻击&#xff0c;网络安全空间攻防对抗越演越烈&#xff0c;企业用户面临更加严峻的网络安全…

思通舆情 是一款开源免费的舆情系统 介绍

思通舆情 是一款开源免费的舆情系统。 支持本地化部署&#xff0c;支持在线体验。 支持对海量舆情数据分析和挖掘。 无论你是使用者还是共同完善的开发者&#xff0c;欢迎 pull request 或者 留言对我们提出建议。 您的支持和参与就是我们坚持开源的动力&#xff01;请 sta…

[java基础揉碎]理解main方法语法

目录 深入理解main方法 解释main方法的形式: main方法的特别说明: main方法的动态传值: 深入理解main方法 解释main方法的形式: public static void main(String[] args){} 1.mian方法是虚拟机调用 2. java虚拟机需要调用类的main()方法&#xff0c;所以该方法的访问权…

每日一题 --- 移除链表元素[力扣][Go]

移除链表元素 题目&#xff1a;203. 移除链表元素 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,6,3,4,5,6], val 6 输出&#xf…

【源码】I.MX6ULL移植OpenCV

编译完成的源码&#xff1a; git clone https://gitee.com/wangyoujie11/atkboard_-linux_-driver.git 1.下载源码放在自己的opecv源码目录下 2.QTOpenCV工程代码放置的位置 3.更改.pro工程文件的opencv地址 4.使用命令行编译 前提是自己环境中已经配置好arm-qt的交叉编译…

【Redis知识点总结】(六)——主从同步、哨兵模式、集群

Redis知识点总结&#xff08;六&#xff09;——主从同步、哨兵模式、集群 主从同步哨兵集群 主从同步 redis的主从同步&#xff0c;一般是一个主节点&#xff0c;加上多个从节点。只有主节点可以接收写命令&#xff0c;主节点接收到的写命令&#xff0c;会同步给从节点&#…

Github 2024-03-24php开源项目日报 Top10

根据Github Trendings的统计,今日(2024-03-24统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量PHP项目10JavaScript项目1Nextcloud服务器:安全的数据之家 创建周期:2796 天开发语言:PHP, JavaScript协议类型:GNU Affero General Public…

【数据结构】考研真题攻克与重点知识点剖析 - 第 1 篇:绪论

前言 本文基础知识部分来自于b站&#xff1a;分享笔记的好人儿的思维导图与王道考研课程&#xff0c;感谢大佬的开源精神&#xff0c;习题来自老师划的重点以及考研真题。此前我尝试了完全使用Python或是结合大语言模型对考研真题进行数据清洗与可视化分析&#xff0c;本人技术…

IDDR、ODDR、IDEALY2和ODELAY2详解

文章目录 前言一、IDDR原语二、ODDR原语三、IDELAYCTRL原语四、IDELAY原语4.1、参数配置 &#xff1a;4.2、端口说明 &#xff1a;4.3、延时控制时序图 五、ODELAY原语 前言 本文参考XILINX手册UG471 一、IDDR原语 参考xilinx手册UG471 IDDR #(.DDR_CLK_EDGE ("SAME_…

【C++11】:统一的列表初始化|声明|STL变化

​ &#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;マイノリティ脈絡—ずっと真夜中でいいのに。 0:24━━━━━━️&#x1f49f;──────── 4:02 &#x1f504; …

已知屏幕分辨率和屏幕尺寸,JavaScript如何计算屏幕PPI像素密度

返回主目录:OpenLayers扩展组件系列汇总目录:常用OpenLayers地图扩展组件ol-ext、ol-cesium、ol-layerswitcher、ol-geocoder和ol-wind等扩展库 前言 本章作为补充章,用于讲解使用ol-ext组件的前置知识。 要想知道 PPI 是什么,我们需要先理解“像素”这个概念,那么什么是…

【C++算法】洛谷P1102:A-B数对,思路,lower_bound,upper_bound,二分答案,代码详解

文章目录 1&#xff09;解题思路2&#xff09;三种情况3&#xff09;代码 题目链接&#xff1a;P1102 A-B 数对 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 1&#xff09;解题思路 这道题要求我们在序列中找到 A − B C A-BC A−BC 的数对的个数&#xff0c;下标不同的数…

注解总结,Java中的注解,springboot中的注解

注解总结 1、Junit 开始执行的方法&#xff1a;初始化资源&#xff0c;执行完之后的方法&#xff1a;释放资源 测试方法&#xff0c;必须是&#xff1a;公有、非静态、无参无返回值的 在一个类中&#xff0c;可以定义多个测试方法&#xff0c;每个测试方法可以单独运行&#…

XUbuntu22.04之安装Plantuml(二百二十三)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a;多媒…

llvm后端

SelectionDAGBuilder是LLVM&#xff08;Low Level Virtual Machine&#xff09;编译器中的一个重要组件&#xff0c;它负责将LLVM中间表示&#xff08;Intermediate Representation&#xff0c;IR&#xff09;转换为SelectionDAG&#xff08;选择有向无环图&#xff09;的形式。…

Nacos部署(四)Docker部署Nacos2.3.x集群环境

&#x1f60a; 作者&#xff1a; 一恍过去 &#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390 &#x1f38a; 社区&#xff1a; Java技术栈交流 &#x1f389; 主题&#xff1a; Nacos部署&#xff08;四&#xff09;Docker部署Nacos2.3.x集群环境 ⏱…

adams卸载与安装

adams 卸载后重新安装lnstaller could not read the log directory of the existing installerto backup and restore.Installer will now exit. ADAMS软件卸载安装【adams吧】_百度贴吧 (baidu.com)