JUC并发编程进阶2:CompletableFuture

1 Future接口理论知识复习

Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消异步任务的执行、判断任务是否被取消、判断任务执行是否完毕等

举例:比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,忙完其他事情或者先执行完,过了一会再才去获取子任务的执行结果或变更的任务状态(老师上课时间想喝水,他继续讲课不结束上课这个主线程,让学生去小卖部帮老师买水完成这个耗时和费力的任务)

一句话:Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂任务
在这里插入图片描述

2 Future接口常用实现类FutureTask异步任务

2.1 Future接口能干什么

  • Future是Java5新加的一个接口,它提供一种异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们会就可以通过Future把这个任务放进异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果

2.2 Future接口相关架构

  • 目的:异步多线程任务执行且返回有结果,三个特点:多线程、有返回、异步任务(班长为老师去买水作为新启动的异步多线程任务且买到水有结果返回)
  • 代码实现:Runnable接口+Callable接口+Future接口和FutureTask实现类。

在这里插入图片描述

/**
 * @package: com.yunyang.bilibili.juc.completablefeture
 * @description: FutureTask开启异步任务
 * @author: Yunyang
 * @date: 2024/10/12  23:15
 * @version:1.0
 **/
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new MyThread());

        Thread t1 = new Thread(futureTask,"t1");

        //开启一个异步线程
        t1.start();

        //有返回hello Callable
        System.out.println(futureTask.get());
    }
}



class MyThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("----come in call() ");
        return "hello Callable";
    }
}

2.3 Future编码实战和优缺点分析

  • 优点:Future+线程池异步多线程任务配合,能显著提高程序的运行效率

  • 缺点

    • get()阻塞:一旦调用get()方法求结果,一旦调用不见不散,非要等到结果才会离开,不管你是否计算完成,如果没有计算完成容易程序堵塞
    • isDone()轮询:轮询的方式会耗费无谓的cpu资源,而且也不见得能及时得到计算结果,如果想要异步获取结果,通常会以轮询的方式去获取结果,尽量不要阻塞
  • 结论
    Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果

/**
 * @package: com.yunyang.bilibili.juc.completablefeture
 * @description:Future获取结果get和轮询
 * @author: Yunyang
 * @date: 2024/10/15  21:28
 * @version:1.0
 **/
public class FutureAPIDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println(Thread.currentThread().getName() + "\t-----come in");

            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return "task over";
        });
        Thread t1 = new Thread(futureTask, "t1");
        t1.start();

        System.out.println(Thread.currentThread().getName() + "\t----忙其他任务了");

        //这样会有阻塞的可能,在程序没有计算完毕的情况下
        // System.out.println(futureTask.get());

        //只愿意等待三秒,计算未完成直接抛出异常
        // System.out.println(futureTask.get(3, TimeUnit.SECONDS));

        // 轮询
        while (true){
            if(futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            } else {
                TimeUnit.MICROSECONDS.sleep(500);
                System.out.println("正在处理中,不要催了,越催越慢");
            }
        }

    }
}

2.4 完成一些复杂的任务

  • 对于简单的业务场景使用Future完全ok

  • 回调通知

    • 应对Future的完成时间,完成了可以告诉我,也就是我们的回调通知
    • 通过轮询的方式去判断任务是否完成这样非常占cpu并且代码也不优雅
  • 创建异步任务:Future+线程池组合

  • 多个任务前后依赖可以组合处理(水煮鱼—>买鱼—>调料—>下锅):

    • 想将多个异步任务的结果组合起来,后一个异步任务的计算结果需要钱一个异步任务的值
    • 想将两个或多个异步计算font color=red>合并成为一个异步计算,这几个异步计算互相独立,同时后面这个又依赖前一个处理的结果
  • 对计算速度选最快的:

    • 当Future集合中某个任务最快结束时,返回结果,返回第一名处理结果
  • 结论:

    • 使用Future之前提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以声明式的方式优雅的处理这些需求
    • 从i到i++
    • Future能干的,CompletableFuture都能干

3 CompletableFutureFuture的改进

3.1 CompletableFuture为什么会出现

  • get()方法在Future计算完成之前会一直处在阻塞状态下,阻塞的方式和异步编程的设计理念相违背
  • isDene()方法容易耗费cpu资源(cpu空转),
  • 对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样,我们就不用等待结果

JDK8设计出CompletableFuture,CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

3.2 CompletableFutureCompletionStage介绍

3.2.1 类架构说明

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

在这里插入图片描述

3.2.2 接口CompletionStage

  • 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
  • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:
stage.thenApply(x -> square(x).thenAccept(x -> System.out.print(x)).thenRun(()->system.out.println())

3.2.3 类CompletableFuture

  • 提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法
  • 它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作
  • 实现了Future和CompletionStage接口

3.3 核心的四个静态方法,来创建一个异步任务

CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,提供了多种方法来创建和管理异步任务。以下是四个核心的静态方法,用于创建不同类型的异步任务:

3.3.1 runAsync - 无返回值

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  • 用途:执行一个没有返回值的异步任务。
  • 参数
    • runnable:要执行的任务。
    • executor:可选的线程池,如果不指定,默认使用 ForkJoinPool.commonPool()

3.3.2 supplyAsync - 有返回值

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • 用途:执行一个有返回值的异步任务。
  • 参数
    • supplier:提供返回值的任务。
    • executor:可选的线程池,如果不指定,默认使用 ForkJoinPool.commonPool()

3.3.3 Executor 参数说明

  • 默认线程池:如果没有指定 Executor,默认使用 ForkJoinPool.commonPool() 作为线程池执行异步代码。
  • 自定义线程池:如果指定了线程池,则使用自定义的线程池执行异步代码。

3.3.4 CompletableFuture 减少阻塞和轮询

  • 回调机制CompletableFuture 支持传入回调对象,当异步任务完成或发生异常时,自动调用回调对象的方法,减少阻塞和轮询。

3.3.5 CompletableFuture 的优点

  • 自动回调:异步任务结束时,自动回调某个对象的方法。
  • 顺序执行:主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。
  • 异常处理:异步任务出错时,自动回调某个对象的方法。

3.3.6 示例代码

  • 四个静态方法的演示
/**
 * @package: com.yunyang.bilibili.juc.completablefeture
 * @description: 四个静态方法的演示
 * @author: Yunyang
 * @date: 2024/10/16  21:41
 * @version:1.0
 **/
public class CompletableFutureBuildDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        },threadPool);

        System.out.println(completableFuture.get()); // 输出:null

        CompletableFuture<String> supplyAsyncCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "hello supplyAsync";
        },threadPool);

        System.out.println(supplyAsyncCompletableFuture.get()); // 输出:hello supplyAsync

        threadPool.shutdown();
    }
}

分析:

  1. 创建线程池:使用 Executors.newFixedThreadPool(3) 创建一个固定大小的线程池。
  2. 无返回值的异步任务:使用 runAsync 方法执行一个无返回值的异步任务。
  3. 有返回值的异步任务:使用 supplyAsync 方法执行一个有返回值的异步任务。
  4. 处理异步任务的结果:使用 get() 方法处理异步任务的返回值。
  5. 关闭线程池:在任务完成后关闭线程池。
  • CompletableFuture减少阻塞和轮询
/**
 * @package: com.yunyang.bilibili.juc.completablefeture
 * @description: CompletableFuture减少阻塞和轮询
 * @author: Yunyang
 * @date: 2024/10/16  21:53
 * @version:1.0
 **/
public class CompletableFutureUserDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService threadPool = Executors.newFixedThreadPool(3);


        try {
            CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread().getName() + "----come in");
                final int result =ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("-----1秒钟后出结果:" + result);
                if(result > 5){
                    int i = 1/0; //模拟产生异常情况
                }
                return result;
            },threadPool).whenComplete((v,e)->{
                if(e == null){
                    System.out.println("-------计算完成,更新系统UpdateValue:" + v);
                }
            }).exceptionally(e->{
                e.printStackTrace();
                System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
                return null;
            });

            System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            threadPool.shutdown();
        }

        // 主线程不要立刻结束,否则completableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
/*        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }*/
    }

    private static void future1() throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "----come in");
            int result = ThreadLocalRandom.current().nextInt(10);

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println("----1秒钟后出结果: " + result);

            return result;
        });

        System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");

        System.out.println(completableFuture.get());
    }
}

/**
 * 有异常情况:
 * pool-1-thread-1----come in
 * main线程先去忙其他任务
 * -----1秒钟后出结果:7
 * java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
 * 异常情况:java.lang.ArithmeticException: / by zero	java.lang.ArithmeticException: / by zero
 */

/**
 * 无异常情况:
 * pool-1-thread-1----come in
 * main线程先去忙其他任务
 * -----1秒钟后出结果:0
 * -------计算完成,更新系统UpdateValue:0
 *
 * */

3.4 案例精讲-从电商网站的比价需求展开

3.4.1 函数式编程已成为主流

  • Lambda表达式+Stream流式调用+Chain链式调用+Java8函数式编程

  • 函数式接口

    • Runnable:无参数、无返回值
    • Function:接受一个参数,并且有返回值
    • Consumer:接受一个参数,没有返回值
    • BiConsumer:接受两个参数,没有返回值
    • Supplier:没有参数,有返回值
  • 小结
    在这里插入图片描述

  • chain链式调用

/**
 * @package: com.yunyang.bilibili.juc.completablefeture
 * @description: Chain链式调用演示
 * @author: Yunyang
 * @date: 2024/10/17  22:06
 * @version:1.0
 **/
public class CompletableFutureMallDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        final Student student = new Student();
        student.setId(1).setStudentName("zhangsan").setMajor("english"); //链式调用
    }
}

@AllArgsConstructor
@NoArgsConstructor
@Data
@Accessors(chain = true) //开启链式调用
class Student{
    private Integer id;
    private String studentName;
    private String major;
}

3.4.2 join和get对比

  • join和get对比
/**
 * @package: com.yunyang.bilibili.juc.completablefeture
 * @description:join和get对比
 * @author: Yunyang
 * @date: 2024/10/17  22:06
 * @version:1.0
 **/
public class CompletableFutureMallDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "hello 1234";
        });

        System.out.println(completableFuture.get());
        System.out.println(completableFuture.join());
        // join和get的作用大致相同,其区别是在编译时是否报出检查时异常
    }
}
  • join和get的作用大致相同,其区别是在编译时是否报出检查时异常:
    • get在编译时会报出异常;
    • join则不会报出异常。

3.4.3 大厂业务需求说明

切记:功能—>性能(完成—>完美)
电商网站比价需求分析:

  1. 需求说明:
    a. 同一款产品,同时搜索出同款产品在各大电商平台的售价
    b. 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
  2. 输出返回:
    a. 出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List
    例如:《Mysql》 in jd price is 88.05 《Mysql》 in taobao price is 90.43
  3. 解决方案,对比同一个产品在各个平台上的价格,要求获得一个清单列表
    a. step by step,按部就班,查完淘宝查京东,查完京东查天猫…
    b. all in,万箭齐发,一口气多线程异步任务同时查询

3.4.4 一波流Java8函数式编程带走-比价案例实战Case


/**
 * 这里面需要注意一下Stream流方法的使用
 * 这种异步查询的方法大大节省了时间消耗,可以融入简历项目中,和面试官有所探讨
 */
public class CompletableFutureMallDemo2 {

    static List<NetMall> list = Arrays.asList(
            new NetMall("jd"),
            new  NetMall("dangdang"),
            new NetMall("taobao")
    );

    /**
     * step by step
     * @param list
     * @param produceName
     * @return
     */
    public static List<String> getPrice(List<NetMall> list, String produceName){
        //《Mysql》 in jd price is 88.05
        return list.stream()
            .map(netMall ->
                    String.format("《" +produceName + "》 in %s price is %.2f",
                            netMall.getNewMallName(),
                            netMall.calcPrice(produceName)))
            .collect(Collectors.toList());
    }

    /**
     *all in
     * 把list里面的内容映射给CompletableFuture()
     * @param list
     * @param productName
     * @return
     */
    public static List<String> getPriceByCompletableFuture(List<NetMall> list,String productName){
        return list.stream()
            .map(netMall -> CompletableFuture.supplyAsync(()->
                    String.format("《" + productName + "》 in %s price is %.2f",
                            netMall.getNewMallName(),
                            netMall.calcPrice(productName)))) //Stream<CompletableFuture<String>>
                .collect(Collectors.toList()) //List<CompletableFuture<String>>
                .stream() //Stream<String>
                .map(stringCompletableFuture -> stringCompletableFuture.join()) //List<String>
                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        /**
         * 采用step by setp方式查询
         * 《mysql》 in jd price is 110.72
         * 《mysql》 in dangdang price is 109.34
         * 《mysql》 in taobao price is 109.66
         * ---costTime: 3079毫秒
         */
        long startTime = System.currentTimeMillis();
        final List<String> list1 = getPrice(list, "mysql");
        for (String element : list1) {
            System.out.println(element);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("---costTime: " + (endTime - startTime) + "毫秒");

        System.out.println("**************************************");

        /**
         * 采用 all in 三个异步线程方式查询
         * 《mysql》 in jd price is 110.79
         * 《mysql》 in dangdang price is 110.61
         * 《mysql》 in taobao price is 110.30
         * ---costTime: 1029毫秒
         */
        long startTime2 = System.currentTimeMillis();
        final List<String> list2 = getPriceByCompletableFuture(list, "mysql");
        for (String element : list2) {
            System.out.println(element);
        }
        long endTime2 = System.currentTimeMillis();
        System.out.println("---costTime: " + (endTime2 - startTime2) + "毫秒");
    }
}

class NetMall{
    @Getter
    private String newMallName;

    public NetMall(String newMallName) {
        this.newMallName = newMallName;
    }

    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
    }
}

4 CompletableFuture常用方法

4.1 获得结果和触发计算

4.1.1 获取结果

  1. public T get()

    • 描述: 这是一个阻塞方法,它会一直等待直到计算完成并返回结果。如果计算过程中抛出异常,get() 方法会将异常重新抛出。
    • 特点: 不见不散,即必须等到计算完成才能获取结果。
  2. public T get(long timeout, TimeUnit unit)

    • 描述: 这也是一个阻塞方法,但它会在指定的时间内等待计算完成。如果在指定时间内计算未完成,它会抛出 TimeoutException
    • 特点: 过时不候,即如果在指定时间内未完成,则抛出异常。
  3. public T join()

    • 描述: 类似于 get() 方法,但它不会抛出检查型异常(如 InterruptedExceptionExecutionException),而是直接抛出非检查型异常(如 RuntimeException)。
    • 特点: 不抛出异常,适合在不需要处理检查型异常的场景中使用。
  4. public T getNow(T valueIfAbsent)

    • 描述: 这是一个非阻塞方法,它会立即返回结果。如果计算已经完成,则返回计算结果;如果计算尚未完成,则返回传入的 valueIfAbsent 值。
    • 特点: 立即获取结果,不阻塞。如果计算未完成,返回设定的默认值。

4.1.2 主动触发计算

  1. public boolean complete(T value)
    • 描述: 这个方法用于主动完成计算,并设置计算结果为传入的 value。如果计算尚未完成,调用此方法后,所有等待结果的线程将立即获得这个 value。如果计算已经完成,则此方法不会产生任何影响,并返回 false
    • 特点: 是否打断 get() 方法立即返回括号值。如果计算未完成,调用 complete(value) 后,所有 get() 方法调用将立即返回 value

4.1.3 示例代码

/**
 * @package: yunyang.bilibili.juc.completablefeture
 * @description: 获得结果和触发计算
 * @author: Yunyang
 * @date: 2024/10/18  20:59
 * @version:1.0
 **/
public class CompletableFutureAPIDemo {
    public static void main(String[] args) //throws ExecutionException, InterruptedException, TimeoutException
    {
        final CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "abc";
        });

        // System.out.println(completableFuture.get());  // 不见不散
        // System.out.println(completableFuture.get(2L, TimeUnit.SECONDS)); // 过时不候
        // System.out.println(completableFuture.join()); // 不抛出异常
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        // System.out.println(completableFuture.getNow("xxxx")); // 立即获取结果
        System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.join()); // 主动触发计算
    }
}

4.1.4 小结

  • 获取结果: 你可以使用 get(), get(timeout), join(), 和 getNow() 方法来获取异步计算的结果。

  • 主动触发计算: 使用 complete(value) 方法可以主动完成计算并设置结果,打断所有等待结果的线程。

4.2 对计算结果进行处理

4.2.1 thenApply

  • 描述: thenApply 方法用于在 CompletableFuture 完成后对结果进行转换或处理。它接收一个 Function 作为参数,该函数将前一个 CompletableFuture 的结果作为输入,并返回一个新的结果。
  • 特点:
    • 串行化: thenApply 方法会等待前一个 CompletableFuture 完成后再执行。
    • 异常处理: 如果前一个 CompletableFuture 抛出异常,thenApply 不会执行,整个链路会被中断。

4.2.2 handle

  • 描述: handle 方法也用于在 CompletableFuture 完成后对结果进行处理,但它可以处理异常。它接收一个 BiFunction 作为参数,该函数接收两个参数:前一个 CompletableFuture 的结果(或异常)和可能的异常。
  • 特点:
    • 串行化: handle 方法会等待前一个 CompletableFuture 完成后再执行。
    • 异常处理: 即使前一个 CompletableFuture 抛出异常,handle 方法仍然会执行,并且可以对异常进行处理。

4.2.3 示例代码

/**
 * @package: yunyang.bilibili.juc.completablefeture
 * @description:对计算结果进行处理
 * @author: Yunyang
 * @date: 2024/10/18  21:16
 * @version:1.0
 **/
public class CompletableFutureAPI2Demo {
    public static void main(String[] args) {

        // testThenApply();

        testHandle();
    }

    /**
     * 使用 handle
     */
    private static void testHandle() {
        final ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println("1111");
            return 1;
        },threadPool).handle((f,e)->{
            int i = 1/0;
            System.out.println("2222"); // 有异常也可以往下一步走,根据带的异常参数可以进一步处理
            return f + 2;
        }).handle((f,e)->{
            System.out.println("3333");
            return f + 3;
        }).whenComplete((v,e)->{
            if(e==null){
                System.out.println("----计算结果:"+v);
            }
        }).exceptionally(e->{
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });

        System.out.println(Thread.currentThread().getName() + "----主线程去忙其他任务了");

        threadPool.shutdown();
    }

    /**
     * 使用 thenApply
     */
    private static void testThenApply() {
        final ExecutorService threadPool = Executors.newFixedThreadPool(3);

        CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("111");
            return 1;
        },threadPool).thenApply(f->{
            int i = 1/0;
            System.out.println("222"); // 由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
            return f + 2;
        }).thenApply(f->{
            System.out.println("333");
            return f + 3;
        }).whenComplete((v,e)->{
            if(e == null){
                System.out.println("----计算结果:" +v);
            }
        }).exceptionally(e->{
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });

        System.out.println(Thread.currentThread().getName() + "-----主线程去忙其他任务了");

        threadPool.shutdown();
    }
}

4.2.4 总结对比

特性thenApplyhandle
串行化
异常处理中断链路,不执行后续步骤继续执行,可以处理异常
适用场景结果依赖,且不需要处理异常结果依赖,且需要处理异常
函数签名Function<T, U>BiFunction<T, Throwable, U>
异常传递不传递异常,中断链路传递异常,可以进一步处理

4.2.5 小结

  • thenApply: 适用于结果依赖且不需要处理异常的场景。如果前一个步骤抛出异常,整个链路会被中断。
  • handle: 适用于结果依赖且需要处理异常的场景。即使前一个步骤抛出异常,链路仍然可以继续执行,并且可以对异常进行处理。

4.3 对计算结果进行消费

4.3.1 thenAccept

  • 描述: thenAccept 方法用于在 CompletableFuture 完成后对结果进行消费,它接收一个 Consumer 作为参数,该函数将前一个 CompletableFuture 的结果作为输入,并执行一些操作,但不会返回任何结果。
  • 特点:
    • 串行化: thenAccept 方法会等待前一个 CompletableFuture 完成后再执行。
    • 无返回值: thenAccept 方法不会返回任何结果,它只是消费前一个任务的结果。

4.3.2 thenRun

  • 描述: thenRun 方法用于在 CompletableFuture 完成后执行一个 Runnable 任务,它不接收前一个任务的结果,也不返回任何结果。
  • 特点:
    • 串行化: thenRun 方法会等待前一个 CompletableFuture 完成后再执行。
    • 无参数,无返回值: thenRun 方法不接收前一个任务的结果,也不返回任何结果。

4.3.3 示例代码

/**
 * @package: yunyang.bilibili.juc.completablefeture
 * @description:对计算结果进行消费
 * @author: Yunyang
 * @date: 2024/10/18  21:35
 * @version:1.0
 **/
public class CompletableFutureAPI3Demo {
    public static void main(String[] args) {
        // testThenAccept();

        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> { }).join()); // 任务A执行完执行B,B不需要A的结果
        System.out.println(CompletableFuture.supplyAsync(() -> "acceptA").thenAccept(r -> System.out.println(r)).join()); // 任务A执行完执行B,B需要A的结果,但是任务B无返回值
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "reslutB").join()); // 任务A执行完执行B,B需要A的结果,同时任务B有返回值
    }

    private static void testThenAccept() {
        CompletableFuture.supplyAsync(()->{
            return 1;
        }).thenApply(f->{
            return f + 2;
        }).thenApply(f->{
            return f + 3;
        }).thenAccept(System.out::println);
    }
}

4.3.4 对比补充

方法参数类型返回值类型描述
thenRunRunnablevoid任务A执行完执行B,B不需要A的结果
thenAcceptConsumervoid任务A执行完执行B,B需要A的结果,但是任务B无返回值
thenApplyFunctionU任务A执行完执行B,B需要A的结果,同时任务B有返回值

4.3.5 CompletableFuture 和线程池说明

CompletableFuture 提供了同步和异步版本的这些方法,它们在使用线程池的方式上有所不同。

4.3.5.1 thenRunthenRunAsync 的区别
  • thenRun(Runnable runnable): 同步执行,使用与前一个任务相同的线程池。
  • thenRunAsync(Runnable runnable): 异步执行,使用默认的 ForkJoinPool 线程池,除非你传入了自定义线程池。
4.3.5.2 示例代码
/**
 * @package: yunyang.bilibili.juc.completablefeture
 * @description:CompletableFuture和线程池说明
 * @author: Yunyang
 * @date: 2024/10/18  21:51
 * @version:1.0
 **/
public class CompletableFututeThreadPoolDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);

        try {
            CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.MICROSECONDS.sleep(20);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("1号任务:" + "\t" + Thread.currentThread().getName());
                return "abcd";
            },threadPool).thenRunAsync(() -> {
                try {
                    TimeUnit.MICROSECONDS.sleep(20);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("2号任务:" + "\t" + Thread.currentThread().getName());
            }).thenRun(() -> {
                try {
                    TimeUnit.MICROSECONDS.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("3号任务:" + "\t" + Thread.currentThread().getName());
            }).thenRun(() -> {
                try {
                    TimeUnit.MICROSECONDS.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("4号任务:" + "\t" + Thread.currentThread().getName());
            });
            completableFuture.get(2L,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        } finally {
            threadPool.shutdown();
        }
    }
}

4.3.6 小结

  • 没有传入自定义线程池: 都使用默认的 ForkJoinPool 线程池。
  • 传入自定义线程池:
    • 如果你执行第一个任务时传入了自定义线程池:
      • thenRun 方法执行第二个任务时,第二个任务与第一个任务共用同一个线程池。
      • thenRunAsync 方法执行第二个任务时,第一个任务使用自定义线程池,第二个任务使用 ForkJoinPool 线程池。

4.3.7 备注

  • 处理速度过快时,系统可能会优化并直接使用 main 线程处理任务。
  • 其他方法如 thenAcceptthenAcceptAsyncthenApplythenApplyAsync 等,它们之间的区别也是类似的。

4.3.8 源码分析

CompletableFuture 的源码中,thenRunthenRunAsync 方法的实现如下:

public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) {
    return uniRunStage(screenExecutor(executor), action);
}
  • thenRun 方法使用 null 作为线程池参数,表示使用前一个任务的线程池。
  • thenRunAsync 方法使用 asyncPool,即默认的 ForkJoinPool 线程池,除非你传入了自定义线程池。

4.4 对计算速度进行选用

CompletableFuture 中,applyToEither 方法用于处理两个 CompletableFuture 中的任意一个先完成的任务。这个方法非常适合在需要快速响应的场景中使用,即“谁快用谁”的策略。

4.4.1 applyToEither

  • 描述: applyToEither 方法接收两个 CompletableFuture 和一个 Function,当其中一个 CompletableFuture 完成时,applyToEither 会立即执行 Function,并将完成的结果作为输入,返回一个新的 CompletableFuture
  • 特点:
    • 非阻塞: applyToEither 是非阻塞的,它会立即返回一个新的 CompletableFuture
    • 快速响应: 它会使用先完成的 CompletableFuture 的结果。

4.4.2 示例代码

/**
 * @package: yunyang.bilibili.juc.completablefeture
 * @description:对计算速度进行选用
 * @author: Yunyang
 * @date: 2024/10/18  22:17
 * @version:1.0
 **/
public class CompletableFutureFastDemo {
    public static void main(String[] args) {
        final CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("A is come in");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "playA";
        });

        final CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("B is come in");

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return "playB";
        });

        final CompletableFuture<String> result = playA.applyToEither(playB, f -> {
            return f + " is winner";
        });

        System.out.println(Thread.currentThread().getName() + " : " + result.join());
    }
}

4.4.3 解释

  1. 创建两个 CompletableFuture:

    • playA 模拟一个耗时 2 秒的任务。
    • playB 模拟一个耗时 1 秒的任务。
  2. 使用 applyToEither:

    • applyToEither 方法会等待 playAplayB 中的任意一个先完成。
    • 一旦其中一个 CompletableFuture 完成,applyToEither 会立即执行传入的 Function,并将完成的结果作为输入。
    • 在这个例子中, playB 会先完成,因此 applyToEither 会使用 playB 的结果。
  3. 获取最终结果:

    • result.join() 会返回处理后的结果。

4.4.4 小结

  • applyToEither: 用于处理两个 CompletableFuture 中的任意一个先完成的任务,非常适合需要快速响应的场景。
  • 非阻塞: applyToEither 是非阻塞的,它会立即返回一个新的 CompletableFuture
  • 快速响应: 它会使用先完成的 CompletableFuture 的结果。

4.5 对计算结果进行合并

CompletableFuture 中,thenCombine 方法用于在两个 CompletableFuture 都完成后,将它们的结果合并在一起进行处理。这个方法非常适合在需要等待多个异步任务都完成后,再进行进一步处理的场景。

4.5.1 thenCombine

  • 描述: thenCombine 方法接收两个 CompletableFuture 和一个 BiFunction,当这两个 CompletableFuture 都完成后,thenCombine 会立即执行 BiFunction,并将两个 CompletableFuture 的结果作为输入,返回一个新的 CompletableFuture
  • 特点:
    • 非阻塞: thenCombine 是非阻塞的,它会立即返回一个新的 CompletableFuture
    • 等待所有任务完成: 它会等待两个 CompletableFuture 都完成后,再进行合并处理。

4.5.2 示例代码

/**
 * @package: yunyang.bilibili.juc.completablefeture
 * @description:对计算结果进行合并
 * @author: Yunyang
 * @date: 2024/10/18  22:25
 * @version:1.0
 **/
public class CompletableFutureCombineDemo {
    public static void main(String[] args) {
        // testCombine1();

        testCombine2();
    }

    private static void testCombine2() {
        final CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ":" + " come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ":" + "come in 2");
            return 20;
        }), (x, y) -> {
            System.out.println(Thread.currentThread().getName() + ":" + "come in 3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + ":" + "come in 4");
            return 30;
        }), (a, b) -> {
            System.out.println(Thread.currentThread().getName() + ":" + "come in 5");
            return a + b;
        });

        System.out.println("----主线程结束,end");
        System.out.println(completableFuture.join());
    }

    private static void testCombine1() {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t---启动");

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return 10;
        });
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t---启动");

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return 20;
        });

        CompletableFuture<Integer> result = completableFuture1.thenCombine(completableFuture2, (x, y) -> {
            System.out.println("----开始两个结果合并");
            return x + y;
        });

        System.out.println(result.join());
    }
}

4.5.3 小结

  • thenCombine: 用于在两个 CompletableFuture 都完成后,将它们的结果合并在一起进行处理。
  • 非阻塞: thenCombine 是非阻塞的,它会立即返回一个新的 CompletableFuture
  • 等待所有任务完成: 它会等待两个 CompletableFuture 都完成后,再进行合并处理。

5 思维导图

在这里插入图片描述

6 参考链接

  1. 【尚硅谷JUC并发编程(对标阿里P6-P7)】
  2. CompletableFuture

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/894855.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot+Vue+uniapp微信小程序的澡堂预订的微信小程序的详细设计和实现

项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念&#xff0c;提供了一套默认的配置&#xff0c;让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…

Java项目实战II基于Spring Boot的毕业就业信息管理系统设计与实现(源码+数据库+文档)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 随着高校扩…

Python酷玩之旅_数据分析入门(matplotlib)

导览 前言matplotlib入门1. 简介1.1 Pairwise data1.2 Statistical distributions1.3 Gridded data1.4 Irregularly gridded data1.5 3D and volumetric data 2. 实践2.1 安装2.2 示例 结语系列回顾 前言 翻看日历&#xff0c;今年的日子已划到了2024年10月19日&#xff0c;今天…

【重学 MySQL】七十二、轻松掌握视图的创建与高效查看技巧

【重学 MySQL】七十二、轻松掌握视图的创建与高效查看技巧 创建视图查看视图注意事项 在MySQL数据库中&#xff0c;视图&#xff08;View&#xff09;是一种虚拟表&#xff0c;它基于一个或多个表的数据计算生成结果集&#xff0c;但不存储实际的数据。视图可以简化复杂的查询、…

【OD】【E卷】【真题】【100分】光伏场地建设规划(PythonJavajavaScriptC++C)

题目描述 祖国西北部有一片大片荒地&#xff0c;其中零星的分布着一些湖泊&#xff0c;保护区&#xff0c;矿区; 整体上常年光照良好&#xff0c;但是也有一些地区光照不太好。 某电力公司希望在这里建设多个光伏电站&#xff0c;生产清洁能源对每平方公里的土地进行了发电评…

打印机出现线条和残影情况的主要原因和解决办法

本篇文章主要讲解&#xff0c;打印机出现打印文本&#xff0c;出现线条和残影情况时的解决办法和主要原因的详细解答和处理方法。 作者&#xff1a;任聪聪 日期&#xff1a;2024年10月19日 博客地址&#xff1a;https://rccblogs.com/604.html 打印现象&#xff1a; 说明&…

【virtuoso】sp测电阻

电路测量原理&#xff1a; 1. 电路原理图 2. 仿真设置 点击select&#xff0c;在原理图选择port设置sp扫频范围 3. plot图像 3.1 plot电阻图像 由公式可得&#xff0c;电阻值为阻抗的实部&#xff0c;所以 1. 选择 ZP 2. 绘制real 3. 点击Z11 4. 即可看到电阻值 3.2 plot电容图…

MySQL数据的导出

【图书推荐】《MySQL 9从入门到性能优化&#xff08;视频教学版&#xff09;》-CSDN博客 《MySQL 9从入门到性能优化&#xff08;视频教学版&#xff09;&#xff08;数据库技术丛书&#xff09;》(王英英)【摘要 书评 试读】- 京东图书 (jd.com) MySQL9数据库技术_夏天又到了…

AI写作助手系统盈利模式分析:打造盈利的AI网站

引言 有数据显示&#xff0c;截至2024年初&#xff0c;全球自媒体从业人员数量已超过1.5亿人&#xff0c;其中中国自媒体从业人员数量超过1亿人。这一数字表明&#xff0c;中国自媒体行业拥有庞大的从业者群体。 另一方面&#xff0c;从自媒体行业的发展趋势来看&#xff0c;…

Axure重要元件三——中继器时间排序

亲爱的小伙伴&#xff0c;在您浏览之前&#xff0c;烦请关注一下&#xff0c;在此深表感谢&#xff01; 本节课&#xff1a;中继器数据时间排序 课程内容&#xff1a;数据的升序、降序、重置排序 应用场景&#xff1a;表单数据的排序 案例展示&#xff1a; 步骤一&#xff…

JVM(HotSpot):GC之垃圾回收阶段

文章目录 前言一、标记清除算法(Mark Sweep)二、标记整理算法(Mark Compact)三、复制算法(Copy) 前言 标记出垃圾对象之后&#xff0c;就要进行清理。 那么&#xff0c;如何清理&#xff1f; 这里也有相应的算法。 主要有三种。 一、标记清除算法(Mark Sweep) 原理说明&…

网络学习笔记

一、网络的结构与功能 网络的鲁棒性与抗毁性 如果在移走少量节点后网络中的绝大部分节点仍然是连通的&#xff0c;那么就该网络的连通性对节点故障具有鲁棒性 网络上的动力学 动力系统&#xff1a;自旋、振子或混沌的同步、可激发系统 传播过程&#xff1a;信息传播与拥堵…

【MySQL】mysql导出数据WPS科学计数法解决方法

导出的长串数字 id 会导致科学计数法&#xff0c;修改 WPS 单元格格式可以解决 数字太长还是有问题&#xff0c;最后有个数字会变成 0 可以 直接用 python脚本转换一下 vim convert_txt_xlsx.py #!/usr/bin/env python3# 使用方法# 安装库 # pip3 install pandas openpyxl…

YOLO11改进|注意力机制篇|引入SEAM注意力机制

目录 一、【SEAM】注意力机制1.1【SEAM】注意力介绍1.2【SEAM】核心代码二、添加【SEAM】注意力机制2.1STEP12.2STEP22.3STEP32.4STEP4三、yaml文件与运行3.1yaml文件3.2运行成功截图一、【SEAM】注意力机制 1.1【SEAM】注意力介绍 下图是【SEAM】的结构图,让我们简单分析一下…

2-127基于matlab的非圆齿轮啮合动画设计

基于matlab的非圆齿轮啮合动画设计&#xff0c;可根据需求设置齿数&#xff0c;齿高、平滑系数等&#xff0c;最后输出啮合动画。程序已调通&#xff0c;可直接运行。 下载源程序请点链接&#xff1a;2-127基于matlab的非圆齿轮啮合动画设计

从Naive RAG到Agentic RAG:基于Milvus构建Agentic RAG

检索增强生成&#xff08;Retrieval-Augmented Generation, RAG&#xff09;作为应用大模型落地的方案之一&#xff0c;通过让 LLM 获取上下文最新数据来解决 LLM 的局限性。典型的应用案例是基于公司特定的文档和知识库开发的聊天机器人&#xff0c;为公司内部人员快速检索内部…

萤石云服务支持云端视频AI自动剪辑生成

萤石视频云存储及媒体处理服务是围绕IoT设备云端存储场景下的音视频采集、媒体管理、视频剪辑和分发能力的一站式、专业云服务&#xff0c;并可面向广大开发者提供复杂设备存储场景下的完整技术方案。目前该服务新增了视频剪辑功能&#xff0c;支持将视频片段在云端进行裁剪并拼…

nacos的使用

nacos的使用 本专栏的上一篇文章已经部署好了nacos&#xff0c;我们就可以使用nacos做配置中心和注册中心了。 一、配置中心 有了nacos&#xff0c;我们在微服务项目的配置文件里只需要做一些简单的配置就行了&#xff1a;服务名、服务端口、nacos的地址。其余的配置都可以用…

python 作业1

任务1: python为主的工作是很少的 学习的python的优势在于制作工具&#xff0c;制作合适的工具可以提高我们在工作中的工作效率的工具 提高我们的竞争优势。 任务2: 不换行 换行 任务3: 安装pycharm 进入相应网站Download PyCharm: The Python IDE for data science and we…

基于springboot的4S店车辆管理系统

作者&#xff1a;计算机学长阿伟 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、ElementUI等&#xff0c;“文末源码”。 系统展示 【2024最新】基于JavaSpringBootVueMySQL的&#xff0c;前后端分离。 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;…