一、Java 线程池 实现原理与源码深度解析
架构
总揽线程池设计,其实可以发现都是符合顶层的接口设计,中间抽象类,最终是实际工作类
使用示例
public class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println("我是要执行的任务,正在处理中");
}
public static void main(String[] args) {
Runnable runnable = new MyRunnable();
Runnable runnable2 = new MyRunnable();
ExecutorService threadPool = Executors.newFixedThreadPool(1);
threadPool.execute(runnable);
threadPool.execute(runnable2);
try {
threadPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Executor接口
public interface Executor {
void execute(Runnable command);
}
只定义了一个接口方法,核心就是处理具体的工作任务。
ExecutorService
public interface ExecutorService extends Executor {
// 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
void shutdown();
// 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
// 它和前面的方法相比,加了一个单词“now”,区别在于它会去停止当前正在进行的任务
List<Runnable> shutdownNow();
// 线程池是否已关闭
boolean isShutdown();
// 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
// 这个方法必须在调用shutdown或shutdownNow方法之后调用才会返回true
boolean isTerminated();
// 等待所有任务完成,并设置超时时间
// 我们这么理解,实际应用中是,先调用 shutdown 或 shutdownNow,
// 然后再调这个方法等待所有的线程真正地完成,返回值意味着有没有超时
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交一个 Runnable 任务,第二个参数将会放到 Future 中,作为返回值,
// 因为 Runnable 的 run 方法本身并不返回任何东西
<T> Future<T> submit(Runnable task, T result);
// 提交一个 Runnable 任务
Future<?> submit(Runnable task);
......
}
AbstractExecutorService
其实java中框架的特点都是设计一个顶层接口,然后有基于顶层接口设计相关的抽象基础类,实现一些基础的功能。AbstractExecutorService最核心的方法就是newTaskFor() 将任务包装成一个FutureTask,FutureTask间接实现了Runnable类。但是最核心的方法,execute AbstractExecutorService是没有实现的。而是需要子类进行实现。
public abstract class AbstractExecutorService implements ExecutorService {
/**
* 将任务包装成FutureTask任务。
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
//1.将任务包装成RunableFuture对象,由于RunnableFuture是实现Runable类,所以execute的参数是一个可拓展的类型
RunnableFuture<Void> ftask = newTaskFor(task, null);
//2,交给具体的执行器进行实现
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
ThreadPoolExecutor
我们知道通过Executors.newXXX 可以创建固定线程池、可缓存线程池等,但是最终指向的都是ThreadPoolExecutor类
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//参数异常值处理,直接返回异常
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
//核心线程数
this.corePoolSize = corePoolSize;
//最大线程数
//默认使用核心线程,超过直接,将任务存储到任务队列中,如果任务队列中也满了,查看是否
//超过最大线程数,如果没有超过就new一个线程执行任务。
//异常情况,
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
//线程工厂,一般推荐自定义的
this.threadFactory = threadFactory;
//拒绝策略
this.handler = handler;
}
- corePoolSize
线程池中的核心线程数。
- maximumPoolSize
最大线程数,线程池允许创建的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue。
无界队列会一直存储任务,所以不会创建新的线程执行任务。
- workQueue
用来保存等待被执行的任务的阻塞队列. 在JDK中提供了如下阻塞队列:
(1) ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
(2) LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
(3) SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
(4) priorityBlockingQuene:具有优先级的无界阻塞队列;
- keepAliveTime
空闲线程的保活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了。注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,当然,也可以通过调用 allowCoreThreadTimeOut(true)
使核心线程数内的线程也可以被回收;默认情况下,该参数只在线程数大于corePoolSize
时才有用, 超过这个时间的空闲线程将被终止。
- unit
keepAliveTime的单位
- threadFactory
用于生成线程,一般我们可以用默认的就可以了。通常,我们可以通过它将我们的线程的名字设置得比较可读一些,如 Message-Thread-1, Message-Thread-2 类似这样。
- handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
拒绝策略
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这里 COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911
// 以我们现在计算机的实际情况,这个数量还是够用的
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 我们说了,线程池的状态存放在高 3 位中
// 运算结果为 111跟29个0:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 将整数 c 的低 29 位修改为 0,就得到了线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 将整数 c 的高 3 为修改为 0,就得到了线程池中的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
属性主要是通过描述线程池的状态,
- RUNNING:这个没什么好说的,这是最正常的状态:接受新的任务,处理等待队列中的任务
- SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
- STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
- TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
- TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个
Worker
将执行的任务封装成一个Woker,内部通过引用真正执行任务的线程和实际的任务。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
// 真正执行任务的线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 实际的任务,
Runnable firstTask;
/** Per-thread task counter */
// 记录此线程完成的任务数
volatile long completedTasks;
/**
* 将运行任务封装成worker
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//调用线程工作创建一个新的线程,创建的线程到时候用来执行任务
this.thread = getThreadFactory().newThread(this);
}
//任务实际执行的方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
execute
判断是否创建线程
public void execute(Runnable command) {
// 若任务为空,则抛 NPE,不能执行空任务
if (command == null) {
throw new NullPointerException();
}
// 表示线程数的状态和线程数的整数
int c = ctl.get();
// 若工作线程数小于核心线程数,则创建新的线程,并把当前任务 command 作为这个线程的第一个任务
//一个新创建的线程池,里面没有线程
if (workerCountOf(c) < corePoolSize) {
//如果当前线程小于核心线程数,则创建核心线程数
if (addWorker(command, true)) {
return;
}
//添加核心线程数失败了,重新获取。可能存在同时多个线程任务创建任务,A,B,C
c = ctl.get();
}
/**
* 至此,有以下两种情况:
* 1.当前工作线程数大于等于核心线程数
* 2.新建核心线程失败
* 此时会尝试将任务添加到阻塞队列 workQueue
*/
// 若线程池处于 RUNNING 状态,将任务添加到阻塞队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查线程池标记
int recheck = ctl.get();
// 如果线程池已不处于 RUNNING 状态,那么移除已入队的任务,并且执行拒绝策略
if (!isRunning(recheck) && remove(command)) {
// 任务添加到阻塞队列失败,执行拒绝策略
reject(command);
}
// 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
/**
* 至此,有以下两种情况:
* 1.线程池处于非运行状态,线程池不再接受新的线程
* 2.线程处于运行状态,但是阻塞队列已满,无法加入到阻塞队列
* 此时会尝试以最大线程数为限制创建新的工作线程
*/
else if (!addWorker(command, false)) {
// 任务进入线程池失败,执行拒绝策略
reject(command);
}
}
执行任务是通过threadPool.execute(runnable); 进行执行的,这里其实就是核心的流程。
addWorker
主要的作用就是将要执行的任务添加到set中
//第一个参数是要执行的任务,第二个参数是是否是核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//经过判断处理,给工作线程数+1
for (;;) {
int c = ctl.get(); //线程个数
int rs = runStateOf(c);//线程池运行状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
//线程池状态stop或者其他状态
return false;
}
for (;;) {
int wc = workerCountOf(c);
//1.目前线程个数超过总容量
//2.大于核心线程或者大于最大线程数 返回true
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用CAS 添加线程数
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//重新获取状态
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//将任务封装成一个Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加锁 获取线程池的全局锁,避免添加任务时,避免线程之间竞争,
//避免中断当前执行的任务。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将工作线程保存在集合中 使用hashSet保存
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
//记录最大值
largestPoolSize = s;
workerAdded = true; //添加工作线程成功
}
} finally {
mainLock.unlock();
}
// 如果成功添加了 Worker,就可以启动 Worker 了
if (workerAdded) {
t.start(); //启动工作线程
workerStarted = true; //启动工作线程成功
}
}
} finally {
if (! workerStarted) //启动失败
addWorkerFailed(w);
}
return workerStarted;
}
Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// 真正执行任务的线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 实际的任务,
Runnable firstTask;
/** Per-thread task counter */
// 记录此线程完成的任务数
volatile long completedTasks;
/**
* 将运行任务封装成worker
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//调用线程工作创建一个新的线程,创建的线程到时候用来执行任务
this.thread = getThreadFactory().newThread(this);
}
//任务实际执行的方法
//因为上层是通过firstTask将要执行的任务传递进来的,所以具体执行就是run
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//执行任务的地方 如果获取不到任务就阻塞
//getTask()方法
while (task != null || (task = getTask()) != null) {
w.lock(); //加锁
//判断当前线程是否大于stop 被拒
//
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
wt.interrupt();
}
try {
//执行任务前的操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
//这里其实就是执行Worker的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后的操作
afterExecute(task, thrown);
}
} finally {
//任务执行完毕,接着从队列中拉去任务执行
task = null;
//计算完成的任务
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//getTask返回了null,非核心的指定时间没有获取到任务超过设置的时候,woker结束
processWorkerExit(w, completedAbruptly);
}
}
getTask
//1,核心线程不会销毁,所以一直会等待获取任务
//2.非核心超过了设置的等待时间
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}