为什么使用线程池
- 降低资源消耗 ,可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度,当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性 ,线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
阿里巴巴开发手册对线程池使用的建议
【推荐】ThreadPoolExecutor 设置线程存活时间(setKeepAliveTime),确保空闲时线程能被释放。
【强制】线程池不允许使用Executors 去创建,而是通过ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回的线程池对象的弊端如下:
FixedThreadPool 和SingleThreadPool :允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM;
CachedThreadPool 和ScheduledThreadPool :允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
ThreadPoolExecutor线程池状态
ThreadPoolExecutor.java
/**
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
(接收新任务并且会处理队列中的任务)
* SHUTDOWN: Don't accept new tasks, but process queued tasks
(不接受新任务,但是会处理队列中的任务)
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
(不接受新任务并且不会处理队列中的任务,中断正在执行中的任务)
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
(所有任务执行完成,workerCount为0,线程转到了状态TIDYING,会执行terminated()钩 子方法)
* TERMINATED: terminated() has completed
(terminated()方法已经执行完成)
* 状态之间的转换如下
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
(调用了shutdown()方法)
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
(调用了shutdownNow()方法)
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
(当队列和线程池为空)
* STOP -> TIDYING
* When pool is empty
(当线程池为空)
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
(当terminated()钩子方法执行完成)
*/
- 线程池的
shutdown()
方法,将线程池由 RUNNING(运行状态)转换为 SHUTDOWN状态 - 线程池的
shutdownNow()
方法,将线程池由RUNNING 或 SHUTDOWN 状态转换为 STOP 状态。
注:
SHUTDOWN
状态 和 STOP 状态 先会转变为TIDYING
状态,最终都会变为TERMINATED
状态定义
//ctl维护两个概念上的参数:workCount和runState。workCount表示有效的线程数量,runState表示线程池的运行状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 低29位表示线程数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits 线程池运行状态存储在32位的高3位
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
接下来我们来看看 ThreadPoolExecutor 是如何获取状态和线程数的。
runStateOf方法
//runStateOf()方法是用于获取线程池状态的方法,其中形参c一般是ctl变量,包含了状态和线程数
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
CAPACITY 取反后高三位置1,低29位置0。取反后的值与 ctl 进行 ‘与’ 操作。由于任何值 ‘与’ 1等于原值,‘与’ 0等于0。因此 ‘与’ 操作过后,ctl 的高3位保留原值,低29位置0。这样就将状态值从 ctl 中分离出来。
workerCountOf方法
private static int workerCountOf(int c) {
return c & CAPACITY;
}
workerCountOf 方法的分析思路与上述类似,就是把后29位从ctl中分离出来,获得活跃线程数
ctlOf方法
//ctlOf(rs, wc)通过状态值和线程数值计算出ctl值。rs是runState的缩写,wc是workerCount的缩写
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
rs的后29位为0,wc的前三位为0,两者通过 ‘或’ 操作计算出来的最终值同时保留了rs的前3位和wc的后29位,即 ctl 值。
ThreadPoolExecutor构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
:线程池中核心线程数的最大值maximumPoolSize
:线程池中能拥有最多线程数keepAliveTime
:表示空闲线程的存活时间TimeUnit unit
:表示keepAliveTime的单位
当一个线程空闲,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize
,那么这个线程就被回收。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize
的大小。
注:如果线程池设置了
allowCoreThreadTimeout
参数为true(默认false),那么当空闲线程超过keepaliveTime
后直接回收。(不会判断线程数是否大于corePoolSize
)即:最终线程数会变为0。
workQueue
:用于存放任务的阻塞队列
ThreadPoolExecutor
线程池推荐了三种等待队列,它们是:SynchronousQueue
、LinkedBlockingQueue
和 ArrayBlockingQueue
。
1)有界队列:
SynchronousQueue
:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于 阻塞状态,吞吐量通常要高于LinkedBlockingQueue
,静态工厂方法Executors.newCachedThreadPool
使用了这个队列。
ArrayBlockingQueue
:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
2)无界队列:
LinkedBlockingQueue
:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)
PriorityBlockingQueue
:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现compareTo()
方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue
不会保证优先级一样的元素的排序。
注意:
keepAliveTime
和maximumPoolSize
及BlockingQueue
的类型均有关系。如果BlockingQueue
是无界的,那么永远不会触发maximumPoolSize
,自然keepAliveTime
也就没有了意义。
threadFactory
:指定创建线程的工厂。(可以不指定)
如果不指定线程工厂时,ThreadPoolExecutor
会使用ThreadPoolExecutor.defaultThreadFactory
创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY
的优先级,以及名为 “pool-XXX-thread-
” 的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。
handler
:表示当workQueue
已满,且池中的线程数达到maximumPoolSize
时,线程池拒绝添加新任务时采取的策略。(可以不指定)
execute()方法执行流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果workerCount(工作线程数) < 核心线程数
if (workerCountOf(c) < corePoolSize) {
//执行addWorker方法。addWorker()方法会在下面进行详细分析,这里可以简单理解为添加工作线程处理任务。这里的true表示:在小于核心线程数时添加worker线程,即添加核心线程。
if (addWorker(command, true))
return;
//添加失败,重新获取ctl的值,防止在添加worker时状态改变
c = ctl.get();
}
//运行到这里表示核心线程数已满,因此下面addWorker中第二个参数为false。
//判断线程池是否是运行状态,如果是则尝试将任务添加至任务队列中
if (isRunning(c) && workQueue.offer(command)) {
//再次获取ctl的值,进行double-check
int recheck = ctl.get();
//如果线程池为非运行状态,则尝试从任务队列中移除任务
if (! isRunning(recheck) && remove(command))
//移除成功后执行拒绝策略
reject(command);
//如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务)
else if (workerCountOf(recheck) == 0)
//创建一个非核心线程,null表示新建线程,但不会启动它
addWorker(null, false);
}
//如果线程池是非运行状态,或者任务队列添加任务失败,再次尝试addWorker()方法
else if (!addWorker(command, false))
//失败,执行拒绝策略
reject(command);
}
double-check主要目的是判断刚加入workQueue阻塞队列的task是否能被执行
addWorker()方法执行流程
private boolean addWorker(Runnable firstTask, boolean core) {
//外层循环,负责判断线程池状态
retry:
for (;;) {
int c = ctl.get();
//线程池运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果 rs >= SHUTDOWN,线程池state已经至少是shutdown状态,此时不允许接收新任务
// 如果 rs == SHUTDOWN,firstTask为null,并且工作队列不为空,其中一个条件不满足则 无法继续执行addWorker()
// 这里设置firstTask == null是因为:线程池在SHUTDOWN状态下,不允许添加新任务,但允许执行工作队列中剩余的任务。
//总结:继续执行addWorker()需要满足要么线程池状态为RUNNING,要么线程池状态为SHUTDOWN,并且要满足队列中有任务以及firstTask==null,即只能添加新线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
内层循环,负责worker数量+1
for (;;) {
//获取活跃线程数
int wc = workerCountOf(c);
// 如果活跃线程数>=容量,不允许添加新任务
// 如果core为true,表示创建核心线程,如果线程数>=核心线程数,则不允许创建线程
// 如果core为false,表示创建空闲线程,如果线程数>=最大线程数,则不允许创建线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS操作,使得worker数量+1,成功则跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS操作worker数量+1失败,再次读取ctl
c = ctl.get(); // Re-read ctl
// 如果运行状态已经改变,则从重新执行外层死循环
// 如果运行状态未改变,继续执行内层死循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 用于记录worker线程的状态
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//new一个新的worker线程,每一个Worker内持有真正执行任务的线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁,防止多线程并发环境下, 向workers中添加数据以及获取workers数量这两个过程出现并发的问题。
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//如果线程池在运行状态或者线程池已经shutdown,且firstTask==null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//线程已经启动,抛非法线程状态异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers是一个HashSet<Worker>
workers.add(w);
//设置最大的池大小largestPoolSize,workerAdded设置为true
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果往HashSet中添加worker成功,启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果启动线程失败,则处理失败逻辑
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker()方法执行流程
addWorker()
方法执行完成时会调用t.start()
方法,这里的t是内部类Worker
中成员变量
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
//new Worker()对象时会新创建线程newThread(),参数传的则是this,即worker对象
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//在调用t.start()方法时,实际上会调用该worker对象中的run方法
public void run() {
//执行runWorker方法
runWorker(this);
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 拷贝提交的任务,并将Worker中的firstTask置为 null,便于下一次重新赋值
Runnable task = w.firstTask;
w.firstTask = null;
//new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断
w.unlock(); // allow interrupts
//是否'突然完成',如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的
boolean completedAbruptly = true;
try {
//如果当前有任务或者通过getTask()从任务队列中获取的任务不为null
while (task != null || (task = getTask()) != null) {
//上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的worker
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//确保只有在线程状态>STOP时,才会被设置中断标示,否则清除中断标示
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//当前线程调用interrupt()中断
wt.interrupt();
try {
// ThreadPoolExecutor的钩子函数,用户可以继承ThreadPoolExecutor,并重写beforeExecute()方法,从而在任务执行前完成用户定制的操作逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行提交任务的run()方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//同beforeExecute,只不过在任务执行完后执行
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//执行到这里表示任务队列中没了任务,或者线程池关闭了,此时需要将worker从缓存冲清除
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法执行流程
private Runnable getTask() {
//如果上一次调用poll方法超时,下一次进入循环时,会清除一个Worker
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//只有以下两种情况下会继续往下执行,否则返回null
//1.线程状态为RUNNING 2.线程状态为SHUTDOWN并且workQueue不为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//循环的CAS减少worker数量,直到成功
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//如果allowCoreThreadTimeOut为true,说明corePoolSize也需要超时检测
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果worker数量大于maximumPoolSize或者允许超时,则尝试worker数量-1,失败则continue继续
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果有超时检测则调用poll方法,否则调用take方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//没有返回说明超时,那么在下一次内层循环时会进入worker count减一的步骤
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
shutdown()方法执行流程
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//加全局锁
mainLock.lock();
try {
// 校验是否有关闭线程池的权限
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 关闭所有空闲线程
interruptIdleWorkers();
//钩子函数,用户可以继承ThreadPoolExecutor并实现自定义钩子
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试关闭线程池
tryTerminate();
}
private void advanceRunState(int targetState) {
//自旋
for (;;) {
int c = ctl.get();
// 判断当前线程池状态 >= SHUTDOWN是否成立,如果不成立的话,通过CAS进行修改
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历workers集合
for (Worker w : workers) {
Thread t = w.thread
// 判断worker线程是否已经被标记中断了,如果没有则尝试获取worker线程的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
//中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果onlyOne为true的话最多中断一个线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow()方法执行流程
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
//加全局锁
mainLock.lock();
try {
// 校验关闭线程池权限
checkShutdownAccess();
// 修改线程池状态为STOP
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 获取队列中所有正在等待处理的任务列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试关闭线程池
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历workers集合
for (Worker w : workers)
//只要Worker启动了就将其中断
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
参考文章
Java线程池实现原理及其在美团业务中的实践
ThreadPoolExecutor源码分析
Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理