JUC线程

进程和线程:

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配的基本单位,是操作系统结构的基础。
线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。
,lightweight processes和Java线程之间存在着1对1的关系。

Java线程:java线程既不是用户线程,也不是内核线程。用户线程阻塞会导致整个进程阻塞,但是Java线程不是这样,你可以做个实验。内核中没有Java线程实现的相关代码,因此Java线程也不是纯粹的内核线程。Java线程更像是LWP和用户线程之间的结合。Java代码中通过start方法调用native 的 start0方法创建了线程。
线程之间共享了区域,因此创建和销毁的消耗都会比进程小,多线程带来了更多的执行流,使得可以一个程序执行多个方法,并且能够利用上计算机多个核的特点,让每个线程运行在不同的核心上面。共享了区域同时带来了数据隔离的问题。两个线程会访问到同一个变量,内存等。例如A线程给变量a赋值1,B线程给变量a赋值2.C线程访问变量a不能够确定这个值是多少。
为了控制多个线程访问同一个变量产生脏数据,使用锁进行控制是一个很好的策略。

Java线程

创建线程:

既然线程有这么多优势,那么Java进行创建线程很简单就是通过Thread类的实例方法start()进行创建。线程的任务通过Thread()的构造函数进行执行。执行的逻辑通过一个Runable接口的实例对象进行传递。new Thread主要是设置线程的一些参数信息,调用start再调用native的start0方法才进行了创建线程,并且这个native方法会调用thread线程的run方法。

        Thread thread = new Thread(new Runnable(){
            @Override
            public void run() {
                System.out.println("new Thread");
            }
        });
        thread.start();

这样虽然创建了线程,但是功能还不够完善,比如有的线程会有返回值,因此还需要有一种能接受返回值的创建线程方式。
创建线程的几种方式:

  1. 通过继承Thread类
  2. 通过实现Runnable接口
  3. 通过实现Callnable接口然后通过FutureTask调用
  4. 通过线程池的方式

具体的实现方式有很多,但本质都是相同的,本质都是通过Runable的run方法进行了执行代码。感兴趣可以看看后面的代码片段,通过了FutureTask进行解释,重写了Runnable接口的run方法进行实现。

public static void main(String[] args) throws Exception {
    FutureTask<String> stringFutureTask = new FutureTask<>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "book";
        }
    });
    Thread thread = new Thread(stringFutureTask);
    thread.start();
    stringFutureTask.get();
}

前面讲了,都是通过native的start0方法然后进行调用run方法,因此FutureTask重写了这个run方法。

public void run() {
...
// 这个就是传递的Callable接口
    Callable<V> c = callable;
    if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
            //调用call方法获得返回值
            result = c.call();
            ran = true;
        } catch (Throwable ex) {
            result = null;
            ran = false;
            setException(ex);
        }
        if (ran)
            set(result);
    }
...
}

Thread类的相关方法

  • void start() 启动线程,执行run方法
  • run() 线程被调用时候执行的操作
  • String getName() 获取线程名称
  • void setName(String name) 设置线程名称
  • static Thread currentThread() 返回当前线程
  • static void yield() 线程让步,让jvm重新分配
  • join() 调用线程将阻塞,被调用线程被优先执行
    • 低优先级的线程也能够执行
  • static void sleep(long millis):休眠x毫秒,
  • stop() 不推荐使用,强制线程生命结束
  • boolean isAlive() :线程是否存活

线程生命周期

线程调用start后处于的是新建(NEW)状态,等待线程调度后转换为运行(RUNNABLE)状态,进入同步块会进入阻塞(BLOCKED)状态,调用了线程wait方法会进入阻塞(WAITING)状态,调用线程的wait(time会进入超时等待(TIMED_WAITING)状态,线程执行接入进入终止(TERMINATED)状态。详细流转图如下:
图片来源:挑错 |《Java 并发编程的艺术》中关于线程状态的三处错误
640.png

  • NEW:尚未启动的线程状态。
  • RUNNABLE:可运行线程状态。处于可运行状态的线程正在 Java 虚拟机中执行,但它可能正在等待操作系统的其他资源,如处理器。
  • BLOCKED:被阻塞等待监视器锁的线程状态。处于阻塞状态的线程正在等待监视器锁以进入同步块/方法,或者在调用 Object#wait() 方法后重新进入同步块/方法。
  • WAITING:等待状态的线程。线程由于调用了以下方法之一而处于等待状态:例如,调用了某个对象的 Object.wait() 的线程正在等待另一个线程调用该对象的 Object.notify() 或 Object.notifyAll()。调用了 Thread.join() 的线程正在等待指定的线程终止。
  • TIMED_WAITING:有指定等待时间的等待状态的线程。线程由于调用了以下方法之一并指定了正数等待时间而处于超时等待状态:
  • TERMINATED:已终止的线程状态。线程已完成执行。

线程上下文切换:

线程运行再CPU上,但是可能当前线程没有任务执行调用sleep(), wait() 主动让出CPU,因此这时候操作系统就会让其他线程执行,或者是当前线程执行的时间太长,就会进行线程调度。调度的结果就是当前线程会暂停执行,然后其他线程执行。这里就会把之前线程占用的资源切换给另一个线程占有执行,这就是上下文切换。不同于进程上下文切换,线程上下文切换花费时间更少,因此这也是线程的优势之一。

线程池:

https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/
一个功能运行时间太久了,一般通过什么方式进行优化?异步/多线程使用new Thread()方式会有什么问题,1.OOM 2.资源开销和耗时,3.不可管理性。
线程池带来的好处:

  1. 降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗。
  2. 提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
  3. 提高线程的可管理性:线程池可以统一管理、分配、调优和监控

JDK提供的:Executorsj-u-c-t-p-e-logo.png

ForkJoinPool:又是一个

构造函数参数列表:

  • corePoolSize: 核心线程池的大小,核心线程通过懒创建,如果核心线程池有空闲位置,这时新的任务就会被核心线程池新建一个线程执行,执行完毕后不会销毁线程,线程会进入缓存队列等待再次被运行。
  • maximunPoolSize: 线程池能创建最大的线程数量。如果核心线程池和有界缓存队列都已经满了,新的任务进来就会创建新的线程来执行。但是数量不能超过maximunPoolSize,否则会采取拒绝接受任务策略,我们下面会具体分析。
  • keepAliveTime: 非核心线程的回收周期,非核心线程能够空闲的最长时间,超过时间,线程终止。这个参数默认只有在线程数量超过核心线程池大小时才会起作用。只要线程数量不超过核心线程大小,就不会起作用(当然如果设置了allowCoreThreadTimeOut(true)线程池中的核心线程也受该参数的影响)。
  • unit: 时间单位,和keepAliveTime配合使用,可选择各种时间单位天-纳秒。
  • workQueue: 任务队列,用来存放等待被执行的任务,一般为阻塞队列(BlockingQueue)三种常用为:(可自定义阻塞队列)。
    • ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
    • LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE
    • SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。最大线程数是 Integer.MAX_VALUE
    • DelayedWorkQueue:DelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。
  • threadFactory: 线程工厂,用来创建线程(包括核心线程和非核心线程),一般有三种选择策略(可自定义)。
  • handler: 任务拒绝策略,也就是在核心线程满负载、任务队列已满、非核心线程满负载的条件下会触发拒绝策略。ThreadPoolExecutor中为我们提供了四种默认策略可选择(可自定义):
    • ThreadPoolExecutor.AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy(不希望丢弃任务时使用):调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务,一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用

创建线程池的方法:

  1. 通过构造器的方式(推荐)
  2. 通过Executor框架的工具类Executors来创建(阿里禁用,容易OOM)

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

Executor数据结构:

线程池标识:线程池状态控制参数ctl

要了解线程池,我们首先要了解的线程池里面的状态控制的参数 ctl,这个线程池的状态控制参数是一个原子操作的 AtomicInteger,这个ctl包含两个参数 :

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//最大容量
	// jdk 11 是这个参数
    // private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl 下面三个运算获取线程状态。
    private static int runStateOf(int c)     { return c & ~CAPACITY; }//获取rs
    private static int workerCountOf(int c)  { return c & CAPACITY; } // 获取wc
    private static int ctlOf(int rs, int wc) { return rs | wc; }// 获取ctl
  • runState:当前线程池的状态
  • workerCount:激活(工作)的线程数

ctl它的低29位用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是 536870911; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应如下:

  • 111(-1): RUNNING:线程池初始化(创建出来之后)处于此状态,能够接收新任务,以及对已添加的任务进行处理。
  • 000(0): SHUTDOWN:当调用shutdown()方法时改为此状态,在此状态时,不接收新任务,但能处理已添加的任务。
  • 001(1): STOP:调用shutdownNow()方法时处于此状态,在此状态时,不接收新任务,不处理已添加的任务,并且会尝试中断正在处理的任务。
  • 010(2): TIDYING:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。|| 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
  • 011(3): TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。线程池彻底终止,就变成TERMINATED状态。

image.png

线程池状态变化:
  1. 单结果转换
    方法名称 描述 异步版本 异步版本(带Executor)
    thenApply 使用函数处理当前阶段的结果 thenApplyAsync thenApplyAsync(Function, Executor)
    thenAccept 使用动作消费当前阶段的结果 thenAcceptAsync thenAcceptAsync(Consumer, Executor)
  2. 组合结果转换
    方法名称 描述 异步版本 异步版本(带Executor)
    thenCombine 当两个阶段都正常完成时,使用两个结果作为参数执行给定函数 thenCombineAsync thenCombineAsync(CompletionStage, BiFunction, Executor)
    thenAcceptBoth 当两个阶段都正常完成时,使用两个结果作为参数执行给定动作 thenAcceptBothAsync thenAcceptBothAsync(CompletionStage, BiConsumer, Executor)
  3. 任意结果转换
    方法名称 描述 异步版本 异步版本(带Executor)
    applyToEither 当当前阶段或另一个给定阶段正常完成时,使用结果作为参数执行给定函数 applyToEitherAsync applyToEitherAsync(CompletionStage, Function, Executor)
    acceptEither 当当前阶段或另一个给定阶段正常完成时,使用结果作为参数执行给定动作 acceptEitherAsync acceptEitherAsync(CompletionStage, Consumer, Executor)
  4. 两者之后执行
    方法名称 描述 异步版本 异步版本(带Executor)
    runAfterBoth 当当前阶段和另一个给定阶段都正常完成时,执行给定动作 runAfterBothAsync runAfterBothAsync(CompletionStage, Runnable, Executor)
  5. 任意一个完成时执行
    方法名称 描述 异步版本 异步版本(带Executor)
    runAfterEither 当当前阶段或另一个给定阶段正常完成时,执行给定动作 runAfterEitherAsync runAfterEitherAsync(CompletionStage, Runnable, Executor)
  6. 组合结果映射
    方法名称 描述 异步版本 异步版本(带Executor)
    thenCompose 使用给定函数计算另一个CompletionStage的结果,并完成新阶段 thenComposeAsync thenComposeAsync(Function, Executor)
  7. 异常处理
    方法名称 描述 异步版本 异步版本(带Executor)
    handle 当当前阶段正常完成或异常完成时,使用结果和异常作为参数执行给定函数 handleAsync handleAsync(BiFunction, Executor)
    exceptionally 当当前阶段异常完成时,使用异常作为参数执行给定函数 exceptionallyAsync exceptionallyAsync(Function, Executor)
    exceptionallyCompose 当当前阶段异常完成时,使用异常作为参数执行给定函数,并返回一个新的CompletionStage exceptionallyComposeAsync exceptionallyComposeAsync(Function, Executor)
  8. 完成时执行
    方法名称 描述 异步版本 异步版本(带Executor)
    whenComplete 当当前阶段正常完成或异常完成时,执行给定动作 whenCompleteAsync whenCompleteAsync(BiConsumer, Executor)
  9. 转换为CompletableFuture
    方法名称 描述
    toCompletableFuture 将CompletionStage转换为CompletableFuture
    这个表格列出了CompletionStage接口的主要方法,并且区分了同步和异步版本,以及是否使用Executor进行异步执行的版本。

execute方法:

源码:

  • 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
  • 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
  • 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
  • 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,饱和策略会调用RejectedExecutionHandler.rejectedExecution()方法。

为什么需要二次检查线程池的运行状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API注释:
如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结(core线程数为0))或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程(core线程数为0,其他线程被回收了)的前提下新建一个工作线程

public void execute(Runnable command) {
    if (command == null)// 健壮性
        throw new NullPointerException();
    int c = ctl.get(); // 获取ctl
    // 获取工作线程数<核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 创建 核心线程数
        if (addWorker(command, true))
            return;
        // 加入核心线程失败(有并发),重新获取ctl。
        c = ctl.get();
    }
	// 判断线程池状态,加入工作队列,
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次检查状态,
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command); // 拒绝
        else if (workerCountOf(recheck) == 0) // 线程池中没有工作线程
            // 创建线程处理阻塞队列中的线程。
            addWorker(null, false);
    }
        // 创建非核心线程处理任务。
    else if (!addWorker(command, false))
        reject(command);//拒绝
}
addworker流程:

在 execute 方法中,多次调用 addWorker 方法。addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。内层for循环跳到外层for循环,通过break和标记。

// 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:  
    // 注意这是一个死循环 - 最外层循环
    for (int c = ctl.get();;) {
        // 这个是十分复杂的条件,这里先拆分多个与(&&)条件:
        // 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0)
        // 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空
        // 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        // 注意这也是一个死循环 - 二层循环
        for (;;) {
            // 这里每一轮循环都会重新获取工作线程数wc
            // 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败
            // 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // 成功通过CAS更新工作线程数wc,则break到最外层的循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
            c = ctl.get();  // Re-read ctl
            // 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可 
            // else CAS failed due to workerCount change; retry inner loop 
        }
    }
    // 标记工作线程是否启动成功
    boolean workerStarted = false;
    // 标记工作线程是否创建成功
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread
        // 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN
                // 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker
                // 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
                // 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker
                // 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把创建的工作线程实例添加到工作线程集合
                    workers.add(w);
                    int s = workers.size();
                    // 尝试更新历史峰值工作线程数,也就是线程池峰值容量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
            if (workerAdded) {
                t.start();
                // 标记线程启动成功
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,需要从工作线程集合移除对应的Worker
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 添加Worker失败
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 从工作线程集合移除之
        if (w != null)
            workers.remove(w);
        // wc数量减1    
        decrementWorkerCount();
        // 基于状态判断尝试终结线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

上面创建了线程,执行了start方法,线程是执行起来了,那么workQueue中的怎么执行呢?
通过上面我们发现worker类。Worker的构造函数里面的逻辑十分重要,通过ThreadFactory创建的Thread实例同时传入Worker实例,因为Worker本身实现了Runnable,所以可以作为任务提交到线程中执行。只要Worker持有的线程实例w调用Thread#start()方法就能在合适时机执行Worker#run();

worker:
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

  // 
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            // 重写run方法。调用  runWorker。
            runWorker(this);
        }
    }   

这里是thread.start()执行run方法转到了runWorker方法。这里有while()自旋。从队列中取任务出来(getTask())执行.

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 任务不为空,执行任务 为空task获取任务。阻塞着等待。
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || // 这里大于等于stop,悲剧
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行任务前的操作。
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); // 开始执行。
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 执行之后的任务。
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

进入getTask()获取任务:

private Runnable getTask() {
    // 记录上一次从队列中拉取的时候是否超时
    boolean timedOut = false; // Did the last poll() time out?
    // 注意这是死循环
    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        // 第一个if:如果线程池状态至少为SHUTDOWN,也就是rs >= SHUTDOWN(0),则需要判断两种情况(或逻辑):
        // 1. 线程池状态至少为STOP(1),也就是线程池正在停止,一般是调用了shutdownNow()方法
        // 2. 任务队列为空
        // 如果在线程池至少为SHUTDOWN状态并且满足上面两个条件之一,则工作线程数wc减去1,然后直接返回null
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        // 跑到这里说明线程池还处于RUNNING状态,重新获取一次工作线程数
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed临时变量勇于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取
        // 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务
        // 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 第二个if:
        // 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量
        // 或者 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null
        // 并且 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null,
        // CAS把线程数减去1失败会进入下一轮循环做重试
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
            // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 这里很重要,只有非null时候才返回,null的情况下会进入下一轮循环
            if (r != null)
                return r;
            // 跑到这里说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

请理空闲线程(非核心)的方法:
**processWorkerExit()😗*方法是为将要终结的Worker做一次清理和数据记录工作(因为processWorkerExit()方法也包裹在runWorker()方法finally代码块中,其实工作线程在执行完processWorkerExit()方法才算真正的终结)

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 因为抛出用户异常导致线程终结,直接使工作线程数减1即可
    // 如果没有任何异常抛出的情况下是通过getTask()返回null引导线程正常跳出runWorker()方法的while死循环从而正常终结,这种情况下,在getTask()中已经把线程数减1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数
        completedTaskCount += w.completedTasks;
        // 工作线程集合中移除此将要终结的Worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
     
    // 见下一小节分析,用于根据当前线程池的状态判断是否需要进行线程池terminate处理
    tryTerminate();

    int c = ctl.get();
    // 如果线程池的状态小于STOP,也就是处于RUNNING或者SHUTDOWN状态的前提下:
    // 1.如果线程不是由于抛出用户异常终结,如果允许核心线程超时,则保持线程池中至少存在一个工作线程
    // 2.如果线程由于抛出用户异常终结,或者当前工作线程数,那么直接添加一个新的非核心线程
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果允许核心线程超时,最小值为0,否则为corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果最小值为0,同时任务队列不空,则更新最小值为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 工作线程数大于等于最小值,直接返回不新增非核心线程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

future相关:

Future接口用来表示异步计算结果。

FutureTask接口

FutureTask前面做过简单介绍,这里就跳过,关键就是那几个方法,点进去看一下就可以了。

public static void main(String[] args) throws Exception {
    FutureTask<String> stringFutureTask = new FutureTask<>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "book";
        }
    });
    Thread thread = new Thread(stringFutureTask);
    thread.start();
    stringFutureTask.get();
}

FutureTask就是重写了run方法:

    public void run() {
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

ForkJoinPool:

是一个线程池用于执行任务,提供的调度模式与其他线程池不同。
ForkJoinPool之前用于树形的任务执行,采用一种工作窃取(work stealing)机制,与其他ExecutorService不同的它能够查找池子中任务或其他线程创建的任务进行执行。没有任务的时候就会进行阻塞。ForkJoinPool里面的线程池线程数默认与核数相同。
当只有一个大任务的执行非常耗时,这时候适合使用ForkJoinPool进行执行,通过将任务进行拆分(分治思想)然后进行处理,ForkJoinPool线程池为了提高任务的并行度和吞吐量做了非常多而且复杂的设计实现,其中最著名的就是任务窃取机制

组成:

①ForkJoinWorkerThread:任务的执行者,具体的线程实体
②ForkJoinTask:需要执行的任务实体
③ForkJoinPool:管理执行者线程的管理池
ForkJoinPool提交任务的方式也有三种,分别为:

  • execute():可提交Runnbale类型的任务
  • submit():可提交Callable类型的任务
  • invoke():可提交ForkJoinTask类型的任务,但ForkJoinTask存在三个子类:

①RecursiveAction:无返回值型ForkJoinTask任务
②RecursiveTask:有返回值型ForkJoinTask任务
③CountedCompleter:任务执行完成后可以触发钩子回调函数的任务

执行原理

ForkJoinPool能够创建出多个线程执行任务,每个线程都会有一个WorkQueue用于存储任务,线程优先执行WorkQueue中的任务,任务进行fork操作就将新的任务提交给WorkQueue,并尝试创建一个新的线程。在没有操作定义的并行度级别parallelism的情况下能够创建新的线程。新创建的线程也有有一个WorkQueue存储当前线程的任务,线程没有任务执行的时候就会从其他线程中偷取任务执行执行,默认情况都是偷取队列头部的任务进行执行。没有任务执行,线程就会进行阻塞。任务进行join的时候就就会阻塞当前的任务,就会选择指定的任务进行执行。

CompletableFuture接口

作用:

用于实现异步调用和任务编排,支持在异步方法完成时触发的依赖函数和操作。实现了两个接口,Future和CompletionStage接口。
实现原理是通过,线程池进行执行具体的方法,然后CompletableFuture维护了具体方法执行的一些状态信息,然后通过后续接口进行判断是否进行执行后续操作。

public class CompletableFutureTest {
    public static void main(String[] args) {
        CompletableFuture.runAsync(() -> {
            System.out.println("hello");
        });
    }
}

进入runAsync代码后可以看到是ForkJoinPool.commonPool()用于默认的线程池实现,并将方法封装为ForkJoinTask的一个对象,然后线程池进行执行ForkJoinTask方法,最后调用到我们的方法,过程中会记录很多的状态变量,用于后续判断方法是否完成等作用。线程池的实现也可以不通过ForkJoinPool.commonPool()进行实现,可以自定义,线程池主要起到一个执行器的作用。
CompletionStage接口提供了很多API,用于异步任务的编排。

常用方法:

列举了一些API。

1. 单结果转换

方法名称描述异步版本异步版本(带Executor)
thenApply使用函数处理当前阶段的结果thenApplyAsyncthenApplyAsync(Function, Executor)
thenAccept使用动作消费当前阶段的结果thenAcceptAsyncthenAcceptAsync(Consumer, Executor)

2. 组合结果转换

方法名称描述异步版本异步版本(带Executor)
thenCombine当两个阶段都正常完成时,使用两个结果作为参数执行给定函数thenCombineAsyncthenCombineAsync(CompletionStage, BiFunction, Executor)
thenAcceptBoth当两个阶段都正常完成时,使用两个结果作为参数执行给定动作thenAcceptBothAsyncthenAcceptBothAsync(CompletionStage, BiConsumer, Executor)

3. 任意结果转换

方法名称描述异步版本异步版本(带Executor)
applyToEither当当前阶段或另一个给定阶段正常完成时,使用结果作为参数执行给定函数applyToEitherAsyncapplyToEitherAsync(CompletionStage, Function, Executor)
acceptEither当当前阶段或另一个给定阶段正常完成时,使用结果作为参数执行给定动作acceptEitherAsyncacceptEitherAsync(CompletionStage, Consumer, Executor)

4. 两者之后执行

方法名称描述异步版本异步版本(带Executor)
runAfterBoth当当前阶段和另一个给定阶段都正常完成时,执行给定动作runAfterBothAsyncrunAfterBothAsync(CompletionStage, Runnable, Executor)

5. 任意一个完成时执行

方法名称描述异步版本异步版本(带Executor)
runAfterEither当当前阶段或另一个给定阶段正常完成时,执行给定动作runAfterEitherAsyncrunAfterEitherAsync(CompletionStage, Runnable, Executor)

6. 组合结果映射

方法名称描述异步版本异步版本(带Executor)
thenCompose使用给定函数计算另一个CompletionStage的结果,并完成新阶段thenComposeAsyncthenComposeAsync(Function, Executor)

7. 异常处理

方法名称描述异步版本异步版本(带Executor)
handle当当前阶段正常完成或异常完成时,使用结果和异常作为参数执行给定函数handleAsynchandleAsync(BiFunction, Executor)
exceptionally当当前阶段异常完成时,使用异常作为参数执行给定函数exceptionallyAsyncexceptionallyAsync(Function, Executor)
exceptionallyCompose当当前阶段异常完成时,使用异常作为参数执行给定函数,并返回一个新的CompletionStageexceptionallyComposeAsyncexceptionallyComposeAsync(Function, Executor)

8. 完成时执行

方法名称描述异步版本异步版本(带Executor)
whenComplete当当前阶段正常完成或异常完成时,执行给定动作whenCompleteAsyncwhenCompleteAsync(BiConsumer, Executor)

9. 转换为CompletableFuture

方法名称描述
toCompletableFuture将CompletionStage转换为CompletableFuture

这个表格列出了CompletionStage接口的主要方法,并且区分了同步和异步版本,以及是否使用Executor进行异步执行的版本。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/588719.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python基础语法--函数

一、函数概述 函数就是执行特定任务完成特定功能的一段代码。可以在程序中将某一段代码定义成函数&#xff0c;并指定一个函数名和接收的输入&#xff08;参数&#xff09;&#xff0c;这样就可以在程序的其他地方通过函数名多次调用并执行该段代码了。 每次调用执行后&#…

Ubuntu如何安装Calicoctl

在 Ubuntu 上安装 Calico 通常涉及几个步骤。以下是一般的安装过程&#xff1a; 安装 etcd 或使用 Kubernetes 集群的现有 etcd&#xff1a; 如果你使用的是独立的 etcd&#xff0c;请确保 etcd 在可访问的地方运行。如果你使用的是 Kubernetes 集群&#xff0c;通常会有一个 e…

用户中心(终)

文章目录 Ant Design Pro&#xff08;Umi 框架&#xff09;ProComponents 高级表单待优化点 todo注册逻辑增加注册页面页面重定向问题注册页面 **获取用户的登录态****前端用户管理功能** Ant Design Pro&#xff08;Umi 框架&#xff09; app.tsx 项目全局入口文件&#xff0c…

【车载开发系列】MCAL基本概念

【车载开发系列】MCAL基本概念 【车载开发系列】MCAL基本概念 【车载开发系列】MCAL基本概念一. BSW与MCAL1&#xff09;BSW-服务层2&#xff09;BSW-ECU抽象层3&#xff09;MCAL驱动层 二. MCAL基本概念三. MCAL组成1&#xff09;PORT2&#xff09;DIO3&#xff09;ADC4&#…

排序算法——直接插入排序

直接插入排序与希尔排序是插入排序的两个分支&#xff0c;直接插入排序是较为简单的一种排序算法&#xff0c;同时也是众多算法实现或优化的基石。 前提&#xff1a; 插入排序&#xff1a;有一个已经有序的数据序列&#xff0c;要求在这个已经排好的数据序列中插入一个数&…

BigKey的危害

1.2.1、BigKey的危害 网络阻塞 对BigKey执行读请求时&#xff0c;少量的QPS就可能导致带宽使用率被占满&#xff0c;导致Redis实例&#xff0c;乃至所在物理机变慢 数据倾斜 BigKey所在的Redis实例内存使用率远超其他实例&#xff0c;无法使数据分片的内存资源达到均衡 Redis阻…

nginx--自定义日志跳转长连接文件缓存状态页

自定义日志服务 [rootlocalhost ~]# cat /apps/nginx/conf/conf.d/pc.conf server {listen 80;server_name www.fxq.com;error_log /data/nginx/logs/fxq-error.log info;access_log /data/nginx/logs/fxq-access.log main;location / {root /data/nginx/html/pc;index index…

C/C++ BM33 二叉树的镜像

文章目录 前言题目解决方案一1.1 思路阐述1.2 源码 总结 前言 镜像说的好听&#xff0c;无非就是换下节点。 题目 操作给定的二叉树&#xff0c;将其变换为源二叉树的镜像。 数据范围&#xff1a;二叉树的节点数 0 ≤ n ≤ 1000 0≤n≤1000 0≤n≤1000&#xff0c; 二叉树每…

ThreeJS:本地部署官网文档与案例

部署方式 部署之前请确保已经配置好node.js环境。 1. 下载ThreeJS源码 ThreeJS的GitHub地址&#xff1a;GitHub - mrdoob/three.js: JavaScript 3D Library.&#xff0c;可以简单查看ThreeJS当前版本&#xff1a;r164&#xff0c; 我们可以选择对应的版本&#xff08;此处为r1…

打印机-STM32版本 硬件部分

最终PCB EDA工程: 一、确定芯片型号 根据项目需求&#xff0c;梳理需要用到的功能&#xff0c; 电量检测&#xff1a;ADC 按键&#xff1a;IO input外部中断 LED&#xff1a;IO output 温度检测&#xff1a;ADC 电机控制&#xff1a;IO output 打印通讯&#xff1a;SPI …

淘宝/天猫商品评论API接口:用户反馈实时追踪与商家决策优化

一、引言 在电子商务迅猛发展的今天&#xff0c;淘宝/天猫作为中国最大的电子商务平台之一&#xff0c;为众多商家提供了广阔的舞台。然而&#xff0c;面对日益激烈的市场竞争&#xff0c;如何精准把握用户需求、优化产品策略、提升服务质量&#xff0c;成为摆在众多商家面前的…

Linux环境与历史和Xshell安装与使用

各位大佬好 &#xff0c;这里是阿川的博客 &#xff0c; 祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 Linux是一个操作系统.全称叫Linux is not UNIX。由…

【C++】哈希的应用---位图

目录 1、引入 2、位图的概念 3、位图的实现 ①框架的搭建 ②设置存在 ③设置不存在 ④检查存在 ​4、位图计算出现的次数 5、完整代码 1、引入 我们可以看一道面试题 给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&#xff0c;如何快速判断一个数…

通过iMock学习Jvmsandbox

Jvm-sandbox Jvm-sandbox基于Jvm-sandbox的Mock平台iMockiMock的工程学习iMock怎么写的&#xff08;sandbox的module应该怎么写&#xff09; Jvm-sandbox Jvm-sandbox是阿里开源的一款java的沙箱&#xff0c;看网上的介绍在沙箱里你可以做你能想到的奇妙的事情。 基于Jvm-san…

智慧旅游开启智慧生活,科技让旅行更轻松:通过智慧旅游,旅行者可以享受到更加便捷、高效的旅行服务,让旅行成为生活的一部分

一、引言 随着科技的飞速发展&#xff0c;我们生活的方方面面都在经历着前所未有的变革。旅游业作为服务业的重要组成部分&#xff0c;也在这场变革中迎来了前所未有的发展机遇。智慧旅游&#xff0c;作为科技与旅游深度融合的产物&#xff0c;正以其独特的魅力&#xff0c;引…

瑞_23种设计模式_解释器模式

文章目录 1 解释器模式&#xff08;Interpreter Pattern&#xff09;1.1 介绍1.2 概述1.2.1 文法&#xff08;语法&#xff09;规则1.2.2 抽象语法树 1.3 解释器模式的结构1.4 解释器模式的优缺点1.5 解释器模式的使用场景 2 案例一2.1 需求2.2 代码实现 3 案例二3.1 需求3.2 代…

信息时代的智慧导航:高效搜索、信息筛选与信任构建的全面指南!

文章目录 一、高效搜索&#xff1a;快速定位目标信息的秘诀二、信息筛选&#xff1a;去伪存真&#xff0c;找到有价值的信息三、信任构建&#xff1a;深入了解与直接沟通《搜索之道&#xff1a;信息素养与终身学习的新引擎》亮点内容简介目录获取方式 随着科技的飞速发展&#…

小白总结uniapp微信小程序跨域问题的解决(前端)

前言&#xff1a;本人前端小白一枚&#xff0c;在B站听了一个很不错的视频&#xff0c;关于uniapp Vue3超详细教程&#xff0c;有需要的小伙伴可以去听&#xff0c;受益匪浅&#xff0c;下面是该博主的链接&#xff1a; gitee源码地址&#xff1a;https://gitee.com/qingnian8/…

windows 驱动开发-DMA技术(三)

在早期&#xff0c;是按照基于包或者基于流的方式来描述DMA的&#xff0c;不过这个描述可能不准确&#xff0c;故在Vista之后修改为使用数据包/使用公共缓冲区的系统DMA。 简单的解释一下基于包和基于流的说法的原因&#xff0c;数据包是指一个个基于一定大小的数据块&#xf…

Tensorflow2.0笔记 - ResNet实践

本笔记记录使用ResNet18网络结构&#xff0c;进行CIFAR100数据集的训练和验证。由于参数较多&#xff0c;训练时间会比较长&#xff0c;因此只跑了10个epoch&#xff0c;准确率还没有提升上去。 import os import time import tensorflow as tf from tensorflow import keras …