CompletableFuture使用小结

为什么需要CompletableFuture

CompletableFuture继承了CompletionStage接口和Future接口,在原有Future的基础上增加了异步回调、流式处理以及任务组合,成为JDK8多任务协同场景下一个有效利器。

CompletableFuture使用示例

提交有返回值的异步任务

通过supplyAsync提交我们的异步任务,然后通过get方法等待异步任务完成并获取返回结果。

public static void main(String[] args) throws Exception {
        //提交一个CompletableFuture任务
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            long start = System.currentTimeMillis();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("work complete! cost:" +(System.currentTimeMillis() - start)  + " ms");
            return 1;
        });


        System.out.println("main thread working");

        //通过get方法阻塞获取任务执行结果
        System.out.println("supplyAsync result: " + task.get());

        System.out.println("main thread finish");
    }

输出结果如下,可以看出CompletableFuture的get方法会阻塞主线程工作,直到得到返回值为止。

main thread working
work complete! cost:1001 ms
supplyAsync result: 1
main thread finish

对此我们不妨来看看get方法是如何做到阻塞主线程并等待异步线程任务执行完成的。从下面这段源码我们可以看到get方法的执行步骤:

  1. 调用reportGet查看异步任务是否将结果赋值给result。
  2. 如果不为null直接返回。
  3. 若为null则调用waitingGet等待任务返回。
public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }

查看reportGet方法可以看到逻辑也很简单,如果r为空则直接抛中断异常,如果r存在异常则直接将异常抛出,如果有结果则将结果返回。

  private static <T> T reportGet(Object r)
        throws InterruptedException, ExecutionException {
        //如果结果为null直接抛出终端异常
        if (r == null) // by convention below, null means interrupted
            throw new InterruptedException();
         //如果结果有异常则将异常抛出
        if (r instanceof AltResult) {
            Throwable x, cause;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if ((x instanceof CompletionException) &&
                (cause = x.getCause()) != null)
                x = cause;
            throw new ExecutionException(x);
        }
        //如果r正常则直接将结果返回出去
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }

waitingGet源码相对复杂一些,整体步骤我们可以拆解为while循环内部和while循环外部,我们先来看看while循环内部的执行流程:

  1. while循环从任务中获取result,如果result为空,则进入循环。
  2. 如果spins小于0,说明刚刚进入循环内部,可以自旋等待一下任务的获取,设置好spins(spins的值从SPINS来,如果多核的情况下值为256),进入下一次循环。
  3. 进入循环发现spins大于0,则随机生成一个数,如果这个数大于等于0则–spins,进入下次循环。
  4. 不断执行步骤3的操作,知道spins等于0。
  5. 此时判断来到q==null,说明任务自旋等待了一段时间还是没有结果,我们需要将其挂起,首先将线程封装成一个Signaller,进入下一次循环。
  6. 循环会判断if (!queued),将要阻塞的任务放到栈中,进入下一次循环。
  7. 循环下一次会来到if (q.thread != null && result == null),说明q线程不为空且没有结果,我们需要将其打断,调用ForkJoinPool.managedBlock(q)将其打断,直至有结果后才结束循环。

while循环外操作就简单了,来到循环尾部时,result已经有值了,代码执行postComplete完成任务,并将结果返回。

private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        //如果result为空则进入循环
        while ((r = result) == null) {
        //如果spins小于0,说明刚刚进入循环内部,可以自旋等待一下任务的获取,设置好spins(spins的值从SPINS来,如果多核的情况下值为256),自此,第一次循环步骤结束
            if (spins < 0)
                spins = SPINS;

			//这一步的操作是自旋等待任务结果,所以代码进入循环发现spins大于0,则随机生成一个数,如果这个数大于等于0则--spins,进入下次循环,直到循环spins变为0
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            //此时判断来到q==null,说明任务自旋等待了一段时间还是没有结果,我们需要将其挂起,首先将线程封装成一个Signaller,结束本次循环
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
			//上一步我们将任务封装成Signaller,这里就将其存入栈中,然后结束循环
            else if (!queued)
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            //循环来到这说明q线程不为空且没有结果,我们需要将其打断,调用`ForkJoinPool.managedBlock(q)`将其打断,直至有结果后才结束循环
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        //结束循环,调用postComplete结束任务并返回结果r
        postComplete();
        return r;
    }

提交无返回值的异步任务

通过runAsync提交一个无返回值的异步任务,这里我们为了实现任务执行完成再关闭主线程用了个get阻塞等待任务完成。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> supplyAsync = CompletableFuture.runAsync(() -> {
            long start = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + "开始工作了,执行时间:" + start);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "结束工作了,总执行时间:" + (System.currentTimeMillis() - start));
        });

        System.out.println("主线程开始运行");
        //get阻塞主线程等待任务结束
        supplyAsync.get();
        System.out.println("主线程运行结束");
    }

输出结果

主线程开始运行
ForkJoinPool.commonPool-worker-1开始工作了,执行时间:1651251489755
ForkJoinPool.commonPool-worker-1结束工作了,总执行时间:1010
主线程运行结束

将异步任务提交给自己的线程池处理

查看supplyAsync方法的源码我们发现,我们提交的任务默认情况下会交给asyncPool这个线程池处理。

 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

查看asyncPool 我们可以看到如果服务器是多核的情况下返回的是一个commonPool,commonPool默认线程池数为CPU核心数。

 private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

所以如果某些情况下我们希望将任务提交到我们自己的线程池中,就建议通过supplyAsync的第二个参数告知CompletableFuture自己要用自定义线程池。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        //使用第二个参数告知CompletableFuture使用的线程池
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
            long start = System.currentTimeMillis();
            System.out.println(Thread.currentThread() + "开始工作了,执行时间:" + start);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //打印当前执行任务的线程
            System.out.println(Thread.currentThread() + "结束工作了,总执行时间:" + (System.currentTimeMillis() - start));
            return 1;
        }, executorService);

        System.out.println("主线程开始运行");
        System.out.println("输出结果 " + supplyAsync.get());
        System.out.println("主线程运行结束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

从输出结果也可以看出这里使用的线程池是我们自定义的线程池

主线程开始运行
Thread[pool-1-thread-1,5,main]开始工作了,执行时间:1651251851358
Thread[pool-1-thread-1,5,main]结束工作了,总执行时间:2005
输出结果 1
主线程运行结束

thenApply和thenApplyAsync

thenApply 适用那些需要顺序执行的异步任务,例如我们希望将第一个任务的返回值交给第二个异步任务,就可以使用thenApply将两个任务组合起来。

在这里插入图片描述

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + "开始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + "结束工作了");
            return 100;
        }, executorService);

        //将两个任务组合起来
        CompletableFuture<String> task2 = task1.thenApply((data) -> {
            System.out.println("第二个线程:" + Thread.currentThread() + "开始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "第一个线程的结果为 " + data;
        });



        System.out.println("获取组合任务结果");
        System.out.println("组合任务处理结果为: " + task2.get());
        System.out.println("获取组合任务结果结束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

输出结果可以看到,任务1执行完成后任务2接着执行了。

Thread[pool-1-thread-1,5,main]开始工作了
获取组合任务结果
Thread[pool-1-thread-1,5,main]结束工作了
第二个线程:Thread[pool-1-thread-1,5,main]开始工作了
组合任务处理结果为: 第一个线程的结果为 100
获取组合任务结果结束

thenApplyAsync与thenApply不同的是,在第一个异步任务有指定线程池的情况下,第二个异步任务会被提交到其他线程池中,所以这里我们可以说明一个规律,带有Async关键字的方法支持组合任务时,将任务提交到不同的线程池中。

在这里插入图片描述

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread()+"开始工作了");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread()+"结束工作了");
            return 100;
        },executorService);

        CompletableFuture<String> task2 = task1.thenApplyAsync((data) -> {
            System.out.println("第二个线程:" + Thread.currentThread() + "开始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "第一个线程的结果为 " + data;
        });



        System.out.println("获取任务结果开始");
        System.out.println("任务的结果 "+task2.get());
        System.out.println("获取任务结果结束");

        executorService.shutdown();
        while (executorService.isTerminated()){

        }
    }

输出结果

Thread[pool-1-thread-1,5,main]开始工作了
获取任务结果开始
Thread[pool-1-thread-1,5,main]结束工作了
第二个线程:Thread[ForkJoinPool.commonPool-worker-9,5,main]开始工作了
任务的结果 第一个线程的结果为 100
获取任务结果结束

thenAccept和thenRun

thenAccept和thenRun都会在上一个任务执行结束后才会继续执行。两者唯一区别时:

  1. thenAccept在上一个任务执行结束后,将上一个任务返回结果作为入参,但无返回值。

在这里插入图片描述

  1. thenRun会在上一个任务执行结束后才开始处理,既没有入参也没有返回值。

在这里插入图片描述

以下便是笔者的使用示例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);


        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("task线程:" + Thread.currentThread().getName() + "开始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task线程:" + Thread.currentThread().getName() + "结束工作了");
            return 200;
        }, executorService);

        CompletableFuture<Integer> task2 = task.thenApply((data) -> {
            System.out.println("task2线程:" + Thread.currentThread().getName() + "开始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2线程:" + Thread.currentThread().getName() + "执行结束");
            return data;
        });

        //thenAccept 收上一个任务的入参,但无返回值
        CompletableFuture<Void> task3 = task2.thenAccept((data) -> {
            System.out.println("task3线程:" + Thread.currentThread().getName() + ",该任务接收上一个任务的结果,但无返回值,收到上一个任务的结果值为 " + data);
        });

        //thenRun在上一个任务结束后执行,既无入参也无出参
        CompletableFuture<Void> task4 = task3.thenRun(() -> {
            System.out.println("task4在上一个任务结束后继续执行,无入参,也无返回值");
        });


        System.out.println("尝试获取最终执行结果");
        task4.get();
        System.out.println("执行任务直至task4 ");
        System.out.println("任务全部执行结束");

        executorService.shutdown();
        while (executorService.isTerminated()) {

        }
    }

输出结果

task线程:pool-1-thread-1开始工作了
尝试获取最终执行结果
task线程:pool-1-thread-1结束工作了
task2线程:pool-1-thread-1开始工作了
task2线程:pool-1-thread-1执行结束
task3线程:pool-1-thread-1,该任务接收上一个任务的结果,但无返回值,收到上一个任务的结果值为 200
task4在上一个任务结束后继续执行,无入参,也无返回值
执行任务直至task4 
任务全部执行结束

exceptionally

假如我们的任务1执行过程中可能报错,我们希望能够从逻辑的角度处理掉,那么我们就可以在任务1后面接一个exceptionally方法,然后再接上任务2。这样一来,任务1执行报错就会走到exceptionally,反之就会走到任务2的代码段。

在这里插入图片描述

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task1 开始工作了");
            //随机生成被除数,为0会抛出算术异常
            int num = RandomUtil.randomInt(0, 2);
            int result = 10 / num;
            System.out.println("task1 结束工作");
            return 200;
        });

        //假如task1报错,任务会走到这个任务上
        CompletableFuture<Integer> exceptionally = task1.exceptionally((e) -> {
            System.out.println("上一个任务报错了,错误信息" + e.getMessage());
            return -1;
        });

        CompletableFuture task2 = task1.thenAccept((param) -> {
            System.out.println("走到正常的结束分支了,task1执行结果:" + param);
        });

        System.out.println("主线程开始运行");
//        调用错误捕获的任务执行结束也会自动走到正常结束的分支
        System.out.println("输出结果 " + exceptionally.get());
        System.out.println("主线程运行结束");
    }

执行正常的输出结果:

task1 开始工作了
主线程开始运行
task1 结束工作
走到正常的结束分支了:200
输出结果 200
主线程运行结束

执行异常的输出结果:

task1 开始工作了
主线程开始运行
上一个任务报错了,错误信息java.lang.ArithmeticException: / by zero
输出结果 -1
主线程运行结束

whenComplete

对于上面的例子,我们完全可以用whenComplete来简化,whenComplete会接收两个入参:

  1. 入参1为上一个任务的返回值。
  2. 入参2比较特殊,如果上一个任务抛出异常,则第2个入参不为空。

在这里插入图片描述

所以上一个例子的代码我们可以简化成这样,需要注意的是whenComplete返回结果是上一个任务的执行结果,我们无法返回任务2的执行结果。

 public static void main(String[] args) {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("任务1开始工作");
            int num = RandomUtil.randomInt(0, 2);
            int result = 10 / num;
            System.out.println("任务1执行结束,执行结果:" + result);
            return result;
        });

        CompletableFuture<Integer> task2 = task.whenComplete((result, err) -> {
            System.out.println("任务2开始工作");

            if (err != null) {
                System.out.println("任务1执行报错,报错原因:" + err.getMessage());
                return;
            }

            System.out.println("任务1正常结束,执行结果:" + result);

        });


        try {
            System.out.println("task2拿到最终执行结果 " + task2.get());
        } catch (Exception e) {

        }
        System.out.println("全流程结束");


    }

错误的输出结果

任务1开始工作
任务2开始工作
任务1执行报错,报错原因:java.lang.ArithmeticException: / by zero
全流程结束

正确执行的输出结果:

任务1开始工作
任务1执行结束,执行结果:10
任务2开始工作
任务1正常结束,执行结果:10
task2拿到最终执行结果 10
全流程结束

handle

handle使用和whenComplete差不多,唯一的区别就是whenComplete返回的是上一个任务的结果,而handle可以返回自己的结果。

在这里插入图片描述

代码如下所示

public static void execute1() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + "开始工作了");
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Random random = new java.util.Random();
            int num = random.nextInt(10);
            if (num < 5) {
                throw new RuntimeException("报错了 num:" + num);
            }
            System.out.println(Thread.currentThread() + "结束工作了");
            return num;
        });

        CompletableFuture<String> future2 = future.handle((result, err) -> {
            System.out.println("第二个线程:" + Thread.currentThread() + "开始工作了");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (err != null) {
                System.out.println(err.getMessage());
                ;return "fail";
            }
            return "sucdess";
        });


        System.out.println("拿第1个任务的结果");
        System.out.println("第1个任务的结果 " + future2.get());
        System.out.println("第1个任务结果结束");



        /**
         * 输出结果
         * Thread[pool-1-thread-1,5,main]开始工作了
         * 拿第一个任务的结果
         * Thread[pool-1-thread-1,5,main]结束工作了
         * 第二个线程:Thread[pool-1-thread-1,5,main]开始工作了
         * 100
         * 第一个任务结果结束
         * 拿第2个任务的结果
         * 第二个任务的结果 第一个线程的结果为 100
         * 第2个任务结果结束
         */

    }

thenCombine / thenAcceptBoth / runAfterBoth

这几个方法都是将两个任务组合起来执行的,只有两个任务都顺利完成了,才会执行之后的方法,唯一的区别是:

  1. thenCombine 接收两个任务的返回值,并返回自己的返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task开始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task结束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2开始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2结束工作");
            return num;
        });

        //通过thenCombine将两个任务组合起来
        CompletableFuture<Integer> completableFuture = task1.thenCombine(task2, (result1, result2) -> {
            System.out.println("task1返回结果:" + result1 + "  task2返回结果:" + result2);
            return result1 + result2;
        });


        System.out.println(completableFuture.get());


    }

输出结果如下:

task开始工作
task2开始工作
task结束工作
task2结束工作
task1返回结果:30  task2返回结果:1
31
  1. thenAcceptBoth 接收两个参数返回值,但没有返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task开始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task结束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2开始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2结束工作");
            return num;
        });

        //通过 thenAcceptBoth 将两个任务组合起来,获取前两个任务处理结果,但自己不返回结果
        CompletableFuture<Void> completableFuture = task1.thenAcceptBoth(task2, (result1, result2) -> {
            System.out.println("task1返回结果:" + result1 + "  task2返回结果:" + result2);

        });


        completableFuture.get();


    }

输出结果:

task开始工作
task2开始工作
task结束工作
task2结束工作
task1返回结果:66  task2返回结果:10
  1. runAfterBoth 既不能接收入参,也无返回值,待前两个任务执行完成后才能执行。
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task开始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task结束工作");
            return num;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("task2开始工作");
            int num = RandomUtil.randomInt(0, 100);
            System.out.println("task2结束工作");
            return num;
        });

        //通过 runAfterBoth 将两个任务组合起来,待前两个组合任务完成后执行,无入参、无出参
        CompletableFuture<Void> completableFuture = task1.runAfterBoth(task2,()-> {
            System.out.println("task1、task2处理完成" );

        });


        completableFuture.get();


    }

输出结果:

task开始工作
task2开始工作
task结束工作
task2结束工作
task1、task2处理完成

applyToEither / acceptEither / runAfterEither

这种组合模式只要有一个异步任务成功,就会触发后续的方法,比如我们组合任务1和任务2,如果任务1执行完成就直接执行任务3,无视任务2。反之任务2先完成直接执行任务3,无视任务1。

在这里插入图片描述

和上一个组合模式一样,依次规律也是:

  1. 接收入参,含返回值。
 public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);

        CompletableFuture<String> completableFuture = task.applyToEither(task2, (result) -> {
            if (result == 1) {
                System.out.println("task1先完成任务");
                return "task1";
            }
            System.out.println("task2先完成任务");
            return "task2";
        });


        System.out.println("最先完成任务的是:" + completableFuture.get());


    }
  1. 接收入参,无返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> 1);


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 2);

        CompletableFuture<Void> completableFuture = task.acceptEither(task2, (result) -> {
            System.out.println("result:" + result);
            if (result == 1) {
                System.out.println("task1先完成任务");
                return;
            }
            System.out.println("task2先完成任务");
        });


        completableFuture.get();


    }
  1. 无入参,无返回值。
public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("task1开始工作");
            try {
                TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1结束工作");
            return 1;
        });


        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync( () -> {
            System.out.println("task2 开始工作");
            try {
                TimeUnit.SECONDS.sleep(RandomUtil.randomInt(0,2));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2 结束工作");
            return 2;
        });

        CompletableFuture<Void> completableFuture = task.runAfterEither(task2, () -> {
            System.out.println("有一个任务完成了");
        });


        completableFuture.get();


    }

输出结果

task1开始工作
task2 开始工作
task1结束工作
有一个任务完成了

thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,例如我们希望任务1执行完成后执行任务2,任务2执行完成后返回执行任务3,最终结果是从任务3中获取。

在这里插入图片描述

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建异步执行任务:
        CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
            System.out.println("task1开始工作");
            int num=RandomUtil.randomInt(0,5);
            try {
                TimeUnit.SECONDS.sleep(num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task1结束工作,处理结果:"+num);
            return num;
        });


        CompletableFuture<String> task2= task1.thenCompose((r)->{

            System.out.println("task2 开始工作");
            int num=RandomUtil.randomInt(0,5);
            try {
                TimeUnit.SECONDS.sleep(num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task2 结束工作");


            return CompletableFuture.supplyAsync(()->{
                System.out.println("task3 开始工作,收到任务1的执行结果:"+r);
                return "task3 finished";
            });
        });

        System.out.println("执行结果->"+task2.get());


    }

输出结果:

task1开始工作
task1结束工作,处理结果:1
task2 开始工作
task2 结束工作
task3 开始工作,收到任务1的执行结果:1
执行结果->task3 finished

allOf / anyOf

allOf返回的CompletableFuture是所有任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常。

在这里插入图片描述

public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务1
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务2
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "World";
        });

        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

        allFutures.thenRun(() -> {
            // 所有异步任务完成后打印它们的结果
            String result1 = future1.join();
            String result2 = future2.join();
            System.out.println(result1 + " " + result2);
        });

        // 等待所有异步任务完成
        allFutures.join();
    }

输出结果:

Hello World

而anyOf则是只要有一个任务完成就可以触发后续方法,并且可以返回先完成任务的返回值,这一点和上述applyToEither 例子差不多。

在这里插入图片描述

public class Main {

    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务1
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 模拟异步任务2
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "World";
        });

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

        anyFuture.thenAccept(result -> {
            // 任何一个异步任务完成后打印它的结果
            System.out.println(result);
        });

        // 等待任何一个异步任务完成
        anyFuture.join();
    }
}

参考文献

Java8 CompletableFuture 用法全解:https://blog.csdn.net/qq_31865983/article/details/106137777

源码解析 Java 的 compareAndSwapObject 到底比较的是什么?:https://blog.csdn.net/qq_40697071/article/details/103374783

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/239010.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【HTML】基于jsQR实现的HTML单页面扫码功能

前言 最近做了一个扫码签到的功能涉及到获取浏览器摄像头并扫码识别的功能。 选择jsQR的原因&#xff1a; html5-qrcode&#xff1a;使用简单&#xff0c;识别率低&#xff0c;二维码小不可解析 zxing/library&#xff1a; 识别率优于html5-qrcode&#xff0c;部分安卓模糊…

Linux系统编程:高级IO总结

非阻塞IO基本概念 高级IO核心就一个概念&#xff1a;非阻塞IO。 与该概念相对的&#xff0c;就是我们之前学习过的阻塞IO。 非阻塞IO&#xff08;Non-blocking I/O&#xff09;是一种IO模型&#xff0c;用于实现异步IO操作&#xff0c;使应用程序能够在等待IO操作完成的同时…

Ubuntu部署EMQX开源版MQTT服务器-Orange Pi部署-服务器部署

一、前言 作为全球最具扩展性的 MQTT 消息服务器&#xff0c;EMQX 提供了高效可靠海量物联网设备连接&#xff0c;能够高性能实时移动与处理消息和事件流数据&#xff0c;本文将介绍如何在Ubuntu 22.04上部署MQTT服务器。我们本次选择开源版&#xff0c;使用离线安装方式部署。…

d2l绘图不显示的问题

之前试了各种方法都不行 在pycharm中还是不行&#xff0c;但是在anaconda中的命令行是可以的 anaconda prompt conda activaye py39 #进入f盘 F: #运行文件 python F:\python_code\softmax.py

Linux Ubuntu 手动搭建webDav

1、安装 因为需要跟 zotero 进行交互&#xff0c;因此需要在服务器搭建一个webDav 以下是搭建步骤&#xff1a; sudo apt-get update sudo apt-get install apache2 Ubuntu 安装apache2来实现 不同于Centos 安装好了之后&#xff0c;运行 a2enmod dav_fs a2enmod dav 激…

Linux shell编程学习笔记34:eval 命令

0 前言 在JavaScript语言中&#xff0c;有一个很特别的函数eval&#xff0c;eval函数可以将字符串当做 JavaScript 代码执行&#xff0c;返回表达式或值。 在Linux Shell 中也提供了内建命令eval&#xff0c;它是否具有JavaScript语言中eval函数的功能呢&#xff1f; 1 eval命…

【flink番外篇】3、fflink的source(内置、mysql、kafka、redis、clickhouse)介绍及示例(2)- 自定义、mysql

Flink 系列文章 一、Flink 专栏 Flink 专栏系统介绍某一知识点&#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分&#xff0c;比如术语、架构、编程模型、编程指南、基本的…

LeetCode 1631. 最小体力消耗路径:广度优先搜索BFS

【LetMeFly】1631.最小体力消耗路径&#xff1a;广度优先搜索BFS 力扣题目链接&#xff1a;https://leetcode.cn/problems/path-with-minimum-effort/ 你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights &#xff0c;其中 heights[row][col] 表示格子 (ro…

Leetcode—2961.双模幂运算【中等】

2023每日刷题&#xff08;五十六&#xff09; Leetcode—2961.双模幂运算 实现代码 class Solution { public:int func(int a, int b) {int ans 1;for(int i 0; i < b; i) {ans * a;ans % 10;}return ans;}int func2(int a, int b, int m) {int ans 1;for(int i 0; i …

使用Kali Linux端口扫描

端口扫描 【实训目的】 掌握端口扫描的基本概念和端口扫描的原理&#xff0c;掌握各种类型端口扫描的方法及其区别。 【场景描述】 在虚拟机环境下配置4个虚拟系统“Win XP1” “Win XP2” “Kali Linux”和“Metasploitable2”&#xff0c;使得4个系统之间能够相互通信。实…

深度学习(生成式模型)——ADM:Diffusion Models Beat GANs on Image Synthesis

文章目录 前言基础模型结构UNet结构Timestep Embedding关于为什么需要timestep embedding global attention layer 如何提升diffusion model生成图像的质量Classifier guidance实验结果 前言 在前几篇博文中&#xff0c;我们已经介绍了DDPM、DDIM、Classifier guidance等相关的…

EasyV易知微助力智慧城市未来趋势发展——数字孪生城市

“智慧城市的未来趋势就是数字孪生”——《基于数字孪生的智慧城市》 城市数字化管理、智慧城市和数字孪生城市的发展是相互促进、逐步深化的过程。 城市数字化管理作为起点&#xff0c;奠定了信息化、数据化的基础&#xff1b;而智慧城市则将数字城市管理进一步升级&#xff…

Could not resolve all dependencies for configuration ‘:app:androidApis‘.

android studio出现Could not resolve all dependencies for configuration ‘:app:androidApis’. 试过很多种方法&#xff0c;但是都不好使&#xff0c;不管怎么样都是提示如下报错&#xff1a; Using insecure protocols with repositories, without explicit opt-in, is un…

nginx配置正向代理支持https

操作系统版本&#xff1a; Alibaba Cloud Linux 3.2104 LTS 64位 nginx版本&#xff1a; nginx-1.25.3 1. 下载软件 切换目录 cd /server wget http://nginx.org/download/nginx-1.25.3.tar.gz 1.1解压 tar -zxvf nginx-1.25.3.tar.gz 1.2切换到源码所在目录…

Wireshark中的http协议包分析

Wireshark可以跟踪网络协议的通讯过程&#xff0c;本节通过http协议&#xff0c;在了解Wireshark使用的基础上&#xff0c;重温http协议的通讯过程。 TCP&#xff08;Transmission Control Protocol&#xff0c;传输控制协议&#xff09;是一种面向连接的、可靠的、基于 字节流…

lwIP 细节之三:errf 回调函数是何时调用的

使用 lwIP 协议栈进行 TCP 裸机编程&#xff0c;其本质就是编写协议栈指定的各种回调函数。将你的应用逻辑封装成函数&#xff0c;注册到协议栈&#xff0c;在适当的时候&#xff0c;由协议栈自动调用&#xff0c;所以称为回调。 注&#xff1a;除非特别说明&#xff0c;以下内…

评论送书:以企业架构为中心的SABOE数字化转型五环法

01 传统企业数字化转型面临诸多挑战 即将过去的2023年&#xff0c;chatGPT大模型、数据资产入表等事件的发生&#xff0c;标志着数字经济正在加速发展。数字经济是人类社会继农业经济、工业经济之后的第三种经济形态&#xff0c;将推动生产方式、生活方式和治理方式深刻变革&a…

Goby 漏洞发布| 亿赛通电子文档安全管理系统 LinkFilterService 接口权限绕过漏洞

漏洞名称&#xff1a;亿赛通电子文档安全管理系统 LinkFilterService 接口权限绕过漏洞 English Name&#xff1a;Esafenet Electronic Document Security Management System LinkFilterService API Permission Bypass Vulnerability CVSS core: 9.3 影响资产数&#xff1a;…

玩转大数据12:大数据安全与隐私保护策略

1. 引言 大数据的快速发展&#xff0c;为各行各业带来了巨大的变革&#xff0c;也带来了新的安全和隐私挑战。大数据系统通常处理大量敏感数据&#xff0c;包括个人身份信息、财务信息、健康信息等。如果这些数据被泄露或滥用&#xff0c;可能会对个人、企业和社会造成严重的损…

《opencv实用探索·十八》Camshift进行目标追踪流程

CamShift&#xff08;Continuously Adaptive Mean Shift&#xff09;是一种用于目标跟踪的方法&#xff0c;它是均值漂移&#xff08;Mean Shift&#xff09;的扩展&#xff0c;支持对目标的旋转跟踪&#xff0c;能够对目标的大小和形状进行自适应调整。 cv::CamShift和cv::me…