线程池简介
线程的每次创建和销毁都会产生的一定的系统资源和时间的开销。正如几乎所有重资源都使用池化技术(数据库连接池、redis连接池等)进行管理,线程作为操作系统宝贵的资源,对它的使用需要进行控制管理,线程池就是使用了池化的技术对线程进行复用和管理。使用线程池对比单独启动线程有以下好处:
- 快速响应用户请求
线程的启动需要一定时间开销,而使用了池化的线程时,当任务到达,节省了这部分的时间。Tomcat就使用自行扩展的ThreadPoolExecutor处理http请求,减小响应时间。
- 减少资源开销
对操作系统来说,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换的时候还要执行内存换页,CPU 的缓存被清空,切换回来的时候还要重新从内存中读取信息,破坏了数据的局部性。
频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
对资源无限申请缺少抑制手段,将引发系统资源耗尽的风险。
系统无法合理管理内部的资源分布,将降低系统的稳定性。
- 提高资源管理性
使用线程池可对线程资源进行统一的命名、创建、销毁、监控和分配任务,还能通过信号量来对线程同时执行任务的数量进行限制。
jdk里最基本的线程池的实现类是ThreadPoolExecutor,它是Java管理线程池的一个重要类,本文正是对这个类的讲解。
线程池运行机制
线程池的总体运行机制如上图(网上找的一个图),内部管理了一批线程和一个缓冲阻塞队列,通过execute()
方法向其提交任务,之后线程池会调度分配任务,根据不同条件时会产生不同的策略:
- 直接新建一个线程运行该任务
- 将任务放进队列,线程池的线程从队列中取任务去执行
- 直接新建一个线程运行该任务直至达到最大线程数(和1中的条件有差异)
- 拒绝任务,执行拒绝策略
这里对概念和机制作简单介绍,下面将作详细讲解。
类图
ThreadPoolExecutor类实现了ExecutorService接口,下面是主要方法的简要说明:
- execute(Runnable command):提交一个任务给线程池执行。
- shutdown():优雅地关闭线程池,不再接受新的任务,等待已经提交的任务执行完成后关闭线程池。
- shutdownNow():立即关闭线程池,并尝试中断所有执行中的任务。
线程池状态
线程池的状态和数量存储在原子类实例ctl
里,高3位为线程池状态,低29位为线程池的工作线程数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
CAPACITY = (1 << COUNT_BITS) - 1; 这段代码得到值转为二进制是
00011111 11111111 11111111 11111111
~CAPACITY得到值转为二进制是
1110000 00000000 00000000 00000000
所以
private static int runStateOf(int c) { return c & ~CAPACITY; } 得到的是c的高3位的值
private static int workerCountOf(int c) { return c & CAPACITY; } 得到的是c的低29位的值
private static int ctlOf(int rs, int wc) { return rs | wc; } 是将rs的高3位和wc的低29位的值组合起来(rs的低29位为0,wc的高3位为0)
线程池包含了五种状态:
状态 | 备注 |
---|---|
RUNNING | 能接受新提交的任务,也能处理阻塞队列中的任务 |
SHUTDOWN | 不再接受新任务,但能继续处理阻塞队列。在调用shutdown()方法中转为此状态 |
STOP | 不再接受新任务,不再处理阻塞队列,中断正在运行的线程。在调用shutdownNow()方法中转为此状态 |
TIDYING | 所有任务已终止,worker数量为0 |
TERMINATED | 当调用terminated()后转为该状态 |
线程池的状态由内部维护,随着生命周期的转变而变化,其状态的流转图如下:
核心参数
线程池的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造函数的参数详细解释:
参数 | 类型 | 名称 | 备注 |
---|---|---|---|
corePoolSize | int | 核心线程数 | 除非设置了allowCoreThreadTimeOut,不然即使空闲也不会回收 |
maximumPoolSize | int | 最大线程数 | 线程池允许的最大线程数量 |
keepAliveTime | long | 线程存活的时间 | 当完成当次任务后线程存活的时间 |
workQueue | BlockingQueue | 阻塞队列 | 可自行选择实现类 |
threadFactory | ThreadFactory | 线程工厂 | 可设置线程属性 |
handler | RejectedExecutionHandler | 拒绝策略 | 当线程容量溢出时执行的策略 |
任务调度流程
线程池提交任务的入口是execute()
方法,在该方法内部决定了提交任务的分配和调度过程,如下图所示:
主要调度逻辑:
- 当前线程数小于corePoolSize时,每次创建一个新线程执行任务
- 当前线程数大于corePoolSize并且workQueue未满,把任务放进workQueue
- 当前线程数处于corePoolSize和maximumPoolSize之间并且workQueue已满,创建新线程执行任务
- 当前线程数大于等于maximumPoolSize,执行拒绝策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //当前线程数小于corePoolSize时
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//当前线程数大于corePoolSize并且workQueue未满,把任务放进workQueue
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//当前线程数处于corePoolSize和maximumPoolSize之间并且workQueue已满,创建新线程执行任务
else if (!addWorker(command, false))
reject(command); //当前线程数大于等于maximumPoolSize,执行拒绝策略
}
Worker类
Worker是线程池的核心内部类,存储着线程,实现了Runnable
接口,通过继承同步器AbstractQueuedSynchronizer
简单的实现了锁,这个锁用于防止在关闭线程池时正在运行任务的线程被interrupt()
方法中断。
/**
* ThreadPoolExecutor的内部类Worker
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread; //线程池的线程
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//新建线程
this.thread = getThreadFactory().newThread(this);
}
/** 任务执行的核心方法 */
public void run() {
runWorker(this);
}
//省略代码……
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
线程池通过addWorker(Runnable firstTask, boolean core)
方法新建Worker实例,在其构造函数会新建一个线程,创建成功后启动这个线程。线程池对线程的管理就是通过对Worker
的管理实现的。
//ThreadPoolExecutor通过内部变量workers来管理线程
private final HashSet<Worker> workers = new HashSet<Worker>();
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
try {
//省略代码……
w = new Worker(firstTask); //新建worker实例
final Thread t = w.thread;
if (t != null) {
try {
//省略代码……
workers.add(w);
}
if (workerAdded) {
t.start(); //启动worker线程
}
}
//省略代码……
}
return workerStarted;
}
}
线程运行的核心方法
worker线程启动后执行的run()
就是runWorker(Worker w)
,是线程执行任务的核心方法,内部是一个循环,运行的逻辑如下:
- 当是新建的线程时, 会运行Worker实例初始化时的firstTask任务。
- 当firstTask任务完成时,调用
getTask()
从阻塞队列方法取得下一个任务。 - 当调用
getTask()
方法返回null时会退出循环,执行processWorkerExit()
方法退出worker,关闭线程。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/**
* 循环从队列里取任务,当task=getTask()方法返回null时会退出循环
*/
while (task != null || (task = getTask()) != null) {
//加锁防止shutdown方法会interrupt正在运行的任务
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
/** 执行提交的任务 */
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//若上面的主循环结束,回收线程
processWorkerExit(w, completedAbruptly);
}
}
拒绝策略
使用了有界等待队列的线程池对任务的总容量作了限制,当线程池的任务总容量达到了maximumPoolSize和等待队列容量之和,线程池不会再接收任务,而是执行拒绝策略。java原生有四种拒绝策略,另外可以实现java.util.concurrent.RejectedExecutionHandler
接口进行扩展。
类 | 效果 |
---|---|
AbortPolicy | 抛出异常(默认的拒绝策略) |
CallerRunsPolicy | 在当前线程运行任务 |
DiscardOldestPolicy | 丢弃队列最旧的任务 |
DiscardPolicy | 丢弃当前的任务 |
任务在线程池的执行流程
线程池内部的主要组件和实现在上面已经进了介绍,通过下面的时序图展示了一个任务在线程池中执行的完整流程:
线程池的关闭
线程池的核心线程是通过shutdown()
和shutdownNow()
方法进行关闭的,这两个方法分别会把线程池的状态设置为SHUTDOWN
和STOP
,这时候线程池不会再接受新的任务,通过使用worker的tryLock()方法尝试获得锁能否成功判断是否空闲,能取得锁就是空闲的线程,然后去中断空闲的线程,这样就保证不会中断正在运行的任务的线程。
shutdown()
和shutdownNow()
两个方法的区别是shutdownNow()会把等待队列清空。
以下是shutdown()方法的代码:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //线程池状态设置为SHUTDOWN
interruptIdleWorkers(); //中断空闲线程,回收资源
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//尝试获取worker的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt(); //对worker工作线程进行中断
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
线程池空闲worker线程的关闭回收
要搞明白空闲线程的关闭回收需要理解线程中断的一些知识,下面就来讲一下。
线程中断知识点回顾开始
——————— 知识点回顾开始 ———————
java有两种方法通知线程结束线程
1、调用线程的stop()方法(已废弃)
直接退出线程,因为太暴力会产生不可知的结果该方法已废弃
2、调用线程的中断方法
线程的中断方法是interrupt()
,首先要明确的一点是调用interrupt()
方法后线程并不是马上关闭、不是马上关闭、不是马上关闭!!
而是根据不同情况出现以下两种不同结果:
- (1)、当使用interrupt()方法去打断处于阻塞状态的线程时,会抛出InterruptedException异常,而不会更新打断标记,因此,虽然被打断,但是打断标记依然为false,使用Thread类的isInterrupted()方法可返回打断标记。
线程阻塞的情况有以下这些:
* @see java.lang.Object#wait()
* @see java.lang.Object#wait(long)
* @see java.lang.Object#wait(long, int)
* @see java.lang.Thread#sleep(long)
* @see java.lang.Thread#sleep(long)
* @see java.util.concurrent.locks.Condition.await
- (2)、当使用interrupt()方法去打断非阻塞线程时,被打断的线程会继续运行,但是该线程的打断标记会更新,更新为true,因此可以根据打断标记来作为判断条件使得线程停止。线程是否打断的方法为isInterrupted()
所以,调用线程的interrupt()方法并不会停止和关闭线程,程序自行根据打断标记或InterruptedException异常自行结束线程的运行
——————— 知识点回顾结束 ———————
下面重新回到线程池空闲worker线程的关闭回收的主题。
当线程池处于SHUTDOWN状态时,不再接受新的任务,意味着只要正在运行的任务和队列里的任务全部运行完毕,线程池里所有任务就完成了。
上面介绍了线程池的关闭的方法,接下来思考一个问题,既然线程的interrupt方法是相当于一种通知机制,为什么通过调用空闲worker线程的interrupt()
方法能关闭回收线程?
其中空闲的worker
线程的锁已释放,正在从队列取数据处于阻塞状态等待任务入列。
当调用线程池的shutdown()
方法时,会遍历worker尝试获得锁,
取得空闲的worker
的锁并去interrupt线程,此时阻塞队列抛出InterruptedException
,在getTask()
方法内捕获异常后结束本次循环,运行下一次循环时由于线程池状态已变为大于等于SHUTDOWN,getTask()
返回null,runWorker()
的作为线程运行任务的核心循环体也结束了,最后调用processWorkerExit
回收worker实例,在processWorkerExit
方法结束后,线程的run()
方法就完成了运行,线程随之结束。
结合下面的时序图和具体代码的流转作深一步理解
空闲worker线程关闭退出的时序图
空闲worker线程被中断后关闭退出的逻辑代码
在代码标红步骤 3.若await
阻塞时被interrupt
抛出异常InterruptedException
的说明:
执行到notEmpty.await()
处的线程为空闲线程,此时被interrupt是因为在线程阻塞时线程池调用了shutdown()
或shutdownNow()
结尾
本文对java线程池的分析就到这里,如有不正,欢迎指出。