ThreadPoolExecutor线程池内部处理浅析

我们知道如果程序中并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束时,会因为频繁创建线程而大大降低系统的效率,因此出现了线程池的使用方式,它可以提前创建好线程来执行任务。本文主要通过java的ThreadPoolExecutor来查看线程池的内部处理过程。

1 ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,下面我们来看一下ThreadPoolExecutor类的部分实现源码。

1.1 构造方法

ThreadPoolExecutor类提供了如下4个构造方法

// 设置线程池时指定核心线程数、最大线程数、线程存活时间及等待队列。
// 线程创建工厂和拒绝策略使用默认的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

// 设置线程池时指定核心线程数、最大线程数、线程存活时间、等待队列及线程创建工厂 
// 拒绝策略使用默认的(AbortPolicy)
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}

// 设置线程池时指定核心线程数、最大线程数、线程存活时间、等待队列及拒绝策略
// 线程创建工厂使用默认的
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
   }
// 设置线程池时指定核心线程数、最大线程数、线程存活时间、等待队列、线程创建工厂及拒绝策略
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

通过观察上述每个构造器的源码实现,我们可以发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

下面解释一下构造器中各个参数的含义:

  • corePoolSize:核心池的线程个数上线,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。

  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

  • unit:参数keepAliveTime的时间单位。

  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响;

  • threadFactory:线程工厂,主要用来创建线程;

  • handler:表示当拒绝处理任务时的策略。有以下四种取值:ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

1.2 核心方法

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法。

 public void execute(Runnable command) {
        // 判断提交的任务command是否为null,若是null,则抛出空指针异常;
        if (command == null)
            throw new NullPointerException();
        // 获取线程池中当前线程数
        int c = ctl.get();
        // 如果线程池中当前线程数小于核心池大小,进入if语句块
        if (workerCountOf(c) < corePoolSize) {
            // 如果以给定的命令启动一个核心线程执行任务成功,直接返回
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果线程池不处于运行状态并且移除刚加入的任务成功则执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果当前线程数为0,则在线程池里增加一个线程,保证队列里的任务不会没有线程执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } 
        // 尝试启动核心线程之外的线程,如果不满足,则执行对应的拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

主要方法addWorker。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池状态大于SHUTDOWN或者线程池状态等于SHUTDOWN,firstTask不等于null
            // 或者线程池状态等于SHUTDOWN,任务队列等于空时,直接返回false结束。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果线程数量大于等于最大数量或者大于等于上限
                //(入参core传true,取核心线程数,否则取最大线程数),直接返回false结束。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false
                // CAS操作给工作线程数加1,成功则跳到retry处,不再进入循环。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 如果线程池状态与刚进入时不一致,则跳到retry处,再次进入循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 新建一个线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {

                    int rs = runStateOf(ctl.get());
                    // 如果线程池状态在SHUTDOWN之前或者
                    // 线程池状态等于SHUTDOWN并且firstTask等于null时,进入处理。
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果要执行的线程正在运行,则抛异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果线程添加失败,则将新增的对应信息删除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

1.3 任务执行run方法

在上述addWorker中,当调用线程的start方法启动线程后,会执行其中的run方法。

public void run() {
            runWorker(this);
        }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 如果任务不为空或者新获取到的任务不为空
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 当线程池状态,大于等于 STOP 时,保证工作线程都有中断标志。
                // 当线程池状态,小于STOP时,保证工作线程都没有中断标志。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

2 整体处理过程

通过上述源码分析,我们可以得出线程池处理任务的过程如下:

3 总结

本文从源码层面主要分析了线程池的创建、运行过程,通过上述的分析,可以看出当线程池中的线程数量超过核心线程数后,会先将任务放入等待队列,队列放满后当最大线程数大于核心线程数时,才会创建新的线程执行。

文章转载自:京东云开发者

原文链接:https://www.cnblogs.com/Jcloud/p/17866942.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/207522.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python自动化办公:PDF文件的分割与合并

我们平时办公中&#xff0c;可能需要对pdf进行合并或者分割&#xff0c;但奈何没有可以白嫖的工具&#xff0c;此时python就是一个万能工具库。 其中PyPDF2是一个用于处理PDF文件的Python库&#xff0c;它提供了分割和合并PDF文件的功能。 在本篇博客中&#xff0c;我们将详细…

Sass 语法详细介绍

文章目录 前言SASS缩进语法SASS的语法差异多线选择器注释import Mixin指令已弃用的语法后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;Sass和Less &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正…

VR全景对旅游业有什么帮助,如何助力旅游业实现新的旅游形式

引言&#xff1a; 旅游业是一个充满机遇的行业&#xff0c;而虚拟现实&#xff08;VR&#xff09;全景技术正逐渐改变着旅游业的面貌&#xff0c;通过提供身临其境的体验&#xff0c;VR全景成为了旅游业的新宠&#xff0c;将旅游带入了一个全新的数字化时代。 一、打破地域限制…

C陷阱与缺陷——第6章 预处理器

在严格意义上的编译过程开始之前&#xff0c;C语言预处理器首先对程序代码做了必要的转换处理。预处理器的主要作用是&#xff1a; 我们有时需要将某个特定数量在程序中出现的所有实例统统加以修改大多数C语言实现在函数调用时都会带来重大的系统开销 1. 不能忽视宏定义中的空…

Elasticsearch:为现代搜索工作流程和生成式人工智能应用程序铺平道路

作者&#xff1a;Matt Riley Elastic 的创新投资支持开放的生态系统和更简单的开发者体验。 在本博客中&#xff0c;我们希望分享 Elastic 为简化你构建 AI 应用程序的体验而进行的投资。 我们知道&#xff0c;开发人员必须在当今快速发展的人工智能环境中保持灵活性。 然而&a…

xilinx系列FPGA基于VIVADO的pin delay列表生成说明

目录 1 概述2 示例平台3 操作说明4 注意事项 xilinx系列FPGA基于VIVADO的pin delay列表生成说明 1 概述 本文用于讲诉xilinx系列FPGA基于VIVADO的pin delay列表生成说明&#xff0c;以及一些注意事项&#xff0c;为FPGA设计人员探明道路。 Pin delay 即FPGA内部die到pin的延时…

爱芯元智AX650N部署yolov8s 自定义模型

爱芯元智AX650N部署yolov8s 自定义模型 本博客将向你展示零基础一步步的部署好自己的yolov8s模型&#xff08;博主展示的是自己训练的手写数字识别模型&#xff09;&#xff0c;本博客教你从训练模型到转化成利于Pulsar2 工具量化部署到开发板上 训练自己的YOLOv8s模型 准备自…

机器视觉新功能上线:同步训练多个模型,智造的脚步又加快了!

“AI视觉”的应用&#xff0c;为当下诸多企业的生产智能化打开了新的想象空间。其中&#xff0c;深度学习作为AI视觉的核心技术&#xff0c;在实际应用中往往需要经历一个耗时较长的阶段——深度学习神经网络模型训练。其目的是通过使用已标注的数据集来训练模型&#xff0c;使…

Inference with C# BERT NLP Deep Learning and ONNX Runtime

目录 效果 测试一 测试二 测试三 模型信息 项目 代码 下载 Inference with C# BERT NLP Deep Learning and ONNX Runtime 效果 测试一 Context &#xff1a;Bob is walking through the woods collecting blueberries and strawberries to make a pie. Question …

传统算法:使用 Pygame 实现插入排序

使用 Pygame 模块实现了插入排序的动画演示。首先,它生成一个包含随机整数的数组,并通过 Pygame 在屏幕上绘制这个数组的条形图。接着,通过插入排序算法对数组进行排序,动画效果可视化每一步的排序过程。在排序的过程中,程序将当前元素插入到已排序的部分,通过适度的延迟…

每日一练2023.12.1——输出GPLT【PTA】

题目链接&#xff1a;L1-023 输出GPLT 题目要求&#xff1a; 给定一个长度不超过10000的、仅由英文字母构成的字符串。请将字符重新调整顺序&#xff0c;按GPLTGPLT....这样的顺序输出&#xff0c;并忽略其它字符。当然&#xff0c;四种字符&#xff08;不区分大小写&#x…

《opencv实用探索·七》一文看懂图像卷积运算

1、图像卷积使用场景 图像卷积是图像处理中的一种常用的算法&#xff0c;它是一种基本的滤波技术&#xff0c;通过卷积核&#xff08;也称为滤波器&#xff09;对图像进行操作&#xff0c;使用场景如下&#xff1a; 模糊&#xff08;Blur&#xff09;&#xff1a; 使用加权平…

C++入门篇(零) C++入门篇概述

目录 一、C概述 1. 什么是C 2. C的发展史 3. C的工作领域 4. C关键字(C98) 二、C入门篇导论 一、C概述 1. 什么是C C是基于C语言而产生的计算机程序设计语言&#xff0c;支持多重编程模式&#xff0c;包括过程化程序设计、数据抽象、面向对象程序设计、泛型程序设计和设计模式…

Maven无法拉取依赖/构建失败操作步骤(基本都能解决)

首先检查配置文件&#xff0c;确认配置文件没有问题(也可以直接用同事的配置文件(记得修改文件里的本地仓库地址)) 1.file->Invalidate Caches清除缓存重启(简单粗暴&#xff0c;但最有效) 2.刷新maven以及mvn clean&#xff0c;多刷几次&#xff0c;看看还有没有报红的依赖…

Python 中 AttributeError: Int object Has No Attribute 错误

int 数据类型是最基本和最原始的数据类型之一&#xff0c;它不仅在 Python 中&#xff0c;而且在其他几种编程语言中都用于存储和表示整数。 只要没有小数点&#xff0c;int 数据类型就可以存储任何正整数或负整数。 本篇文章重点介绍并提供了一种解决方案&#xff0c;以应对我…

基于Netty的网络调用实现

作为一个分布式消息队列&#xff0c;通信的质量至关重要。基于TCP协议和Socket实现一个高效、稳定的通信程序并不容易&#xff0c;有很多大大小小的“坑”等待着经验不足的开发者。RocketMQ选择不重复发明轮子&#xff0c;基于Netty库来实现底层的通信功能。 1 Netty介绍 Net…

TCP报文解析

1.端口号 标记同一台计算机上的不同进程 源端口&#xff1a;占2个字节&#xff0c;源端口和IP的作用是标记报文的返回地址。 目的端口&#xff1a;占2个字节&#xff0c;指明接收方计算机上的应用程序接口。 TCP报头中的源端口号和目的端口号同IP报头中的源IP和目的IP唯一确定一…

马蹄集第34周

1.战神的对称谜题 不知道为什么超时&#xff01; def main():s input()result 0for i in range(len(s)):l i - 1r i 1while l > 0 and r < len(s) and s[l] s[r]:result max(result, r - l 1)l - 1r 1l ir i 1while l > 0 and r < len(s) and s[l] s…

二分查找与搜索树高频问题

关卡名 逢试必考的二分查找 我会了✔️ 内容 1.山脉数组的峰顶索引 ✔️ 2.旋转数字的最小数字 ✔️ 3.寻找缺失数字 ✔️ 4.优化求平方根 ✔️ 5.中序与搜索树原理 ✔️ 6.二叉搜索树中搜索特定值 ✔️ 7.验证二叉搜索树 ✔️ 基于二分查找思想&#xff0c;可以拓展出很…

【PUSDN】WebStorm中报错Switch language version to React JSX

简述 WebStorm中报错Switch language version to React JSX 可能本页面的写法是其他语法。所以可以不用管。 测试项目&#xff1a;ant design vue pro 前情提示 系统&#xff1a; 一说 同步更新最新版、完整版请移步PUSDN Powered By PUSDN - 平行宇宙软件开发者网www.pusdn…