Java线程池ThreadPoolExecutor运行机制和源码解析

线程池简介

线程的每次创建和销毁都会产生的一定的系统资源和时间的开销。正如几乎所有重资源都使用池化技术(数据库连接池、redis连接池等)进行管理,线程作为操作系统宝贵的资源,对它的使用需要进行控制管理,线程池就是使用了池化的技术对线程进行复用和管理。使用线程池对比单独启动线程有以下好处:

  1. 快速响应用户请求

线程的启动需要一定时间开销,而使用了池化的线程时,当任务到达,节省了这部分的时间。Tomcat就使用自行扩展的ThreadPoolExecutor处理http请求,减小响应时间。

  1. 减少资源开销

对操作系统来说,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换的时候还要执行内存换页,CPU 的缓存被清空,切换回来的时候还要重新从内存中读取信息,破坏了数据的局部性。
频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
对资源无限申请缺少抑制手段,将引发系统资源耗尽的风险。
系统无法合理管理内部的资源分布,将降低系统的稳定性。

  1. 提高资源管理性

使用线程池可对线程资源进行统一的命名、创建、销毁、监控和分配任务,还能通过信号量来对线程同时执行任务的数量进行限制。

jdk里最基本的线程池的实现类是ThreadPoolExecutor,它是Java管理线程池的一个重要类,本文正是对这个类的讲解。

线程池运行机制

在这里插入图片描述

线程池的总体运行机制如上图(网上找的一个图),内部管理了一批线程和一个缓冲阻塞队列,通过execute()方法向其提交任务,之后线程池会调度分配任务,根据不同条件时会产生不同的策略:

  1. 直接新建一个线程运行该任务
  2. 将任务放进队列,线程池的线程从队列中取任务去执行
  3. 直接新建一个线程运行该任务直至达到最大线程数(和1中的条件有差异)
  4. 拒绝任务,执行拒绝策略

这里对概念和机制作简单介绍,下面将作详细讲解。

类图

image

ThreadPoolExecutor类实现了ExecutorService接口,下面是主要方法的简要说明:

  • execute(Runnable command):提交一个任务给线程池执行。
  • shutdown():优雅地关闭线程池,不再接受新的任务,等待已经提交的任务执行完成后关闭线程池。
  • shutdownNow():立即关闭线程池,并尝试中断所有执行中的任务。

线程池状态

线程池的状态和数量存储在原子类实例ctl里,高3位为线程池状态,低29位为线程池的工作线程数。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

CAPACITY = (1 << COUNT_BITS) - 1; 这段代码得到值转为二进制是
00011111 11111111 11111111 11111111

~CAPACITY得到值转为二进制是
1110000 00000000 00000000 00000000

所以
private static int runStateOf(int c) { return c & ~CAPACITY; } 得到的是c的高3位的值
private static int workerCountOf(int c) { return c & CAPACITY; } 得到的是c的低29位的值
private static int ctlOf(int rs, int wc) { return rs | wc; } 是将rs的高3位和wc的低29位的值组合起来(rs的低29位为0,wc的高3位为0)

线程池包含了五种状态:

状态备注
RUNNING能接受新提交的任务,也能处理阻塞队列中的任务
SHUTDOWN不再接受新任务,但能继续处理阻塞队列。在调用shutdown()方法中转为此状态
STOP不再接受新任务,不再处理阻塞队列,中断正在运行的线程。在调用shutdownNow()方法中转为此状态
TIDYING所有任务已终止,worker数量为0
TERMINATED当调用terminated()后转为该状态

线程池的状态由内部维护,随着生命周期的转变而变化,其状态的流转图如下:

在这里插入图片描述

核心参数

线程池的构造函数:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造函数的参数详细解释:

参数类型名称备注
corePoolSizeint核心线程数除非设置了allowCoreThreadTimeOut,不然即使空闲也不会回收
maximumPoolSizeint最大线程数线程池允许的最大线程数量
keepAliveTimelong线程存活的时间当完成当次任务后线程存活的时间
workQueueBlockingQueue阻塞队列可自行选择实现类
threadFactoryThreadFactory线程工厂可设置线程属性
handlerRejectedExecutionHandler拒绝策略当线程容量溢出时执行的策略

任务调度流程

线程池提交任务的入口是execute()方法,在该方法内部决定了提交任务的分配和调度过程,如下图所示:

在这里插入图片描述

主要调度逻辑:

  1. 当前线程数小于corePoolSize时,每次创建一个新线程执行任务
  2. 当前线程数大于corePoolSize并且workQueue未满,把任务放进workQueue
  3. 当前线程数处于corePoolSize和maximumPoolSize之间并且workQueue已满,创建新线程执行任务
  4. 当前线程数大于等于maximumPoolSize,执行拒绝策略
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) //当前线程数小于corePoolSize时
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        //当前线程数大于corePoolSize并且workQueue未满,把任务放进workQueue
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //当前线程数处于corePoolSize和maximumPoolSize之间并且workQueue已满,创建新线程执行任务
    else if (!addWorker(command, false)) 
        reject(command);  //当前线程数大于等于maximumPoolSize,执行拒绝策略
}

Worker类

Worker是线程池的核心内部类,存储着线程,实现了Runnable接口,通过继承同步器AbstractQueuedSynchronizer简单的实现了锁,这个锁用于防止在关闭线程池时正在运行任务的线程被interrupt()方法中断。

/**
 * ThreadPoolExecutor的内部类Worker
 */
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {

        final Thread thread;  //线程池的线程

        Runnable firstTask;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //新建线程
            this.thread = getThreadFactory().newThread(this);
        }

        /** 任务执行的核心方法 */
        public void run() {
            runWorker(this);
        }
        
        //省略代码……

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

    }

线程池通过addWorker(Runnable firstTask, boolean core)方法新建Worker实例,在其构造函数会新建一个线程,创建成功后启动这个线程。线程池对线程的管理就是通过对Worker的管理实现的。

//ThreadPoolExecutor通过内部变量workers来管理线程
private final HashSet<Worker> workers = new HashSet<Worker>();

private boolean addWorker(Runnable firstTask, boolean core) {
        Worker w = null;
        try {
            //省略代码……
            w = new Worker(firstTask); //新建worker实例
            final Thread t = w.thread;
            if (t != null) {
                try {
                    //省略代码……
                    workers.add(w);
                }
                if (workerAdded) {
                    t.start(); //启动worker线程
                }
            }
            //省略代码……
        }
        return workerStarted;
    }
}

线程运行的核心方法

worker线程启动后执行的run()就是runWorker(Worker w),是线程执行任务的核心方法,内部是一个循环,运行的逻辑如下:

  1. 当是新建的线程时, 会运行Worker实例初始化时的firstTask任务。
  2. firstTask任务完成时,调用getTask()从阻塞队列方法取得下一个任务。
  3. 当调用getTask()方法返回null时会退出循环,执行processWorkerExit()方法退出worker,关闭线程。
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            /**
            * 循环从队列里取任务,当task=getTask()方法返回null时会退出循环
            */
            while (task != null || (task = getTask()) != null) {
                //加锁防止shutdown方法会interrupt正在运行的任务
                w.lock();  
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        /** 执行提交的任务 */
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //若上面的主循环结束,回收线程
            processWorkerExit(w, completedAbruptly); 
        }
    }

拒绝策略

使用了有界等待队列的线程池对任务的总容量作了限制,当线程池的任务总容量达到了maximumPoolSize和等待队列容量之和,线程池不会再接收任务,而是执行拒绝策略。java原生有四种拒绝策略,另外可以实现java.util.concurrent.RejectedExecutionHandler接口进行扩展。

效果
AbortPolicy抛出异常(默认的拒绝策略)
CallerRunsPolicy在当前线程运行任务
DiscardOldestPolicy丢弃队列最旧的任务
DiscardPolicy丢弃当前的任务

任务在线程池的执行流程

线程池内部的主要组件和实现在上面已经进了介绍,通过下面的时序图展示了一个任务在线程池中执行的完整流程:

在这里插入图片描述

线程池的关闭

线程池的核心线程是通过shutdown()shutdownNow()方法进行关闭的,这两个方法分别会把线程池的状态设置为SHUTDOWNSTOP,这时候线程池不会再接受新的任务,通过使用worker的tryLock()方法尝试获得锁能否成功判断是否空闲,能取得锁就是空闲的线程,然后去中断空闲的线程,这样就保证不会中断正在运行的任务的线程。

shutdown()shutdownNow()两个方法的区别是shutdownNow()会把等待队列清空。

以下是shutdown()方法的代码:

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); //线程池状态设置为SHUTDOWN
            interruptIdleWorkers();    //中断空闲线程,回收资源
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }


    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //尝试获取worker的锁
                if (!t.isInterrupted() && w.tryLock()) {  
                    try {
                        t.interrupt();  //对worker工作线程进行中断
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    

线程池空闲worker线程的关闭回收

要搞明白空闲线程的关闭回收需要理解线程中断的一些知识,下面就来讲一下。

线程中断知识点回顾开始

——————— 知识点回顾开始 ———————

java有两种方法通知线程结束线程

1、调用线程的stop()方法(已废弃)

直接退出线程,因为太暴力会产生不可知的结果该方法已废弃

2、调用线程的中断方法

线程的中断方法是interrupt(),首先要明确的一点是调用interrupt()方法后线程并不是马上关闭不是马上关闭不是马上关闭!!

而是根据不同情况出现以下两种不同结果:

  • (1)、当使用interrupt()方法去打断处于阻塞状态的线程时,会抛出InterruptedException异常,而不会更新打断标记,因此,虽然被打断,但是打断标记依然为false,使用Thread类的isInterrupted()方法可返回打断标记。

线程阻塞的情况有以下这些:

 * @see     java.lang.Object#wait()
 * @see     java.lang.Object#wait(long)
 * @see     java.lang.Object#wait(long, int)
 * @see     java.lang.Thread#sleep(long)
 * @see     java.lang.Thread#sleep(long)
 * @see     java.util.concurrent.locks.Condition.await
  • (2)、当使用interrupt()方法去打断非阻塞线程时,被打断的线程会继续运行,但是该线程的打断标记会更新,更新为true,因此可以根据打断标记来作为判断条件使得线程停止。线程是否打断的方法为isInterrupted()

所以,调用线程的interrupt()方法并不会停止和关闭线程,程序自行根据打断标记或InterruptedException异常自行结束线程的运行

——————— 知识点回顾结束 ———————

下面重新回到线程池空闲worker线程的关闭回收的主题。

当线程池处于SHUTDOWN状态时,不再接受新的任务,意味着只要正在运行的任务和队列里的任务全部运行完毕,线程池里所有任务就完成了。

上面介绍了线程池的关闭的方法,接下来思考一个问题,既然线程的interrupt方法是相当于一种通知机制,为什么通过调用空闲worker线程的interrupt()方法能关闭回收线程?

其中空闲的worker线程的锁已释放,正在从队列取数据处于阻塞状态等待任务入列。

当调用线程池的shutdown()方法时,会遍历worker尝试获得锁,

取得空闲的worker的锁并去interrupt线程,此时阻塞队列抛出InterruptedException,在getTask()方法内捕获异常后结束本次循环,运行下一次循环时由于线程池状态已变为大于等于SHUTDOWNgetTask()返回null,runWorker()的作为线程运行任务的核心循环体也结束了,最后调用processWorkerExit回收worker实例,在processWorkerExit方法结束后,线程的run()方法就完成了运行,线程随之结束。

结合下面的时序图和具体代码的流转作深一步理解

空闲worker线程关闭退出的时序图

在这里插入图片描述

空闲worker线程被中断后关闭退出的逻辑代码

在这里插入图片描述
在代码标红步骤 3.若await阻塞时被interrupt抛出异常InterruptedException的说明:

执行到notEmpty.await()处的线程为空闲线程,此时被interrupt是因为在线程阻塞时线程池调用了shutdown()shutdownNow()

结尾

本文对java线程池的分析就到这里,如有不正,欢迎指出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/398225.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【前沿】头戴式光场显示技术研究进展

摘要&#xff1a;光场显示器旨在通过重建三维场景在不同方向发出的几何光线来渲染三维场景的视觉感知&#xff0c;从而为人的视觉系统提供自然舒适的视觉体验&#xff0c;解决传统平面立体三维显示器中的聚散调节冲突问题。近年来&#xff0c;多种光场显示方法被尝试应用到头戴…

特征选择、特征降维和特征提取到底有什么区别和联系?这篇文章一次性给你讲清楚!

目录 一、特征选择&#xff1a; 1.最大互信息系数(MIC)&#xff1a; 2.互信息(MI)&#xff1a; 3.最大相关最小冗余算法(mRMR)&#xff1a; 4.支持向量机递归特征消除(SVM_RFE)&#xff1a; 二、特征降维&#xff1a; 1.主成分分析(PCA)&#xff1a; 2.核主成分分析(KP…

【数据结构/c++】求解有向无环图DAG的关键路径

#include<cstring>//memset头文件 #include<algorithm>//fill头文件 #include<vector> #include<stdio.h> #include<stack> #include<queue> using namespace std; const int MAXV510; struct Node{int v,w;Node(int _v,int _w):v(_v),…

【.NET Core】常见C#代码约定

【.NET Core】常见C#代码约定 文章目录 【.NET Core】常见C#代码约定一、概述二、代码预定的目标三、代码约束工具和分析器四、C#语言准则五、字符串约定5.1 使用字符串内插来连接短字符串5.2 插入大文本时&#xff0c;使用System.Text.StringBuilder对象 六、数组约定七、委托…

提升认知水平和防止偏见浅谈

提升认知水平和防止偏见浅谈 《庄子外物》&#xff1a;井蛙不可语海&#xff0c;夏虫不可语冰。 不要跟井底的青蛙谈论大海&#xff0c;因为它的认知只有井底那么大&#xff0c;大海对于它来说是认知盲区&#xff1b;不要与夏虫去谈论冰雪&#xff0c;因为夏虫一生很短没有经历…

springboot203医疗挂号管理系统

医疗挂号管理系统设计与实现 摘 要 在如今社会上&#xff0c;关于信息上面的处理&#xff0c;没有任何一个企业或者个人会忽视&#xff0c;如何让信息急速传递&#xff0c;并且归档储存查询&#xff0c;采用之前的纸张记录模式已经不符合当前使用要求了。所以&#xff0c;对医…

摄像设备+nginx+rtmp服务器

前言 由于html中的video现在不支持rtmp协议(需要重写播放器框架&#xff0c;flash被一刀切&#xff0c;360浏览器还在支持flash),遂用rtmp作为桥梁,实际是hls协议在html中起作用. 在此推荐一款前端播放器,.ckplayer 简直了,写点页面,一直循环&#xff0c;洗脑神曲 dream it po…

spring boot3参数校验基本用法

⛰️个人主页: 蒾酒 &#x1f525;系列专栏&#xff1a;《spring boot实战》 &#x1f30a;山高路远&#xff0c;行路漫漫&#xff0c;终有归途。 目录 前置条件 前言 导入依赖 使用介绍 配置检验规则 开启校验 使用注意 全局异常捕获返回友好提示信息 常用的校…

Sparse MLP

上图展示了本文网络的整体架构。与ViT、MLP-Mixer和Swin Transformer类似&#xff0c;空间分辨率为HW的输入图像被分割为不重叠的patch。作者在网络中采用了44的patch大小&#xff0c;每个patch被reshape成一个48维的向量&#xff0c;然后由一个线性层映射到一个c维embedding i…

可解决95%以上问题的Linux命令!能用到退休

对于我们程序员来说&#xff0c;我们始终绕不过去要与 Linux 系统打交道。很多人&#xff0c;特别是新手程序员&#xff0c;一看到 Linux 系统那个小黑框&#xff0c;就发怵&#xff0c;其实&#xff0c;如果你真正去深入了解了&#xff0c;然后再学会一些常用的命令&#xff0…

网络入山太困难?看格行随身WiFi如何助力大山教育!

近日&#xff0c;一则关于偏远大山的上网问题冲上了热搜&#xff0c;引发了社会关注。虽然很多山区都已经通了电、通了网&#xff0c;但是在一些贫困的地区&#xff0c;网络基础设施依旧薄弱&#xff0c;村民想要使用固定宽带&#xff0c;仍然十分困难。 而在山区的学生们&…

每日OJ题_二叉树dfs②_力扣129. 求根节点到叶节点数字之和

目录 力扣129. 求根节点到叶节点数字之和 解析代码 力扣129. 求根节点到叶节点数字之和 129. 求根节点到叶节点数字之和 难度 中等 给你一个二叉树的根节点 root &#xff0c;树中每个节点都存放有一个 0 到 9 之间的数字。 每条从根节点到叶节点的路径都代表一个数字&am…

Unable to make field private JavacProcessingEnvironment$DiscoveredPro报错解决办法

maven项目打包报错 报错信息 Unable to make field private com.sun.tools.javac.processing.JavacProcessingEnvironment$DiscoveredProcessors com.sun.tools.javac.processing.JavacProcessingEnvironment.discoveredProcs accessible: module jdk.compiler does not &q…

【最新Dubbo3深入理解】Dubbo特性、工作原理以及负载均衡策略

欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术的推送&#xff01; 在我后台回复 「资料」 可领取编程高频电子书&#xff01; 在我后台回复「面试」可领取硬核面试笔记&#xff01; 文章导读地址…

SpringAop是什么?

简单介绍&#xff1a; AOP&#xff1a;Aspect Oriented Programming (面向切面编程、面向方面编程)&#xff0c;其实就是面向特定方法编程。 场景&#xff1a; 比如现在有一个需求&#xff0c;我要统计每一个业务方法的耗时时长&#xff0c; 我们只需在业务方法的前面获取一个…

ThreadLocal(5):ThreadLocalMap源码分析

在分析ThreadLocal方法的时候&#xff0c;我们了解到ThreadLocal的操作实际上是围绕ThreadLocalMap展开的。ThreadLocalMap的源码相对比较复杂, 我们从以下三个方面进行讨论。 1 基本结构 ​ ThreadLocalMap是ThreadLocal的内部类&#xff0c;没有实现Map接口&#xff0c;用独…

Python三级考试笔记

Python三级考试笔记【源源老师】 三级标准 一、 理解编码、数制的基本概念&#xff0c;并且会应用。 1. 能够进行二进制、十进制以及十六进制之间的转换&#xff1b; 2. 理解Python中的数制转换函数。 二、 掌握一维数据的表示和读写方法&#xff0c;能够编写程序处理一维数据…

QT 文本编辑框textBrowser接收数据保持光标在底部的方法

目录 1.实现效果2.代码 1.实现效果 2.代码 右键textBrowser加入触发信号textChanged&#xff1a; 双击&#xff0c;跳转到槽函数&#xff1a;(文本更改时执行该函数) void Widget::updata_textBrowser() void Widget::on_textBrowser_textChanged() {//光标移动至底部ui->…

npm install 安装依赖如何加速

在使用npm安装依赖时&#xff0c;有几种方法可以加速这一过程&#xff0c;尤其是在面临网络限制或npm官方源速度慢的情况下。以下是一些常用的加速技巧&#xff1a; 1. 使用国内镜像源 国内有几个镜像源可以提供更快的下载速度&#xff0c;例如淘宝npm镜像。你可以通过以下命…

Android挖取原图中心区域RectF(并框线标记)放大到ImageView宽高,Kotlin

Android挖取原图中心区域RectF(并框线标记)放大到ImageView宽高&#xff0c;Kotlin 红色线框区域即为选中的原图中心区域&#xff0c;放大后放到等宽高的ImageView里面。 import android.content.Context import android.graphics.Bitmap import android.graphics.BitmapFactor…