1. 计划线程池ScheduledThreadPoolExecutor简介
ScheduledThreadPoolExecutor
继承自线程池ThreadPoolExecutor
,并在其基础上增加了按时间调度执行任务的功能,如果对ThreadPoolExecutor
还不是很熟悉,可以阅读一下这篇文章:
Java线程池ThreadPoolExecutor运行机制和源码解析。
首先来翻译一下这个类的前面的单词scheduled: 预先安排的,按时刻表的;定期的。那么ScheduledThreadPoolExecutor可翻译为计划线程池、定时线程池。
大多数时候,线程池已经使用了池化技术很好的满足了线程的重复使用需要,为什么还要另外搞一个ScheduledThreadPoolExecutor呢?
普通线程池无法满足以下痛点:
- 无法设置任务在指定时间点执行。
- 无法周期性的执行任务。想要完成这点需要在自己实现,在任务里加一个while循环。
可能有人会问java里已经有了Timer类,能实现定时运行任务。但Timer类有个缺点就是内部只有一个线程,如果有多个任务,在同一时间只有一个任务在运行,无法做到同时运行,虽然可以使用多个Timer解决,但比较难以进行统一的管理。
2. 计划线程池ScheduledThreadPoolExecutor运行机制
ScheduledThreadPoolExecutor
在调用提交任务的方法后(比如scheduleAtFixedRate
和scheduleWithFixedDelay
)会将任务放入一个按执行时间升序排行的缓冲队列,工作线程会从任务缓冲队列取任务然后将它执行,在任务完成时会计算下次执行时间然后放回任务缓冲队列(这点是和ThreadPoolExecutor
的区别),从而实现周期性执行。
3. ScheduledThreadPoolExecutor类图
ScheduledThreadPoolExecutor
继承自线程池ThreadPoolExecutor
,是从ThreadPoolExecutor
扩展而来。
4. ScheduledThreadPoolExecutor构造函数
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
参数 | 类型 | 名称 | 备注 |
---|---|---|---|
corePoolSize | int | 核心线程数 | 除非设置了allowCoreThreadTimeOut,不然即使空闲也不会回收 |
threadFactory | ThreadFactory | 线程工厂 | 可设置线程属性 |
handler | RejectedExecutionHandler | 拒绝策略 | 当线程容量溢出时执行的策略 |
ScheduledThreadPoolExecutor
的构造函数是调用父类ThreadPoolExecutor
的构造函数,区别就是ThreadPoolExecutor
可以自行根据功能需要使用不同类型的缓冲队列,而ScheduledThreadPoolExecutor
的缓冲队列只能是DelayedWorkQueue
。
5. 提交计划任务
我们知道,ThreadPoolExecutor
提交任务的唯一入口就是execute()
方法,任务在execute()
方法里进行分配和调度。与之不同的是ScheduledThreadPoolExecutor
根据对任务处理需求不同,提交任务有几个方法。
/**
* 安排一个任务在延迟一段时间后执行
*
* @param command 任务
* @param delay 要延迟多少时间单位执行(从当前时间开始计时)
* @param unit delay参数的时间单位
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit);
/**
* 和上面方法的区别是入参任务一个是Runnable,一个Callable
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit);
/**
* 创建并执行一个周期性任务,该任务在指定的延迟时间后首次执行,
* 然后以给定的周期重复执行;即任务将在 initialDelay 后开始执行,
* 然后在 initialDelay+period,再然后是 initialDelay + 2 * period,以此类推。
* 如果任务的任何执行遇到异常,则将禁止后续的执行。
* 否则,任务只能通过取消或执行器的终止来终止。
* 如果任务的任何执行时间超过其周期,则后续执行可能会延迟开始,但不会并发执行。
*
* @param command 要执行的任务
* @param initialDelay 首次执行前的延迟时间
* @param period 多次执行的时间周期间隔
* @param unit 初始延迟和周期的时间单位
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 创建并执行一个周期性任务,该任务在指定的延迟时间后首次执行,
* 然后在本次执行的终止与下一次执行的开始之间以给定的延迟重复执行。
* 如果任务的任何执行遇到异常,则将禁止后续的执行。
* 否则,任务只能通过取消或执行器的终止来终止。
*
* @param command 要执行的任务
* @param initialDelay 首次执行的延迟时间
* @param delay 一个执行的终止与下一个执行的开始之间的延迟
* @param unit initialDelay 和 delay 参数的时间单位
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
提交计划的方法汇总至表格如下:
方法 | 参数 | 参数备注 | 效果 |
schedule | Runnable command | 要执行的任务 | 安排一个任务在延迟一段时间后执行 |
long delay | 要延迟多少时间单位执行(从当前时间开始计时) | ||
TimeUnit unit | delay参数的时间单位 | ||
schedule | Callable<V> callable | 要执行的callable任务 | 和上面方法的区别是入参任务一个是Runnable,一个Callable |
long delay | 要延迟多少时间单位执行(从当前时间开始计时) | ||
TimeUnit unit | delay参数的时间单位 | ||
scheduleAtFixedRate | Runnable command | 要执行的任务 | 创建并执行一个周期性任务,该任务在指定的延迟时间后首次执行;然后以给定的周期重复执行(有可能会连续不间隔的执行) |
long initialDelay | 首次执行前的延迟时间 | ||
long period | 多次执行的时间周期间隔 | ||
TimeUnit unit | 初始延迟和周期的时间单位 | ||
scheduleWithFixedDelay | Runnable command | 要执行的任务 | 创建并执行一个周期性任务,该任务在指定的延迟时间后首次执行;然后在本次执行的终止与下一次执行的开始之间 固定以给定的延迟 重复执行。 |
long initialDelay | 首次执行前的延迟时间 | ||
long period | 多次执行的时间周期间隔 | ||
TimeUnit unit | 初始延迟和周期的时间单位 |
scheduleAtFixedRate和scheduleWithFixedDelay的区别
- scheduleAtFixedRate
以本次执行的开始时间beginTime为基准,下一次的执行的时间为beginTime + period。如果执行任务的时间超过了period,会导致任务完成后无缝再执行一次。
- scheduleWithFixedDelay
以本次执行的结束时间endTime为基准,下一次的执行的时间为endTime + delay。这个方法无论该次执行使用多少时间,都会在任务执行完成后会等待delay的时间再执行。
6. 阻塞队列DelayedWorkQueue
DelayedWorkQueue提供了一种方便的方式来管理延迟执行的任务,并且能够在高并发情况下安全地进行操作,是实现定时任务调度功能的重要组成部分,以下是DelayedWorkQueue的一些特性:
- 延迟执行任务:DelayedWorkQueue用于存储需要延迟执行的任务,这些任务会在指定的延迟时间之后被执行。这使得程序可以安排任务在将来的某个时间点执行,例如定时任务。
- 基于优先级队列:DelayedWorkQueue通常是基于优先级队列(PriorityQueue)实现的,其中任务根据其延迟时间被排序。因此,队列中的任务按照延迟的时间进行排列,最先到期的任务将被最先执行。
- 数据结构:DelayedWorkQueue通常是一个有序队列,内部使用了二叉堆数据结构来实现。这样可以快速找到下一个要执行的任务,并且在添加新任务时保持队列的有序性。
- 并发性:DelayedWorkQueue是线程安全的,内部有一个可重入锁ReentrantLock,通过锁来保证并发安全,它需要被多个线程同时访问,以便添加新任务或者移除已经过期的任务。这涉及到对队列的操作的同步或者使用线程安全的数据结构。
- 使用场景:DelayedWorkQueue适用于需要按照一定延迟顺序执行任务的场景,比如定时任务、周期性任务等。它可以作为ScheduledThreadPoolExecutor的内部组件使用,ScheduledThreadPoolExecutor通过DelayedWorkQueue实现了定时任务的调度功能。
DelayedWorkQueue从数据结构来说是一个用数组实现的二叉堆,排序方式为从小到大,每个父节点都比子节点要小的堆也称为最小堆, 堆顶元素是最小的,因此堆顶存储的是执行时间缀最小的任务。
堆(Heap)是一种特殊的树形数据结构,通常实现为数组,其具有以下性质:
-
堆序性质:在堆中,对于任意节点 i,如果 j 是 i 的子节点,那么堆中的节点 i 的值必须满足某种特定的顺序关系(比如最小堆中,父节点的值小于或等于子节点的值;最大堆中,父节点的值大于或等于子节点的值)。
-
完全二叉树性质:堆通常被实现为完全二叉树,即除了最后一层外,每一层都被完全填满,且最后一层的节点都尽可能地靠左排列。这样的特性使得堆可以利用数组来表示,而不需要使用额外的指针。
由于
DelayedWorkQueue
是最小堆结构,下面文章着重讲解的是最小堆
最小堆介绍
堆可以分为两种常见的类型:
最小堆(Min Heap):在最小堆中,父节点的值小于或等于其子节点的值,因此堆顶元素是最小的。
最大堆(Max Heap):在最大堆中,父节点的值大于或等于其子节点的值,因此堆顶元素是最大的。
最小堆通常可以用数组来表示,下面是一个列子
最小堆二叉树
最小堆数组
对于最小堆数组,有以下的下标对应关系:
节点 | 效果 |
---|---|
当前节点 | arr[i] |
父节点 | arr[(i-1)/2] |
左子节点 | arr[(2*i) + 1] |
右子节点 | arr[(2*i )+ 2] |
DelayedWorkQueue的定义
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
private Thread leader = null;
private final Condition available = lock.newCondition();
}
类图:
DelayedWorkQueue实现了阻塞队列BlockingQueue,通过可重入锁ReentrantLock保证多线程并发安全,它的最小堆通过数组实现,这个数组初始容量为16,每次扩容增加一半。
核心方法:
- offer(Runnable x) 向队列添加元素
- take() 从队列取任务
- poll(long timeout, TimeUnit unit) 从队列取元素,若在指定时间内未能取到则返回null
offer 向队列增加元素
offer方法其实就是向最小堆添加元素的过程,将元素从最小堆最底部开始上浮,直到比它的parent节点的值要大。上浮完毕后如果该元素在堆顶,则该元素为最先要执行的任务,唤醒等待的一个线程。
offer方法的流程图
offer方法的代码如下:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow(); //队列已满的时候进行扩容
size = i + 1;
if (i == 0) { //队列为空的时候
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e); //元素从底部上浮
}
if (queue[0] == e) {
leader = null;
available.signal(); //唤醒一个等待的线程
}
} finally {
lock.unlock();
}
return true;
}
siftUp 堆排序中的上浮操作
在offer的过程中因为堆添加了新的元素,假设它为e,为了维持最小堆的数据结构,需要将元e素调整到它数值的排序位置上,步骤如下:
- 首先将元素e放到堆数据的尾部
- 将元素e与父节点比较,若小于父节点则与其交换
- 重复步骤2直至元素e大于父子点或元素e已到堆顶
/**
* 堆排序中的上浮操作
* 仅当持有锁的时候才会调用.
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
//在二叉小堆中,把元素从底向上比较排序
while (k > 0) {
int parent = (k - 1) >>> 1; //在二叉堆中元素的父节点位置
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
/**
* 上浮直到找到比它的parent节点的值要大
*/
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
take 从队列取元素
take()
方法是从堆顶取最小的任务,
- 如果队列为空,等待元素入列
- 取第一个堆顶最小的元素,计算该元素是否已经到执行时间
- 如果已到执行时间,返回元素
- 如果未到执行时间,
take()方法流程图:
源码:
/**
* 取堆顶元素
*/
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
siftDown 堆排序中的向下堆操作
由于堆顶元素由于在take()
和poll()
时已经出列而不再存在,堆的结构已经不完整,需调整堆结构。
- 把堆顶作为当前节点向下堆化
- 取当前节点的较小子节点child与队列最后元素(最后元素为入参的参数key,调用
siftDown()
时传入)比较 - 如果子节点child小于key元素,子节点child与父节点交换,当前节点变为child,然后回到步骤2直至子节点child大于key元素
- key元素放在child节点处
/**
* 堆排序中的下沉操作
* 仅当持有锁的时候才会调用。
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1; //相当于size/2
while (k < half) { //half之后都是叶子节点,不需要遍历小堆的叶子
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
//如果右节点小于左节点,取右节点作为比较和交换的节点
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
思考一个问题:为什么offer()使用的锁方法是lock()
,而take()方法使用的锁是lockInterruptibly()
?
首先来理清一下这两者的区别:
-
lock(): 这是最基本的获取锁的方法。如果调用 lock() 时锁已经被其他线程持有,则当前线程会一直阻塞,直到获取到锁为止。如果在阻塞的过程中当前线程被中断(通过 Thread.interrupt() 方法),它将继续等待锁,不会响应中断,直到获取到锁为止。
-
lockInterruptibly:这个方法也是用来获取锁的,但是它允许在等待锁的过程中响应中断。如果调用
lockInterruptibly()
时锁已经被其他线程持有,则当前线程会被阻塞,但是如果在等待过程中被中断,则会抛出InterruptedException
异常,此时可以根据实际需求进行相应的处理。
使用offer
方法时为了保证提交的任务入列,肯定是要一直阻塞直到获得锁。
使用take()
方法取数据时若线程处于竞争锁的时候(此时没有在执行任务,是空闲的工作线程),若调用线程池的shutdown()
方法,线程池中断空闲线程时也能抛出 InterruptedException
;但如果用的是lock()
方法,会导致取得锁后一直在available.await()
,除了使用kill -9强杀进程外永远无法回收线程。
leader线程用于
7. ScheduledFutureTask
计划线程池会把Runnable
或Callable
的类型的任务包装成ScheduledFutureTask
再放进任务队列,这个包装类是实现周期性重复运行任务的关键。
ScheduledFutureTask类图
定义
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
private long time; //设定的执行时间nanoTime
//0:不重复执行
//正值: 对应FixedRate
//负值: 对应FixedDelay
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
//省略代码……
}
run方法
要实现周期性重复运行任务的功能只需把任务重复放进等待队列, worker线程会从队列里面取任务来运行。
由以下代码可知计算任务线程池ScheduledThreadPoolExecutor
能实现定时重复运行任务的核心就是当次任务运行完毕时把任务重新加入等待队列。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//省略代码……
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run(); //如果不是定期任务,运行run方法后结束
else if (ScheduledFutureTask.super.runAndReset()) { //如果是定期任务,运行run方法,重设置状态
setNextRunTime();
//把任务重新加入等待队列
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task); //把任务重新加入等待队列
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
compareTo方法
compareTo()
方法比较两个任务的私有变量time
的大小,用于在队列中进行有序排序。
- 当前任务执行时间 先于(小于) 要比较的任务的执行时间,返回-1
- 当前任务执行时间 后于(大于) 要比较的任务的执行时间,返回1
从上面可以得出结论:
- a.compareTo(b) < 0:a小于b,即a的执行时间在前
- a.compareTo(b) > 0:a大于b,即a的执行时间在后
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1; //当前任务执行时间 先于(小于) 要比较的任务的执行时间
else if (diff > 0)
return 1; //当前任务执行时间 后于大于) 要比较的任务的执行时间
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
cancel方法
cancel方法中的参数mayInterruptIfRunning
表示是否中断正在运行的线程,
在计划线程池ScheduledThreadPoolExecutor
内部的使用中,mayInterruptIfRunning
都为false,少于中断线程这个操作。
cancel(false)
方法里,尝试将新任务的状态转为CANCELLED,转为CANCELLED成功后,如果removeOnCancel
为true则把任务从队列中移除,如果不是新任务(状态为NEW)则表示任务已经开始运行,无法取消,在FutureTask
类的run()
方法里如果状态为非NEW的时候会停止执行任务。
removeOnCancel
默认为false,也就是不会从队列出移除已取消的任务,但当已取消的任务在线程中运行时,由于状态为CANCELLED会马上返回,不会继续执行。
cancel方法的流程图如下:
源码:
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
//继承自FutureTask
public boolean cancel(boolean mayInterruptIfRunning) {
//尝试将状态为NEW的任务的转为CANCELLED状态
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//能执行到这里的都是将任务状态成功转为CANCELLED的,
//逐个通知等待结果的线程任务已经被取消
finishCompletion();
}
return true;
}
8. 任务在计划线程线执行过程
任务在提交后最终会在工作线程中执行,worker工作线程run()方法的调用栈比基础线程池的简单Runnable任务要嵌套了几层,理清后对整体流程有更好的理解。
worker工作线程启动后的调用栈:
调用栈执行顺序:
- Thread.run()
- ThreadPoolExecutor$Worker#run();
- ThreadPoolExecutor$Worker#runWorker()
- ScheduledThreadPoolExecutor$ScheduledFutureTask#run()
- FutureTask#runAndReset()
- FutureTask#call()
- Executors$RunnableAdapter#call()
- 提交的执行任务run()方法
可以看到工作线程的run()方法被包裹了几层,每个被包裹层增加了特定功能:
调用栈 | 备注 |
---|---|
Worker#run() | 工作线程 |
ScheduledFutureTask#run() | 实现 周期性重复 运行 |
FutureTask#runAndReset() | 重置FutureTask的结果状态以便不影响后续重复执行 |
FutureTask#call() | 实现了Callable |
Executors$RunnableAdapter#call() | 适配器,将Runnable转为Callable |
提交的任务run()方法 | 真正提交的任务 |
任务从提交开始在计划线程线执行过程的时序图如下:
任务从提交开始在计划线程线执行过程的简要代码如下:
public class ScheduledThreadPoolExecutor {
//工作线程类,初始化的时候会创建线程,线程的运行的Runnable为this(当前worker实例)
private final class Worker implements Runnable {
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
//创建线程, 线程的运行的Runnable为this(当前worker实例)
this.thread = getThreadFactory().newThread(this);
}
//线程运行的run方法
public void run() {
runWorker(this);
}
}
//1. 提交任务,由计划线程池进行任务调度
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
//省略代码……
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t); //调用2
return t;
}
//2. 把任务放入队列
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (//省略代码……)
//省略代码……
else
ensurePrestart(); //调用步骤3确保线程已预热
}
}
//3. 预热,确保工作线程已启动
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
//4. 增加并启动worker工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
try {
//省略代码……
w = new Worker(firstTask); //2.1 新建worker实例
final Thread t = w.thread;
if (t != null) {
try {
//省略代码……
workers.add(w); //2.2 放入集合用于管理
}
if (workerAdded) {
//2.3 启动worker线程,
// 线程启动后运行worker的run()方法,
// worker的run()会运行runWorker(this)方法,
// 所以线程启动后实际运行的是runWorker(this)方法;
t.start();
}
}
//省略代码……
}
return workerStarted;
}
//5. 线程运行循环体
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
//省略代码……
try {
//核心循环体: 执行第一个任务,完成后循环地从队列取任务执行
while (task != null || (task = getTask()) != null) {//5.1 通过getTask()取任务
//省略代码……
try {
try {
task.run(); //5.2 运行任务,任务的类型为ScheduledFutureTask
} catch (RuntimeException x) {
thrown = x; throw x;
}
//省略代码……
}
//省略代码……
}
}
}
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//6. 运行任务,计算任务下个周期运行的时间缀,任务重新放回队列
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { //6.1 runAndReset()
setNextRunTime(); //6.2 计算任务下个周期运行的时间缀
reExecutePeriodic(outerTask); //6.3 任务重新放回队列
}
}
//6.1 该方法继承自父类FutureTask, 重置状态的是任务是否已完成
protected boolean runAndReset() {
//省略代码……
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
//省略代码……
}
return ran && s == NEW;
}
}
//6.2 计算下个周期运行的时间缀
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
//6.3 任务重新放回队列, 线程池的线程会从队列取任执行(步骤5.1)
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
//省略代码……
}
}
}
9. 计划线程池ScheduledThreadPoolExecutor的关闭
计划线程池的关闭继承自线程池ThreadPoolExecutor的shutdown()
和shutdownNow()
,保留了这两个方法的全部功能。(如果对ThreadPoolExecutor还不是很熟悉,可以阅读一下这篇文章:
Java线程池ThreadPoolExecutor运行机制和源码解析。)
另外,ThreadPoolExecutor
给计划线程池ScheduledThreadPoolExecutor
预留的调用钩子onShutdown()
,用于根据executeExistingDelayedTasksAfterShutdown
和continueExistingPeriodicTasksAfterShutdown
这两个策略判断是否继续运行延迟和周期性任务。
和线程池不同的是计划线程池在
onShutdown()
根据条件对延迟任务和周期性任务进行取消操作。
executeExistingDelayedTasksAfterShutdown
:是否在shutdown后继续运行延迟任务continueExistingPeriodicTasksAfterShutdown
:是否在shutdown后继续运行周期性任务
onShutdown()源码:
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic) {
//如果保留延迟任务和保留周期性任务都为false, 清空队列
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
//满足以下条件的一个则把任务从队列移除并取消任务
//1.是周期性任务但设置了不保留周期性任务
//2.一次性延迟任务但设置了不保留延迟任务
//3.任务已取消
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}
}
结语
本文对java计划线程池ScheduledThreadPoolExecutor的分析就到这里。
如有不正,欢迎指出。