深度解析CompletableFuture:Java 异步世界的奇迹

目录

概述

介绍

上文我们可知:CompletableFuture 是 Java 8 引入用于支持异步编程和非阻塞操作的类。对于没有使用过CompletableFuture通过它这么长的名字就感觉到一头雾水,那么现在我们来一起解读一下它的名字。

  • Completable:可完成
  • Future:未来/将来

这两个单词体现了它设计的目的:提供一种可完成的异步计算。

身世

接下来我将详细介绍CompletableFuture的实现。

Future接口

CompletableFuture实现自JDK 5出现的Future接口,该接口属于java.util.concurrent包,这个包提供了用于并发编程的一些基础设施,其中就包括 Future 接口。Future接口的目的是表示异步计算的结果,它允许你提交一个任务给一个 Executor(执行器),并在稍后获取任务的结果。尽管 Future 提供了一种机制来检查任务是否完成、等待任务完成,并获取其结果,但它的设计也有一些局限性,比如无法取消任务、无法组合多个任务的结果等。

Future接口为CompletableFuture提供了以下功能:

  1. 异步任务的提交:通过Future的接口,可以提交异步任务,并在稍后获取任务的结果,这是 Future 接口最基本的功能之一。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
  1. 检查任务完成状态: 使用 isDone 方法可以检查任务是否已经完成。
boolean isDone = future.isDone();
  1. 等待任 务完成: 通过get方法,阻塞当前线程,直到异步任务完成并获取其结果。
System.out.println("main Thread");
//开启异步线程
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
//阻塞异步线程执行完成
String result = future.get();
  1. 取消任务: 通过 cancel 方法,你可以尝试取消异步任务的执行。这是 Future 接口的一项功能,但在实际使用中,由于限制和不确定性,这个方法并不总是能够成功取消任务。
boolean canceled = future.cancel(true);

CompletionStage接口

CompletableFuture同时也实现自CompletionStage接口,CompletionStage 接口是 Java 8 中引入的,在CompletableFuture中用于表示一个步骤,这个步骤可能是由另外一个CompletionStage触发的,随当前步骤的完成,可以触发其他CompletionStage的执行。CompletableFuture 类实现了 CompletionStage 接口,因此继承了这些功能。以下是 CompletionStageCompletableFuture 提供的一些关键功能:

  1. 链式操作:CompletionStage 定义了一系列方法,如 thenApply, thenAccept, thenRun,允许你在一个异步操作完成后,基于其结果进行进一步的操作。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");

CompletableFuture<Integer> lengthFuture = future.thenApply(String::length);
  1. 组合多个阶段CompletionStage 提供了 thenCombine, thenCompose, thenAcceptBoth 等方法,用于组合多个阶段的结果,形成新的 CompletionStage
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
  1. 异常处理CompletionStage 提供了一系列处理异常的方法,如 exceptionally, handle,用于在异步计算过程中处理异常情况。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 抛出异常
    throw new RuntimeException("Some error");
});

CompletableFuture<String> resultFuture = future.exceptionally(ex -> "Handled Exception: " + ex.getMessage());
  1. 顺序执行thenApply, thenAccept, thenRun 等方法可以用于在上一个阶段完成后执行下一个阶段,形成顺序执行的链式操作。

图片来源于美团技术

CompletableFuture原理与实践-外卖商家端API的异步化


CompletableFuture-tryFire

tryFire 方法是 CompletableFuture 内部的一个关键方法,用于尝试触发异步操作链中的下一个阶段。这个方法的主要作用是在合适的时机执行异步操作链中的后续阶段,将计算结果传递给下一个阶段。

为什么先介绍这个方法呢?因为这个方法的大部分API都是基于该方法的基础上实现的。

abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
    volatile Completion next;

    final void tryFire(int mode) {
        // ... (其他逻辑)

        // 触发下一个阶段
        Completion n;
        if ((n = next) != null)
            n.tryFire(SYNC);
    }

    // ... (其他方法)
}
  1. 触发方式( mode ):
    • tryFire 方法接收一个 mode 参数,表示触发的方式。常见的触发方式包括同步触发(SYNC)、异步触发(ASYNC)以及嵌套触发(NESTED)。
  1. 触发下一个阶段:
    • tryFire 方法中,通过 next 字段获取下一个阶段的引用,然后调用下一个阶段的 tryFire 方法,将当前阶段的计算结果传递给下一个阶段。
  1. 递归触发:
    • tryFire 方法可能会递归调用下一个阶段的 tryFire 方法,以确保整个异步操作链中的阶段能够依次触发。这个递归调用保证了异步操作链的串联执行。
  1. 触发逻辑的条件判断:
    • tryFire 方法中通常还包含一些条件判断,用于确定是否应该触发后续的操作。例如,可能会检查当前阶段的状态,如果满足触发条件,则继续触发。

总体而言,tryFire 方法是 CompletableFuture 异步操作链中触发后续阶段的核心方法。通过递归调用,它实现了异步操作链的顺序执行,确保了各个阶段按照期望的顺序执行,并将计算结果传递给下一个阶段。

CompletableFuture结构

字段和常量定义

字段定义
  • result:存储异步计算的结果
  • stack:存储观察者链
  • NEXT:异步调用链中观察者链的管理
常量定义
// Modes for Completion.tryFire. Signedness matters.
static final int SYNC   =  0;
static final int ASYNC  =  1;
static final int NESTED = -1;

这三个变量用于Completion类中tryFire方法的标志,表示不同的触发模式。

  • SYNC:表示同步触发(默认触发方式),即当前计算完成后直接执行后续的操作。适用于当前计算的结果已经准备好并且可以直接进行下一步操作的情况。
  • AYSNC:表示异步触发,当前计算完成后将后续的操作提交到异步线程池中执行。即当前计算完成后将后续的操作提交到异步线程池中执行。适用于需要在不同线程上执行后续操作的情况。
  • NESTED:嵌套触发,通常表示当前阶段的触发是由另一个阶段触发的,因此无需再次触发后续操作。在某些情况下,可能会避免重复触发。

内部类定义

CompletableFuture 类包含多个内部类,这些内部类用于为CompletableFuture提供不同的API而设计的,用于异步编程中的不同阶段和操作。

常用内部类列举:

  1. UniCompletionBiCompletion
    • UniCompletionBiCompletion 是用于表示异步操作链中的单一阶段和二元阶段的基础抽象类。它们提供了一些通用的方法和字段,用于处理阶段之间的关系,尤其是观察者链的构建和触发。
  1. UniApplyUniAcceptUniRun
    • UniApplyUniAcceptUniRunUniCompletion 的具体子类,分别用于表示异步操作链中的 thenApplythenAcceptthenRun 阶段。它们实现了具体的 tryFire 方法,用于触发阶段的执行。
  1. BiApplyBiAcceptBiRun
    • BiApplyBiAcceptBiRunBiCompletion 的具体子类,分别用于表示异步操作链中的 thenCombinethenAcceptBothrunAfterBoth 阶段。它们同样实现了具体的 tryFire 方法。
  1. OrApplyOrAcceptOrRun
    • OrApplyOrAcceptOrRunBiCompletion 的另一组具体子类,用于表示异步操作链中的 applyToEitheracceptEitherrunAfterEither 阶段。同样,它们实现了具体的 tryFire 方法。
  1. Async
    • AsyncCompletableFuture 内部用于表示异步操作的标志类,用于表示某个阶段需要异步执行。例如,在调用 supplyAsyncrunAsync 等方法时,会生成一个带有 Async 标志的阶段。

异步编程模型

状态转换

volatile Object result;       // Either the result or boxed AltResult
volatile Completion stack;    // Top of Treiber stack of dependent actions

CompletableFuture中定义了两个属性:result、stack,result用于表示执行的结果或异常,stack用于表示执行完当前任务后触发的其他步骤。

CompletableFuture中包含两个字段:resultstack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。

图10 CF基本结构

这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。

  • UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
  • BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。

引用自美团技术。

CompletableFuture 中,Completion 对象表示当前的异步操作,它是被观察者。stack 中存储的是后续的步骤对象,这些对象充当观察者的角色。当当前的异步操作执行完成后,会通知 stack 中的观察者获取执行结果。

这种设计允许异步操作的串联,每个步骤都对应一个 Completion 对象,形成了观察者链。当一个异步操作完成时,它会逐一触发 stack 中的观察者对象执行相应的回调函数,实现了链式的异步操作。这个机制是 CompletableFuture 强大异步编程模型的核心之一。

为印证以上结论,我们来看个例子,追踪下源码:

例子:

CompletableFuture<String> originalFuture = CompletableFuture.supplyAsync(() -> "Hello");

//thenAccept方法构造Completable
CompletableFuture<Void> thenAcceptFuture = originalFuture.thenAccept(result -> {
    System.out.println("Result: " + result);
});

以JDK 11为例

源码:

CompletableFuturethenAccept方法中直接调用了uniAcceptStage方法,该方法入参是线程池对象和JDK 8出现的函数式接口Consumer,即上文中的result -> {System.out.println("Result: " + result);}),这段代码的作用是获取到上一阶段的计算结果后,将计算结果传递给消费者操作f,在thenAccept方法中将f转换成一个新的CompletableFuture,将uniAccept推入观察者链中,来表示一个新的thenAccept阶段。

private CompletableFuture<Void> uniAcceptStage(Executor e,
                                               Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    Object r;
    if ((r = result) != null)
        return uniAcceptNow(r, e, f);
    CompletableFuture<Void> d = newIncompleteFuture();
    unipush(new UniAccept<T>(e, d, this, f));
    return d;
}

以下代码是将给定的Completion对象推入观察者链:

/**
 * Pushes the given completion unless it completes while trying.
 * Caller should first check that result is null.
 */
final void unipush(Completion c) {
    if (c != null) {
        //尝试将Completion对象c推入观察者链,如果返回false,
        //说明推入的过程中观察者链发生了变化,可能有其他线程正在修改观察者链,
        //这种情况下,通过循环尝试
        while (!tryPushStack(c)) {
            //result对象不为空,表示当前CompletableFuture对象已完成,计算结果已存在
            if (result != null) {
                NEXT.set(c, null);
                break;
            }
        }
        if (result != null)
            c.tryFire(SYNC);
    }
}


/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
    Completion h = stack;
    NEXT.set(c, h);         // CAS piggyback
    return STACK.compareAndSet(this, h, c);
}

前提:判断观察者链是否被其他线程修改是通过被保持线程可见性的类、关键字修饰的。JDK 8使用的是volatile关键字实现简单的变量的原子性和线程可见性。在JDK 11中的CompletableFuture使用的是VarHandle类型定义。

// VarHandle mechanics
private static final VarHandle RESULT;
private static final VarHandle STACK;
private static final VarHandle NEXT;

CompletableFuture线程池

CompletableFuture 类在执行异步操作时,默认使用 ForkJoinPool.commonPool() 作为线程池。这是一个共享的线程池,通常是一个守护线程池,适用于执行异步任务。该线程池的特性包括自动管理线程数量、支持工作窃取(work-stealing)等。

如果你想要使用自定义的线程池,可以通过传递 Executor 对象作为参数来创建 CompletableFuture 实例。

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(defaultExecutor(), action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,
                                            Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}

默认线程池

Executor executor = Executors.newFixedThreadPool(10); // 创建一个固定大小为10的线程池
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // 异步任务的执行逻辑
}, executor);

以上是使用默认线程池的相关代码逻辑,我们来看一下源码:

public Executor defaultExecutor() {
    return ASYNC_POOL;
}
    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

ASYNC_POOL中使用了默认的ForkJoinPool去开启一个线程池。

自定义线程池

CompletableFuture中提供了使用自定义线程池的方法,方法中需要传入一个线程池的接口对象,那么我们就可以传入任何一个实现自Executor接口的线程池。

以下是基于Spring Framework中的线程池实现的异步操作:

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(30);
        executor.setThreadNamePrefix("Async-");
        executor.initialize();
        return executor;
    }
}
@Service
public class MyAsyncService {

    @Autowired
    private TaskExecutor taskExecutor;

    @Async
    public CompletableFuture<String> performAsyncTask() {
        // 异步任务的执行逻辑
        return CompletableFuture.supplyAsync(() -> {
            // 模拟耗时操作
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Async Task Completed";
        }, taskExecutor);
    }
}

并发控制

CompletableFuture 默认使用共享线程池: ForkJoinPool.commonPool() 作为线程池,通过工作窃取算法提高了任务的并行度,同时使用VarHandlevolatile来保证线程间的可见性和原子操作,以上保证了线程安全和高可用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/137753.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

工厂设计模式

1、简单工厂模式 package com.jmj.pattern.factory.simple_factory;public class SimpleCoffeeFactory {public Coffee createCoffee(String type) throws Exception {//声明Coffee类型的变量&#xff0c;根据不同类型创建不同的coffee子类对象Coffee coffee null;if ("a…

Netty入门指南之NIO Selector写操作

作者简介&#xff1a;☕️大家好&#xff0c;我是Aomsir&#xff0c;一个爱折腾的开发者&#xff01; 个人主页&#xff1a;Aomsir_Spring5应用专栏,Netty应用专栏,RPC应用专栏-CSDN博客 当前专栏&#xff1a;Netty应用专栏_Aomsir的博客-CSDN博客 文章目录 参考文献前言操作演…

06-解决Spirng中的循环依赖问题

Bean的循环依赖问题 循环依赖: A对象中有B属性 , B对象中有A属性(丈夫类Husband中有Wife的引用, 妻子类Wife中有Husband的引用) toString()方法重写时直接输出wife/husband会出现递归导致的栈内存溢出错误 直接输出wife/husband会调用它们的toString()方法, 在toString()方法…

小黑子—springMVC:第二章

springMVC入门2.0 4、小黑子的springMVC拦截器4.1 Interceptor简介4.2 拦截器快速入门4.3 拦截器执行顺序4.4 拦截器执行原理 5、小黑子的springMVC全注解开发5.1 spring-mvc.xml中组件转化为注解形式5.1.1 消除spring-mvc.xml一二三 5.1.2 消除web.xml 6、小黑子的springMVC组…

新的开始吧

项目答辩终于结束了&#xff1a; 学习规划 下面先对自己的目前的情况来说&#xff1a; 学长学姐让我先把vue和boot学完&#xff0c;所以我打算先把vue3和boot学一下&#xff0c;但是每天还要花一点时间在六级的听力和阅读上面&#xff0c;还有就是算法&#xff1b; 下面进行…

数据结构----链式栈的操作

链式栈的定义其实和链表的定义是一样的&#xff0c;只不过在进行链式栈的操作时要遵循栈的规则----即“先进后出”。 1.链式栈的定义 typedef struct StackNode {SElemType data;struct StackNode *next; }StackNode,*LinkStack; 2.链式栈的初始化 Status InitStack(LinkSta…

Python---字典的增、删、改、查操作

字典的增操作 基本语法&#xff1a; 字典名称[key] value 注&#xff1a;如果key存在则修改这个key对应的值&#xff1b;如果key不存在则新增此键值对。 案例&#xff1a;定义一个空字典&#xff0c;然后添加name、age以及address这样的3个key # 1、定义一个空字典 person {…

RT-DETR算法改进:更换损失函数DIoU损失函数,提升RT-DETR检测精度

💡本篇内容:RT-DETR算法改进:更换损失函数DIoU损失函数 💡本博客 改进源代码改进 适用于 RT-DETR目标检测算法(ultralytics项目版本) 按步骤操作运行改进后的代码即可🚀🚀🚀 💡改进 RT-DETR 目标检测算法专属 文章目录 一、DIoU理论部分 + 最新 RT-DETR算法…

实验一 Anaconda安装和使用(上机Python程序设计实验指导书)

实验一 Anaconda安装和使用 一、实验目的和要求 &#xff08;一&#xff09;掌握Windows下Anaconda的安装和配置。 &#xff08;二&#xff09;掌握Windows下Anaconda的简单使用&#xff0c;包括IDLE、Jupyter Notebook、Spyder工具的使用。 &#xff08;三&#xff09;掌…

基于蚁狮算法优化概率神经网络PNN的分类预测 - 附代码

基于蚁狮算法优化概率神经网络PNN的分类预测 - 附代码 文章目录 基于蚁狮算法优化概率神经网络PNN的分类预测 - 附代码1.PNN网络概述2.变压器故障诊街系统相关背景2.1 模型建立 3.基于蚁狮优化的PNN网络5.测试结果6.参考文献7.Matlab代码 摘要&#xff1a;针对PNN神经网络的光滑…

数据结构 栈(C语言实现)

目录 1.栈的概念及结构2.栈的代码实现 1.栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端 称为栈顶&#xff0c;另一端称为栈底。栈中的数据元素遵守后进先出LIFO&#xff08;Last In F…

【Qt】撤销/恢复的快捷键

使用Qt的时候&#xff0c;有时需要撤销修改的代码&#xff0c;但可能回撤过头了。 下面提供2个快捷键&#xff0c;当撤销过头时&#xff0c;可恢复撤销内容。 撤销的快捷键是 CtrlZ 恢复/向前的快捷键是 CtrlShiftZ 我们可以自定义快捷键。 点击【工具】->【选项】 点击…

并发安全问题之--事物失效问题

并发安全问题之–事物失效问题 事物失效常见的6种原因&#xff1a; 1、事物方法非public修饰 2、非事物方法调用事物方法 3、事物方法抛出的异常被捕获了 4、事物方法抛出的异常类型不对 5、事物传播行为不对&#xff08;事物发生嵌套时有事物传播&#xff09; 6、事物锁属类没…

【Java】定时任务 - Timer/TimerTask 源码原理解析

一、背景及使用 日常实现各种服务端系统时&#xff0c;我们一定会有一些定时任务的需求。比如会议提前半小时自动提醒&#xff0c;异步任务定时/周期执行等。那么如何去实现这样的一个定时任务系统呢&#xff1f; Java JDK提供的Timer类就是一个很好的工具&#xff0c;通过简单…

C++二分查找算法:132 模式

说明 本篇是视频课程的讲义&#xff0c;可以看直接查看视频。也可以下载源码&#xff0c;包括空源码。 题目 给你一个整数数组 nums &#xff0c;数组中共有 n 个整数。132 模式的子序列 由三个整数 nums[i]、nums[j] 和 nums[k] 组成&#xff0c;并同时满足&#xff1a;i &l…

基于springboot实现沁园健身房预约管理系统【项目源码】

基于springboot实现沁园健身房预约管理系统演示 B/S架构 B/S结构是目前使用最多的结构模式&#xff0c;它可以使得系统的开发更加的简单&#xff0c;好操作&#xff0c;而且还可以对其进行维护。使用该结构时只需要在计算机中安装数据库&#xff0c;和一些很常用的浏览器就可以…

【算法】繁忙的都市(Kruskal算法)

题目 城市C是一个非常繁忙的大都市&#xff0c;城市中的道路十分的拥挤&#xff0c;于是市长决定对其中的道路进行改造。 城市C的道路是这样分布的&#xff1a; 城市中有 n 个交叉路口&#xff0c;编号是 1∼n &#xff0c;有些交叉路口之间有道路相连&#xff0c;两个交叉…

配置开启Docker2375远程连接与解决Docker未授权访问漏洞

一、配置开启Docker远程连接 首先需要安装docker,参考我这篇文章&#xff1a;基于CentOS7安装配置docker与docker-compose 配置开启Docker远程连接的步骤&#xff1a; //1-编辑/usr/lib/systemd/system/docker.service 文件 vim /usr/lib/systemd/system/docker.service //2…

合并集合(并查集)

一共有 n个数&#xff0c;编号是 1∼n&#xff0c;最开始每个数各自在一个集合中。 现在要进行 m 个操作&#xff0c;操作共有两种&#xff1a; M a b&#xff0c;将编号为 a 和 b 的两个数所在的集合合并&#xff0c;如果两个数已经在同一个集合中&#xff0c;则忽略这个操作…

解析JSON字符串:属性值为null的时候不被序列化

如果希望属性值为null及不序列化&#xff0c;只序列化不为null的值。 1、测试代码 配置代码&#xff1a; mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); 或者通过注解JsonInclude(JsonInclude.Include.NON_NULL) //常见问题2&#xff1a;属性为null&a…