Future CompleteFuture

前言

Java8 中的 completeFuture 是对 Future 的扩展实现,主要是为了弥补 Future 没有相应的回调机制的缺陷。

Callable、Runnable、Future、CompletableFuture 之间的关系:

Callable,有结果的同步行为,比如做蛋糕,产生蛋糕。
Runnable,无结果的同步行为,比如喝牛奶,仅仅就是喝。
Future,异步封装 Callable / Runnable,比如委托给师傅(其他线程)去做蛋糕,我去喝牛奶。但是蛋糕做好不会通知我。
CompletableFuture,封装Future,使其拥有回调功能,委托师傅做蛋糕,我去喝牛奶,并且师傅蛋糕做好了还会通知我。

1、Callable、Future、FutureTask

直接继承 Thread 或者实现 Runnable 接口都可以创建线程,但是这两种方法都有一个问题就是:没有返回值,也就是不能获取执行完的结果。因此 java1.5 就提供了 Callable 接口来实现这一场景,而 Future 和 FutureTask 就可以和 Callable 接口配合起来使用。

1.1 Callable 和 Runnable 的区别

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Callable 的 call 方法可以有返回值,可以声明抛出异常。和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的。

new Thread(new Runnable() {
    @Override
    public void run() {
        System.out.println("通过Runnable方式执行任务");
    }
}).start();

FutureTask task = new FutureTask(new Callable() {
    @Override
    public Object call() throws Exception {
        System.out.println("通过Callable方式执行任务");
        Thread.sleep(3000);
        return "返回任务结果";
    }
});
new Thread(task).start();

1.2、使用案例

在维护促销活动时需要查询商品信息(包括商品基本信息、商品价格、商品库存、商品图片、商品销售状态等)。

这些信息分布在不同的业务中心,由不同的系统提供服务。如果采用同步方式,假设一个接口需要50ms,那么一个商品查询下来就需要 200ms-300ms,这对于我们来说是不满意的。如果使用 Future 改造则需要的就是最长耗时服务的接口,也就是50ms左右。
在这里插入图片描述
配合线程池多线程执行

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;

public class Yang {

    private static ExecutorService executorService = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> ft1 = new FutureTask<>(new T1Task());
        FutureTask<String> ft2 = new FutureTask<>(new T2Task());
        FutureTask<String> ft3 = new FutureTask<>(new T3Task());
        FutureTask<String> ft4 = new FutureTask<>(new T4Task());
        FutureTask<String> ft5 = new FutureTask<>(new T5Task());

        executorService.submit(ft1);
        executorService.submit(ft2);
        executorService.submit(ft3);
        executorService.submit(ft4);
        executorService.submit(ft5);

        // 获取执行结果
        System.out.println(ft1.get());
        System.out.println(ft2.get());
        System.out.println(ft3.get());
        System.out.println(ft4.get());
        System.out.println(ft5.get());

        executorService.shutdown();

    }

    static class T1Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("T1: 查询商品基本信息...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "商品基本信息查询成功";
        }
    }

    static class T2Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("T2: 查询商品价格...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "商品价格查询成功";
        }
    }

    static class T3Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("T3: 查询商品库存...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "商品库存查询成功";
        }
    }

    static class T4Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("T4: 查询商品图片...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "商品图片查询成功";
        }
    }

    static class T5Task implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("T5: 查询商品销售状态...");
            TimeUnit.MILLISECONDS.sleep(50);
            return "商品销售状态查询成功";
        }
    }
}

1.3、Future的局限性

从本质上说,Future 表示一个异步计算的结果。它提供了 isDone() 来检测计算是否已经完成,并且在计算结束后,可以通过 get() 方法来获取计算结果。在异步计算中,Future 确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

并发执行多任务:Future 只提供了 get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法。
无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但 Future 却没有提供这样的能力。
无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在 Future 中这是无能为力的。
没有异常处理:Future 接口中没有关于异常处理的方法。
所以,我们还需要CompletionService来帮助我们完成这些需求。

1.4、Future 注意事项

当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制。
Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来。
2、CompletionService
2.1、CompletionService 原理
Callable + Future 可以实现多个task并行执行,但是如果遇到前面的 task 执行较慢时需要阻塞等待前面的 task 执行完后面 task 才能取得结果。而 CompletionService 的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

2.2、使用案例

应用:向不同电商平台询价,并保存价格。

// 创建线程池 
ExecutorService executor = Executors.newFixedThreadPool(3); 
// 异步向电商S1询价 
Future<Integer> f1 = executor.submit(() -> getPriceByS1()); 
// 异步向电商S2询价 
Future<Integer> f2 = executor.submit(() -> getPriceByS2());             
// 获取电商S1报价并异步保存 
executor.execute(() -> save(f1.get()));        
// 获取电商S2报价并异步保存 
executor.execute(() -> save(f2.get()) 

如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞 在了 f1 的 get() 操作上。

使用 CompletionService 实现先获取的报价先保存到数据库

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(() -> getPriceByS1());
// 异步向电商S2询价
cs.submit(() -> getPriceByS2());
//异步向电商S3询价
cs.submit(() -> getPriceByS3());
// 将询价结果异步保存到数据库
for (int i = 0; i < 3; i++) {
    Integer r = cs.take().get();
    executor.execute(() -> save(r));
}

2.3、应用场景总结

当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。
CompletionService 能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。
线程池隔离。CompletionService 支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

3、CompletableFuture

CompletableFuture 是 Future 接口的扩展和增强。CompletableFuture 实现了 Future 接口,并在此基础上进行了丰富地扩展,完美地弥补了 Future 上述的种种问题。

更为重要的是,CompletableFuture 实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过 CountDownLatch 等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
在这里插入图片描述
CompletionStage: 执行某一阶段,可向下执行后续阶段。异步执行,默认线程池是 ForkJoinPool 。

3.1、创建异步操作

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个方法区别在于:

  • 没有指定 Executor 的方法会使用 ForkJoinPool 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
  • runAsync 方法以 Runnable函数式接口类型为参数,没有返回结果,supplyAsync 方法以 Supplier函数式接口类型为参数,返回结果类型为U,Supplier 接口的 get() 方法会有返回值的但是会阻塞线程。

使用默认的线程池就会出现一个问题,在主线程任务执行完以后,如果异步线程执行任务还没执行完就会直接把线程清除掉,因为默认线程池中的都是守护线程
forkjoinpool,当没有用户线程以后,会随着 jvm 一起清除,可以在主线程中阻塞几秒来解决,但是这样的编码显得格外的不优雅!

3.1.1、runAsync 无返回值

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;

public class Yang {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {

        CompletableFuture.runAsync(new Runnable() {
            @Override
            public void run() {
                System.out.println("[" + Thread.currentThread().getName() + "]" + "   ");
            }
        });

        countDownLatch.await();
    }
}

// 结果
// [ForkJoinPool.commonPool-worker-25]执行无返回结果的异步任务


3.1.2、supplyAsync 有返回值

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;

public class Yang {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("[" + Thread.currentThread().getName() + "]" + "执行有返回值的异步任务");
            return "Hello World";
        });
        System.out.println(future.get());
    }
}

// 结果
// [ForkJoinPool.commonPool-worker-25]执行有返回值的异步任务
// Hello World


3.2、获取结果

public T get() throws InterruptedException, ExecutionException

public T join()
  • join()和 get()方法都是用来获取 CompletableFuture 异步之后的返回值。
  • 两者的主要区别在于,join()方法产生的是 RuntimeException,get()方法抛出的是受检异常(ExecutionException, InterruptedException )需要用户手动处理。

3.2.1、get & join 返回阻塞等待线程结束

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;

public class Yang {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("[" + Thread.currentThread().getName() + "]" + "执行有返回值的异步任务");
            return "Hello World";
        });
        System.out.println(future.get());
        System.out.println(future.join());
    }
}

// 结果
// [ForkJoinPool.commonPool-worker-25]执行有返回值的异步任务
// Hello World
// Hello World


3.3、结果处理

当 CompletableFuture 的计算结果完成或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) 


  • Action 的类型是 BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
  • 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
  • 这几个方法都会返回 CompletableFuture,当 Action 执行完毕后它的结果返回原始的 CompletableFuture 的计算结果或者返回异常。

3.3.1、whenComplete 无返回值

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;
import java.util.function.BiConsumer;

public class Yang {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行结束!");
            return "test";
        }).whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) {
                System.out.println("执行完成!");
            }
        });
        System.out.println(future.get());
    }
}

// 执行结束!
// 执行完成!
// test


3.3.2、exceptionally 有返回值

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;
import java.util.function.Function;

public class Yang {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行结束!");
            int i = 12 / 0;
            return "test";
        }).exceptionally(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) {
                System.out.println(throwable);
                return "异常";
            }
        });
        countDownLatch.await();
    }
}

// 执行结束!
// java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

3.4、结果转换

所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);

  • thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture。
  • thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个 CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的 CompletableFuture。

3.4.1、thenApply 有返回值

thenApply 接收一个函数作为参数,使用该函数处理上一个 CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;
import java.util.function.Function;

public class Yang {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行结束!");
            return "test";
        }).thenApply(new Function<String, String>() {
            @Override
            public String apply(String s) {
                System.out.println("执行完成!");
                return s + " yang";
            }
        });
        System.out.println(future.get());
    }
}

// 执行结束!
// 执行完成!
// test yang

3.4.2、thenCompose 有返回值

thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;
import java.util.function.Function;

public class Yang {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行结束!");
            return "test";
        }).thenCompose(new Function<String, CompletionStage<String>>() {
            @Override
            public CompletionStage<String> apply(String s) {
                System.out.println("执行完成!");
                return CompletableFuture.supplyAsync(() -> s + " yang");
            }
        });
        System.out.println(future.get());
    }
}

// 执行结束!
// 执行完成!
// test yang

3.5、结果消费

与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。

根据对结果的处理方式,结果消费函数又分为:

  • thenAccept系列:对单个结果进行消费
  • thenAcceptBoth系列:对两个结果进行消费
  • thenRun系列:不关心结果,执行额外逻辑
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);

3.5.1、thenAccept 无返回值

thenAccept 通过观察该系列函数的参数类型可知,它们是函数式接口 Consumer,这个接口只有输入,没有返回值。

package com.example.canal.CompleteFuture;

import java.util.Random;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class Yang {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(() -> {
            int number1 = new Random().nextInt(3) + 1;
            System.out.println("第一阶段:" + number1);
            return number1;
        });

        futrue1.thenAccept(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                System.out.println("最终结果:" + (integer));
            }
        });

        countDownLatch.await();
    }
}

// 第一阶段:1
// 最终结果:1

3.5.2、thenAcceptBoth 无返回值

thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果。

package com.example.canal.CompleteFuture;

import java.util.Random;
import java.util.concurrent.*;
import java.util.function.BiConsumer;

public class Yang {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(() -> {
            int number1 = new Random().nextInt(3) + 1;
            System.out.println("第一阶段:" + number1);
            return number1;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int number2 = new Random().nextInt(3) + 1;
            System.out.println("第二阶段:" + number2);
            return number2;
        });

        futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {
            @Override
            public void accept(Integer integer, Integer integer2) {
                System.out.println("最终结果:" + (integer + integer2));
            }
        });

        countDownLatch.await();
    }
}

// 结果
// 第一阶段:1
// 第二阶段:3
// 最终结果:4

3.5.3、thenRun 无返回值

thenRun 也是对线程任务结果的一种消费函数,与 thenAccept 不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。

package com.example.canal.CompleteFuture;

import java.util.Random;
import java.util.concurrent.*;

public class Yang {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            int number = new Random().nextInt(10);
            System.out.println("第一阶段:" + number);
            return number;
        }).thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("thenRun()执行...");
            }
        });

        countDownLatch.await();
    }
}

// 结果
// 第一阶段:5
// thenRun()执行...

3.6、结果组合

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

3.6.1、thenCombine 有返回值

thenCombine 方法,合并两个线程任务的结果,并进一步处理。

package com.example.canal.CompleteFuture;

import java.util.Random;
import java.util.concurrent.*;

public class Yang {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int number1 = new Random().nextInt(10);
            System.out.println("第一阶段:" + number1);
            return number1;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int number2 = new Random().nextInt(10);
            System.out.println("第二阶段:" + number2);
            return number2;
        });

        CompletableFuture<Integer> future3 = future1.thenCombine(future2, (number1, number2) -> number1 + number2);
        System.out.println("最终结果:" + future3.get());

        countDownLatch.await();
    }
}

// 结果
// 第一阶段:1
// 第二阶段:5
// 最终结果:6

3.7、任务交互

3.7.1、applyToEither 有返回值

applyToEither 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。

package com.example.canal.CompleteFuture;

import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Function;

public class Yang {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int number = new Random().nextInt(10);
            System.out.println("第一阶段 start:" + number);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一阶段 end:" + number);
            return number;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int number = new Random().nextInt(10);
            System.out.println("第二阶段 start:" + number);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二阶段 end:" + number);
            return number;
        });

        future1.applyToEither(future2, new Function<Integer, Object>() {
            @Override
            public Object apply(Integer integer) {
                System.out.println("最快结果:" + integer);
                return integer * 2;
            }
        });

        countDownLatch.await();
    }
}

// 结果
// 第一阶段 start:3
// 第二阶段 start:1
// 第二阶段 end:1
// 最快结果:1
// 第一阶段 end:3

3.7.2、acceptEither 无返回值

acceptEither 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。

package com.example.canal.CompleteFuture;

import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;

public class Yang {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int number = new Random().nextInt(10);
            System.out.println("第一阶段 start:" + number);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一阶段 end:" + number);
            return number;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int number = new Random().nextInt(10);
            System.out.println("第二阶段 start:" + number);
            try {
                TimeUnit.SECONDS.sleep(number);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二阶段 end:" + number);
            return number;
        });

        future1.acceptEither(future2, new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                System.out.println("最快结果:" + integer);
            }
        });

        countDownLatch.await();
    }
}

// 结果
// 第一阶段 start:3
// 第二阶段 start:1
// 第二阶段 end:1
// 最快结果:1
// 第一阶段 end:3

3.7.3、runAfterEither 无返回值

runAfterEither 两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;

public class Yang {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一阶段:1");
            return 1;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二阶段:2");
            return 2;
        });

        future1.runAfterEither(future2, new Runnable() {
            @Override
            public void run() {
                System.out.println("已经有一个任务完成了");
            }
        });

        countDownLatch.await();
    }
}

// 第一阶段:1
// 已经有一个任务完成了
// 第二阶段:2

3.7.4、runAfterBoth 无返回值

runAfterBoth 两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。

package com.example.canal.CompleteFuture;

import java.util.concurrent.*;

public class Yang {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一阶段:1");
            return 1;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二阶段:2");
            return 2;
        });

        future1.runAfterBoth(future2, new Runnable() {
            @Override
            public void run() {
                System.out.println("上面两个任务都执行完成了");
            }
        });

        countDownLatch.await();
    }
}

// 第一阶段:1
// 第二阶段:2
// 上面两个任务都执行完成了

3.7.5、anyOf 有返回值

anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回 CompletableFuture。

package com.example.canal.CompleteFuture;

import java.util.Random;
import java.util.concurrent.*;

public class Yang {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Random random = new Random();
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(1));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "world";
        });
        CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
        System.out.println(result.get());

        countDownLatch.await();
    }
}

// 结果
// world

3.7.6、allOf 无返回值

allOf方法用来实现多 CompletableFuture 的同时返回。

package com.example.canal.CompleteFuture;

import java.util.Random;
import java.util.concurrent.*;

public class Yang {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("future1 完成!");
            return "future1 完成!";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future2 完成!");
            return "future2 完成!";
        });

        CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2);
        try {
            combindFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        countDownLatch.await();
    }
}

// future2 完成!
// future1 完成!

4、CompletableFuture 常用方法总结

分类方法说明返回值
异步执行一个线程runAsync默认ForkJoinPool 线程池无返回值
异步执行一个线程supplyAsync默认 ForkJoinPool 线程池有返回值
分类方法说明返回值
两个线程依次执行thenApply获取前一个线程的结果,转换结果,会返还给调用端有返回值
两个线程依次执行thenAccept获取前一个线程的结果,消费结果,不会返还给调用端无返回值
两个线程依次执行thenRun 忽略前一个线程的结果,执行额外的逻辑无返回值
两个线程依次执行whenComplete获取前一个线程的结果或异常,消费不影响上一线程返回值
两个线程依次执行exceptionally线程异常执行,配合whenComplete 使用有返回值
两个线程依次执行handle相当于whenComplete + exceptionally有返回值
分类方法说明返回值
-等待2个线程都执行完- thenCombine- 2个线程都要有返回值,等待都结束,结果合并转换- 有返回值
-等待2个线程执都行完- thenAcceptBoth-2个线程都要有返回值,等待都结束,结果合并消费- 无返回值
-等待2个线程执都行完- runAfterBoth-2个线程无需要有返回值,等待都结束,执行其他逻辑- 无返回值
分类方法说明返回值
等待2个线程任一执行完applyToEither2个线程都要有返回值,等待任一结束,转换其结果有返回值
等待2个线程任一执行完acceptEither2个线程都要有返回值,等待任一结束,消费其结果无返回值
等待2个线程任一执行完runAfterEither2个线程无需有返回值,等待任一结束,执行其他逻辑无返回值
分类方法说明返回值
多个线程等待anyOf多个线程任一执行完返回有返回值
多个线程等待allOf多个线程全部执行完返回无返回值

5、Java8 CompleteFuture 简单使用

我们先看看 Java8 之前的 Future 的使用:

package com.example.canal.CompleteFuture;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class Yang {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService cachePool = Executors.newCachedThreadPool();
        Future<String> future = cachePool.submit(() -> {
            Thread.sleep(3000);
            return "异步任务计算结果!";
        });

        // 提交完异步任务后,主线程可以继续干一些其他的事情
        doSomeThingElse();

        // 为了获取异步计算结果,我们可以通过 future.get 和 轮询机制来获取
        String result;

        // Get 方式会导致当前线程阻塞, 这显然违背了异步计算的初衷
        // result = future.get();

        // 轮询方式虽然不会导致当前线程阻塞, 但是会导致高额的 CPU 负载
        long start = System.currentTimeMillis();
        while (true) {
            if (future.isDone()) {
                break;
            }
        }
        System.out.println("轮询耗时:" + (System.currentTimeMillis() - start));

        result = future.get();
        System.out.println("获取到异步计算结果啦: " + result);

        cachePool.shutdown();
    }

    private static void doSomeThingElse() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程正在做事.....");
    }
}

输出:

主线程正在做事… 轮询耗时:1998 获取到异步计算结果啦: 异步任务计算结果!

从上面的 Demo 中我们可以看出,future 在执行异步任务时,,对于结果的获取显的不那么优雅,很多第三方库就针对 Future 提供了回调式的接口以用来获取异步计算结果,如Google的: ListenableFuture,而 Java8 所提供的 CompleteFuture 便是官方为了弥补这方面的不足而提供的 API。

下面简单介绍用法:

package com.example.canal.CompleteFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Yang {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<String> completableFutureOne = new CompletableFuture<>();

        ExecutorService cachePool = Executors.newCachedThreadPool();
        cachePool.execute(() -> {
            try {
                Thread.sleep(3000);
                completableFutureOne.complete("异步任务执行结果");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // WhenComplete 方法返回的 CompletableFuture 仍然是原来的 CompletableFuture 计算结果
        CompletableFuture<String> completableFutureTwo = completableFutureOne.whenComplete((s, throwable) -> {
            System.out.println("当异步任务执行完毕时打印异步任务的执行结果: " + s);
        });

        // ThenApply 方法返回的是一个新的 completeFuture
        CompletableFuture<Integer> completableFutureThree = completableFutureTwo.thenApply(s -> {
            System.out.println("当异步任务执行结束时, 根据上一次的异步任务结果, 继续开始一个新的异步任务!");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s.length();
        });

        System.out.println("阻塞方式获取执行结果:" + completableFutureThree.get());

        cachePool.shutdown();
    }
}

从上面的 Demo 中我们主要需要注意 thenApply 和 whenComplete 这两个方法,这两个方法便是 CompleteFuture 中最具有意义的方法,,他们都会在 completeFuture 调用,complete 方法传入异步计算结果时回调,从而获取到异步任务的结果。

相比之下 future 的阻塞和轮询方式获取异步任务的计算结果,CompleteFuture 获取结果的方式就显的优雅的多。

6、实现最优的 烧水泡茶 程序

著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

在这里插入图片描述

对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶,烧开水,泡茶,T2 负责洗茶壶,洗茶杯,拿茶叶。

6.1、基于Future实现

package com.example.canal.CompleteFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

public class Yang {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创建任务T2的FutureTask
        FutureTask<String> ft2 = new FutureTask<>(new T2Task());
        // 创建任务T1的FutureTask
        FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));

        // 线程T1执行任务ft1
        Thread T1 = new Thread(ft1);
        T1.start();
        // 线程T2执行任务ft2
        Thread T2 = new Thread(ft2);
        T2.start();
        // 等待线程T1执行结果
        System.out.println(ft1.get());

    }
}

// T1 需要执行的任务:洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
    FutureTask<String> ft2;

    // T1任务需要T2任务的FutureTask
    T1Task(FutureTask<String> ft2) {
        this.ft2 = ft2;
    }

    @Override
    public String call() throws Exception {
        System.out.println("T1:洗水壶...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("T1:烧开水...");
        TimeUnit.SECONDS.sleep(15);

        // 获取T2线程的茶叶
        String tf = ft2.get();
        System.out.println("T1:拿到茶叶:" + tf);

        System.out.println("T1:泡茶...");
        return "上茶:" + tf;
    }
}

// T2 需要执行的任务:洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("T2:洗茶壶...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("T2:洗茶杯...");
        TimeUnit.SECONDS.sleep(2);

        System.out.println("T2:拿茶叶...");
        TimeUnit.SECONDS.sleep(1);
        return "绿茶";
    }
}

// T1:洗水壶...
// T2:洗茶壶...
// T1:烧开水...
// T2:洗茶杯...
// T2:拿茶叶...
// T1:拿到茶叶:绿茶
// T1:泡茶...
// 上茶:绿茶

6.2、基于CompletableFuture实现

package com.example.canal.CompleteFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Yang {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 任务1:洗水壶、烧开水
        CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
            System.out.println("T1:洗水壶...");
            sleep(1, TimeUnit.SECONDS);

            System.out.println("T1:烧开水...");
            sleep(15, TimeUnit.SECONDS);
        });

        // 任务2:洗茶壶、洗茶杯、拿茶叶
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("T2:洗茶壶...");
            sleep(1, TimeUnit.SECONDS);

            System.out.println("T2:洗茶杯...");
            sleep(2, TimeUnit.SECONDS);

            System.out.println("T2:拿茶叶...");
            sleep(1, TimeUnit.SECONDS);
            return "龙井";
        });

        // 任务3:任务1和任务2完成后执行:泡茶
        CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
            System.out.println("T1:拿到茶叶:" + tf);
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        });
        // 等待任务3执行结果
        System.out.println(f3.get());
    }

    static void sleep(int t, TimeUnit u) {
        try {
            u.sleep(t);
        } catch (InterruptedException e) {
        }
    }
}

// T1:洗水壶...
// T2:洗茶壶...
// T2:洗茶杯...
// T1:烧开水...
// T2:拿茶叶...
// T1:拿到茶叶:龙井
// T1:泡茶...
// 上茶:龙井

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/254540.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python程序打包成exe全流程纪实(windows)

目录 前言准备工作安装python&#xff08;必须&#xff09;安装vs平台或conda&#xff08;非必须&#xff09; 详细步骤Step1.创建python虚拟环境方法一、裸装(windows下)方法二、借助工具(windows下) Step2.安装打包必须的python包Step3.准备好程序logo&#xff08;非必须&…

51单片机定时器

51单片机有两个16位定时器&#xff0c;今天复习了一下使用方法&#xff0c;发现当初刚开始学习51单片机时并没有记录&#xff0c;特此今天补上这篇博客。 下面是定时器的总览示意图&#xff0c;看到这个图就能想到定时器怎么设置&#xff0c;怎么开始工作。 第一步&#xff1a…

刷完这个笔记,18K不能再少了....

大家好&#xff0c;最近有不少小伙伴在后台留言&#xff0c;得准备年后面试了&#xff0c;又不知道从何下手&#xff01;为了帮大家节约时间&#xff0c;特意准备了一份面试相关的资料&#xff0c;内容非常的全面&#xff0c;真的可以好好补一补&#xff0c;希望大家在都能拿到…

EmbedAI:一个可以上传文件训练自己ChatGPT的AI工具,妈妈再也不用担心我的GPT不会回答问题

功能介绍&#xff1a; 个性化定制&#xff1a;提供灵活的训练选项&#xff0c;用户能够通过文件、网站、Notion文档甚至YouTube等多种数据源对ChatGPT进行训练&#xff0c;以满足不同领域和需求的个性化定制。广泛应用场景&#xff1a;ChatGPT支持多种用例&#xff0c;包括智能…

Jmeter吞吐量控制器使用小结

吞吐量控制器(Throughput Controller)场景: 在同一个线程组里, 有10个并发, 7个做A业务, 3个做B业务,要模拟这种场景,可以通过吞吐量模拟器来实现.。 添加吞吐量控制器 用法1: Percent Executions 在一个线程组内分别建立两个吞吐量控制器, 分别放业务A和业务B 吞吐量控制器采…

【算法系列篇】递归、搜索和回溯(三)

文章目录 前言什么是决策树1. 全排列1.1 题目要求1.2 做题思路1.3 代码实现 2. 子集2.1 题目要求2.2 做题思路2.3 代码实现 3. 找出所有子集的异或总和再求和3.1 题目要求3.2 做题思路3.3 代码实现 4. 全排列II4.1 题目要求4.2 做题思路4.3 代码实现 前言 前面我们通过几个题目…

蚂蚁集团5大开源项目获开放原子 “2023快速成长开源项目”

12月16日&#xff0c;在开放原子开源基金会主办的“2023开放原子开发者大会”上&#xff0c;蚂蚁集团主导开源的图数据库TuGraph、时序数据库CeresDB、隐私计算框架隐语SecretFlow、前端框架OpenSumi、数据域大模型开源框架DB-GPT入选“2023快速成长开源项目”。 &#xff08;图…

Kafka中Ack应答级别和数据去重

在Kafka中&#xff0c;保证数据安全可靠的条件是&#xff1a; 数据完全可靠条件 ACK级别设置为-1 分区副本大于等于2 ISR里应答的最小副本数量大于等于2&#xff1b; Ack应答级别 可靠性总结&#xff1a; acks0&#xff0c;生产者发送过来数据就不管了&#xff0c;可靠性差…

2023年国赛高教杯数学建模D题圈养湖羊的空间利用率解题全过程文档及程序

2023年国赛高教杯数学建模 D题 圈养湖羊的空间利用率 原题再现 规模化的圈养养殖场通常根据牲畜的性别和生长阶段分群饲养&#xff0c;适应不同种类、不同阶段的牲畜对空间的不同要求&#xff0c;以保障牲畜安全和健康&#xff1b;与此同时&#xff0c;也要尽量减少空间闲置所…

人工智能深度学习:探索智能的深邃奥秘

导言 人工智能深度学习作为当今科技领域的明星&#xff0c;正引领着智能时代的浪潮。深度学习和机器学习作为人工智能领域的两大支柱&#xff0c;它们之间的关系既有协同合作&#xff0c;又存在着显著的区别。本文将深入研究深度学习在人工智能领域的角色&#xff0c;以及其在各…

Android Termux安装MySQL数据库并通过内网穿透实现公网远程访问

文章目录 前言1.安装MariaDB2.安装cpolar内网穿透工具3. 创建安全隧道映射mysql4. 公网远程连接5. 固定远程连接地址 前言 Android作为移动设备&#xff0c;尽管最初并非设计为服务器&#xff0c;但是随着技术的进步我们可以将Android配置为生产力工具&#xff0c;变成一个随身…

鸿蒙端H5容器化建设——JSB通信机制建设

1. 背景 2023年鸿蒙开发者大会上&#xff0c;华为宣布为了应对国外技术封锁的潜在风险&#xff0c;2024年的HarmonyOS NEXT版本中将不再兼容Android&#xff0c;并推出鸿蒙系统以及其自研的开发框架&#xff0c;形成开发生态闭环。同时&#xff0c;在更高维度上华为希望将鸿蒙…

GPT-4V被超越?SEED-Bench多模态大模型测评基准更新

&#x1f4d6; 技术报告 SEED-Bench-1&#xff1a;https://arxiv.org/abs/2307.16125 SEED-Bench-2&#xff1a;https://arxiv.org/abs/2311.17092 &#x1f917; 测评数据 SEED-Bench-1&#xff1a;https://huggingface.co/datasets/AILab-CVC/SEED-Bench SEED-Bench-2&…

基于主动安全的AIGC数据安全建设

面对AIGC带来的数据安全新问题&#xff0c;是不是就应该一刀切禁止AIGC的研究利用呢&#xff1f;答案是否定的。要发展AIGC&#xff0c;也要主动积极地对AIGC的数据安全进行建设。让AIGC更加安全、可靠的为用户服务。为达到此目的&#xff0c;应该从三个方面来开展AIGC的数据安…

C++中的并发多线程网络通讯

C中的并发多线程网络通讯 一、引言 C作为一种高效且功能强大的编程语言&#xff0c;为开发者提供了多种工具来处理多线程和网络通信。多线程编程允许多个任务同时执行&#xff0c;而网络通信则是现代应用程序的基石。本文将深入探讨如何使用C实现并发多线程网络通信&#xff…

【Netty】Netty核心概念

目录 NIO编程NIO介绍NIO和BIO的比较缓冲区(Buffer)基本介绍常用API缓冲区对象创建添加数据读取数据 通道(Channel)基本介绍Channel常用类ServerSocketChannelSocketChannel Selector (选择器)基本介绍常用API介绍示例代码 NIO 三大核心原理 Netty核心概念Netty 介绍原生 NIO 存…

verilog基础语法-计数器

概述&#xff1a; 计数器是FPGA开发中最常用的电路&#xff0c;列如通讯中记录时钟个数&#xff0c;跑马灯中时间记录&#xff0c;存储器中地址的控制等等。本节给出向上计数器&#xff0c;上下计数器以及双向计数器案例。 内容 1. 向上计数器 2.向下计数器 3.向上向下计数…

Minio文件服务器(上传文件)

官网&#xff1a;https://www.minio.org.cn/ 开源的分布式对象存储服务器 Window安装 用户名和密码相同 创建bucket&#xff0c;并且将策略改成public 一、添加依赖 二、代码 public class FileUploadTest{public static void main(String[] args) throws Exception{//…

RHEL8_Linux_Ansible常用模块的使用

本章主要介绍Ansible中最常见模块的使用 shell模块文件管理模块软件包管理模块服务管理模块磁盘管理模块用户管理模块防火墙管理模块 ansible的基本用法如下。 ansible 机器名 -m 模块x -a "模块的参数" 对被管理机器执行不同的操作&#xff0c;只需要调用不同的模块…

做计算,找天玑算!

天玑算科研服务_DFT计算_MD模拟_FEA_ML_相图计算200余位计算工程师均来自己TOP高校及科研院所&#xff0c;涉及第一性原理&#xff0c;分子动力学&#xff0c;有限元&#xff0c;机器学习&#xff0c;可为催化、电池、能源、化工、生物等重多领域提供技术支持&#xff0c;计算软…