深入剖析Java线程池的核心概念与源码解析:从Executors、Executor、execute逐一揭秘

文章目录

  • 文章导图
  • 前言
  • Executors、Executor、execute对比剖析
  • Executors生成的线程池?
  • 线程池中的 execute 方法
    • execute 方法的作用
    • execute的工作原理
      • 拒绝策略
    • 源码分析工作原理
      • 基本知识
        • 线程的状态
        • 线程池的状态
        • 线程池状态和线程状态总结
        • 线程池的状态信息和线程数量信息(ctl)
      • execute->addWorker->runWorker->getTask流程
      • execute方法
        • 步骤详细解释:
        • 关键方法:
        • 总结:
      • addWorker
        • Worker线程
        • 步骤详细解释:
        • 方法的关键点:
        • 总结:
      • runWorker
        • 代码解读:
      • getTask
        • 代码解读:
        • 特别注意:

线程池系列文章可参考下表,目前已更新完毕…

线程池系列:文章
Java基础线程池深入剖析Java线程池的核心概念与源码解析:从Executors、Executor、execute逐一揭秘
CompletableFuture线程池从用法到源码再到应用场景:全方位了解CompletableFuture及其线程池
SpringBoot默认线程池(@Async和ThreadPoolTaskExecutor)探秘SpringBoot默认线程池:了解其运行原理与工作方式(@Async和ThreadPoolTaskExecutor)
SpringBoot默认线程池和内置Tomcat线程池你是否傻傻分不清SpringBoot默认线程池和内置Tomcat线程池?

文章导图

image-20240601182910930

前言

在日常编码中,特别是在处理并发编程时,Java 提供了很多便捷工具帮助我们高效运行。不过你是否也曾被 Executors、Executor 和 execute 这些名字搞得一头雾水?

它们长得这么像,究竟有什么区别呢?接下来跟着我一探究竟吧!
在这里插入图片描述

Executors、Executor、execute对比剖析

java.util.concurrent.Executors是一个工具类,提供了一些静态方法,用于创建各种类型的 ExecutorService 实例。例如,newFixedThreadPool 可以创建一个固定大小的线程池,newCachedThreadPool 可以创建一个可缓存的线程池等。它简化了创建线程池的过程,提供了一些默认的配置选项。
java.util.concurrent.Executor是一个接口,定义了一个单一的方法 execute,用于提交任务到执行器中。这个接口是线程池的基础接口,它没有提供直接创建线程池的方法。
java.util.concurrent.Executor#executeExecutor 接口中唯一的方法,用于将任务提交给执行器执行。这个方法通常用于提交实现了 Runnable 接口的任务。

简而言之,Executors 是一个工具,用于创建各种类型的线程池实例。Executor 是一个接口,定义了提交任务到执行器的方法。execute 方法是 Executor 接口定义的唯一方法,用于提交任务给执行器执行。

如果你需要创建线程池,可以使用 Executors 类提供的方法来创建,然后使用返回的线程池实例来提交任务。具体看我下面的这个例子就理解了!:

//Executors工具类创建线程池默认是ExecutorService,ExecutorService又继承了Executor,它是线程池最底层的基础接口!
Executor executor = Executors.newFixedThreadPool(5);
//然后用executor线程池去执行execute方法,接收一个Runnable任务
executor.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
});

image-20240530222442006

Executors生成的线程池?

说到了Executors,就必须谈下用它生成的线程池,在阿里Java规约中,对Executors有一个专门的规约:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lKKPHsVd-1596103123455)(EE1DC468E4A74F7AB21E05EF9D46AFB0)]

原因其实也很简单,会有各种OOM风险,也不便于管理,详细的我都帮你们总结成思维导图了!

  • Executors.newWorkStealingPool方法之外,其他方法都有OOM风险。
  • 我们可以发现Executors创建的线程池底层其实还是基于ThreadPoolExecutor的方式

image-20240531214546898

线程池中的 execute 方法

讲完了Executors生成的线程池,接下来当然是看看这个线程池的核心方法execute了

execute 方法的作用

execute 方法是线程池的核心方法之一,可以用来提交任务并执行,当然也可以用submit,但是实际核心逻辑都是一样的。

不管是上面的Executors生成的线程池,还是自己定义的ThreadPoolExecutor线程池,实际去执行线程的时候都会调用到我们的execute 方法,

  • 它用于向线程池提交一个任务,供线程池调度执行。

  • 它接受一个 Runnable 对象作为参数,并将其提交给内部的工作队列。

execute的工作原理

线程池采用的是一种生产者-消费者的模型,如下图:
FixedThreadPool的execute()方法运行示意图

工作原理如下:

  1. 任务提交
    • 当外部提交一个任务到线程池时,线程池会根据当前的运行状态和线程数量决定如何处理这个任务。
  2. 任务队列和线程的管理
    • 如果当前运行的线程少于核心线程数(corePoolSize),线程池会创建一个新的工作线程来执行这个任务。
    • 如果当前运行的线程数达到核心线程数,任务会被放入任务队列等待执行。
    • 如果任务队列已满,并且当前运行的线程数少于最大线程数(maximumPoolSize),线程池会创建新的工作线程来处理任务。
    • 如果任务队列已满,并且线程池中的线程数已经达到最大线程数,线程池会根据拒绝策略处理这个任务。
  3. 任务执行
    • 工作线程从任务队列中获取任务并执行。
    • 执行完一个任务后,工作线程会继续从任务队列中获取下一个任务,直到任务队列为空。
  4. 线程的回收和销毁
    • 如果一个线程在一定时间内(keepAliveTime)没有获取到新的任务,且当前线程数超过核心线程数,那么该线程将被终止。
    • 当所有任务完成后,核心线程会继续等待新任务,而非核心线程会被回收。

image-20240601013614020

拒绝策略

策略名称描述适用场景
AbortPolicy默认的拒绝策略。当任务无法提交到线程池时,抛出 RejectedExecutionException 异常。希望在任务无法处理时立即得到通知,并进行相应处理的场景。
CallerRunsPolicy调用者运行策略。当任务无法提交到线程池时,由提交任务的线程(即调用者线程)执行该任务。希望降低任务提交速度,并且不希望丢弃任务的场景。
DiscardPolicy丢弃策略。当任务无法提交到线程池时,直接丢弃该任务,不进行任何处理。可以接受任务丢失,并且不希望对系统产生过多负载的场景。
DiscardOldestPolicy丢弃最旧策略。当任务无法提交到线程池时,丢弃任务队列中最旧的未处理任务,然后尝试重新提交当前任务。希望抛弃旧任务,优先处理新任务的场景。

在实际操作里,建议选默认的AbortPolicyCallerRunsPolicy策略。

AbortPolicy策略:

  • AbortPolicy是最保险安全的,简单粗暴,一满就拒绝,无论什么情况都能保证系统不会因线程池出问题。
  • 缺点也明显,一满就可能丢线程,没执行到你想执行的业务逻辑。

CallerRunsPolicy策略:

  • CallerRunsPolicy策略是,当线程池塞不下新任务时,会让调用线程来跑,既不丢任务,又能适当减缓新任务速度,减轻线程池压力,特别适合需要高执行保证的场景。
  • 但它的缺点正好相反,不丢线程,但流量大时可能出问题。

想象一下,某天早上,系统访问量突然猛增,线程池马上满了,剩下的都被拒绝策略打回给调用者线程处理,就像高速公路后半段的车都要掉头走别的路,你说后面的高速会不会堵得瘫痪?

系统也是一样,这个拒绝策略在这种情况下可能让系统崩溃,在前端页面上可能出现假死、卡顿、超时未响应等问题。

当然,如果默认的策略不满足你的需求也可以通过实现 RejectedExecutionHandler 接口来定义自己的拒绝策略,以满足特殊的业务需求。

class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 自定义处理逻辑
    }
}

源码分析工作原理

上面讲到了线程池的工作原理或者也是工作流程,那么为什么是这样的呢?

知其然知其所以然,那我们来看看源码不就知道了!

基本知识

线程的状态

注意我们这里涉及到了线程池的状态,注意区别我们常见的线程状态,线程状态如下

线程状态指的是单个线程在其生命周期中所处的状态。Java 中的线程有以下几种状态:

img

状态描述
NEW线程已创建但尚未启动。
RUNNABLE线程正在 Java 虚拟机中执行。
BLOCKED线程被阻塞,等待监视器锁。
WAITING线程无限期等待另一个线程执行特定操作。
TIMED_WAITING线程在指定的等待时间内等待另一个线程执行特定操作。
TERMINATED线程已退出。

代码示例:

Thread thread = new Thread(() -> {
    // 线程任务
});

System.out.println(thread.getState()); // NEW
thread.start();
System.out.println(thread.getState()); // RUNNABLE 或其他状态
线程池的状态

线程池的状态有5种,他们的状态转换如下图所示,可以看到和线程的状态完全它们不是一回事。

图3 线程池生命周期

ThreadPoolExecutor类存放线程池的状态信息很特别,是存储在一个int类型原子变量的高3位,而低29位用来存储线程池当前运行的线程数量。通过将线程池的状态和线程数量合二为一,可以做到一次CAS原子操作更新数据。

状态高3位值说明
RUNNING111运行状态,线程池被创建后的初始状态,能接受新提交的任务,也能处理阻塞队列中的任务。
SHUTDOWN000关闭状态,不再接受新提交的任务,但任可以处理阻塞队列中的任务。
STOP001停止状态,会中断正在处理的线程,不能接受新提交的任务,也不会处理阻塞队列中的任务。
TIDYING010所有任务都已经终止,有效工作线程为0。
TERMINATED011终止状态,线程池彻底终止。

代码示例:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

// 获取线程池状态
System.out.println(executor.isShutdown()); // false
System.out.println(executor.isTerminating()); // false
System.out.println(executor.isTerminated()); // false

executor.shutdown();

// 获取线程池状态
System.out.println(executor.isShutdown()); // true
System.out.println(executor.isTerminating()); // true 或 false
System.out.println(executor.isTerminated()); // true 或 false
线程池状态和线程状态总结
比较项线程状态线程池状态
描述单个线程在其生命周期中所处的状态。线程池在其生命周期中所处的状态。
常见状态NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATEDRUNNING, SHUTDOWN, STOP, TIDYING, TERMINATED
主要用途用于监控和调试单个线程的执行状态。用于管理和控制整个线程池的生命周期和任务处理行为。
示例代码Thread.getState()ThreadPoolExecutor.isShutdown(), ThreadPoolExecutor.isTerminating(), ThreadPoolExecutor.isTerminated()
线程池的状态信息和线程数量信息(ctl)

线程池常见属性和方法.png

// 使用原子操作类AtomicInteger的ctl变量,前3位记录线程池的状态,后29位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的范围为[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用来辅助左移位运算
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位用来存储线程池运行状态,其余位数表示线程池的容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池状态以常量值被存储在高三位中
private static final int RUNNING    = -1 << COUNT_BITS; // 线程池接受新任务并会处理阻塞队列中的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 线程池不接受新任务,但会处理阻塞队列中的任务
private static final int STOP       =  1 << COUNT_BITS; // 线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务
private static final int TIDYING    =  2 << COUNT_BITS; // 所有任务都执行完成,且工作线程数为0,将调用terminated方法
private static final int TERMINATED =  3 << COUNT_BITS; // 最终状态,为执行terminated()方法后的状态
  • ctl变量的封箱拆箱相关的方法
// ctl变量的封箱拆箱相关的方法
private static int runStateOf(int c)     { return c & ~CAPACITY; } // 从ctl中获取线程池的状态值
private static int workerCountOf(int c)  { return c & CAPACITY; } // 从ctl中获取线程池的数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //生成ctl值, rs 表示线程池状态,wc 表示当前线程池中 worker
(线程)数量,相与以后就是合并后的状态
  • ​ 线程池状态值比较
比较当前线程池 ctl 所表示的状态:线程池状态值的大小关系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
// 比较当前线程池 ctl 所表示的状态,是否小于某个状态 s
private static boolean runStateLessThan(int c, int s) { return c < s; }
// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态s
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// 小于 SHUTDOWN 的一定是 RUNNING,SHUTDOWN == 0
private static boolean isRunning(int c) { return c < SHUTDOWN; }
  • CAS设置ctl的值
    // 使用 CAS 方式 让 ctl 值 +1 ,成功返回 true, 失败返回 false
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    // 使用 CAS 方式 让 ctl 值 -1 ,成功返回 true, 失败返回 false
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    // 将 ctl 值减一,do while 循环会一直重试,直到成功为止
    private void decrementWorkerCount() {
        do {} while (!compareAndDecrementWorkerCount(ctl.get()));
    }

execute->addWorker->runWorker->getTask流程

ThreadPoolExecutor 类中的 execute 方法、addWorker 方法、runWorker 方法和 getTask 方法共同协作,以确保线程池的有效执行和任务管理。

  1. execute 方法
    • 是线程池接口 Executor 的实现,用户通过这个方法提交任务(Runnable 对象)给线程池。
    • execute 方法首先判断是否需要添加新的工作线程(Worker)来执行提交的任务,如果是,则调用 addWorker 方法。
  2. addWorker 方法
    • 负责创建新的工作线程(Worker),并将提交的任务作为线程的首个任务(firstTask)。
    • 该方法同样负责确认线程池的状态和工作线程的数量是否允许添加新的 Worker
    • 如果成功添加了 Worker,该 Worker 会通过 runWorker 方法开始执行。
  3. runWorker 方法
    • 由每个工作线程(Worker)调用,是工作线程的主要执行循环。
    • 在这个方法中,线程会循环地从任务队列中获取任务(通过调用 getTask 方法),并执行它们。
  4. getTask 方法
    • 从线程池的任务队列中获取待执行的任务。
    • 如果线程池处于关闭状态或者配置允许线程超时且没有任务可执行,getTask 可以返回 null,导致 runWorker 方法结束执行循环,从而允许工作线程结束。

大致流程如下:
image-20240601195340699

了解了大致流程以后,接下来我们再仔细剖析每个方法!

execute方法

这段代码是Java中的ThreadPoolExecutor类的核心方法之一:execute。此方法负责处理接收到的Runnable任务,并根据线程池的状态(包括核心线程、工作队列、最大线程数等)来决定如何处理这个任务。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    // 获取当前线程池的状态
    int c = ctl.get();
    
    // 步骤1:如果当前运行的线程数少于 corePoolSize,则创建一个新线程来执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))  // 调用 addWorker 方法尝试增加一个新的工作线程
            return;
        c = ctl.get();  // 如果 addWorker 失败(添加线程失败或线程池已关闭),则重新获取线程池状态
    }
    
    // 步骤2:如果任务可以成功加入工作队列
    if (isRunning(c) && workQueue.offer(command)) {  // 尝试将任务加入工作队列
        int recheck = ctl.get();  // 再次检查线程池状态
        if (!isRunning(recheck) && remove(command))  // 如果线程池不再运行,并且成功从队列中移除任务
            reject(command);  // 拒绝任务
        else if (workerCountOf(recheck) == 0)  // 如果线程池仍然运行但没有正在执行的线程
            addWorker(null, false);  // 尝试添加一个新的工作线程
    }
    // 步骤3:如果工作队列已满或池已满,尝试创建一个新的线程,如果失败则拒绝任务
    else if (!addWorker(command, false))
        reject(command);  // 如果添加新线程失败,拒绝任务
}
步骤详细解释:
  1. 第一步:如果当前运行的线程数少于核心线程数(corePoolSize),尝试创建一个新线程来执行任务

    • if (workerCountOf(c) < corePoolSize):检查当前运行的线程数是否少于核心线程数。
    • if (addWorker(command, true)):调用 addWorker 方法尝试创建一个新线程,如果成功,新线程将执行给定的任务 command
      • addWorker(command, true)返回 true 表示已成功创建并启动了一个新线程来处理任务。
      • 如果 addWorker 失败(如无法新建线程,或线程池状态已变),则继续步骤2。
  2. 第二步:如果成功地将任务放入队列

    • if (isRunning(c) && workQueue.offer(command)):检查线程池是否处于运行状态,并尝试将任务添加到工作队列中(非阻塞操作)。
    • if (!isRunning(recheck) && remove(command)):重新检查线程池的状态,如果线程池不再运行,并且成功从工作队列中移除了任务(即任务被停止),则执行拒绝策略 reject(command)
    • else if (workerCountOf(recheck) == 0): 如果没有任何线程在运行,尝试创建一个新线程以确保至少有一个线程在执行任务。
  3. 第三步:如果无法将任务加入队列(即队列满了)

    • else if (!addWorker(command, false)):尝试新增一个非核心线程来执行任务,如果再次失败(比如线程数超过最大限制 maxPoolSize),则执行拒绝策略 reject(command)
关键方法:
  • ctl: 是一个控制变量,它用来控制线程池的状态和线程数量。

  • workerCountOf(c): 是一个方法,用来从控制变量c中提取当前的工作线程数量。

  • addWorker(Runnable firstTask, boolean core):该方法用于创建并启动新线程,将新线程添加到线程池中执行任务。

  • isRunning(int c):检查线程池是否处于运行状态。

  • workQueue.offer(command):尝试将任务加入工作队列。

  • remove(Runnable task):从工作队列中移除指定任务。

  • reject(Runnable task):拒绝任务,通常是通过调用RejectedExecutionHandler

总结:

这段代码通过三个步骤确保线程池以最佳方式处理新提交的任务:

  1. 优先使用核心线程处理任务。
  2. 如果核心线程已满,将任务放入队列。
  3. 如果队列已满,则使用备用策略(如创建新的非核心线程)处理任务,最后如果所有处理策略都失败,则拒绝新任务。

这种设计用于确保线程池的高效利用并提供灵活的任务处理机制,同时保护系统免受过载。

addWorker

Worker线程

Worker是通过继承AQS(AbstractQueuedSynchronizer),使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;//Worker持有的线程
    Runnable firstTask;//初始化的任务,可以为null
}

Worker 类是线程池中负责实际执行任务的工作线程。它封装了一个线程,并且负责从任务队列中取出任务并执行。WorkerThreadPoolExecutor 中的主要作用包括:

  1. 执行任务Worker 从任务队列中取出任务并执行这些任务。
  2. 维护线程存活:即使当前没有任务可执行,Worker 线程也可以保持存活,以便随时准备处理新的任务。
  3. 生命周期管理Worker 管理着线程的生命周期,包括启动、运行和终止。
步骤详细解释:

这段代码来自ThreadPoolExecutor类的addWorker方法,该方法用于向线程池中添加新的工作线程。如果线程池处于非关闭状态或者允许新任务进入,则会将传入的Runnable任务交给新线程来执行。以下是对该代码的逐行解释:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();  // 获取线程池状态控制变量
        int rs = runStateOf(c);  // 获取线程池当前运行状态

        // 检查线程池状态:
        // 1. 线程池状态是否 >= SHUTDOWN(即 STOP、TERMINATE 等),如果是且
        // 2. 当前状态是 SHUTDOWN,且 firstTask 为 null,且工作队列不为空,则继续添加。
        // 否则,不能添加新线程,直接返回 false。
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        // 内层循环,尝试增加 worker 数量。
        for (;;) {
            int wc = workerCountOf(c);  // 获取当前工作线程数
            // 3. 如果工作线程数已达到上限(CAPACITY)或达到核心线程数(corePoolSize)或最大线程数(maximumPoolSize),则返回 false。
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 4. 尝试通过 CAS(Compare-And-Swap)操作增加工作线程计数。如果成功,则跳出整个 retry 循环。
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // 5. 如果 CAS 失败,重新获取线程池状态控制变量
            // 6. 如果运行状态已改变,则返回到外层 retry 循环重新尝试。
            if (runStateOf(c) != rs)
                continue retry;
            // 否则,重新尝试内循环(CAS 操作失败的情况)。
        }
    }

	// 简单总结上面的CAS过程:
    //(1)内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas
    //(2)cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了
    //(3)如果变了,则重新进入外层循环重新获取线程池状态,否者重新进入内层循环继续进行cas

	
	// 走到这里说明cas成功,线程数+1,但并未被执行
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建新 Worker,并分配 firstTask
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;  // 获取主锁,确保添加操作的同步
            mainLock.lock();
            try {
                // 7. 再次检查池的运行状态,确保在保持锁定的一致性
                int rs = runStateOf(ctl.get());

                // 8. 如果状态允许添加新线程(即池处于 RUNNING 或 SHUTDOWN 且 firstTask 为 null),则进行进一步操作。
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 防止创建已启动的线程
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 将 Worker 添加到工作集
                    workers.add(w);
                    // 更新最大池大小统计
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();  // 确保在退出时释放锁
            }
            if (workerAdded) {
                t.start();  // 启动工作线程
                workerStarted = true;
            }
        }
    } finally {
        // 如果线程启动失败,进行适当的清理
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回是否成功启动工作线程
    return workerStarted;
}
  1. 初始检查和状态验证

    • 使用一个无限循环 (retry: 标签) 不断尝试添加新的工作线程。
    • 从控制字段 (ctl) 中获取线程池状态,并获取当前的运行状态 (runStateOf(c)).
    • 检查线程池是否正在关闭 (rs >= SHUTDOWN),如果是且任务队列不为空,返回 false 退出。
  2. 递增工作线程数量

    • 内部循环再一次检查当前的工作线程数量 (workerCountOf(c)),并确保其未超过CAPACITY或给定的大小限制 (corePoolSizemaximumPoolSize)。
    • 使用CAS操作 (compareAndIncrementWorkerCount(c)) 递增工作线程计数,确保线程安全。
    • 如果CAS操作失败,重新获取 ctl 用于检查运行状态是否已更改。
  3. 实例化工作线程并添加到池中

    • 如果成功增加了工作线程计数,创建一个新的 Worker 实例,并分配传入的 firstTask
    • 获取Worker关联的线程实例 (w.thread) 并使用ReentrantLock (mainLock) 确保操作的同步。
    • 再次检查运行状态,如果状态允许则工作线程已添加入池中 (workers.add(w)), 并更新线程池大小统计 (largestPoolSize)。
  4. 启动新线程

    • 如果 Worker 成功添加,则启动新线程 (t.start()), 将 workerStarted 设置为 true
  5. 清理和失败处理

    • 最后,如果线程未成功启动,调用 addWorkerFailed 方法进行清理和失败处理。
    • 返回 workerStarted 表示工作线程是否成功启动。
方法的关键点:
  • 运行状态验证:多次检查和验证线程池的运行状态以确定是否应该添加新的工作线程。
  • 线程安全:使用CAS操作和锁来确保多线程环境中的线程安全。
  • 资源管理:同步地添加或移除线程,并在失败时进行恰当的资源清理。
总结:

该方法通过复杂且细致的状态检查和同步操作,确保只在适宜的条件下添加新的工作线程以处理任务,从而维护线程池的有效运行和资源的最佳利用。多次的状态检查和CAS操作确保了较高的并发安全性。

runWorker

if (workerAdded) {
    t.start();  // 启动工作线程
    workerStarted = true;
}

在上面的addWorker方法中,我们看到,如果线程添加成功(workerAdded=true)了以后,会执行t.start()启动线程

// Worker 类的 run 方法,是每个 worker 线程的执行主体
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread(); // 获取当前线程的引用
    Runnable task = w.firstTask; // 取出传入的第一个任务
    w.firstTask = null; // 清空第一个任务,避免重复引用导致内存泄漏
    w.unlock(); // 解锁,允许中断

    // 初始值为 true,以标记线程是由于未处理的异常突然完成任务
    boolean completedAbruptly = true;
    try {
        // 循环取任务执行,当 task 不为 null 或者能从队列中获取到任务时
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 对 worker 加锁,开始执行任务
            // 如果线程池停止或当前线程被中断,则确保线程被中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行任务之前可以进行的钩子方法
                beforeExecute(wt, task);
                // 异常变量,用于捕获任务执行中的异常
                Throwable thrown = null;
                try {
                    task.run(); // 运行任务
                } catch (RuntimeException x) {
                    thrown = x; throw x; // 捕获运行时异常
                } catch (Error x) {
                    thrown = x; throw x; // 捕获错误
                } catch (Throwable x) {
                    thrown = x; throw new Error(x); // 捕获其他可抛出的异常
                } finally {
                    // 执行任务之后的钩子方法,传递任务和异常信息
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务执行完毕,清除引用,完成此任务
                task = null;
                // 完成任务计数增加
                w.completedTasks++;
                // 解锁 worker
                w.unlock();
            }
        }
        // 如果 while 循环完成并没有异常,则设置 completedAbruptly 为 false
        completedAbruptly = false;
    } finally {
        // 线程退出处理,针对异常终止或正常终止的线程做不同的退出处理
        processWorkerExit(w, completedAbruptly);
    }
}
代码解读:
  1. 线程开始运行,获取当前线程引用,并尝试执行Worker的第一个任务。
  2. w.unlock()允许在执行任务时可以中断当前线程,比如取消任务或关闭线程池的情况。
  3. completedAbruptly标志位用于跟踪任务是否突然完成(如抛出异常)。
  4. try块中循环执行以下步骤:
    • 从任务队列中获取下一个任务 (getTask())。
    • 执行Worker的加锁操作,以确保任务的执行过程中不会被其他线程干扰。
    • 如果线程池正在停止,确保当前线程被中断;如果线程池没有停止,确保线程不被中断。
    • 在开始执行任务之前调用beforeExecute()
    • 执行任务 (task.run())。
    • 使用try-catch捕捉执行任务过程中抛出的异常。
    • 在任务执行完毕后调用afterExecute()
  5. 每完成一个任务后,将该任务设置为null,完成任务数 (completedTasks) 加一,然后进行解锁操作。
  6. 如果while循环正常结束,将completedAbruptly设置为false,表示工作线程正常完成,没有发生异常。
  7. 最后,在finally块中调用processWorkerExit()方法处理工作线程的退出。如果completedAbruptly仍为true,意味着工作线程异常退出,这个方法将进行相应的清理工作。

以上流程确保了任务能够在工作线程中被正确处理执行,同时确保在任务执行前后能进行相应的额外处理(如资源释放、计数器维护等),并且在工作线程因异常结束时做出适当的清理。

getTask

final void runWorker(Worker w) {
	.....
 	// 循环取任务执行,当 task 不为 null 或者能从队列中获取到任务时
    while (task != null || (task = getTask()) != null) {

上面runWorker里面的getTask() 也是 ThreadPoolExecutor 中的一个方法,用于从线程池的工作队列中获取待执行的任务。它处理线程池的状态和工作队列,以决定是否提供一个任务、等待任务或终止线程。

private Runnable getTask() {
    boolean timedOut = false; // 上一次调用 poll() 是否超时?

    for (;;) { // 无限循环,直到从工作队列获取到任务或者确定线程应该终止
        int c = ctl.get(); // 获取当前线程池控制状态
        int rs = runStateOf(c); // 从控制状态中提取运行状态

        // 如果线程池正在 SHUTDOWN 或更高状态,并且工作队列为空或者线程池正在 STOP 状态
        // 那么此线程应该被终止
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount(); // 减少工作线程的计数器
            return null; // 返回 null,表示没有任务执行,线程应该退出
        }

        int wc = workerCountOf(c); // 从控制状态中提取当前工作线程数

        // 判断是否允许核心线程超时或当前工作线程数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果当前工作线程数大于最大线程数,或者(允许超时并且已经超时)且(工作线程数大于1或工作队列为空)
        // 那么尝试减少工作线程并返回 null
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null; // 成功减少工作线程计数,则当前线程应该退出
            continue; // 如果减少失败,重新进行循环
        }

        try {
            // 根据队列是否允许线程超时来从工作队列获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r; // 如果成功获取到任务,则返回这个任务
            timedOut = true; // 如果 poll() 调用返回了 null,则表示超时
        } catch (InterruptedException retry) {
            timedOut = false; // 如果在等待任务时被中断,则重置超时标志
        }
    }
}
代码解读:
  1. 循环尝试:该方法在一个无限循环中运行,直到获取一个任务或确定线程应当退出。
  2. 状态检查:检查线程池的当前状态,以及当前线程是否应当因为线程池状态(如关闭)或工作队列状态(如空)而终止。
  3. 线程数调整:根据线程池配置(allowCoreThreadTimeOutcorePoolSizemaximumPoolSize)和当前状态(超时、工作队列空),决定是否终止当前线程。
  4. 任务获取
    • 如果不需要终止当前线程,则尝试从工作队列中获取任务。
    • 根据是否允许超时,使用 poll(带超时)或 take(无超时)从队列中获取任务。
  5. 超时和中断处理:处理在等待任务时发生的超时和中断事件。
特别注意:
  • 这个方法负责管理线程是否持续等待新任务或者是从工作队列中获取任务时是否采取超时策略,这对于线程池的效率和资源管理非常关键。
  • 方法通过精细的逻辑检查线程池的状态和工作队列的状态,合理地控制线程的生命周期,确保不会有过多线程空闲或占用资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/669197.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode题练习与总结:路径总和Ⅱ--113

一、题目描述 给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [5,4,8,11,null,13,4,7,2,null,null,5,1], target…

基于Springboot驾校预约平台小程序的设计与实现(源码+数据库+文档)

一.项目介绍 系统角色&#xff1a;管理员、教练、学员 小程序(仅限于学员注册、登录)&#xff1a; 查看管理员发布的公告信息 查看管理员发布的驾校信息 查看所有教练信息、预约(需教练审核)、评论、收藏喜欢的教练 查看管理员发布的考试信息、预约考试(需管理…

算法题解记录27+++随机链表的复制(百日筑基)

一、题目描述&#xff1a; 题目难度&#xff1a;中等 给你一个长度为 n 的链表&#xff0c;每个节点包含一个额外增加的随机指针 random &#xff0c;该指针可以指向链表中的任何节点或空节点。 构造这个链表的 深拷贝。 深拷贝应该正好由 n 个 全新 节点组成&#xff0c;其中每…

CDH6.3.2安装文档

前置环境&#xff1a; 操作系统&#xff1a; CentOS Linux release 7.7 java JDK &#xff1a; 1.8.0_231 1、准备工作 准备以下安装包&#xff1a; Cloudera Manager: cloudera-manager-agent-6.3.1-1466458.el7.x86_64.rpm cloudera-manager-daemons-6.3.1-1466458.el…

linux安装MYSQL后,利用grep查看MYSQL初始密码

问题描述 linux安装mysql获取初始密码 解决方案&#xff1a; 通过查看日志获取初始密码 grep "password" /var/log/mysqld.loggrep 是一个用于在文本中查找特定字符串的工具。 /var/log/mysqld.log 是要搜索的文件路径&#xff0c;"password" 是要查找的…

树莓集团:构筑全国数字影像生态链

在数字化浪潮席卷全球的今天&#xff0c;数字影像技术正以前所未有的速度改变着我们的生活。成都树莓集团以远见卓识和坚定步伐&#xff0c;专注于全国数字影像生态链的建设&#xff0c;不断推动着文创产业的创新与发展。 树莓集团致力于打造一个完整的数字影像生态链&#xff…

CLIP--Learning Transferable Visual Models From Natural Language Supervision

参考&#xff1a;CLIP论文笔记--《Learning Transferable Visual Models From Natural Language Supervision》_visual n-grams模型-CSDN博客 openAI&#xff0c;2021&#xff0c;将图片和文字联系在一起&#xff0c;----->得到一个能非常好表达图片和文字的模型主题&#…

Java后端代码框架包设计-什么是Domain,BO,VO?我们改如何区分和定义?

我们先来看看一个项目的代码结构,如下图: 1.定义包名用domain这个单词是什么含义 在Java中,domain 这个单词通常用于表示应用程序的“领域模型”(Domain Model)或“领域层”(Domain Layer)。领域模型是描述系统业务逻辑和规则的对象集合,它通常包含实体(Entities)、…

构建一个文字冒险游戏:Python 编程实战

在本文中&#xff0c;我们将探索如何使用 Python 创建一个简单的文字冒险游戏。通过这个项目&#xff0c;你将了解到基础的编程技术&#xff0c;包括条件语句、函数和基本的用户输入处理&#xff0c;同时也能体会到文本游戏的魅力和设计的挑战。 项目概述 文字冒险游戏是一种…

Transformer中的位置编码PE(position encoding)

Transformer中的位置编码PE(position encoding) 1.提出背景 transformer模型的attention机制并没有包含位置信息&#xff0c;即一句话中词语在不同的位置时在transformer中是没有区别的 2.解决背景 给encoder层和decoder层的输入添加了一个额外的向量Positional Encoding&a…

linux进程的加载和启动过程分析

我们的源代码通过预处理,编译,汇编,链接后形成可执行文件,那么当我们在终端敲下指令$ ./a.out argv1 argv2 后,操作系统是怎么将我们的可执行文件加载并运行的呢? 首先知道,计算机的操作系统的启动程序是写死在硬件上的,每次计算机上电时,都将自动加载启动程序,之后…

使用迭代最近点 (ICP) 算法在 Open3D 中对齐点云

一、Open3D 简介及其功能 Open3D 是一个现代库&#xff0c;它提供了用于处理 3D 数据的各种工具。在其功能中&#xff0c;它提供了高效的数据结构和算法来处理点云、网格等&#xff0c;使其成为在计算机视觉、机器人和图形领域工作的研究人员和从业人员的不错选择。Open3D 的特…

运维开发详解之指标收集

一、指标收集 运维开发中的指标收集是指收集、监控和分析系统运行的各种指标数据&#xff0c;用于评估系统的性能、健康状况和可靠性。这些指标可以包括服务器的 CPU 使用率、内存利用率、磁盘空间使用情况、网络流量等等。 指标收集的目的是为了及时发现系统存在的问题&…

Jetpack MVVM - Android架构探索!

一 开发架构 是什么&#xff1f; 我们先来理解开发架构的本质是什么&#xff0c;维基百科对软件架构的描述如下&#xff1a; 软件架构是一个系统的草图。软件架构描述的对象是直接构成系统的抽象组件。各个组件之间的连接则明确和相对细致地描述组件之间的通讯。在实现阶段&a…

选择算法之冒泡排序【图文详解】

P. S.&#xff1a;以下代码均在VS2019环境下测试&#xff0c;不代表所有编译器均可通过。 P. S.&#xff1a;测试代码均未展示头文件stdio.h的声明&#xff0c;使用时请自行添加。 博主主页&#xff1a;LiUEEEEE                        …

Java——变量

一、变量介绍 变量就是申请内存来存储值。也就是说&#xff0c;当创建变量的时候&#xff0c;需要在内存中申请空间。内存管理系统根据变量的类型为变量分配存储空间&#xff0c;分配的空间只能用来储存该类型数据。 1、变量声明和初始化 变量的声明&#xff1a; int a; i…

2021JSP普及组第三题:插入排序

2021JSP普及组第三题 题目&#xff1a; 思路&#xff1a; 题目要求排序后根据操作进行对应操作。 操作一需要显示某位置数据排序后的位置&#xff0c;所以需要定义结构体数组储存原数据的位置和数据本身排序后所得数据要根据原位置输出排序后的位置&#xff0c;所以建立一个新…

字典树,AcWing 5726. 连续子序列

一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 5726. 连续子序列 - AcWing题库 二、解题报告 1、思路分析 字典树存储前缀和 考虑边遍历计算前缀和&#xff0c;边查询字典树 查询流程&#xff1a; 记当前前缀和为s 如果当前位k为1&#xff0c;那么s …

Qt6 mathgl数学函数绘图

1. 程序环境 Qt6.5.1, mingw11.2mathgl 8.0.1: https://sourceforge.net/projects/mathgl/,推荐下载mathgl-8.0.LGPL-mingw.win64.7z,Windows环境尝试自己编译mathgl会缺失一些库,补充完整也可以自己编译,路径"D:\mathgl-8.0.LGPL-mingw.win64\bin"添加至系统环境…

关于Golang中自定义包的简单使用-Go Mod

1. go env 查看 GO111MODULE 是否为 on&#xff0c;不是修改成on go env -w GO111MODULEon 2 .自定义包的目录格式 3. test.go 内容 package calc func Add(x, y int) int { // 首字母大写表示公有方法return x y }func Sub(x, y int) int {return x - y } 4.生成calc目…