进程和线程:
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配的基本单位,是操作系统结构的基础。
线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。
,lightweight processes和Java线程之间存在着1对1的关系。
Java线程:java线程既不是用户线程,也不是内核线程。用户线程阻塞会导致整个进程阻塞,但是Java线程不是这样,你可以做个实验。内核中没有Java线程实现的相关代码,因此Java线程也不是纯粹的内核线程。Java线程更像是LWP和用户线程之间的结合。Java代码中通过start方法调用native 的 start0方法创建了线程。
线程之间共享了区域,因此创建和销毁的消耗都会比进程小,多线程带来了更多的执行流,使得可以一个程序执行多个方法,并且能够利用上计算机多个核的特点,让每个线程运行在不同的核心上面。共享了区域同时带来了数据隔离的问题。两个线程会访问到同一个变量,内存等。例如A线程给变量a赋值1,B线程给变量a赋值2.C线程访问变量a不能够确定这个值是多少。
为了控制多个线程访问同一个变量产生脏数据,使用锁进行控制是一个很好的策略。
Java线程
创建线程:
既然线程有这么多优势,那么Java进行创建线程很简单就是通过Thread类的实例方法start()进行创建。线程的任务通过Thread()的构造函数进行执行。执行的逻辑通过一个Runable接口的实例对象进行传递。new Thread主要是设置线程的一些参数信息,调用start再调用native的start0方法才进行了创建线程,并且这个native方法会调用thread线程的run方法。
Thread thread = new Thread(new Runnable(){
@Override
public void run() {
System.out.println("new Thread");
}
});
thread.start();
这样虽然创建了线程,但是功能还不够完善,比如有的线程会有返回值,因此还需要有一种能接受返回值的创建线程方式。
创建线程的几种方式:
- 通过继承Thread类
- 通过实现Runnable接口
- 通过实现Callnable接口然后通过FutureTask调用
- 通过线程池的方式
- …
具体的实现方式有很多,但本质都是相同的,本质都是通过Runable的run方法进行了执行代码。感兴趣可以看看后面的代码片段,通过了FutureTask进行解释,重写了Runnable接口的run方法进行实现。
public static void main(String[] args) throws Exception {
FutureTask<String> stringFutureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "book";
}
});
Thread thread = new Thread(stringFutureTask);
thread.start();
stringFutureTask.get();
}
前面讲了,都是通过native的start0方法然后进行调用run方法,因此FutureTask重写了这个run方法。
public void run() {
...
// 这个就是传递的Callable接口
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用call方法获得返回值
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
...
}
Thread类的相关方法
- void start() 启动线程,执行run方法
- run() 线程被调用时候执行的操作
- String getName() 获取线程名称
- void setName(String name) 设置线程名称
- static Thread currentThread() 返回当前线程
- static void yield() 线程让步,让jvm重新分配
- join() 调用线程将阻塞,被调用线程被优先执行
- 低优先级的线程也能够执行
- static void sleep(long millis):休眠x毫秒,
- stop() 不推荐使用,强制线程生命结束
- boolean isAlive() :线程是否存活
线程生命周期
线程调用start后处于的是新建(NEW)状态,等待线程调度后转换为运行(RUNNABLE)状态,进入同步块会进入阻塞(BLOCKED)状态,调用了线程wait方法会进入阻塞(WAITING)状态,调用线程的wait(time会进入超时等待(TIMED_WAITING)状态,线程执行接入进入终止(TERMINATED)状态。详细流转图如下:
图片来源:挑错 |《Java 并发编程的艺术》中关于线程状态的三处错误
- NEW:尚未启动的线程状态。
- RUNNABLE:可运行线程状态。处于可运行状态的线程正在 Java 虚拟机中执行,但它可能正在等待操作系统的其他资源,如处理器。
- BLOCKED:被阻塞等待监视器锁的线程状态。处于阻塞状态的线程正在等待监视器锁以进入同步块/方法,或者在调用 Object#wait() 方法后重新进入同步块/方法。
- WAITING:等待状态的线程。线程由于调用了以下方法之一而处于等待状态:例如,调用了某个对象的 Object.wait() 的线程正在等待另一个线程调用该对象的 Object.notify() 或 Object.notifyAll()。调用了 Thread.join() 的线程正在等待指定的线程终止。
- TIMED_WAITING:有指定等待时间的等待状态的线程。线程由于调用了以下方法之一并指定了正数等待时间而处于超时等待状态:
- TERMINATED:已终止的线程状态。线程已完成执行。
线程上下文切换:
线程运行再CPU上,但是可能当前线程没有任务执行调用sleep(), wait() 主动让出CPU,因此这时候操作系统就会让其他线程执行,或者是当前线程执行的时间太长,就会进行线程调度。调度的结果就是当前线程会暂停执行,然后其他线程执行。这里就会把之前线程占用的资源切换给另一个线程占有执行,这就是上下文切换。不同于进程上下文切换,线程上下文切换花费时间更少,因此这也是线程的优势之一。
线程池:
https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/
一个功能运行时间太久了,一般通过什么方式进行优化?异步/多线程使用new Thread()方式会有什么问题,1.OOM 2.资源开销和耗时,3.不可管理性。
线程池带来的好处:
- 降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗。
- 提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
- 提高线程的可管理性:线程池可以统一管理、分配、调优和监控
JDK提供的:Executors
ForkJoinPool:又是一个
构造函数参数列表:
- corePoolSize: 核心线程池的大小,核心线程通过懒创建,如果核心线程池有空闲位置,这时新的任务就会被核心线程池新建一个线程执行,执行完毕后不会销毁线程,线程会进入缓存队列等待再次被运行。
- maximunPoolSize: 线程池能创建最大的线程数量。如果核心线程池和有界缓存队列都已经满了,新的任务进来就会创建新的线程来执行。但是数量不能超过maximunPoolSize,否则会采取拒绝接受任务策略,我们下面会具体分析。
- keepAliveTime: 非核心线程的回收周期,非核心线程能够空闲的最长时间,超过时间,线程终止。这个参数默认只有在线程数量超过核心线程池大小时才会起作用。只要线程数量不超过核心线程大小,就不会起作用(当然如果设置了allowCoreThreadTimeOut(true)线程池中的核心线程也受该参数的影响)。
- unit: 时间单位,和keepAliveTime配合使用,可选择各种时间单位天-纳秒。
- workQueue: 任务队列,用来存放等待被执行的任务,一般为阻塞队列(BlockingQueue)三种常用为:(可自定义阻塞队列)。
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
- SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。最大线程数是 Integer.MAX_VALUE
- DelayedWorkQueue:DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。
- threadFactory: 线程工厂,用来创建线程(包括核心线程和非核心线程),一般有三种选择策略(可自定义)。
- handler: 任务拒绝策略,也就是在核心线程满负载、任务队列已满、非核心线程满负载的条件下会触发拒绝策略。ThreadPoolExecutor中为我们提供了四种默认策略可选择(可自定义):
- ThreadPoolExecutor.AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy(不希望丢弃任务时使用):调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务,一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用
创建线程池的方法:
- 通过构造器的方式(推荐)
- 通过Executor框架的工具类Executors来创建(阿里禁用,容易OOM)
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
Executor数据结构:
线程池标识:线程池状态控制参数ctl
要了解线程池,我们首先要了解的线程池里面的状态控制的参数 ctl,这个线程池的状态控制参数是一个原子操作的 AtomicInteger,这个ctl包含两个参数 :
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//最大容量
// jdk 11 是这个参数
// private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl 下面三个运算获取线程状态。
private static int runStateOf(int c) { return c & ~CAPACITY; }//获取rs
private static int workerCountOf(int c) { return c & CAPACITY; } // 获取wc
private static int ctlOf(int rs, int wc) { return rs | wc; }// 获取ctl
- runState:当前线程池的状态
- workerCount:激活(工作)的线程数
ctl它的低29位用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是 536870911; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应如下:
- 111(-1): RUNNING:线程池初始化(创建出来之后)处于此状态,能够接收新任务,以及对已添加的任务进行处理。
- 000(0): SHUTDOWN:当调用shutdown()方法时改为此状态,在此状态时,不接收新任务,但能处理已添加的任务。
- 001(1): STOP:调用shutdownNow()方法时处于此状态,在此状态时,不接收新任务,不处理已添加的任务,并且会尝试中断正在处理的任务。
- 010(2): TIDYING:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。|| 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
- 011(3): TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。线程池彻底终止,就变成TERMINATED状态。
线程池状态变化:
- 单结果转换
方法名称 描述 异步版本 异步版本(带Executor)
thenApply 使用函数处理当前阶段的结果 thenApplyAsync thenApplyAsync(Function, Executor)
thenAccept 使用动作消费当前阶段的结果 thenAcceptAsync thenAcceptAsync(Consumer, Executor) - 组合结果转换
方法名称 描述 异步版本 异步版本(带Executor)
thenCombine 当两个阶段都正常完成时,使用两个结果作为参数执行给定函数 thenCombineAsync thenCombineAsync(CompletionStage, BiFunction, Executor)
thenAcceptBoth 当两个阶段都正常完成时,使用两个结果作为参数执行给定动作 thenAcceptBothAsync thenAcceptBothAsync(CompletionStage, BiConsumer, Executor) - 任意结果转换
方法名称 描述 异步版本 异步版本(带Executor)
applyToEither 当当前阶段或另一个给定阶段正常完成时,使用结果作为参数执行给定函数 applyToEitherAsync applyToEitherAsync(CompletionStage, Function, Executor)
acceptEither 当当前阶段或另一个给定阶段正常完成时,使用结果作为参数执行给定动作 acceptEitherAsync acceptEitherAsync(CompletionStage, Consumer, Executor) - 两者之后执行
方法名称 描述 异步版本 异步版本(带Executor)
runAfterBoth 当当前阶段和另一个给定阶段都正常完成时,执行给定动作 runAfterBothAsync runAfterBothAsync(CompletionStage, Runnable, Executor) - 任意一个完成时执行
方法名称 描述 异步版本 异步版本(带Executor)
runAfterEither 当当前阶段或另一个给定阶段正常完成时,执行给定动作 runAfterEitherAsync runAfterEitherAsync(CompletionStage, Runnable, Executor) - 组合结果映射
方法名称 描述 异步版本 异步版本(带Executor)
thenCompose 使用给定函数计算另一个CompletionStage的结果,并完成新阶段 thenComposeAsync thenComposeAsync(Function, Executor) - 异常处理
方法名称 描述 异步版本 异步版本(带Executor)
handle 当当前阶段正常完成或异常完成时,使用结果和异常作为参数执行给定函数 handleAsync handleAsync(BiFunction, Executor)
exceptionally 当当前阶段异常完成时,使用异常作为参数执行给定函数 exceptionallyAsync exceptionallyAsync(Function, Executor)
exceptionallyCompose 当当前阶段异常完成时,使用异常作为参数执行给定函数,并返回一个新的CompletionStage exceptionallyComposeAsync exceptionallyComposeAsync(Function, Executor) - 完成时执行
方法名称 描述 异步版本 异步版本(带Executor)
whenComplete 当当前阶段正常完成或异常完成时,执行给定动作 whenCompleteAsync whenCompleteAsync(BiConsumer, Executor) - 转换为CompletableFuture
方法名称 描述
toCompletableFuture 将CompletionStage转换为CompletableFuture
这个表格列出了CompletionStage接口的主要方法,并且区分了同步和异步版本,以及是否使用Executor进行异步执行的版本。
execute方法:
源码:
- 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
- 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
- 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
- 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,饱和策略会调用RejectedExecutionHandler.rejectedExecution()方法。
为什么需要二次检查线程池的运行状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API注释:
如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结(core线程数为0))或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程(core线程数为0,其他线程被回收了)的前提下新建一个工作线程。
public void execute(Runnable command) {
if (command == null)// 健壮性
throw new NullPointerException();
int c = ctl.get(); // 获取ctl
// 获取工作线程数<核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建 核心线程数
if (addWorker(command, true))
return;
// 加入核心线程失败(有并发),重新获取ctl。
c = ctl.get();
}
// 判断线程池状态,加入工作队列,
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查状态,
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command); // 拒绝
else if (workerCountOf(recheck) == 0) // 线程池中没有工作线程
// 创建线程处理阻塞队列中的线程。
addWorker(null, false);
}
// 创建非核心线程处理任务。
else if (!addWorker(command, false))
reject(command);//拒绝
}
addworker流程:
在 execute 方法中,多次调用 addWorker 方法。addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。内层for循环跳到外层for循环,通过break和标记。
// 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 注意这是一个死循环 - 最外层循环
for (int c = ctl.get();;) {
// 这个是十分复杂的条件,这里先拆分多个与(&&)条件:
// 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0)
// 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空
// 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 注意这也是一个死循环 - 二层循环
for (;;) {
// 这里每一轮循环都会重新获取工作线程数wc
// 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败
// 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 成功通过CAS更新工作线程数wc,则break到最外层的循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
c = ctl.get(); // Re-read ctl
// 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {
// 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread
// 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN
// 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker
// 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
// 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker
// 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把创建的工作线程实例添加到工作线程集合
workers.add(w);
int s = workers.size();
// 尝试更新历史峰值工作线程数,也就是线程池峰值容量
if (s > largestPoolSize)
largestPoolSize = s;
// 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start();
// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程集合移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 添加Worker失败
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 从工作线程集合移除之
if (w != null)
workers.remove(w);
// wc数量减1
decrementWorkerCount();
// 基于状态判断尝试终结线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
上面创建了线程,执行了start方法,线程是执行起来了,那么workQueue中的怎么执行呢?
通过上面我们发现worker类。Worker的构造函数里面的逻辑十分重要,通过ThreadFactory创建的Thread实例同时传入Worker实例,因为Worker本身实现了Runnable,所以可以作为任务提交到线程中执行。只要Worker持有的线程实例w调用Thread#start()方法就能在合适时机执行Worker#run();
worker:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
//
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
// 重写run方法。调用 runWorker。
runWorker(this);
}
}
这里是thread.start()执行run方法转到了runWorker方法。这里有while()自旋。从队列中取任务出来(getTask())执行.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 任务不为空,执行任务 为空task获取任务。阻塞着等待。
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || // 这里大于等于stop,悲剧
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前的操作。
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 开始执行。
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行之后的任务。
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
进入getTask()获取任务:
private Runnable getTask() {
// 记录上一次从队列中拉取的时候是否超时
boolean timedOut = false; // Did the last poll() time out?
// 注意这是死循环
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
// 第一个if:如果线程池状态至少为SHUTDOWN,也就是rs >= SHUTDOWN(0),则需要判断两种情况(或逻辑):
// 1. 线程池状态至少为STOP(1),也就是线程池正在停止,一般是调用了shutdownNow()方法
// 2. 任务队列为空
// 如果在线程池至少为SHUTDOWN状态并且满足上面两个条件之一,则工作线程数wc减去1,然后直接返回null
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 跑到这里说明线程池还处于RUNNING状态,重新获取一次工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed临时变量勇于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取
// 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务
// 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 第二个if:
// 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量
// 或者 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null
// 并且 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null,
// CAS把线程数减去1失败会进入下一轮循环做重试
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
// 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 这里很重要,只有非null时候才返回,null的情况下会进入下一轮循环
if (r != null)
return r;
// 跑到这里说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
请理空闲线程(非核心)的方法:
**processWorkerExit()😗*方法是为将要终结的Worker做一次清理和数据记录工作(因为processWorkerExit()方法也包裹在runWorker()方法finally代码块中,其实工作线程在执行完processWorkerExit()方法才算真正的终结)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 因为抛出用户异常导致线程终结,直接使工作线程数减1即可
// 如果没有任何异常抛出的情况下是通过getTask()返回null引导线程正常跳出runWorker()方法的while死循环从而正常终结,这种情况下,在getTask()中已经把线程数减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数
completedTaskCount += w.completedTasks;
// 工作线程集合中移除此将要终结的Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 见下一小节分析,用于根据当前线程池的状态判断是否需要进行线程池terminate处理
tryTerminate();
int c = ctl.get();
// 如果线程池的状态小于STOP,也就是处于RUNNING或者SHUTDOWN状态的前提下:
// 1.如果线程不是由于抛出用户异常终结,如果允许核心线程超时,则保持线程池中至少存在一个工作线程
// 2.如果线程由于抛出用户异常终结,或者当前工作线程数,那么直接添加一个新的非核心线程
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 如果允许核心线程超时,最小值为0,否则为corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果最小值为0,同时任务队列不空,则更新最小值为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 工作线程数大于等于最小值,直接返回不新增非核心线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
future相关:
Future接口用来表示异步计算结果。
FutureTask接口
FutureTask前面做过简单介绍,这里就跳过,关键就是那几个方法,点进去看一下就可以了。
public static void main(String[] args) throws Exception {
FutureTask<String> stringFutureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
return "book";
}
});
Thread thread = new Thread(stringFutureTask);
thread.start();
stringFutureTask.get();
}
FutureTask就是重写了run方法:
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
ForkJoinPool:
是一个线程池用于执行任务,提供的调度模式与其他线程池不同。
ForkJoinPool之前用于树形的任务执行,采用一种工作窃取(work stealing)机制,与其他ExecutorService不同的它能够查找池子中任务或其他线程创建的任务进行执行。没有任务的时候就会进行阻塞。ForkJoinPool里面的线程池线程数默认与核数相同。
当只有一个大任务的执行非常耗时,这时候适合使用ForkJoinPool进行执行,通过将任务进行拆分(分治思想)然后进行处理,ForkJoinPool线程池为了提高任务的并行度和吞吐量做了非常多而且复杂的设计实现,其中最著名的就是任务窃取机制。
组成:
①ForkJoinWorkerThread:任务的执行者,具体的线程实体
②ForkJoinTask:需要执行的任务实体
③ForkJoinPool:管理执行者线程的管理池
ForkJoinPool提交任务的方式也有三种,分别为:
- execute():可提交Runnbale类型的任务
- submit():可提交Callable类型的任务
- invoke():可提交ForkJoinTask类型的任务,但ForkJoinTask存在三个子类:
①RecursiveAction:无返回值型ForkJoinTask任务
②RecursiveTask:有返回值型ForkJoinTask任务
③CountedCompleter:任务执行完成后可以触发钩子回调函数的任务
执行原理
ForkJoinPool能够创建出多个线程执行任务,每个线程都会有一个WorkQueue用于存储任务,线程优先执行WorkQueue中的任务,任务进行fork操作就将新的任务提交给WorkQueue,并尝试创建一个新的线程。在没有操作定义的并行度级别parallelism的情况下能够创建新的线程。新创建的线程也有有一个WorkQueue存储当前线程的任务,线程没有任务执行的时候就会从其他线程中偷取任务执行执行,默认情况都是偷取队列头部的任务进行执行。没有任务执行,线程就会进行阻塞。任务进行join的时候就就会阻塞当前的任务,就会选择指定的任务进行执行。
CompletableFuture接口
作用:
用于实现异步调用和任务编排,支持在异步方法完成时触发的依赖函数和操作。实现了两个接口,Future和CompletionStage接口。
实现原理是通过,线程池进行执行具体的方法,然后CompletableFuture维护了具体方法执行的一些状态信息,然后通过后续接口进行判断是否进行执行后续操作。
public class CompletableFutureTest {
public static void main(String[] args) {
CompletableFuture.runAsync(() -> {
System.out.println("hello");
});
}
}
进入runAsync代码后可以看到是ForkJoinPool.commonPool()用于默认的线程池实现,并将方法封装为ForkJoinTask的一个对象,然后线程池进行执行ForkJoinTask方法,最后调用到我们的方法,过程中会记录很多的状态变量,用于后续判断方法是否完成等作用。线程池的实现也可以不通过ForkJoinPool.commonPool()进行实现,可以自定义,线程池主要起到一个执行器的作用。
CompletionStage接口提供了很多API,用于异步任务的编排。
常用方法:
列举了一些API。
1. 单结果转换
方法名称 | 描述 | 异步版本 | 异步版本(带Executor) |
---|---|---|---|
thenApply | 使用函数处理当前阶段的结果 | thenApplyAsync | thenApplyAsync(Function, Executor) |
thenAccept | 使用动作消费当前阶段的结果 | thenAcceptAsync | thenAcceptAsync(Consumer, Executor) |
2. 组合结果转换
方法名称 | 描述 | 异步版本 | 异步版本(带Executor) |
---|---|---|---|
thenCombine | 当两个阶段都正常完成时,使用两个结果作为参数执行给定函数 | thenCombineAsync | thenCombineAsync(CompletionStage, BiFunction, Executor) |
thenAcceptBoth | 当两个阶段都正常完成时,使用两个结果作为参数执行给定动作 | thenAcceptBothAsync | thenAcceptBothAsync(CompletionStage, BiConsumer, Executor) |
3. 任意结果转换
方法名称 | 描述 | 异步版本 | 异步版本(带Executor) |
---|---|---|---|
applyToEither | 当当前阶段或另一个给定阶段正常完成时,使用结果作为参数执行给定函数 | applyToEitherAsync | applyToEitherAsync(CompletionStage, Function, Executor) |
acceptEither | 当当前阶段或另一个给定阶段正常完成时,使用结果作为参数执行给定动作 | acceptEitherAsync | acceptEitherAsync(CompletionStage, Consumer, Executor) |
4. 两者之后执行
方法名称 | 描述 | 异步版本 | 异步版本(带Executor) |
---|---|---|---|
runAfterBoth | 当当前阶段和另一个给定阶段都正常完成时,执行给定动作 | runAfterBothAsync | runAfterBothAsync(CompletionStage, Runnable, Executor) |
5. 任意一个完成时执行
方法名称 | 描述 | 异步版本 | 异步版本(带Executor) |
---|---|---|---|
runAfterEither | 当当前阶段或另一个给定阶段正常完成时,执行给定动作 | runAfterEitherAsync | runAfterEitherAsync(CompletionStage, Runnable, Executor) |
6. 组合结果映射
方法名称 | 描述 | 异步版本 | 异步版本(带Executor) |
---|---|---|---|
thenCompose | 使用给定函数计算另一个CompletionStage的结果,并完成新阶段 | thenComposeAsync | thenComposeAsync(Function, Executor) |
7. 异常处理
方法名称 | 描述 | 异步版本 | 异步版本(带Executor) |
---|---|---|---|
handle | 当当前阶段正常完成或异常完成时,使用结果和异常作为参数执行给定函数 | handleAsync | handleAsync(BiFunction, Executor) |
exceptionally | 当当前阶段异常完成时,使用异常作为参数执行给定函数 | exceptionallyAsync | exceptionallyAsync(Function, Executor) |
exceptionallyCompose | 当当前阶段异常完成时,使用异常作为参数执行给定函数,并返回一个新的CompletionStage | exceptionallyComposeAsync | exceptionallyComposeAsync(Function, Executor) |
8. 完成时执行
方法名称 | 描述 | 异步版本 | 异步版本(带Executor) |
---|---|---|---|
whenComplete | 当当前阶段正常完成或异常完成时,执行给定动作 | whenCompleteAsync | whenCompleteAsync(BiConsumer, Executor) |
9. 转换为CompletableFuture
方法名称 | 描述 |
---|---|
toCompletableFuture | 将CompletionStage转换为CompletableFuture |
这个表格列出了CompletionStage接口的主要方法,并且区分了同步和异步版本,以及是否使用Executor进行异步执行的版本。