文章目录
- Pre
- 为什么要用线程池
- 线程池的优点
- (1)重复利用线程
- (2)控制线程的数量
- 线程池实现原理
- 线程池ThreadPoolExecutor
- 类关系
- 线程池的工作流程
- 任务队列
- 空闲线程的存活时间
- 参数ThreadFactory
- 拒绝策略
- 被拒绝后的任务如何再次执行
- 向线程池提交任务
- execute
- submit
- 关闭线程池
- shutdown
- shutdownNow
- isShutdown
- isTerminated
- ThreadPoolExecutor源码解析
- 线程池的5种状态
- 工作线程workers
- execute提交任务
- addWorker创建并执行工作线程
- Worker类
- runWorker方法
- getTask() 从任务队列中获取下一个任务
- processWorkerExit
- 关闭线程池
- shutdown
- shutdownNow
Pre
每日一博 - Review线程池
每日一博 - Review线程池_02
异步编程 - 02 显式使用线程和线程池实现异步编程
并发编程-23J.U.C组件拓展之阻塞队列BlockingQueue 和 线程池
Java-Java中的线程池原理分析及使用
J.U.C Review - 线程池原理/源码解析
为什么要用线程池
当我们需要执行一个任务时,可以直接使用new Thread创建一个线程来运行任务。
线程从创建到销毁大概经历了以下步骤:
- 1)创建Java线程实例,线程是一个对象实例,会在堆中分配内存。创建线程需要时间和内存。
- 2)JVM为线程创建其私有资源:虚拟机栈和程序计数器。
- 3)执行start方法启动Java线程,操作系统为Java线程创建对应的内核线程,线程处于就绪状态。内核线程属于操作系统资源,创建也需要时间和内存
- 4)线程被操作系统CPU调度器选中后,线程开始运行任务。
- 5)线程在运行过程中,会被CPU不断切换运行。
- 6)线程运行完毕,Java线程被垃圾回收器回收
从线程的执行流程来看,可以得知:
- 1)线程不仅是Java对象,更是操作系统资源,创建线程和销毁线程,都需要时间。频繁的创建、销毁线程,会很大程度上影响处理效率。例如:创建线程花费时间T1,执行任务花费时间T2,销毁线程花费时间T3。如果T1+T3>T2,就很不划算
- 2)Java线程的创建和运行需要占用内存空间,线程数量一大,会消耗很多内存。线程不仅需要在堆中开辟空间,还需要为每个线程分配虚拟机栈和程序计数器。根据JVM规范,一个线程默认最大栈大小是1M,这个栈空间需要从内存中分配的。
- 3)CPU切换上下文时需要时间,线程数量多时,CPU会频繁切换线程上下文,会影响系统性能。
单CPU上同时只能运行一个线程,CPU通过切换上下文运行线程,实现多线程的并发。
所以说,线程并不是越多越好,线程数量和系统性能是一种抛物线的关系,当线程数量达到某个数值的时候,性能反而会降低很多,因此对线程的管理,尤其是数量的控制能直接决定程序的性能。对线程的重复利用是非常有必要的。
为了解决这些问题,所以产生了线程池。线程池的主要目的就是为了控制线程的数量,重复利用线程,提高执行效率。
线程池的优点
线程池的优点具体如下:
(1)重复利用线程
- 1)可以复用线程,降低了创建和销毁的性能开销。当线程数量一大,线程的创建和销毁的开销是巨大的。使用线程池,每当有新任务要执行时,可以复用已有的工作线程,大大减少了不必要的开销,这些开销包括内存开销和时间开销。
- 2)提升任务的响应速度,当有新任务需要执行时不需创建线程可以立即执行。当有新任务需要执行时不必创建新线程,可以使用线程池中的工作线程立即执行。这样就减少了创建线程的时间消耗,减少了任务执行时间,提升了任务的响应速度。
(2)控制线程的数量
-
1)可以根据系统承受能力,通过合理的控制线程数,防止线程数过多导致服务崩溃。
内存:当线程数量一大,线程的创建和运行消耗内存是巨大的,甚至有可能超过服务器的承受范围,导致的内存溢出问题。根据系统承受能力,合理的控制线程数,就可以防止这种情况发生。
CPU:CPU切换线程也是需要时间的,当线程数量过多时,CPU会频繁切换线程上下文,这个时间消耗也是不容忽视的。可以通过控制线程池的最大线程数,避免大量的线程池争夺CPU资源而造成的性能消耗。
-
2)线程池可以对线程进行统一的管理,支持更多的功能. 比如,线程池可以根据任务执行情况,动态的调整线程池中的工作线程的数量。当任务比较少时自动回收线程,当线程不够用时则新建。使用线程池可以进行统一分配、调优和监控。
线程池实现原理
所谓线程池,通俗的理解即一个容器,里面存放着提前创建好的若干个线程,当有任务提交给线程池执行时,任务会被分配给容器中的某个线程来执行。
任务执行完毕后,这个线程不会被销毁而是重新等待分配任务。
线程池有一个任务队列,缓存着异步提交待处理的任务。当任务过多超过了任务队列的容量时,线程池会自动扩充新的线程到池子中,但是最大线程数量是有上限的。同时当任务比较少的时候,池子中的线程还能够自动回收和释放资源。
由此可知,线程池应该具备如下要素:
- 1)线程池管理器:用于创建并控制线程数量,包括创建线程和销毁线程;
- 2)工作线程:线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
- 3)任务队列:用于缓存提交的任务。线程池提供了一种缓冲机制,用于处理用户提交的,还没有来得及处理的任务,一般是有数量限制的。
- 4)任务拒绝策略:如果任务队列已满且线程数已达到上限,则需要有相应的拒绝策略来处理后续任务
线程池ThreadPoolExecutor
类关系
Java线程池中标准的线程池是ThreadPoolExecutor
。该线程池的接口是Executor
和ExecutorService
。
-
Executor
是最上层接口,只有一个方法execute,用于执行提交任务Runnable实例。 -
ExecutorService
继承自Executor
,定义了线程池的主要接口,拓展了任务执行的Callable方式,以及生命周期相关的方法,如关闭线程池shutdown。ExecutorService的生命周期有三种状态:运行、关闭、终止。其中生命周期关闭和终止相关的主要方法如下
ThreadPoolExecutor
继承了AbstractExecutorService
,AbstractExecutorService
实现了任务执行的Callable
方式。
ThreadPoolExecutor
线程池有4个构造函数,下面基于它最完整的构造函数来讲解下每个参数的作用,构造函数代码如下所示
参数说明如下:
- 1)corePoolSize:核心线程数。
- 2)maximumPoolSize:最大线程数。
- 3)workQueue:任务队列,缓存已经提交但尚未被执行的任务。
- 4)keepAliveTime:空闲线程的存活时间。
- 5)unit:keepAliveTime的单位。
- 6)threadFactory:线程工厂(用于指定如何创建一个线程)。
- 7)handler:拒绝策略(工作队列已满且线程池中线程已达上限时的处理策略)。
线程池的工作流程
线程池刚被创建时,线程池中的线程数默认为0。当向线程池提交一个任务时,线程池的工作流程如下
- 1)如果当前线程数<corePoolSize,则创建新的线程并执行该任务。 当有新任务需要执行时,如果当前线程数<corePoolSize,即使这时候有空闲线程,也会创建新线程执行任务。
- 2)如果当前线程数>=corePoolSize且任务队列未满,则将任务存入任务队列中。等待线池中有空闲线程时,就会执行任务队列中的任务。
- 3)如果任务队列已满,且当前线程数<maximumPoolSize,则新建线程执行该任务。
- 4)如果阻塞队列已满,且当前线程数=maximumPoolSize,则执行拒绝策略,通知任务调用者线程池不再接受任务了。
线程池刚被创建时,是不会创建线程的,当有新任务需要执行时,线程池才会创建线程。线程在完成任务以后,线程是不会立即销毁的,线程会先查看任务队列,如果任务队列有等待执行的任务,线程会继续执行队列中的任务。如果这时没有任务需要执行,线程会主动挂起自己,当有新任务需要执行时,线程会被唤醒开始执行任务。
这样当有大量任务需要执行时,既节省了创建线程的性能损耗,也可以反复重用同一线程,节约大量性能开销.
任务队列
其中参数workQueue,是一个阻塞队列BlockingQueue,用于缓存待执行的任务。可以选择以下几种阻塞队列
- 1)
ArrayBlockingQueue
:一个数组结构的有界阻塞队列,使用时需指定数量,先进先出的 - 2)
LinkedBlockingQueue
:一个链表结构的阻塞队列,可指定最大容量,如果不指定容量默认为Integer.MAX_VALUE,该队列为先进先出的。线程池工厂Executors中的newFixedThreadPool线程池就使用了这种队列。 - 3)
SynchronousQueue
:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作会一直处于阻塞状态。若在线程池中使用此队列,若有新任务时会直接新建一个线程来执行新任务。线程池工厂Executors中的newCachedThreadPool线程池就使用了这种队列。 - 4)
PriorityBlockingQueue
:一个具有优先级的无限阻塞队列,真正的无界队列,按照元素权重出队。优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。但是注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行
空闲线程的存活时间
空闲线程的存活时间的参数有两个:
- 参数keepAliveTime:空闲线程的存活时间。
- 参数unit:keepAliveTime的单位。这两个参数的单位一般使用秒或者毫秒就够了。在TimeUnit类中,可选的单位有
天(DAYS)、小时(HOURS)、分钟(MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)、微秒(MICROSECONDS ,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
。
这两个参数表示线程池的工作线程空闲后,其保持存活的时间。在默认情况下,只会回收非核心线程,核心线程是不回收的,但在设置了核心线程可回收后,核心线程空闲时间达到回收条件时也会被回收
allowCoreThreadTimeOut(true)
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<Runnable>(), // workQueue
Executors.defaultThreadFactory(), // threadFactory
new ThreadPoolExecutor.AbortPolicy() // handler
);
// 允许核心线程超时并终止
executor.allowCoreThreadTimeOut(true);
如果任务很多,并且每个任务执行的时间比较短,可以调长时间,提高线程的利用率 .
参数ThreadFactory
用于创建线程池中线程的工厂,线程池默认是使用的是Executors中的DefaultThreadFactory,线程格式为pool-{线程池id}-thread-{线程id}。
可以通过自定义线程工厂类给创建出来的线程设置更有意义的名字,方便出错时回溯,对故障定位。
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadFactory implements ThreadFactory {
private final String poolName;
private final AtomicInteger threadId = new AtomicInteger(1);
public CustomThreadFactory(String poolName) {
this.poolName = poolName;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, poolName + "-thread-" + threadId.getAndIncrement());
t.setDaemon(false); // 设置为非守护线程
t.setPriority(Thread.NORM_PRIORITY); // 设置线程优先级
return t;
}
}
如果觉得实现麻烦,还可以使用开源框架guava提供的ThreadFactoryBuilder,它可以给线程池里的线程设置有意义的名字
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class GuavaThreadFactoryExample {
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("my-pool-%d")
.setDaemon(true) // 设置为守护线程
.setPriority(Thread.MAX_PRIORITY) // 设置线程优先级
.build();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<Runnable>(), // workQueue
threadFactory, // threadFactory
new ThreadPoolExecutor.AbortPolicy() // handler
);
// 提交任务
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("任务执行中...");
}
});
}
}
拒绝策略
当线程池中任务队列和线程都满时,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。
在JDK中Java提供了以下4种拒绝策略。
AbortPolicy
:新任务直接被拒绝,并抛出异常RejectedExecutionException。DiscardPolicy
:新任务忽略不执行,丢弃。DiscardOldestPolicy
:抛弃任务队列中等待最久的任务,将新任务添加到等待队列中。CallerRunPolicy
:新任务使用调用者所在线程来执行任务。
当然,也可以根据应用场景需要,来实现RejectedExecutionHandler
接口自定义策略。比如记录日志或持久化存储不能处理的任务。可以使用两个方法向线程池提交任务,分别为execute和submit方法。
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志
System.err.println("Task " + r.toString() + " is rejected");
// 持久化存储任务
persistTask(r);
}
private void persistTask(Runnable r) {
// 实现任务持久化逻辑
System.out.println("Persisting task: " + r.toString());
}
}
被拒绝后的任务如何再次执行
- 自定义拒绝策略
首先,实现 RejectedExecutionHandler 接口,定义自定义的拒绝策略。
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志
System.err.println("Task " + r.toString() + " is rejected");
// 持久化存储任务
persistTask(r);
}
private void persistTask(Runnable r) {
// 实现任务持久化逻辑
System.out.println("Persisting task: " + r.toString());
// 假设任务被持久化到一个文件或数据库中
// 这里只是一个简单的示例,实际应用中需要实现具体的持久化逻辑
TaskStorage.storeTask(r);
}
}
- 任务存储类
定义一个任务存储类 TaskStorage,用于模拟任务的持久化和读取。
import java.util.ArrayList;
import java.util.List;
public class TaskStorage {
private static List<Runnable> storedTasks = new ArrayList<>();
public static void storeTask(Runnable task) {
storedTasks.add(task);
}
public static List<Runnable> getStoredTasks() {
return new ArrayList<>(storedTasks);
}
public static void clearTasks() {
storedTasks.clear();
}
}
- 重新提交任务
在适当的时候,从任务存储中读取任务并重新提交到线程池中。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize
10, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<Runnable>(10), // workQueue
Executors.defaultThreadFactory(), // threadFactory
new CustomRejectedExecutionHandler() // handler
);
// 提交一些任务
for (int i = 0; i < 20; i++) {
int taskId = i;
executor.execute(() -> {
System.out.println("Task " + taskId + " is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 等待一段时间,确保所有任务都处理完毕
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 从任务存储中读取任务并重新提交
List<Runnable> storedTasks = TaskStorage.getStoredTasks();
if (!storedTasks.isEmpty()) {
System.out.println("Re-submitting stored tasks...");
for (Runnable task : storedTasks) {
executor.execute(task);
}
}
// 关闭线程池
executor.shutdown();
}
}
向线程池提交任务
可以使用两个方法向线程池提交任务,分别为execute和submit方法。
execute
(1)execute
方法ThreadPoolExecutor中的方法execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute传入的任务是一个Runnable类的实例
submit
(2)submit方法submit相关的方法有三种,用于提交需要返回值的任务。任务提交后,线程池会立即返回一个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且可以通过它的get方法来获取任务的计算结果。执行get
方法会阻塞当前线程直到任务完成,而使用get(longtimeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后返回,这时候有可能任务没有执行完。
submit 方法:
- 提交需要返回值的任务:submit 方法用于提交需要返回值的任务。
submit(Runnable task)
:提交一个 Runnable 任务,返回 Future<?> 对象。submit(Runnable task, T result)
:提交一个 Runnable 任务,并指定任务完成后的结果,返回Future<T>
对象。submit(Callable<T> task)
:提交一个 Callable 任务,返回Future<T>
对象。
Future 对象:
-
通过
get()
方法获取结果:调用 get() 方法获取任务的计算结果。 -
阻塞当前线程直到任务完成:调用 get() 方法会阻塞当前线程,直到任务完成。
-
get(long timeout, TimeUnit unit)
方法:调用 get(long timeout, TimeUnit unit) 方法会阻塞当前线程一段时间后返回,如果在这段时间内任务没有完成,可能会抛出 TimeoutException。 -
阻塞当前线程一段时间后返回:调用
get(long timeout, TimeUnit unit)
方法会阻塞当前线程一段时间后返回,如果任务在这段时间内没有完成,可能会抛出TimeoutException。
关闭线程池
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池
shutdown
- 线程池不再接收新的任务:调用 shutdown() 方法后,线程池不再接收新的任务。
- 所有任务(包括正在执行的和等待队列中的)完成后,线程池关闭:线程池会等待所有任务(包括正在执行的和等待队列中的)完成后,才会关闭。
shutdownNow
- 线程池不再接收新的任务:调用 shutdownNow() 方法后,线程池不再接收新的任务。
- 等待队列中的任务将不会被执行:等待队列中的任务将不会被执行。
- 尝试中断正在运行的线程:线程池会尝试中断正在运行的线程。
- 无法响应中断的任务无法终止:如果任务无法响应中断,它们将无法终止。
- 只能等待任务执行完毕后,才能关闭线程池:对于无法响应中断的任务,只能等待它们执行完毕后,线程池才能关闭。
isShutdown
返回 true:只要调用了 shutdown() 或 shutdownNow() 方法,isShutdown 方法就会返回 true。
isTerminated
返回 true:当所有的任务都已关闭后,isTerminated 方法会返回 true。
使用建议
- 通常调用 shutdown 方法来关闭线程池:如果任务重要且需要执行完,建议使用 shutdown 方法。
- 如果任务不重要不一定要执行完,则可以调用 shutdownNow 方法:如果任务不重要或可以中断,可以使用 shutdownNow 方法。
ThreadPoolExecutor源码解析
线程池的5种状态
工作线程workers
execute提交任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 执行过程分为三个步骤:
*
* 1. 如果当前运行的线程数少于核心线程数,尝试启动一个新的线程,
* 并将给定的命令作为新线程的第一个任务。调用 addWorker 方法会原子地检查 runState 和 workerCount,
* 从而防止在不应该添加线程时错误地添加线程,通过返回 false 来避免这种情况。
*
* 2. 如果任务可以成功入队,我们仍然需要双重检查是否应该添加线程(因为自上次检查以来现有线程可能已经死亡)
* 或者线程池是否在进入此方法后已关闭。因此,我们需要重新检查状态,如果线程池已停止,则回滚入队操作;
* 如果没有线程在运行,则启动一个新线程。
*
* 3. 如果任务无法入队,我们尝试添加一个新线程。如果失败,我们知道线程池已关闭或已饱和,
* 因此拒绝该任务。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 当前工作线程数少于核心线程数
if (addWorker(command, true))
return; // 成功添加工作线程,直接返回
c = ctl.get(); // 重新获取控制状态
}
if (isRunning(c) && workQueue.offer(command)) {
// 线程池正在运行且任务成功入队
int recheck = ctl.get(); // 重新获取控制状态
if (!isRunning(recheck) && remove(command)) {
// 线程池已停止且任务已从队列中移除
reject(command); // 拒绝任务
} else if (workerCountOf(recheck) == 0) {
// 重新检查工作线程数是否为0
addWorker(null, false); // 添加一个空闲工作线程
}
} else if (!addWorker(command, false)) {
// 任务无法入队且无法添加新线程
reject(command); // 拒绝任务
}
}
addWorker创建并执行工作线程
/**
* 检查根据当前线程池状态和给定的边界(核心或最大),是否可以添加新的工作线程。如果可以,相应地调整工作线程的数量,
* 并且如果可能,创建并启动一个新的工作线程,运行 `firstTask` 作为其第一个任务。如果线程池已停止或符合关闭条件,
* 或者线程工厂无法创建线程时,此方法返回 false。如果线程创建失败(通常是由于线程工厂返回 null 或者在 `Thread.start()` 时抛出异常,如 `OutOfMemoryError`),
* 我们会干净地回滚。
*
* @param firstTask 新线程应首先运行的任务(如果没有则为 null)。当线程池中的线程少于核心线程数时(在这种情况下我们总是启动一个新线程),
* 或者当队列已满时(在这种情况下我们必须绕过队列),工作线程会在 `execute()` 方法中创建并带有初始的第一个任务。
* 初始空闲线程通常通过 `prestartCoreThread` 创建,或者用于替换其他即将死亡的工作线程。
*
* @param core 如果为 true,则使用核心线程数作为边界,否则使用最大线程数。这里使用布尔指示器而不是值,以确保在检查其他线程池状态后读取最新的值。
* @return 如果成功则返回 true
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 必要时检查队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重新读取 ctl
if (runStateOf(c) != rs)
continue retry;
// 否则,CAS 失败是因为工作线程数量发生变化;重试内部循环
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 持有锁时重新检查
// 如果线程工厂失败或在获取锁之前已关闭,则回退
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 预检查 t 是否可启动
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker类
addWorker方法只是构造了一个Worker,并且把firstTask封装到Worker中。
每个Worker都是一条线程,包含了一个firstTask初始化时要被首先执行的任务。最终执行任务的是runWorker()方法。
属性
-
Worker类继承了AQS,并实现了Runnable接口。
-
firstTask字段用来保存传入的任务;表示 Worker 被创建时需要运行的第一个任务。如果为 null,则表示 Worker 是空闲的
-
thread字段,是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。表示实际的工作线程。每个 Worker 实例都对应一个 Thread 实例
-
volatile long completedTasks: 记录该 Worker 已完成的任务数量。这是一个 volatile 变量,确保多线程环境下的可见性和有序性。
构造方法
在调用构造方法时,需要传入任务,这里通过getThreadFactory(). newThread(this)来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
runWorker方法
在 run 方法中,调用 runWorker(this) 方法来执行任务
public void run() {
runWorker(this);
}
/**
* 主工作线程循环。重复从队列中获取任务并执行它们,同时处理多个问题:
*
* 1. 我们可能从一个初始任务开始,在这种情况下,我们不需要获取第一个任务。否则,只要线程池在运行,我们就从 getTask 获取任务。
* 如果 getTask 返回 null,则由于线程池状态或配置参数的变化,工作线程退出。其他退出情况是由于外部代码抛出异常,此时 completedAbruptly 为 true,
* 通常会导致 processWorkerExit 替换这个线程。
*
* 2. 在运行任何任务之前,获取锁以防止任务执行期间其他线程中断,然后确保除非线程池正在停止,否则该线程不会被中断。
*
* 3. 每个任务运行之前都会调用 beforeExecute,可能会抛出异常,这种情况下我们会使线程终止(通过设置 completedAbruptly 为 true 退出循环)而不处理任务。
*
* 4. 假设 beforeExecute 正常完成,我们运行任务,并收集其抛出的所有异常以传递给 afterExecute。
* 我们分别处理 RuntimeException、Error(规范保证我们捕获这两种异常)和任意的 Throwable。
* 由于我们不能在 Runnable.run 中重新抛出 Throwable,我们在退出时将其包装在 Error 中(传递给线程的 UncaughtExceptionHandler)。
* 任何抛出的异常也会保守地导致线程终止。
*
* 5. 在 task.run 完成后,我们调用 afterExecute,它也可能抛出异常,这同样会导致线程终止。
* 根据 JLS 第 14.20 节,即使 task.run 抛出异常,这个异常也是有效的。
*
* 异常机制的总体效果是,afterExecute 和线程的 UncaughtExceptionHandler 可以获得关于用户代码遇到的问题的最准确信息。
*
* @param w 工作线程
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池正在停止,确保线程被中断;
// 否则,确保线程不被中断。这需要在第二种情况下重新检查,以处理清除中断时的 shutdownNow 竞争条件
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
初始化:
- 使用 retry 标签开始一个无限循环。
- 获取当前控制状态 c 和运行状态 rs。
检查线程池状态:
- 如果线程池状态大于等于 SHUTDOWN,并且不符合特定条件(即线程池处于 SHUTDOWN 状态且没有初始任务且队列不为空),则返回 false。
检查工作线程数量:
- 获取当前工作线程数量 wc。
- 如果工作线程数量达到容量上限或超过核心/最大线程数,则返回 false。
- 使用 compareAndIncrementWorkerCount 尝试增加工作线程数量,如果成功则跳出 retry 循环。
- 如果 compareAndIncrementWorkerCount 失败,重新读取控制状态 c 并检查运行状态 rs,如果运行状态发生变化则继续 retry 循环。
创建和启动工作线程:
- 尝试创建一个新的 Worker 对象 w。
- 获取 Worker 的线程 t。
- 如果线程 t 不为 null,则持有主锁 mainLock。
- 重新检查线程池状态,如果线程池未停止或处于 SHUTDOWN 状态且没有初始任务,则检查线程是否已启动。
- 将 Worker 添加到工作线程列表 workers,更新最大线程数 largestPoolSize,并设置 workerAdded 为 true。
- 释放主锁 mainLock。
- 如果 workerAdded 为 true,启动线程 t 并设置 workerStarted 为 true。
异常处理:
- 如果 workerStarted 为 false,调用 addWorkerFailed 方法进行回滚处理。
返回结果:
- 返回 workerStarted,表示是否成功启动了新的工作线程
getTask() 从任务队列中获取下一个任务
/**
* 根据当前配置设置执行阻塞或定时等待任务,或者在以下情况下返回 null:
* 1. 工作线程数超过 maximumPoolSize(由于调用了 setMaximumPoolSize)。
* 2. 线程池已停止。
* 3. 线程池已关闭且任务队列为空。
* 4. 此工作线程在等待任务时超时,并且超时的工作线程将被终止(即,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* 在定时等待前后,如果队列不为空,则此工作线程不是池中的最后一个线程。
*
* @return 任务,或者如果工作线程必须退出,则返回 null,在这种情况下 workerCount 将递减
*/
private Runnable getTask() {
boolean timedOut = false; // 上一次 poll() 是否超时?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果必要,检查队列是否为空。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 工作线程是否可以被裁剪?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit
/**
* 执行即将死亡的工作线程的清理和账簿记录。仅从工作线程调用。除非设置了 completedAbruptly,
* 否则假设 workerCount 已经调整以反映退出。此方法从工作线程集合中移除线程,
* 并可能终止线程池或替换工作线程,如果它因用户任务异常退出,或者运行中的工作线程少于
* corePoolSize,或者队列不为空但没有工作线程。
*
* @param w 即将退出的工作线程
* @param completedAbruptly 如果工作线程因用户异常而死亡
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果突然退出,则 workerCount 没有调整
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 增加已完成任务计数
workers.remove(w); // 从工作线程集合中移除该工作线程
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试终止线程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 如果线程池状态小于 STOP
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 计算最小工作线程数
if (min == 0 && !workQueue.isEmpty())
min = 1; // 如果最小工作线程数为 0 且队列不为空,则最小工作线程数设为 1
if (workerCountOf(c) >= min)
return; // 如果当前工作线程数大于等于最小工作线程数,则不需要替换
}
addWorker(null, false); // 添加新的工作线程
}
}
关闭线程池
shutdown
/**
* 开始有序关闭,在此过程中之前提交的任务将继续执行,但不再接受新任务。
* 如果已经关闭,则调用此方法不会产生额外效果。
*
* <p>此方法不会等待之前提交的任务完成执行。使用 {@link #awaitTermination awaitTermination} 来实现这一点。
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查关闭权限
advanceRunState(SHUTDOWN); // 将运行状态推进到 SHUTDOWN
interruptIdleWorkers(); // 中断空闲的工作线程
onShutdown(); // 为 ScheduledThreadPoolExecutor 提供的钩子方法
} finally {
mainLock.unlock(); // 释放主锁
}
tryTerminate(); // 尝试终止线程池
}
shutdownNow
/**
* 尝试停止所有正在执行的任务,停止处理等待中的任务,并返回一个包含等待执行任务的列表。
* 这些任务在方法返回时将从任务队列中被移除。
*
* <p>此方法不会等待正在执行的任务终止。使用 {@link #awaitTermination awaitTermination} 来实现这一点。
*
* <p>除了尽力尝试停止正在执行的任务外,没有其他保证。此实现通过 {@link Thread#interrupt} 取消任务,
* 因此任何不响应中断的任务可能永远不会终止。
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查关闭权限
advanceRunState(STOP); // 将运行状态推进到 STOP
interruptWorkers(); // 中断所有工作线程
tasks = drainQueue(); // 从任务队列中移除并返回等待的任务
} finally {
mainLock.unlock(); // 释放主锁
}
tryTerminate(); // 尝试终止线程池
return tasks; // 返回等待的任务列表
}