ThreadPoolExecutor 是开发中最常用的线程池,今天来简单学习一下它的用法以及内部构造。
1、线程池存在的意义?
一般在jvm上,用户线程和操作系统内核线程是1:1的关系,也就是说,每次创建、销毁线程的时候,都会进行内核调用,产生比较大的开销,有了线程池,就可以重复的利用线程资源,大幅度降低创建和回收的效率。
此外线程池还可以帮助我们维护线程ID,线程状态等信息。
2、ThreadPoolExecutor的作用?
开发者将任务提交给ThreadPoolExecutor,ThreadPoolExecutor负责分配工作线程(Worker)来执行任务,任务完成后,工作线程不进行回收,而是继续等待后续任务。
3、如何使用ThreadPoolExecutor?
直接new一个ThreadPoolExecutor,使用的时候,调用其excute()方法,传入一个任务即可。
ThreadPoolExecutor包含了7个入参:
- * corePoolSize : 核心线程数
- * maximumPoolSize :线程池中允许的最大线程数
- * keepAliveTime:线程数目大于核心线程数时,多余空闲线程在终止前等待新任务的最长时间
- * unit:保存时间的单位
- * workQueue :用于任务执行前保存任务的队列
- * threadFactory :执行器创建新线程的工厂 Executors.defaultThreadFactory() 默认工厂
- * handler :拒绝策略,由于达到线程边界和队列容量而阻止执行时使用的处理程序 new ThreadPoolExecutor.AbortPolicy()抛出异常
public class TestThreadPool {
public static void main(String[] args) {
/**
* 共有七个参数
* corePoolSize : 核心线程数
* maximumPoolSize :线程池中允许的最大线程数
* keepAliveTime:线程数目大于核心线程数时,多余空闲线程在终止前等待新任务的最长时间
* unit:保存时间的单位
* workQueue :用于任务执行前保存任务的队列
* threadFactory :执行器创建新线程的工厂 Executors.defaultThreadFactory() 默认工厂
* handler :由于达到线程边界和队列容量而阻止执行时使用的处理程序 new ThreadPoolExecutor.AbortPolicy()抛出异常
*/
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 1L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 7; i++) {
int finalI = i;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "===>办理业务" + finalI);
});
}
executorService.shutdown();
}
}
学会了如何使用,接下来看看ThreadPoolExecutor的源码吧
1、ThreadPoolExecutor定义的一些属性
除了构造器中需要传入的7个参数外,还要重点关注一下以下属性
- ctl:原子类int, ThreadPoolExecutor用这样一个32位的int类型维护了两个核心属性
- 线程池状态:高三位表示
- 工作线程的个数:低29位表示,因此一个ThreadPoolExecutor最多运行2^29个工作线程
- workers:存放工作线程的集合,是一个HashSet,因此在添加工作线程到workers里面的时候,要加锁保证线程安全
- mainLock:一个ReentrantLock可重入锁
/**
* ctl基于一个int类型,维护了线程池的两个核心属性:
* 1 线程池的状态: 高三位
* 2 工作线程的个数:低二十九位 --> 因此,线程池中最多允许 2^29 个工作线程个数
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 下面的参数,就是为了方便运算定义的常量
*/
// Integer.SIZE=32, 因此COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 左移29位减一,也就是2^29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 默认状态,可以正常接收执行处理任务
private static final int RUNNING = -1 << COUNT_BITS;
// 执行shutdown()方法可以变成SHUTDOWN状态 ,优雅的关闭线程池
// 不接受新任务,但是可以处理完已经提交的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 执行shutdownNow()方法变成stop状态
// 不接受新任务,也不会处理阻塞队列中未执行的任务,并设置正在执行的线程 中断标志位
private static final int STOP = 1 << COUNT_BITS;
// 所有任务执行完毕,池子中的工作线程数为0,等待执行 terminated()钩子方法
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()钩子方法,继承的时候可以实现一些自己的业务,执行完毕
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态之间的转换关系图如下:
2、内部类Worker
这里只介绍一个主要的内部类Worker,这个就是工作线程
- 继承AQS:说明内部存在同步的需求(为了使用AQS中的state状态?)
- 实现了Runnable接口:说明worker本身就是一个异步的任务调度者
当某个Worker获取到一个任务时,便持有锁,直到将任务在当前线程内执行完成后,再释放锁,然后在获取新的任务。
其实这里加锁其实分成了两个部分
- Worker初始化的时候,进行了一次加锁。state状态初始化为-1,然后在初始化成功后,去执行任务之前,将state置为0,
- 真正开始执行任务时,先进行一次加锁,执行完任务的时候,进行解锁。
// Worker构造器
Worker(Runnable firstTask) {
// AQS的方法,将state设置为-1
setState(-1); // inhibit interrupts until runWorker
// 设置任务,并使用 用户传入的线程工厂 创建线程
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
2.1:为什么要将state的状态设置为-1?
答:就是为了保证worker在“从初始化到开始执行任务”这个期间不接受中断信号,以保证当前的worker能够正常初始化完成。
详细解释:
1、首先,看一下 Worker 内部类中定义了一个interruptIfStarted方法:如果state为-1,就不允许接收中断信号
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
2、worker开始执行任务的时候,也就是调用run方法的时候,会把state重新设置为0,此时,就可以接收中断信号了,看代码。
2.2 线程池如何执行任务,如何拉取工作队列中的任务?
主要代码逻辑在runWorker方法中,工作线程worker开始执行任务的时候,会调用自己的run方法,而runWorker的入参,就是worker自身。
- 执行初始化的任务:worker对象初始化的时候,是携带了任务的,那么就优先执行自身携带的任务 (每次执行任务前先判断线程池状态)
- 执行从工作队列中拉取的任务:从工作队列中拉取任务是通过getTask方法,这个方法里面是一个死循环
- 首先还是判断线程池状态
- 如果当前是核心线程,那就执行take方法,这个方法就是一直等待从工作队列中获取任务,直到成功或者当前线程中断
- 如果不是核心线程,那就执行poll方法,在指定时间内从工作队列中拉取任务,超时就退出 --- 初始化线程池传递的keepAliveTime
- 如果当前没有任务,就要将工作线程关闭了,getTask方法里面通过CAS修改了线程池的工作线程数目。---- 一个线程的关闭就是在run方法结束之后,结合2中介绍,如果是核心线程,就会一直等待下去,非核心线程在超时之后就会执行完runWorker方法,然后关闭该工作线程。
// runWorker方法,传入的是一个Worker对象
final void runWorker(Worker w) {
// 获取当前线程,并取出第一个任务
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// Worker对象启动的时候,是携带了任务的,优先执行携带的任务
// 第一个循环结束,task = null, 然后就通过getTask(),从工作队列中拉取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 判断当前线程池状态是否是stop,如果是,就强制中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 从工作队列中拉取任务,是一个死循环
// 如果该线程是 核心线程,基于take方法从工作队列中拉取任务:死等下去,直到线程中断
// 如果非核心线程,基于poll方法,拉取指定时间的任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 还是先判断线程池状态,如果是stop 或者SHUTDOWN且工作队列中 没有任务了,就可以干掉当前任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// CAS工作线程个数减一
// 结束一个线程就是run方法结束。
// 外层是在while循环里面调用的run方法,这里取不到就不能继续进入循环, 上次循环的run方法执行完成,也就是说明当前工作线程结束了。
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 省略一堆判断
try {
// 这里的逻辑就是,如果是核心线程,就用take方法,如果不是核心线程,就用poll方法
// poll方法:拉取阻塞队列中的任务,指定等待时间
// take方法:死等下去,直到线程中断
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null) {
return r;
}
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3、线程池的主要执行方法excute():
3.1 excute方法,主要是当用户提交一个任务给线程池之后,线程池的处理逻辑:
- 如果当前线程数小于核心线程数,就新创建一个worker执行任务
- 因为并发的原因,可能会创建失败,这时就尝试将任务放到任务队列中
- 如果此时工作队列也满了,加入失败,就尝试创建非核心工作线程处理任务
- 如果此时线程个数也已经达到了允许的最大线程数的限制,那么就执行拒绝策略。
/**
* 任务交给线程池处理的时候,执行excute方法
* 1、首先判断当前工作线程的个数是否小于定义的核心线程个数,如果小于,就创建核心工作线程执行当前任务,返回
* 2、如果不小于核心工作线程个数,那么就尝试把任务放到工作队列中,如放入成功,就返回了
* 3、如果工作队列已经满了,就判断当前工作线程数是否超过允许的最大线程数,如果小于,就创建非核心工作线程,执行该任务,返回
* 4. 如果当前工作线程数目也等于允许的最大线程数了,就要执行拒绝策略了。
*
*
* @param command 要传递的任务
*/
public void execute(Runnable command) {
// 非空校验
if (command == null)
throw new NullPointerException();
// 拿到线程池的状态,以及工作线程的个数
int c = ctl.get();
// workerCountOf(c)就是从int类型的熟悉中,拿到工作线程的个数
// 如果工作线程的个数小于核心线程数,那就创建工作线程
if (workerCountOf(c) < corePoolSize) {
// 创建工作线程,创建成功就返回
if (addWorker(command, true))
return;
// 创建失败,可能是并发环境下有其他线程创建成功了,重新取一下ctl
c = ctl.get();
}
// 核心线程数已经达到了预期的数量,就先尝试把任务放到工作队列中
// 首先判断线程池状态是running,然后将任务放进队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 任务入队前后线程池的状态可能会改变,如果此时不是running状态,就要对新加入的任务从队列中删除,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 核心线程数可能设置为0,如果为0,就要创建非核心工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 任务放到队列中失败,尝试构建非核心线程去处理当前任务
// 创建成功,就结束了
else if (!addWorker(command, false))
// 非核心线程创建失败,就执行拒绝策略。
reject(command);
}
3.2 线程池中,创建worker的方法, addWorker:
addWorker方法就是用来创建线程,然后启动线程执行任务的。下面来看看他的具体逻辑:
- 首先是两层的for循环
- 外层for循环用于判断线程池的状态
- 如果不是running状态,那么就不能添加新任务了
- 此外经过一系列判断,确保在shutdown状态下,有工作线程能够处理阻塞队列中的任务(因此核心工作线程数目是可以为0的)
- 内层for循环用于判断线程池中工作线程的个数,
- 如果超过了要求的数目就返回
- 否则通过cas来使当前线程个数加1
- 外层for循环用于判断线程池的状态
- 然后就是创建worker,并执行任务
- 创建出来的worker要加入 HashSet类型的 worker中,因此要加锁处理
- 添加成功就执行该任务,并返回true,否则返回false
// 创建工作线程,包括核心线程和非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 外层for循环判断线程池的状态
// 内存for循环判断线程池的个数
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c); // rs就是高三位的线程池状态
// rs >= SHUTDOWN 说明线程池不是running状态,不能接收新任务 -- 添加失败,返回false
// SHUTDOWN可以正常处理工作队列的任务,后面的判断为了解决在SHUTDOWN状态下,没有工作线程处理工作队列中的任务的情况
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
return false;
}
for (; ; ) {
int wc = workerCountOf(c);
// 工作线程数目大于等于最大数目2^29,或者大于等于 核心线程数/最大线程数 返回false,添加失败--- 根据是否创建核心线程确定的
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
// cas修改工作线程数目,成功就跳出循环
if (compareAndIncrementWorkerCount(c)) {
break retry;
}
// CAS修改失败了,重新获取新的ctl
c = ctl.get();
// 重新判断线程池状态,如果和原来不一样,就重新外层循环,因为外层循环是判断线程池状态的
// 如果一样,说明线程池状态没变,继续内存循环就行了
if (runStateOf(c) != rs) {
continue retry;
}
}
}
// 定义两个标记,工作线程是否启动?工作线程是否创建?
boolean workerStarted = false;
boolean workerAdded = false;
// w就是要创建的工作线程
Worker w = null;
try {
/**
* 创建工作线程,并且把任务交给这个线程new里面调用了线程工厂
* Worker(Runnable firstTask) {
* setState(-1); // inhibit interrupts until runWorker
* this.firstTask = firstTask;
* this.thread = getThreadFactory().newThread(this);
* }
*/
w = new Worker(firstTask);
// 获取new worker时得到的线程对象
final Thread t = w.thread;
// 因为线程工厂是用户传进来的,所以thread可能为null,这里判断一下,增加代码的健壮性
if (t != null) {
// 这里用锁,对workers进行操作,将创建出来的工作线程加到workers 中
// 这个workers是一个hashSet
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
{
throw new IllegalThreadStateException();
}
// 在此处添加
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 工作线程添加成功了,就启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}
4 线程池的关闭流程
4.1 shutdown() 方法
这个方法被称为温柔的终止线程池,不接受新任务,但是会处理完正在运行的任务和阻塞队列中的任务。--- 这个方法会将线程池状态修改为 SHUTDOWN
怎么判断哪些任务是空闲的,哪些任务是正在运行的呢?其实还是根据worker中的state的值来判断的,在循环workers的过程中,会尝试通过CAS将当前worker的状态从0修改到1,只有空闲状态状态的工作线程的state为0,修改成功然后执行interrupt命令,工作状态则不会。
// 温柔的终止线程池,不接受新任务,但是会处理完正在运行的和阻塞队列中的任务
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 可重入锁加锁,因为要操作workers了,这是一个hashset
mainLock.lock();
try {
// 权限校验
checkShutdownAccess();
// 将线程池状态设置为shutdown
advanceRunState(SHUTDOWN);
// 这个方法里面中断所有的空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
// 中断所有的空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers) {
Thread t = w.thread;
// w.tryLock() --- 这里最终会调用CAS尝试把state从0修改到1
// 但是我们知道工作中的线程是加了锁的,state的值不为0,因此工作线程CAS失败,不会进入判断
// 只有空闲线程才会修改成功,然后执行interrupt方法
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// tryLock里面cas成功了,state要减一
w.unlock();
}
}
// onlyOne如果为true,最多interrupt一个worker
// 只有当终止流程已经开始,但线程池还有worker线程时,tryTerminate()方法会做调用onlyOne为true的调用
// 在这种情况下,最多有一个worker被中断,为了传播shutdown信号,以免所有的线程都在等待
// 为保证线程池最终能终止,这个操作总是中断一个空闲worker
if (onlyOne) {
break;
}
}
} finally {
mainLock.unlock();
}
}
4.2 shutdownNow()方法
shutdownNow方法就很暴力,直接中断所有线程,即便当前线程正在执行任务,也会执行interrupt方法;然后将工作队列中的任务返回。
// 中断所有线程
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限校验
checkShutdownAccess();
// 修改状态为stop
advanceRunState(STOP);
// 简单粗暴,终止所有线程
interruptWorkers();
// 将工作线程中的任务都放到一个list中返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 简单粗暴,除了初始化状态的线程,全部中断
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
4.3 tryTerminate() 方法
我们看到在shutdown和shutdownNow方法中都调用了tryTerminate() 方法(事实上,所有可能导致线程池产终止的地方都调用了tryTerminate() 方法),这个方法中,在工作线程不为0的时候,会去中断线程池中的一个空闲的线程,这样做的目的是:当满足终结线程池的条件但是工作线程数不为0, 这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。
想象这样一种场景:调用shutdown时,多个worker正在运行,且此时工作队列也不为空,当所有的任务都执行完毕时,核心线程会被 queue.take()阻塞,无法终止线程,但是因为调用了showdown,后续也无法接收新任务了,这是不合理的,因此需要在showdown之后还可以发出中断信号。事实上,所有可能导致线程池产终止的地方都调用了tryTerminate() 方法,如果线程池进入了终止流程,但是还有空闲线程,就中断一个空闲线程。
// 每个工作线程结束的时候都会调用tryTerminate方法
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
// 还是判断线程池的状态
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) {
return;
}
// 工作线程不为0,则会中断工作线程集合中的第一个空闲的线程
// ONLY_ONE为 true表示只中断一个线程,会在遍历的时候跳出循环
// 当满足终结线程池的条件但是工作线程数不为0,这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// CAS设置线程池状态为TIDYING,
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 最后更新线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
介绍一下Thread中的几个方法。
- stop()方法:立即停止当前线程,并释放当前线程的锁,已经弃用了。
- interrupt()方法:给当前线程设置一个中断标志位,当前任务还会继续执行,
- isInterrupted()方法:返回当前线程的中断标志位。
- interrupted()方法:这个是一个静态方法,也是查询当前线程的中断标志位,但是查询之后会把当前线程的中断标记位清除。