并发编程之线程池的设计和原理

一、线程池

提前创建一系列的线程,保存在这个线程池中,有任务要执行的时候,从线程池中取出线程来执行。没有任务的时候,线程池放回去。

二、为什么要使用线程池

线程使用上的问题:

  • 线程的频繁创建 和 销毁

  • 线程的数量过多,会造成CPU资源的开销

    • 上下文切换(消耗CPU资源)

那么如何实现线程的复用呢? 池化技术

三、线程池的设计猜想

3.1线程池的设计思考?

需求: 实现线程的重复使用

分解:

  • 如何使用线程的复用?

    • 让线程实现复用的唯一方法,就是让线程不结束

      • 那如何让线程执行新的任务呢?也就是说,任务怎么给他执行?

        • [共享内存]-> List.add()

      • 线程一直处于运行状态,合理吗?

        • 有任务来的时候,执行,没有任务的时候,阻塞

结论: 通过阻塞队列的方式,来实现线程池中线程的复用

思考: 通过阻塞队列的方式,如果队列满了,可以阻塞主线程/生产者线程吗?显然不能,那么怎么办:

  • 1.增加消费者线程的数量(扩容) 也就是增加线程去执行任务,消费者多了,阻塞队列自然被消费的也快了,就不容易阻塞主线程了

  • 2.如果扩容解决不了问题,那只能采用拒绝策略

    • 报错

    • 直接丢弃这个任务

    • 直接普通线程调用task.run(直接重新开启一个线程)

    • 队列中头部的等待最久的任务丢弃,然后把当前任务添加到阻塞队列

    • 存储起来,后续等待空闲之后重试(自定义去完成)

结束的方法:让线程执行结束(run方法执行结束),也就是跳出while循环

3.2 线程池的核心参数

a.线程数量(核心线程数)  [初始化时创建]

b. 最大线程数 [还能够扩容多少个线程 与核心线程数的差]

c. 存活时间 [扩容的线程要有一个存活时间]

d. 存活时间单位

e.阻塞队列(使用哪一种阻塞队列)

f.线程工厂(生产线程的工厂)(有默认值)

h.阻绝策略(有默认值 默认抛出异常)

四、Java中提供的线程池

Exectors

  • newScheduledThreadPool 提供周期的线程池

  • newFixedThreadPool 固定线程数量

  • newSingleThreadExecutor 只有一个线程的线程池

  • newCachedThreadPool 可以缓存的线程池->理论上来说,有多少情况,就构建多少个线程

五、自定义线程池源码分析

线程池中的核心线程是延迟初始化的

  • 先初始化核心线程

  • 调用阻塞队列的方法,把task存进去

    • 如果true,说明当前的请求量不大,核心线程就可以搞定

    • false,增加工作线程(非核心线程)

      • 如果添加失败,说明当前的工作线程数量达到了最大的线程数,直接调用拒绝策略

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 前3位记录运行状态  后29位记录线程数
        int c = ctl.get();
        // 1.判断当前工作线程数是否小于核心线程数(延迟初始化)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))  // 添加工作线程,并执行任务
                return;
            c = ctl.get();
        }
        // 2.添加到阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3. 增加工作线程
        else if (!addWorker(command, false))
            reject(command);   // 4. 增加失败,则执行拒绝策略
    }

5.1 addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 1. 修改工作线程记录数
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))   //1.1 CAS操作进行修改
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 2. 创建线程并启动
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 3. 添加失败 则回滚
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

分析: addWorker主要做3件事,.a.使用CAS操作修改原子值中的工作线程数  b.将任务放到Worker对象中,并启动线程 c.如果线程启动失败,则回滚。线程启动后,则自动调用Worker对象中的run方法。

5.2 runWorker

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 1. while循环保证当前线程不结束,直到task为空
            while (task != null || (task = getTask()) != null) {
                // 2.这里是因为表示当前线程正在运行一个任务,其它地方要执行shutdown 你要等我执行结束
                w.lock(); // worker继承了AQS  实现了互斥锁
             
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();  // 是否应该中断 在gettask中处理
                try {
                    // 空实现,方便扩展
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 空实现,方便扩展
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

5.3 getTask

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //1. 判断如果线程池已经结束,直接返回Null,需要清理掉所有的工作线程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 2. 是否允许超时   allowCoreThreadTimeOut 为true 也就是说设置为true 核心线程也 
              // 可以被销毁
              // 或者工作线程数大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))   // cas操作减少工作线程
                    return null;
                continue;
            }

            try {
                //* 执行任务
                Runnable r = timed ?
                    // 超时阻塞
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 如果阻塞队列为空,则会阻塞在这个地方
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

分析: getTask中从队列中取出任务并执行。注意:allowCoreThreadTimeOut 设置为true,则核心线程也可以被销毁

5.4 拒绝策略

a. AbortPolicy(抛出异常)

        // 报错 抛出异常
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

b. CallerRunsPolicy (创建一个普通线程,并执行)

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

c. DiscardOldestPolicy (从队列头部抛弃一个,执行当前的)

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

d. DiscardPolicy (什么也不做)

   public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/589219.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

注册表获取autoCAD安装位置

注意事项 注意&#xff1a;①64位操作系统注册表会重定向&#xff0c;RegOpenKeyEx第4个参数得加上KEY_WOW64_64KEY&#xff1b;②RegOpenKeyEx遍历子项时注意第2和第4参数&#xff0c;参考图&#xff1a; ③RegQueryValueEx同样得注意第6参数 完整代码 std::unordered_map…

Mybatis进阶(映射关系多对一 )

文章目录 1.需求分析2.应用实例&#xff08;xml配置&#xff09;1.数据表设计2.entity设计&#xff08;不要使用toString会栈溢出&#xff09;1.Pet.java2.User.java 3.编写Mapper1.PetMapper.java2.UserMapper.java 4.编写Mapper.xml1.UserMapper.xml2.PetMapper.xml 5.测试Us…

OPPO A72/A55/K7X/A53真我Q3S等手机ROOT刷机后广电卡没信号不读卡解决办法

目前运营商除了移动联通电信以外&#xff0c;还存在1个中国广电&#xff0c;广电属于第四大运营商&#xff0c;由于广电起步较晚&#xff0c;对于手机频段要求也自然不一样&#xff0c;导致目前市面上部分手机出厂没有信号或者不读卡等问题&#xff0c;特别在手机被用户自行刷机…

如何快速的追加文章的内容(在不知道内容的情况下)

首先&#xff0c;需要用到的这个工具&#xff1a; 度娘网盘 提取码&#xff1a;qwu2 蓝奏云 提取码&#xff1a;2r1z 1、打开工具&#xff0c;切换到文章模块下&#xff0c;快捷键&#xff1a;Ctrl1 2、新建一个文章做演示&#xff0c;001 3、添加一个内容&#xff0c;就随…

蚂蚁面试:Springcloud核心组件的底层原理,你知道多少?

尼恩说在前面 在40岁老架构师 尼恩的读者交流群(50)中&#xff0c;最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、希音、百度、网易、美团、蚂蚁、得物的面试资格&#xff0c;遇到很多很重要的相关面试题&#xff1a; 说说&#xff1a;蚂蚁面试&#xff1…

【Java】HOT100 贪心算法

目录 理论基础 一、简单贪心 LeetCode455&#xff1a;分发饼干 二、中等贪心 2.1 序列问题 LeetCode376&#xff1a;摆动序列 2.2 贪心股票问题 LeetCode121&#xff1a;买卖股票的最佳时机 LeetCode121&#xff1a;买卖股票的最佳时机ii 2.3 两个维度权衡问题 LeetCode135&…

【06016传感器原理与应用】第5章 气敏传感器 期末复习自考复习

第5章 气敏传感器 能够把气体信息变成电信号的装置。 一、学习目的与要求 通过本章的学习&#xff0c;熟悉并掌握气体传感器的工作原理和硬件组成结构。熟练掌握几种电阻式尤其TiO2、SnO2气敏传感器、非电阻式气敏传感器、固体电解质传感器、接触燃烧式气敏传感器的基本原理…

karpathy makemore 3

1 Introduction 这一章&#xff0c;karpathy通过分析参数初始化对模型的收敛的影响作为一个引子&#xff0c;让我们看到神经网络这一个复杂的系统&#xff0c;对grad非常敏感&#xff0c;以及有一些技巧可以解决grad敏感的问题。 2 准备工作 2.1 dataloader import torch b…

DS:顺序表、单链表的相关OJ题训练

欢迎各位来到 Harper.Lee 的学习小世界&#xff01; 博主主页传送门&#xff1a;Harper.Lee的博客主页 想要一起进步的uu可以来后台找我交流哦&#xff01; 在DS&#xff1a;单链表的实现 和 DS&#xff1a;顺序表的实现这两篇文章中&#xff0c;我详细介绍了顺序表和单链表的…

PAT (Advanced Level) - 1047 Student List for Course

模拟 #include <iostream> #include <cstring> #include <algorithm> #include <vector> using namespace std;const int N 40010, M 2510;int n, k; string name[N]; vector<int> list[M];int main() {scanf("%d%d", &n, &…

万万没想到,延缓帕金森病进展的“玄机”竟然就在腿上?【北京仁爱堂】

帕金森病患者的腿部变化&#xff0c;其实可以反应出很多问题。例如行走的变化问题、步态的异常等问题&#xff0c;可以反应病情轻重程度。而通过保持腿部肌肉活动的状态&#xff0c;也可以使帕金森病的症状得到一定的缓解&#xff0c;甚至有助于防止病情恶化。 帕金森病腿部变…

2024年五一数学建模竞赛C题论文首发

基于随机森林的煤矿深部开采冲击地压危险预测 摘要 煤炭作为中国重要的能源和工业原料&#xff0c;其开采活动对国家经济的稳定与发展起着至关重要的作用。本文将使用题目给出的数据探索更为高效的数据分析方法和更先进的监测设备&#xff0c;以提高预警系统的准确性和可靠性…

Photoshop前言

Photoshop前言 分辨率图像格式工具界面组件 分辨率 分辨率是指单位长度内包含的像素点的数量&#xff0c;其单位通常为像素/英寸&#xff08;ppi&#xff09;&#xff0c;300ppi表示每英寸包含300个像素点。对于1英寸1英寸大小的图像&#xff0c;若分辨率为72ppi&#xff0c;则…

基于Pytorch深度学习——多层感知机

本文章来源于对李沐动手深度学习代码以及原理的理解&#xff0c;并且由于李沐老师的代码能力很强&#xff0c;以及视频中讲解代码的部分较少&#xff0c;所以这里将代码进行尽量逐行详细解释 并且由于pytorch的语法有些小伙伴可能并不熟悉&#xff0c;所以我们会采用逐行解释小…

Visio 2021 (64bit)安装教程

Visio 2021 (64bit)安装教程 ​ 通知公告 Visio 2021 (64位) 是一款流程图和图表设计工具,主要用于创建和编辑各种类型的图表和图形。它提供了一个直观的界面和丰富的功能,可以帮助用户快速绘制专业的流程图、组织结构图、网络图、平面图、数据库模型等。 具体来说,Visio 20…

Go 语言数组

Go 语言提供了数组类型的数据结构。 数组是具有相同唯一类型的一组已编号且长度固定的数据项序列&#xff0c;这种类型可以是任意的原始类型例如整型、字符串或者自定义类型。 相对于去声明 number0, number1, ..., number99 的变量&#xff0c;使用数组形式 numbers[0], num…

Spring Cloud——Circuit Breaker上篇

Spring Cloud——Circuit Breaker上篇 一、分布式系统面临的问题1.服务雪崩2.禁止服务雪崩故障 二、Circuit Breaker三、resilience4j——服务熔断和降级1.理论知识2.常用配置3.案例实战&#xff08;1&#xff09;COUNT_BASED&#xff08;计数的滑动窗口&#xff09;&#xff0…

Spring Cloud——OpenFeign

Spring Cloud——OpenFeign 一、OpenFeign能干嘛二、OpenFeign的基本使用三、Feign的高级特性1.OpenFeign超时控制2.OpenFeign重试机制3.OpenFeign之性能优化HttpClient54.OpenFeign请求和响应压缩5.OpenFeign之Feign日志打印 四、参考 OpenFeign是一个声明式的Web服务客户端。…

Android --- 消息机制与异步任务

在Android中&#xff0c;只有在UIThread(主线程)中才能直接更新界面&#xff0c; 在Android中&#xff0c;长时间的工作联网都需要在workThread(分线程)中执行 在分线程中获取服务器数据后&#xff0c;需要立即到主线程中去更新UI来显示数据&#xff0c; 所以&#xff0c;如…

程序员的五大方法论

前言&#xff1a; 最近看了一篇总结程序员学习&#xff0c;晋升方法的文章&#xff0c;颇有感想&#xff0c;决定分享给大家&#xff0c;原文地址&#xff1a;给程序员的5条学习方法论 (qq.com)https://mp.weixin.qq.com/s/xVFlF9qTf9c74Emmdm0DqA 在繁忙的工作中&#xff0c;持…