ThreadPoolExecutor的核心工作原理
当我们在Java中讨论并发和多线程时,ThreadPoolExecutor 是不可或缺的一个类。在 java.util.concurrent 包下,该类负责管理线程池内的线程,包括线程的创建、执行、管理以及线程池的监控等。理解 ThreadPoolExecutor 如何保证线程池正确运作是非常关键的。本章将带您深入源码,解析 ThreadPoolExecutor 的核心工作原理。
线程池状态和控制流程概述
首先,让我们来了解一下 ThreadPoolExecutor 是如何通过内部的状态控制来管理线程池的。ThreadPoolExecutor 类维护着一个 ctl 的原子整型变量,该变量高位存储线程池状态,低位存储线程数量。
ctl的位字段表示方法
在 ThreadPoolExecutor 源码中,您会看到如下定义:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 状态在高位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
此代码段定义了 ThreadPoolExecutor 状态。它用位操作确保状态存储在高位,而工作线程数量存储在低位。线程池的状态反映了线程池的生命周期,比如运行中、关闭中等。
状态转换及控制
线程池状态通过 ctl 变量控制,并提供了几个辅助函数来管理这些状态:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
这些函数帮助我们获取当前的状态以及当前正在运行的线程数量,还通过 compareAndSet 方法保证状态更新操作的原子性。
综上所述,ThreadPoolExecutor 类的 ctl 相关属性是线程池正确运行的关键部分,它不仅标示了线程池的状态,还协调着线程的工作流程。
ThreadPoolExecutor中的属性详解
在深度了解 ThreadPoolExecutor 之前,我们需要了解一些它的键属性。这些属性共同决定了线程池的行为和性能。
核心线程与最大线程数
corePoolSize 和 maximumPoolSize 这两个参数分别代表着线程池中核心线程数量和允许的最大线程数量。核心线程会一直存活,即使它们处于空闲状态,而非核心线程如果空闲时间超过了 keepAliveTime 就会被终止。
private volatile int corePoolSize;
private volatile int maximumPoolSize;
任务队列与SynchronousQueue特性
任务队列是存放提交未执行任务的缓冲队列。ThreadPoolExecutor 使用一个 BlockingQueue 来存放任务。
private final BlockingQueue<Runnable> workQueue;
其中 SynchronousQueue 是一种无存储空间的阻塞队列,每个 put 必须等待一个 take,反之亦然。
keepAliveTime与TimeUnit的作用
当线程数量超过核心线程数时,多余的空闲线程将在等待新任务的时候,最多等待 keepAliveTime 所指定的时间长度,超时后将会被终止。
private volatile long keepAliveTime;
这里的 TimeUnit 是一个枚举,用来指定 keepAliveTime 的时间单位。
private final TimeUnit unit;
ThreadFactory的自定义线程创建
用户可以通过实现 ThreadFactory 接口来自定义线程创建方式,比如设置线程名、分组、优先级等。
private volatile ThreadFactory threadFactory;
Handler的拒绝策略
当任务太多,无法及时处理时,我们必须提供一种策略来处理这些额外的任务。ThreadPoolExecutor 支持几种拒绝策略。
private volatile RejectedExecutionHandler handler;
ThreadPoolExecutor类中的重要操作方法解析
ThreadPoolExecutor 中定义了几种核心的方法,用于执行任务、管理线程池的生命周期等。在本章中,我们将通过源码来分析这些关键方法是如何实现的。
execute方法的任务提交流程
execute 方法是线程池的核心之一,用于提交任务供线程池执行。这个方法主要涉及到任务的接收和线程的调度处理。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在这段代码中,首先检查传入的任务对象是否为 null,然后根据当前线程数量和核心线程数比较,以便决定是否需要添加新的工作线程。如果当前运行状态是 RUNNING 并且任务可以成功添加到队列中,那么会再次检查线程池的状态并尝试添加一个非核心线程。如果这些步骤都失败了,那么将会调用拒绝策略来处理这个任务。
submit方法与FutureTask的协同
除了 execute 方法,ThreadPoolExecutor 同时提供了 submit 方法,这个方法允许提交带返回值的任务。其内部实际上是转化为 FutureTask 来处理。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
submit 方法首先将 Callable 任务转化为 FutureTask,再调用 execute 方法进行执行。返回的 Future 对象可以用来获取异步执行结果。
shutdown和shutdownNow方法的区别
shutdown 方法用于平缓的关闭线程池,它会等待正在执行的任务执行完毕,但不接收新任务。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
}
而 shutdownNow 方法则更加激进,它试图停止所有正在执行的任务,并返回等待执行的任务列表。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
return tasks;
}
ThreadPoolExecutor类中的重要内部类
ThreadPoolExecutor 的功能实现,除了依赖于它的核心属性和方法之外,还依赖于一些关键的内部类。这些内部类扮演着线程池运作中不可或缺的角色。
Worker类的作用和生命周期
ThreadPoolExecutor 使用 Worker 类来封装线程池中的每个工作线程。Worker 类是 ThreadPoolExecutor 的一个私有内部类,并且实现了 Runnable 接口。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// 省略其他方法
}
这段代码展示了 Worker 类的简化结构。Worker 类首先使用给定的 ThreadFactory 来创建新的线程,并在创建时可以选择传入一个首个任务。这个类还利用了AQS来控制线程的状态。每个工作线程的生命周期开始于它对 firstTask 的执行,然后它会持续从任务队列获取并执行任务,直到线程池终止或线程无任务可执行而被终止。
内部阻塞队列的实现与特性
线程池中使用阻塞队列来暂存待执行的任务。这是一种线程安全的队列实现,能够在高并发场景下正确地管理任务。
private final BlockingQueue<Runnable> workQueue;
ThreadPoolExecutor 支持不同类型的阻塞队列,如 ArrayBlockingQueue、LinkedBlockingQueue 和 PriorityBlockingQueue。每种队列都有其特殊的使用场景和性能特点。
LinkedBlockingQueue与ArrayBlockingQueue
LinkedBlockingQueue 和 ArrayBlockingQueue 是两种常见的阻塞队列实现。LinkedBlockingQueue 基于链表结构,理论上具有更高的处理效率,而 ArrayBlockingQueue 基于数组实现,固定容量并且在预分配内存方面性能更优。
DelayedWorkQueue的定时任务处理
如果 ThreadPoolExecutor 配置为 ScheduledThreadPoolExecutor,则使用 DelayedWorkQueue 来处理定时任务。这个队列能够让线程延时或周期性地执行任务。
ThreadPoolExecutor的扩展与自定义
ThreadPoolExecutor 提供了几个可以覆写的钩子方法,使得我们可以根据业务需求来扩展和自定义线程池的行为。同时,我们可通过自定义拒绝策略来处理无法立即执行的任务。本章将介绍如何利用这些功能来增强 ThreadPoolExecutor 的功能。
beforeExecute、afterExecute和terminated方法的扩展
ThreadPoolExecutor 允许我们在任务执行前后以及线程池终止时执行一些自定义逻辑。这是通过覆写 beforeExecute、afterExecute 和 terminated 方法来实现的。
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
这些方法默认是空实现,我们可以根据需要来重写这些方法,比如记录日志、计算任务执行时间、收集线程池统计信息等。
自定义拒绝策略与RejectedExecutionHandler的应用
我们可以实现 RejectedExecutionHandler 接口来创建自己的拒绝策略。例如,我们可以写一个记录日志并且尝试重新提交任务的拒绝策略。
public class LogAndRetryRejectedExecutionHandler implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
System.out.println("Task rejected, retrying...");
// Retry submission of the task
executor.execute(r);
} else {
System.err.println("Task rejected and cannot retry, executor has shut down.");
}
}
}
将上面的自定义拒绝策略应用到 ThreadPoolExecutor 中:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory,
new LogAndRetryRejectedExecutionHandler()
);
实战案例:定制一个监控线程池运行状态的ThreadPoolExecutor
现在让我们来看一个实战案例,其中我们将创建一个监控线程池的 ThreadPoolExecutor 并实施自定义扩展。
public class MonitoringThreadPoolExecutor extends ThreadPoolExecutor {
private final ConcurrentHashMap<String, Long> timing = new ConcurrentHashMap<>();
// 构造方法和其他必要的方法
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
timing.put(String.valueOf(r.hashCode()), System.nanoTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
Long startTime = timing.remove(String.valueOf(r.hashCode()));
long taskDuration = System.nanoTime() - startTime;
// Record or log the task duration
}
@Override
protected void terminated() {
super.terminated();
// Do some final logging or resource release
}
}