文章目录
- 文章导图
- 前言
- Executors、Executor、execute对比剖析
- Executors生成的线程池?
- 线程池中的 execute 方法
- execute 方法的作用
- execute的工作原理
- 拒绝策略
- 源码分析工作原理
- 基本知识
- 线程的状态
- 线程池的状态
- 线程池状态和线程状态总结
- 线程池的状态信息和线程数量信息(ctl)
- execute->addWorker->runWorker->getTask流程
- execute方法
- 步骤详细解释:
- 关键方法:
- 总结:
- addWorker
- Worker线程
- 步骤详细解释:
- 方法的关键点:
- 总结:
- runWorker
- 代码解读:
- getTask
- 代码解读:
- 特别注意:
线程池系列文章可参考下表,目前已更新完毕…
线程池系列: | 文章 |
---|---|
Java基础线程池 | 深入剖析Java线程池的核心概念与源码解析:从Executors、Executor、execute逐一揭秘 |
CompletableFuture线程池 | 从用法到源码再到应用场景:全方位了解CompletableFuture及其线程池 |
SpringBoot默认线程池(@Async和ThreadPoolTaskExecutor) | 探秘SpringBoot默认线程池:了解其运行原理与工作方式(@Async和ThreadPoolTaskExecutor) |
SpringBoot默认线程池和内置Tomcat线程池 | 你是否傻傻分不清SpringBoot默认线程池和内置Tomcat线程池? |
文章导图
前言
在日常编码中,特别是在处理并发编程时,Java 提供了很多便捷工具帮助我们高效运行。不过你是否也曾被 Executors、Executor 和 execute 这些名字搞得一头雾水?
它们长得这么像,究竟有什么区别呢?接下来跟着我一探究竟吧!
Executors、Executor、execute对比剖析
java.util.concurrent.Executors | 是一个工具类,提供了一些静态方法,用于创建各种类型的 ExecutorService 实例。例如,newFixedThreadPool 可以创建一个固定大小的线程池,newCachedThreadPool 可以创建一个可缓存的线程池等。它简化了创建线程池的过程,提供了一些默认的配置选项。 |
---|---|
java.util.concurrent.Executor | 是一个接口,定义了一个单一的方法 execute ,用于提交任务到执行器中。这个接口是线程池的基础接口,它没有提供直接创建线程池的方法。 |
java.util.concurrent.Executor#execute | 是 Executor 接口中唯一的方法,用于将任务提交给执行器执行。这个方法通常用于提交实现了 Runnable 接口的任务。 |
简而言之,Executors
是一个工具类,用于创建各种类型的线程池实例。Executor
是一个接口,定义了提交任务到执行器的方法。execute
方法是 Executor
接口定义的唯一方法,用于提交任务给执行器执行。
如果你需要创建线程池,可以使用 Executors
类提供的方法来创建,然后使用返回的线程池实例来提交任务。具体看我下面的这个例子就理解了!:
//Executors工具类创建线程池默认是ExecutorService,ExecutorService又继承了Executor,它是线程池最底层的基础接口!
Executor executor = Executors.newFixedThreadPool(5);
//然后用executor线程池去执行execute方法,接收一个Runnable任务
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
Executors生成的线程池?
说到了Executors,就必须谈下用它生成的线程池,在阿里Java规约中,对Executors有一个专门的规约:
原因其实也很简单,会有各种OOM风险,也不便于管理,详细的我都帮你们总结成思维导图了!
- Executors.newWorkStealingPool方法之外,其他方法都有OOM风险。
- 我们可以发现Executors创建的线程池底层其实还是基于ThreadPoolExecutor的方式
线程池中的 execute 方法
讲完了Executors生成的线程池,接下来当然是看看这个线程池的核心方法execute了
execute 方法的作用
execute
方法是线程池的核心方法之一,可以用来提交任务并执行,当然也可以用submit
,但是实际核心逻辑都是一样的。
不管是上面的Executors生成的线程池,还是自己定义的ThreadPoolExecutor线程池,实际去执行线程的时候都会调用到我们的execute
方法,
-
它用于向线程池提交一个任务,供线程池调度执行。
-
它接受一个
Runnable
对象作为参数,并将其提交给内部的工作队列。
execute的工作原理
线程池采用的是一种生产者-消费者的模型,如下图:
工作原理如下:
- 任务提交:
- 当外部提交一个任务到线程池时,线程池会根据当前的运行状态和线程数量决定如何处理这个任务。
- 任务队列和线程的管理:
- 如果当前运行的线程少于核心线程数(corePoolSize),线程池会创建一个新的工作线程来执行这个任务。
- 如果当前运行的线程数达到核心线程数,任务会被放入任务队列等待执行。
- 如果任务队列已满,并且当前运行的线程数少于最大线程数(maximumPoolSize),线程池会创建新的工作线程来处理任务。
- 如果任务队列已满,并且线程池中的线程数已经达到最大线程数,线程池会根据拒绝策略处理这个任务。
- 任务执行:
- 工作线程从任务队列中获取任务并执行。
- 执行完一个任务后,工作线程会继续从任务队列中获取下一个任务,直到任务队列为空。
- 线程的回收和销毁:
- 如果一个线程在一定时间内(keepAliveTime)没有获取到新的任务,且当前线程数超过核心线程数,那么该线程将被终止。
- 当所有任务完成后,核心线程会继续等待新任务,而非核心线程会被回收。
拒绝策略
策略名称 | 描述 | 适用场景 |
---|---|---|
AbortPolicy | 默认的拒绝策略。当任务无法提交到线程池时,抛出 RejectedExecutionException 异常。 | 希望在任务无法处理时立即得到通知,并进行相应处理的场景。 |
CallerRunsPolicy | 调用者运行策略。当任务无法提交到线程池时,由提交任务的线程(即调用者线程)执行该任务。 | 希望降低任务提交速度,并且不希望丢弃任务的场景。 |
DiscardPolicy | 丢弃策略。当任务无法提交到线程池时,直接丢弃该任务,不进行任何处理。 | 可以接受任务丢失,并且不希望对系统产生过多负载的场景。 |
DiscardOldestPolicy | 丢弃最旧策略。当任务无法提交到线程池时,丢弃任务队列中最旧的未处理任务,然后尝试重新提交当前任务。 | 希望抛弃旧任务,优先处理新任务的场景。 |
在实际操作里,建议选默认的AbortPolicy
或CallerRunsPolicy
策略。
AbortPolicy
策略:
AbortPolicy
是最保险安全的,简单粗暴,一满就拒绝,无论什么情况都能保证系统不会因线程池出问题。- 缺点也明显,一满就可能丢线程,没执行到你想执行的业务逻辑。
CallerRunsPolicy
策略:
CallerRunsPolicy
策略是,当线程池塞不下新任务时,会让调用线程来跑,既不丢任务,又能适当减缓新任务速度,减轻线程池压力,特别适合需要高执行保证的场景。- 但它的缺点正好相反,不丢线程,但流量大时可能出问题。
想象一下,某天早上,系统访问量突然猛增,线程池马上满了,剩下的都被拒绝策略打回给调用者线程处理,就像高速公路后半段的车都要掉头走别的路,你说后面的高速会不会堵得瘫痪?
系统也是一样,这个拒绝策略在这种情况下可能让系统崩溃,在前端页面上可能出现假死、卡顿、超时未响应等问题。
当然,如果默认的策略不满足你的需求也可以通过实现
RejectedExecutionHandler
接口来定义自己的拒绝策略,以满足特殊的业务需求。
class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义处理逻辑
}
}
源码分析工作原理
上面讲到了线程池的工作原理或者也是工作流程,那么为什么是这样的呢?
知其然知其所以然,那我们来看看源码不就知道了!
基本知识
线程的状态
注意我们这里涉及到了线程池的状态,注意区别我们常见的线程状态,线程状态如下
线程状态指的是单个线程在其生命周期中所处的状态。Java 中的线程有以下几种状态:
状态 | 描述 |
---|---|
NEW | 线程已创建但尚未启动。 |
RUNNABLE | 线程正在 Java 虚拟机中执行。 |
BLOCKED | 线程被阻塞,等待监视器锁。 |
WAITING | 线程无限期等待另一个线程执行特定操作。 |
TIMED_WAITING | 线程在指定的等待时间内等待另一个线程执行特定操作。 |
TERMINATED | 线程已退出。 |
代码示例:
Thread thread = new Thread(() -> {
// 线程任务
});
System.out.println(thread.getState()); // NEW
thread.start();
System.out.println(thread.getState()); // RUNNABLE 或其他状态
线程池的状态
线程池的状态有5种,他们的状态转换如下图所示,可以看到和线程的状态完全它们不是一回事。
ThreadPoolExecutor类存放线程池的状态信息很特别,是存储在一个int类型原子变量的高3位,而低29位用来存储线程池当前运行的线程数量。通过将线程池的状态和线程数量合二为一,可以做到一次CAS原子操作更新数据。
状态 | 高3位值 | 说明 |
---|---|---|
RUNNING | 111 | 运行状态,线程池被创建后的初始状态,能接受新提交的任务,也能处理阻塞队列中的任务。 |
SHUTDOWN | 000 | 关闭状态,不再接受新提交的任务,但任可以处理阻塞队列中的任务。 |
STOP | 001 | 停止状态,会中断正在处理的线程,不能接受新提交的任务,也不会处理阻塞队列中的任务。 |
TIDYING | 010 | 所有任务都已经终止,有效工作线程为0。 |
TERMINATED | 011 | 终止状态,线程池彻底终止。 |
代码示例:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
// 获取线程池状态
System.out.println(executor.isShutdown()); // false
System.out.println(executor.isTerminating()); // false
System.out.println(executor.isTerminated()); // false
executor.shutdown();
// 获取线程池状态
System.out.println(executor.isShutdown()); // true
System.out.println(executor.isTerminating()); // true 或 false
System.out.println(executor.isTerminated()); // true 或 false
线程池状态和线程状态总结
比较项 | 线程状态 | 线程池状态 |
---|---|---|
描述 | 单个线程在其生命周期中所处的状态。 | 线程池在其生命周期中所处的状态。 |
常见状态 | NEW , RUNNABLE , BLOCKED , WAITING , TIMED_WAITING , TERMINATED | RUNNING , SHUTDOWN , STOP , TIDYING , TERMINATED |
主要用途 | 用于监控和调试单个线程的执行状态。 | 用于管理和控制整个线程池的生命周期和任务处理行为。 |
示例代码 | Thread.getState() | ThreadPoolExecutor.isShutdown() , ThreadPoolExecutor.isTerminating() , ThreadPoolExecutor.isTerminated() |
线程池的状态信息和线程数量信息(ctl)
// 使用原子操作类AtomicInteger的ctl变量,前3位记录线程池的状态,后29位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的范围为[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用来辅助左移位运算
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位用来存储线程池运行状态,其余位数表示线程池的容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态以常量值被存储在高三位中
private static final int RUNNING = -1 << COUNT_BITS; // 线程池接受新任务并会处理阻塞队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS; // 线程池不接受新任务,但会处理阻塞队列中的任务
private static final int STOP = 1 << COUNT_BITS; // 线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务都执行完成,且工作线程数为0,将调用terminated方法
private static final int TERMINATED = 3 << COUNT_BITS; // 最终状态,为执行terminated()方法后的状态
- ctl变量的封箱拆箱相关的方法
// ctl变量的封箱拆箱相关的方法
private static int runStateOf(int c) { return c & ~CAPACITY; } // 从ctl中获取线程池的状态值
private static int workerCountOf(int c) { return c & CAPACITY; } // 从ctl中获取线程池的数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //生成ctl值, rs 表示线程池状态,wc 表示当前线程池中 worker
(线程)数量,相与以后就是合并后的状态
- 线程池状态值比较
比较当前线程池 ctl 所表示的状态:线程池状态值的大小关系:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
// 比较当前线程池 ctl 所表示的状态,是否小于某个状态 s
private static boolean runStateLessThan(int c, int s) { return c < s; }
// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态s
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// 小于 SHUTDOWN 的一定是 RUNNING,SHUTDOWN == 0
private static boolean isRunning(int c) { return c < SHUTDOWN; }
- CAS设置ctl的值
// 使用 CAS 方式 让 ctl 值 +1 ,成功返回 true, 失败返回 false
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 使用 CAS 方式 让 ctl 值 -1 ,成功返回 true, 失败返回 false
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 将 ctl 值减一,do while 循环会一直重试,直到成功为止
private void decrementWorkerCount() {
do {} while (!compareAndDecrementWorkerCount(ctl.get()));
}
execute->addWorker->runWorker->getTask流程
ThreadPoolExecutor 类中的 execute
方法、addWorker
方法、runWorker
方法和 getTask
方法共同协作,以确保线程池的有效执行和任务管理。
execute
方法:- 是线程池接口
Executor
的实现,用户通过这个方法提交任务(Runnable
对象)给线程池。 execute
方法首先判断是否需要添加新的工作线程(Worker
)来执行提交的任务,如果是,则调用addWorker
方法。
- 是线程池接口
addWorker
方法:- 负责创建新的工作线程(
Worker
),并将提交的任务作为线程的首个任务(firstTask
)。 - 该方法同样负责确认线程池的状态和工作线程的数量是否允许添加新的
Worker
。 - 如果成功添加了
Worker
,该Worker
会通过runWorker
方法开始执行。
- 负责创建新的工作线程(
runWorker
方法:- 由每个工作线程(
Worker
)调用,是工作线程的主要执行循环。 - 在这个方法中,线程会循环地从任务队列中获取任务(通过调用
getTask
方法),并执行它们。
- 由每个工作线程(
getTask
方法:- 从线程池的任务队列中获取待执行的任务。
- 如果线程池处于关闭状态或者配置允许线程超时且没有任务可执行,
getTask
可以返回null
,导致runWorker
方法结束执行循环,从而允许工作线程结束。
大致流程如下:
了解了大致流程以后,接下来我们再仔细剖析每个方法!
execute方法
这段代码是Java中的ThreadPoolExecutor
类的核心方法之一:execute
。此方法负责处理接收到的Runnable任务,并根据线程池的状态(包括核心线程、工作队列、最大线程数等)来决定如何处理这个任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取当前线程池的状态
int c = ctl.get();
// 步骤1:如果当前运行的线程数少于 corePoolSize,则创建一个新线程来执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 调用 addWorker 方法尝试增加一个新的工作线程
return;
c = ctl.get(); // 如果 addWorker 失败(添加线程失败或线程池已关闭),则重新获取线程池状态
}
// 步骤2:如果任务可以成功加入工作队列
if (isRunning(c) && workQueue.offer(command)) { // 尝试将任务加入工作队列
int recheck = ctl.get(); // 再次检查线程池状态
if (!isRunning(recheck) && remove(command)) // 如果线程池不再运行,并且成功从队列中移除任务
reject(command); // 拒绝任务
else if (workerCountOf(recheck) == 0) // 如果线程池仍然运行但没有正在执行的线程
addWorker(null, false); // 尝试添加一个新的工作线程
}
// 步骤3:如果工作队列已满或池已满,尝试创建一个新的线程,如果失败则拒绝任务
else if (!addWorker(command, false))
reject(command); // 如果添加新线程失败,拒绝任务
}
步骤详细解释:
-
第一步:如果当前运行的线程数少于核心线程数(corePoolSize),尝试创建一个新线程来执行任务:
if (workerCountOf(c) < corePoolSize)
:检查当前运行的线程数是否少于核心线程数。if (addWorker(command, true))
:调用addWorker
方法尝试创建一个新线程,如果成功,新线程将执行给定的任务command
。addWorker(command, true)
返回true
表示已成功创建并启动了一个新线程来处理任务。- 如果
addWorker
失败(如无法新建线程,或线程池状态已变),则继续步骤2。
-
第二步:如果成功地将任务放入队列:
if (isRunning(c) && workQueue.offer(command))
:检查线程池是否处于运行状态,并尝试将任务添加到工作队列中(非阻塞操作)。if (!isRunning(recheck) && remove(command))
:重新检查线程池的状态,如果线程池不再运行,并且成功从工作队列中移除了任务(即任务被停止),则执行拒绝策略reject(command)
。else if (workerCountOf(recheck) == 0)
: 如果没有任何线程在运行,尝试创建一个新线程以确保至少有一个线程在执行任务。
-
第三步:如果无法将任务加入队列(即队列满了):
else if (!addWorker(command, false))
:尝试新增一个非核心线程来执行任务,如果再次失败(比如线程数超过最大限制 maxPoolSize),则执行拒绝策略reject(command)
。
关键方法:
-
ctl
: 是一个控制变量,它用来控制线程池的状态和线程数量。 -
workerCountOf(c)
: 是一个方法,用来从控制变量c
中提取当前的工作线程数量。 -
addWorker(Runnable firstTask, boolean core)
:该方法用于创建并启动新线程,将新线程添加到线程池中执行任务。 -
isRunning(int c)
:检查线程池是否处于运行状态。 -
workQueue.offer(command)
:尝试将任务加入工作队列。 -
remove(Runnable task)
:从工作队列中移除指定任务。 -
reject(Runnable task)
:拒绝任务,通常是通过调用RejectedExecutionHandler
。
总结:
这段代码通过三个步骤确保线程池以最佳方式处理新提交的任务:
- 优先使用核心线程处理任务。
- 如果核心线程已满,将任务放入队列。
- 如果队列已满,则使用备用策略(如创建新的非核心线程)处理任务,最后如果所有处理策略都失败,则拒绝新任务。
这种设计用于确保线程池的高效利用并提供灵活的任务处理机制,同时保护系统免受过载。
addWorker
Worker线程
Worker是通过继承AQS(
AbstractQueuedSynchronizer
),使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
Worker
类是线程池中负责实际执行任务的工作线程。它封装了一个线程,并且负责从任务队列中取出任务并执行。Worker
在 ThreadPoolExecutor
中的主要作用包括:
- 执行任务:
Worker
从任务队列中取出任务并执行这些任务。 - 维护线程存活:即使当前没有任务可执行,
Worker
线程也可以保持存活,以便随时准备处理新的任务。 - 生命周期管理:
Worker
管理着线程的生命周期,包括启动、运行和终止。
步骤详细解释:
这段代码来自ThreadPoolExecutor
类的addWorker
方法,该方法用于向线程池中添加新的工作线程。如果线程池处于非关闭状态或者允许新任务进入,则会将传入的Runnable
任务交给新线程来执行。以下是对该代码的逐行解释:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get(); // 获取线程池状态控制变量
int rs = runStateOf(c); // 获取线程池当前运行状态
// 检查线程池状态:
// 1. 线程池状态是否 >= SHUTDOWN(即 STOP、TERMINATE 等),如果是且
// 2. 当前状态是 SHUTDOWN,且 firstTask 为 null,且工作队列不为空,则继续添加。
// 否则,不能添加新线程,直接返回 false。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层循环,尝试增加 worker 数量。
for (;;) {
int wc = workerCountOf(c); // 获取当前工作线程数
// 3. 如果工作线程数已达到上限(CAPACITY)或达到核心线程数(corePoolSize)或最大线程数(maximumPoolSize),则返回 false。
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 4. 尝试通过 CAS(Compare-And-Swap)操作增加工作线程计数。如果成功,则跳出整个 retry 循环。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 5. 如果 CAS 失败,重新获取线程池状态控制变量
// 6. 如果运行状态已改变,则返回到外层 retry 循环重新尝试。
if (runStateOf(c) != rs)
continue retry;
// 否则,重新尝试内循环(CAS 操作失败的情况)。
}
}
// 简单总结上面的CAS过程:
//(1)内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas
//(2)cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了
//(3)如果变了,则重新进入外层循环重新获取线程池状态,否者重新进入内层循环继续进行cas
// 走到这里说明cas成功,线程数+1,但并未被执行
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建新 Worker,并分配 firstTask
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock; // 获取主锁,确保添加操作的同步
mainLock.lock();
try {
// 7. 再次检查池的运行状态,确保在保持锁定的一致性
int rs = runStateOf(ctl.get());
// 8. 如果状态允许添加新线程(即池处于 RUNNING 或 SHUTDOWN 且 firstTask 为 null),则进行进一步操作。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 防止创建已启动的线程
if (t.isAlive())
throw new IllegalThreadStateException();
// 将 Worker 添加到工作集
workers.add(w);
// 更新最大池大小统计
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); // 确保在退出时释放锁
}
if (workerAdded) {
t.start(); // 启动工作线程
workerStarted = true;
}
}
} finally {
// 如果线程启动失败,进行适当的清理
if (! workerStarted)
addWorkerFailed(w);
}
// 返回是否成功启动工作线程
return workerStarted;
}
-
初始检查和状态验证:
- 使用一个无限循环 (
retry:
标签) 不断尝试添加新的工作线程。 - 从控制字段 (
ctl
) 中获取线程池状态,并获取当前的运行状态 (runStateOf(c)
). - 检查线程池是否正在关闭 (
rs >= SHUTDOWN
),如果是且任务队列不为空,返回false
退出。
- 使用一个无限循环 (
-
递增工作线程数量:
- 内部循环再一次检查当前的工作线程数量 (
workerCountOf(c)
),并确保其未超过CAPACITY
或给定的大小限制 (corePoolSize
或maximumPoolSize
)。 - 使用CAS操作 (
compareAndIncrementWorkerCount(c)
) 递增工作线程计数,确保线程安全。 - 如果CAS操作失败,重新获取
ctl
用于检查运行状态是否已更改。
- 内部循环再一次检查当前的工作线程数量 (
-
实例化工作线程并添加到池中:
- 如果成功增加了工作线程计数,创建一个新的
Worker
实例,并分配传入的firstTask
。 - 获取Worker关联的线程实例 (
w.thread
) 并使用ReentrantLock (mainLock
) 确保操作的同步。 - 再次检查运行状态,如果状态允许则工作线程已添加入池中 (
workers.add(w)
), 并更新线程池大小统计 (largestPoolSize
)。
- 如果成功增加了工作线程计数,创建一个新的
-
启动新线程:
- 如果
Worker
成功添加,则启动新线程 (t.start()
), 将workerStarted
设置为true
。
- 如果
-
清理和失败处理:
- 最后,如果线程未成功启动,调用
addWorkerFailed
方法进行清理和失败处理。 - 返回
workerStarted
表示工作线程是否成功启动。
- 最后,如果线程未成功启动,调用
方法的关键点:
- 运行状态验证:多次检查和验证线程池的运行状态以确定是否应该添加新的工作线程。
- 线程安全:使用CAS操作和锁来确保多线程环境中的线程安全。
- 资源管理:同步地添加或移除线程,并在失败时进行恰当的资源清理。
总结:
该方法通过复杂且细致的状态检查和同步操作,确保只在适宜的条件下添加新的工作线程以处理任务,从而维护线程池的有效运行和资源的最佳利用。多次的状态检查和CAS操作确保了较高的并发安全性。
runWorker
if (workerAdded) {
t.start(); // 启动工作线程
workerStarted = true;
}
在上面的addWorker方法中,我们看到,如果线程添加成功(workerAdded=true)了以后,会执行t.start()启动线程
// Worker 类的 run 方法,是每个 worker 线程的执行主体
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 获取当前线程的引用
Runnable task = w.firstTask; // 取出传入的第一个任务
w.firstTask = null; // 清空第一个任务,避免重复引用导致内存泄漏
w.unlock(); // 解锁,允许中断
// 初始值为 true,以标记线程是由于未处理的异常突然完成任务
boolean completedAbruptly = true;
try {
// 循环取任务执行,当 task 不为 null 或者能从队列中获取到任务时
while (task != null || (task = getTask()) != null) {
w.lock(); // 对 worker 加锁,开始执行任务
// 如果线程池停止或当前线程被中断,则确保线程被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务之前可以进行的钩子方法
beforeExecute(wt, task);
// 异常变量,用于捕获任务执行中的异常
Throwable thrown = null;
try {
task.run(); // 运行任务
} catch (RuntimeException x) {
thrown = x; throw x; // 捕获运行时异常
} catch (Error x) {
thrown = x; throw x; // 捕获错误
} catch (Throwable x) {
thrown = x; throw new Error(x); // 捕获其他可抛出的异常
} finally {
// 执行任务之后的钩子方法,传递任务和异常信息
afterExecute(task, thrown);
}
} finally {
// 任务执行完毕,清除引用,完成此任务
task = null;
// 完成任务计数增加
w.completedTasks++;
// 解锁 worker
w.unlock();
}
}
// 如果 while 循环完成并没有异常,则设置 completedAbruptly 为 false
completedAbruptly = false;
} finally {
// 线程退出处理,针对异常终止或正常终止的线程做不同的退出处理
processWorkerExit(w, completedAbruptly);
}
}
代码解读:
- 线程开始运行,获取当前线程引用,并尝试执行
Worker
的第一个任务。 w.unlock()
允许在执行任务时可以中断当前线程,比如取消任务或关闭线程池的情况。completedAbruptly
标志位用于跟踪任务是否突然完成(如抛出异常)。- 在
try
块中循环执行以下步骤:- 从任务队列中获取下一个任务 (
getTask()
)。 - 执行
Worker
的加锁操作,以确保任务的执行过程中不会被其他线程干扰。 - 如果线程池正在停止,确保当前线程被中断;如果线程池没有停止,确保线程不被中断。
- 在开始执行任务之前调用
beforeExecute()
。 - 执行任务 (
task.run()
)。 - 使用
try-catch
捕捉执行任务过程中抛出的异常。 - 在任务执行完毕后调用
afterExecute()
。
- 从任务队列中获取下一个任务 (
- 每完成一个任务后,将该任务设置为
null
,完成任务数 (completedTasks
) 加一,然后进行解锁操作。 - 如果
while
循环正常结束,将completedAbruptly
设置为false
,表示工作线程正常完成,没有发生异常。 - 最后,在
finally
块中调用processWorkerExit()
方法处理工作线程的退出。如果completedAbruptly
仍为true
,意味着工作线程异常退出,这个方法将进行相应的清理工作。
以上流程确保了任务能够在工作线程中被正确处理执行,同时确保在任务执行前后能进行相应的额外处理(如资源释放、计数器维护等),并且在工作线程因异常结束时做出适当的清理。
getTask
final void runWorker(Worker w) {
.....
// 循环取任务执行,当 task 不为 null 或者能从队列中获取到任务时
while (task != null || (task = getTask()) != null) {
上面runWorker里面的getTask()
也是 ThreadPoolExecutor
中的一个方法,用于从线程池的工作队列中获取待执行的任务。它处理线程池的状态和工作队列,以决定是否提供一个任务、等待任务或终止线程。
private Runnable getTask() {
boolean timedOut = false; // 上一次调用 poll() 是否超时?
for (;;) { // 无限循环,直到从工作队列获取到任务或者确定线程应该终止
int c = ctl.get(); // 获取当前线程池控制状态
int rs = runStateOf(c); // 从控制状态中提取运行状态
// 如果线程池正在 SHUTDOWN 或更高状态,并且工作队列为空或者线程池正在 STOP 状态
// 那么此线程应该被终止
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 减少工作线程的计数器
return null; // 返回 null,表示没有任务执行,线程应该退出
}
int wc = workerCountOf(c); // 从控制状态中提取当前工作线程数
// 判断是否允许核心线程超时或当前工作线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果当前工作线程数大于最大线程数,或者(允许超时并且已经超时)且(工作线程数大于1或工作队列为空)
// 那么尝试减少工作线程并返回 null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null; // 成功减少工作线程计数,则当前线程应该退出
continue; // 如果减少失败,重新进行循环
}
try {
// 根据队列是否允许线程超时来从工作队列获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; // 如果成功获取到任务,则返回这个任务
timedOut = true; // 如果 poll() 调用返回了 null,则表示超时
} catch (InterruptedException retry) {
timedOut = false; // 如果在等待任务时被中断,则重置超时标志
}
}
}
代码解读:
- 循环尝试:该方法在一个无限循环中运行,直到获取一个任务或确定线程应当退出。
- 状态检查:检查线程池的当前状态,以及当前线程是否应当因为线程池状态(如关闭)或工作队列状态(如空)而终止。
- 线程数调整:根据线程池配置(
allowCoreThreadTimeOut
、corePoolSize
、maximumPoolSize
)和当前状态(超时、工作队列空),决定是否终止当前线程。 - 任务获取
- 如果不需要终止当前线程,则尝试从工作队列中获取任务。
- 根据是否允许超时,使用
poll
(带超时)或take
(无超时)从队列中获取任务。
- 超时和中断处理:处理在等待任务时发生的超时和中断事件。
特别注意:
- 这个方法负责管理线程是否持续等待新任务或者是从工作队列中获取任务时是否采取超时策略,这对于线程池的效率和资源管理非常关键。
- 方法通过精细的逻辑检查线程池的状态和工作队列的状态,合理地控制线程的生命周期,确保不会有过多线程空闲或占用资源。