1. 前言
线程池这块,作为高频面试题,并且实际使用场景巨多,所以出了这篇文章,一块来研究一下线程池的实现原理,运行机制,从底层深挖,不再局限于面试题。
2. 线程池概览
2.1. 构造器
线程池总共有四个构造器。
java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue<java.lang.Runnable>)
java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue<java.lang.Runnable>, java.util.concurrent.ThreadFactory)
java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue<java.lang.Runnable>, java.util.concurrent.RejectedExecutionHandler)
java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue<java.lang.Runnable>, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)
参数名 | 参数解释 | 构造器1必填 | 构造器2必填 | 构造器3必填 | 构造器4必填 |
corePoolSize | 核心线程数。该线程数创造出来就不会回收 | √ | √ | √ | √ |
maximumPoolSize | 最大线程数。当核心线程数不够用来处理任务时,会创建新的临时线程数量,临时线程数量+核心线程数量最大可以到多少 | √ | √ | √ | √ |
keepAliveTime | 当临时线程空闲时,持续存活多久 | √ | √ | √ | √ |
unit | 基于存活时间的一个时间单位 | √ | √ | √ | √ |
workQueue | 保存任务的队列类型 | √ | √ | √ | √ |
threadFactory | 创建线程的工厂 | × | √ | × | √ |
handler | 拒绝策略 | × | × | √ | √ |
根据构造器可以看出,共有七个参数,其中五个是必填的,另外两个选填。
2.2. 线程池中的方法
2.2.1. 操作类
2.2.1.1. execute
方法描述:
核心最常用的方法,该方法是让线程池来执行一个任务。
入参是一个java.lang.Runnable,没有返回结果
2.2.1.2. remove
方法描述:
该方法是移除一个任务。入参是一个java.lang.Runnable,返回boolean值。判断是否移除成功。
2.2.1.3. awaitTermination
等待线程池执行完毕。参数是等待时间。最长等待这么久,如果没执行完,就返回。如果执行完了,也会返回。
2.2.1.4. allowCoreThreadTimeOut
设置线程池中的线程存活时间。这个值适用于非核心线程数,也适用于核心线程数。
默认是核心线程永远存活,非核心线程存在存活时间。
如果调用该方法,并传入true值,则核心线程也将有生命周期,到达一定的时间后,就会被销毁。
2.2.1.5. purge
删除当前在队列中没有执行的任务。
2.2.1.6. prestartCoreThread
创建一个核心线程,并让它在原地等待。如果当前核心线程还没有满,则创建一个核心线程。如果已经满了,就不再创建。
此方法会返回true 或 false来标识核心线程有没有创建成功
2.2.1.7. prestartCoreThread
创建所有核心线程,并让他们在原地等待。
此方法会返回本次操作总共创建了多少个核心线程。
2.2.1.8. shutdown
该方法会进行有序的让线程池进行关停。不会终止现有任务,但不再接受新的任务。
该方法重复调用没有效果
2.2.1.9. shutdownNow
该方法会尝试终止现在正在执行的任务。尽最大努力来把线程池给关停掉。
2.2.1.10. AbstractExecutorService#submit
该方法有三个重载,分别对应的参数为
- java.lang.Runnable
- java.lang.Runnable,T result
- Callable
第一个为单任务,可以等待执行,调用get方法,得到的结果是null。、
第二个为任务+一个固定的返回值。调用get方法,最终会拿到设置的这个result。
第三个为传入一个有返回值的任务,即callback,调用get,可以拿到任务执行结果返回值。
2.2.1.11. AbstractExecutorService#invokeAll
提交一批任务,全部执行,全部执行完之后,返回结果。
该方法有两个重载,分别参数为:
- Collection<? extends Callable<T>> tasks
- Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit
对应一个一直等待,第二个等待一定时间之后,就不再等待。
2.2.1.12. AbstractExecutorService#invokeAny
提交一批任务,执行任意一个,有结果了就返回。
该方法也有两个重载,分别参数为:
该方法有两个重载,分别参数为:
- Collection<? extends Callable<T>> tasks
- Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit
同样的,第一个会一直等待,直到有一个执行有结果。另外一个等待一定时间后,就不再等待。
2.2.2. 线程池属性配置类
2.2.2.1. setCorePoolSize
配置核心线程数,覆盖启动时默认的配置
2.2.2.2. setMaximumPoolSize
配置最大线程数,覆盖默认配置
2.2.2.3. setKeepAliveTime
配置空闲存活时间
2.2.2.4. setRejectedExecutionHandler
配置拒绝策略执行器
2.2.2.5. setThreadFactory
配置线程创建工厂
2.2.3. 线程池状态类
2.2.3.1. getCorePoolSize
获取当前线程池配置的核心线程数大小
2.2.3.2. getPoolSize
获取当前线程池中的实际线程数量
2.2.3.3. getActiveCount
获取当前线程池中活跃的线程数量
2.2.3.4. getLargestPoolSize
获取当前线程池,从创建到现在为止,存在过的最大的线程数量。
2.2.3.5. getMaximumPoolSize
获取当前线程池中最大的线程数量
2.2.3.6. getCompletedTaskCount
获取已经执行完成的任务数量。
2.2.3.7. getKeepAliveTime
获取当前配置的空闲时间。需要传入一个单位,会将空闲时间自动转为所需要的时间单位。
2.2.3.8. getQueue
获取当前线程池中的队列对象。可以对该对列做监控,里面会存放目前需要执行的任务。
2.2.3.9. getTaskCount
获取任务数量总和。这个数量是近似值,不是精准值。
该任务数量包含目前所有计划执行的任务数量,包括已经执行过的任务也会计算在内。
2.2.3.10. getRejectedExecutionHandler
获取拒绝策略执行器
2.2.3.11. getThreadFactory
获取线程创建工厂
2.2.3.12. allowsCoreThreadTimeOut
获取当前线程池中,是否允许核心线程销毁
2.2.3.13. isShutdown
判断当前线程池是否关闭
2.2.3.14. isTerminated
判断当前线程池是否已经终止
2.2.3.15. isTerminating
判断当前线程池是否已经终止或者正在终止中
2.2.3.16. toString
线程池重写了toString方法,该方法会打印线程池中的几个属性。
状态,存活线程数量,活跃线程数,待执行任务数量,已执行任务数量。
2.3. 核心逻辑源码分析
将所有的能力点分析完之后,分为三大类
- 实操类,真正涉及到逻辑的处理
- 属性配置类,一些口子,可以改变初始化的参数
- 状态类,只读,查看目前的线程池状态
基于此,只需要将操作类的完全搞懂,那么配置类和状态类的,自然就清晰了,所以这两类我们不做深入分析,本次的核心目的在于,对操作类的逻辑进行分析。
2.3.1. 构造器逻辑分析
构造器也没什么逻辑,只是将对应的参数进行赋值,并没有逻辑处理。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.3.2. execute方法分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
该方法是高频方法,是我们对于线程池使用中,最常用的方法。
了解了这个方法的执行原理,再使用线程池时,就是得心应手,丝毫不慌。
调用execute方法,直接执行逻辑是非常简单的。通过代码,可以看出。
- 获取一个数字,然后对这个数字做处理,判断是否小于核心线程数。
-
- 如果小于,就添加一个任务,并返回。
- 如果添加任务失败,则再获取一个数字
- 判断这个数字是否在运行中。同时将该任务放入队列中。
-
- 再获取一个数字,判断状态,并移除任务。如果不是运行中并且移除成功,则执行拒绝策略。
- 如果将这个数字处理后,得到的是0,则添加任务。
- 如果添加任务失败,则执行拒绝策略。
看来逻辑简单,真的很难读。好像读懂了,又好像什么都没读懂。
这里面涉及到一些核心的逻辑点
- ctl是什么?get出来的数字代表什么?
- 对这个数字处理的几个方法执行了什么逻辑,分别代表什么意思?
- 添加任务的逻辑是什么?结果的true和false分别代表什么?
把这几块搞懂,那么就基本懂了所有的逻辑了。
2.3.2.1. ctl是什么?
官方解释
The main pool control state, ctl, is an atomic integer packing two conceptual fields workerCount, indicating the effective number of threads runState, indicating whether running, shutting down etc In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable. If this is ever an issue in the future, the variable can be changed to be an AtomicLong, and the shift/mask constants below adjusted. But until the need arises, this code is a bit faster and simpler using an int. The workerCount is the number of workers that have been permitted to start and not permitted to stop. The value may be transiently different from the actual number of live threads, for example when a ThreadFactory fails to create a thread when asked, and when exiting threads are still performing bookkeeping before terminating. The user-visible pool size is reported as the current size of the workers set. The runState provides the main lifecycle control, taking on values: RUNNING: Accept new tasks and process queued tasks SHUTDOWN: Don't accept new tasks, but process queued tasks STOP: Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method TERMINATED: terminated() has completed The numerical order among these values matters, to allow ordered comparisons. The runState monotonically increases over time, but need not hit each state. The transitions are: RUNNING -> SHUTDOWN On invocation of shutdown() (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow() SHUTDOWN -> TIDYING When both queue and pool are empty STOP -> TIDYING When pool is empty TIDYING -> TERMINATED When the terminated() hook method has completed Threads waiting in awaitTermination() will return when the state reaches TERMINATED. Detecting the transition from SHUTDOWN to TIDYING is less straightforward than you'd like because the queue may become empty after non-empty and vice versa during SHUTDOWN state, but we can only terminate if, after seeing that it is empty, we see that workerCount is 0 (which sometimes entails a recheck -- see below).
谷歌翻一下
主池控制状态ctl是一个原子整数,包装了两个概念字段
workerCount,表示线程的有效数量
runState,表示是否正在运行,关闭等
为了将它们包装成一个int,我们将workerCount限制为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。
如果将来出现这个问题,可以将该变量更改为 AtomicLong,并调整下面的移位/掩码常量。但在需要之前,使用 int 的代码会更快、更简单。
workerCount 是已允许启动且不允许停止的工作线程数。该值可能会暂时不同于活动线程的实际数量,例如,当 ThreadFactory 在被请求时无法创建线程时,以及退出线程在终止之前仍在执行簿记时。用户可见的池大小被报告为工作集的当前大小。
runState 提供主要的生命周期控制,取值如下:
RUNNING:接受新任务并处理排队任务
SHUTDOWN:不接受新任务,但处理排队任务
STOP:不接受新任务,不处理排队任务,并中断正在进行的任务
TIDYING:所有任务都已终止,workerCount 为零,线程转换到状态 TIDYING 将运行终止()钩子方法
TERMINATED:终止()已完成
这些值之间的数字顺序很重要,以允许有序比较。
runState 随着时间的推移单调增加,但不需要达到每个状态。转换为:
RUNNING -> SHUTDOWN 调用 shutdown() 时
(RUNNING 或 SHUTDOWN) -> STOP 调用 shutdownNow() 时
SHUTDOWN -> TIDYING 当队列和池都为空时
STOP -> TIDYING 当池为空时
TIDYING -> TERMINATED 当终止()钩子方法完成时,在awaitTermination()中等待的线程将在状态达到TERMINATED时返回。
检测从 SHUTDOWN 到 TIDYING 的转换并不像您想象的那么简单,因为队列在非空后可能会变空,在 SHUTDOWN 状态期间反之亦然,但只有在看到队列为空后,我们看到了workerCount,我们才能终止是 0(有时需要重新检查——见下文)。
提炼
ctl 就是 workerCount 和 runState 的整合计算值。从ctl中,可以反映出目前的线程有效数量和运行状态。
简化来说,通过某种计算逻辑,可以从ctl中,获得当前的核心线程数,线程的运行状态。
想要深入了解,就需要继续来解读和ctl相关的几个数分别是什么,代表什么意思。
字段名 | 说明 | 十进制值 | 二进制值 |
COUNT_BITS | 直译就是二进制数量。 | 29(Integer.SIZE - 3) | 11101 |
COUNT_MASK | 直译就是计数掩码 | 536870911((1 << COUNT_BITS) - 1) | 11111111111111111111111111111 |
RUNNING | 运行状态 | -536870912(-1 << COUNT_BITS) | -100000000000000000000000000000 |
SHUTDOWN | 停止工作 | 0(0 << COUNT_BITS) | 0 |
STOP | 结束 | 536870912(1 << COUNT_BITS) | 100000000000000000000000000000 |
TIDYING | 整理中 | 1073741824(2 << COUNT_BITS) | 1000000000000000000000000000000 |
TERMINATED | 已终止 | 1610612736(3 << COUNT_BITS) | 1100000000000000000000000000000 |
和ctl相关的几个值基本已经搞清楚了,那么再来分析ctl的值与他的变化逻辑。
状态 | 值 | 计算逻辑 | 说明 |
初始化 | -536870912 | RUNNING | 0 | 通过或计算,通过RUNNING状态值和0来计算ctl的初始值 |
计算核心线程数 | 0 | ctl & COUNT_MASK | 当前ctl值和计数掩码进行与运算,可以得到当前的核心线程数。 |
是否运行中 | true | ctl < SHUTDOWN | SHUTDOWN固定为0,只要ctl小于0,就是在运行中。这里判断的是线程池的状态。 |
通过这里,大概了解了ctl的意思和计算逻辑,此时再次分析下这套方法逻辑。
2.3.2.2. 二次分析execute
- 获取当前的线程池状态值,通过该值计算当前的核心线程数。
-
- 如果当前核心线程数小于配置的核心线程数,则添加任务,并标记为核心线程。
- 如果任务添加成功,则返回。【结束】
- 如果添加任务失败,则获取当前最新的线程池状态值,通过计算获得当前线程池状态。
- 如果线程池在运行中,并成功将任务放入当前任务队列中
-
- 再次获取当前线程池状态、
- 如果此时当前线程池没有在运行,并且成功将该任务移除,则执行拒绝策略。【结束】
- 如果当前核心线程数等于0,则添加一个非核心线程数,创建一个空任务。【结束】
- 添加非核心线程来执行这个任务。
-
- 如果执行失败了,则执行拒绝策略。【结束】
解析逻辑
使用线程池执行任务时,如果当前核心线程池没有满,则使用核心线程池进行执行。
如果核心线程池满了,就往队列中放这个任务。
- 如果线程池状态不在运行中,就尝试移除任务,并执行拒绝策略。
- 如果核心线程数是0,就创建一个新的非核心线程空任务,来去执行这个任务。
如果队列中放失败了,就创建非核心线程来执行这个任务。
- 如果核心线程执行失败了,就执行拒绝策略
到此,execute执行逻辑基本就清晰了。
2.3.2.3. addWorker方法分析
了解了execute的基本运行原理,但实际的执行逻辑依然不清楚,到底是怎么运行的也不知道。
整个execute中,最核心的方法就是addWorker方法了,我们就再来分析一下addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这个方法中,有两个参数,一个 要执行的任务,一个 是否为核心的标识。
2.3.2.3.1. 逻辑梳理
- 一个复杂的判断
-
- 当前线程池的状态大于等于
SHUTDOWN
(停止/结束/整理中/已终止状态) 并且 满足另一个复杂条件
- 当前线程池的状态大于等于
-
-
- 当前线程池状态 大于等于 STOP (结束/整理中/已终止状态) 或者
- 当前传入的任务不是空 或者
- 待执行的任务队列是空
-
-
- 满足两个条件的话,就直接返回false。
说明,这里说,如果当前线程池状态非运行状态,并且已经没有要执行的任务了,当前任务就不再执行了,直接返回false,标识执行失败了。
下面又一个死循环
- 这里有一个计算逻辑,
(core ? corePoolSize : maximumPoolSize) & COUNT_MASK)
-
- 当前是否为核心线程。三目运算,如果是核心线程,则使用核心线程池数量,如果不是,则使用最大线程数量。
- 上一步计算出的数值和计数掩码进行与运算。这里的值,不管是几,和掩码做计算,都会得到原值。
- 如果计算出的当前正在工作的线程数量,大于或等于这个数量,就直接返回false。
-
- 这里就是用来判断,是否可以创建更多的线程来执行任务。
- 将当前正在执行的线程数量+1,如果自增成功,则结束当前的循环。【retry循环】
- 更新当前线程池状态值
- 如果当前线程池状态非运行状态,就重新进入这个循环。【retry循环】
接着就是addWorker的核心逻辑了
- 首先做一个包装,将任务,包装成一个
Woker
类。
这个worker类待会儿详细说。
- 从woker中获取一个线程
- 对当前核心流程进行加锁
- 加锁后,再次判断线程池的状态
- 判断当前是否为可执行状态
-
- 当前线程池为运行中。 或者
- 当前线程处于执行或者SHUTDOWN状态,同时当前要执行的任务为空。
- 将当前的worker添加到队列中,并标记工作线程已添加
- 判断当前的队列数量,如果是最大值,就记录当前的数量
- 这些都做完之后,进行解锁
说明,到此为止,加锁的逻辑就完成了,锁中间的逻辑,核心是
- 确保线程池的状态是可运行状态
- 记录当前线程数量的最大值
- 保存工作线程
- 判断在上一步加锁逻辑中,是否成功添加了工作线程。
-
- 如果添加了,就启动该工作线程,并标记成功执行该任务。
- 如果当前任务没有成功执行
-
- 尝试从工作线程池中,将该线程给移除掉
- 将ctl的数量减去
- 更新线程池状态
- 最后返回当前对于任务的执行结果
2.3.2.3.2. 逻辑概述
- 通过死循环+atomicInteger来保障当前线程池是可执行状态,并将线程池ctl数量加一
- 包装任务为Woker
- 经过加锁,然后对Woker做基础验证,没问题后将woker加入本地的缓存中,并记录woker的最大值。
- 如果对于woker的处理没问题,就让woker启动。
- 如果woker处理异常,就执行错误逻辑,将前面加入的woker移除掉,并将ctl加的1减去,同步更新线程池状态
- 最后将执行结果返回
至此addWorker中的逻辑已经基本清晰明了,知道了添加任务的工作原理。
但这里面还有一些疑惑点
- Woker是什么?
- 线程从哪来的?
- 线程池就是woker吗?
2.3.2.3.3. Woker分析
首先看Woker类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
/** Initial task to run. Possibly null. */
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// TODO: switch to AbstractQueuedLongSynchronizer and move
// completedTasks into the lock word.
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Woker类,实际就是一个Runnable类,即任务类。
这个类中有三个比较重要的属性
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
/** Initial task to run. Possibly null. */
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
- thread:线程,这也是线程池中的线程。实际上,线程池中的线程,就是这个东西。
- firstTask:直译第一个任务,就是创建woker时,执行的是什么任务。(既然是第一个任务,那就说明一个woker执行不止一个任务)
- completedTasks:执行完成的任务数量。这里也再次说明,一个woker不会只执行一个任务。
看下构造器
java.util.concurrent.ThreadPoolExecutor.Worker#Worker
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
构造器很简单,就是把创建woker时传入的任务,当做第一个任务来执行。
调用工厂,来创建一个新的线程。通过当前对象来创建线程。那么线程中,就有了任务。
这里比较细节的点,在于run方法。run方法并非是直接执行任务的run,而是进行了一次包装
public void run() {
runWorker(this);
}
这里就是整个线程池中,对于任务的执行的核心逻辑了。
至此对于整个woker就有了一个比较全面的了解了。
2.3.2.3.4. runWoker逻辑解析
对于woker有了深入了解,还需要深入解析下,runwoker到底做了什么动作。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
逻辑梳理
- 拿到第一个任务,将第一个任务指向一个临时变量。
- 将worker中的first任务指向null。(所以,woker执行后,在woker对象中,就不再保存woker任务了,就会被回收掉)
- 这里会有一个死循环,有条件的死循环
-
- 死循环的结束条件:任务为空,并且获取任务的方法没有拿到任务,则结束死循环。
- 获取任务的方法,拿不到任务的情况有这些
-
-
- 当前线程池不是运行状态
- 当前非核心线程,获取任务超时了,就会结束。
-
-
- 基于此,只要是核心线程,并且线程池没有问题,那么死循环就永远不会结束,他会一直阻塞在这里。
- 死循环中的逻辑是执行任务
-
- 执行任务之前,执行一段逻辑,这是一个钩子,如果有个性需求,继承线程池,重写beforeExecute方法,就可以在调用任务之前执行。
- 执行任务
- 执行任务后,调用后置处理器,一样也是钩子,afterExecute。这个方法会在执行结束,或者执行异常时调用。结束时调用,异常为null。异常时调用,异常为真实异常信息。
- 如果执行异常,则会抛出异常,终止死循环
- 执行完成后,一定会执行逻辑,将该woker执行完成的任务数量自增。
- 如果由于异常,中断了死循环,会执行退出逻辑,将woker移除,并修正状态和ctl数量。
2.3.2.3.5. 小结
到这,已经完成了线程池中execute的核心所有逻辑。
几乎已经知道了线程池中的核心执行原理。
2.3.3. 总结
到这里,线程池的核心工作原理其实已经讲清楚了,还有一些其他操作类的,由于文章篇幅太长了,我们放在下篇统一来讲。
感谢观看,万望三连!