前言
Java8 中的 completeFuture 是对 Future 的扩展实现,主要是为了弥补 Future 没有相应的回调机制的缺陷。
Callable、Runnable、Future、CompletableFuture 之间的关系:
Callable,有结果的同步行为,比如做蛋糕,产生蛋糕。
Runnable,无结果的同步行为,比如喝牛奶,仅仅就是喝。
Future,异步封装 Callable / Runnable,比如委托给师傅(其他线程)去做蛋糕,我去喝牛奶。但是蛋糕做好不会通知我。
CompletableFuture,封装Future,使其拥有回调功能,委托师傅做蛋糕,我去喝牛奶,并且师傅蛋糕做好了还会通知我。
1、Callable、Future、FutureTask
直接继承 Thread 或者实现 Runnable 接口都可以创建线程,但是这两种方法都有一个问题就是:没有返回值,也就是不能获取执行完的结果。因此 java1.5 就提供了 Callable 接口来实现这一场景,而 Future 和 FutureTask 就可以和 Callable 接口配合起来使用。
1.1 Callable 和 Runnable 的区别
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callable 的 call 方法可以有返回值,可以声明抛出异常。和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的。
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("通过Runnable方式执行任务");
}
}).start();
FutureTask task = new FutureTask(new Callable() {
@Override
public Object call() throws Exception {
System.out.println("通过Callable方式执行任务");
Thread.sleep(3000);
return "返回任务结果";
}
});
new Thread(task).start();
1.2、使用案例
在维护促销活动时需要查询商品信息(包括商品基本信息、商品价格、商品库存、商品图片、商品销售状态等)。
这些信息分布在不同的业务中心,由不同的系统提供服务。如果采用同步方式,假设一个接口需要50ms,那么一个商品查询下来就需要 200ms-300ms,这对于我们来说是不满意的。如果使用 Future 改造则需要的就是最长耗时服务的接口,也就是50ms左右。
配合线程池多线程执行
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
public class Yang {
private static ExecutorService executorService = Executors.newFixedThreadPool(5);
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> ft1 = new FutureTask<>(new T1Task());
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
FutureTask<String> ft3 = new FutureTask<>(new T3Task());
FutureTask<String> ft4 = new FutureTask<>(new T4Task());
FutureTask<String> ft5 = new FutureTask<>(new T5Task());
executorService.submit(ft1);
executorService.submit(ft2);
executorService.submit(ft3);
executorService.submit(ft4);
executorService.submit(ft5);
// 获取执行结果
System.out.println(ft1.get());
System.out.println(ft2.get());
System.out.println(ft3.get());
System.out.println(ft4.get());
System.out.println(ft5.get());
executorService.shutdown();
}
static class T1Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T1: 查询商品基本信息...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品基本信息查询成功";
}
}
static class T2Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T2: 查询商品价格...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品价格查询成功";
}
}
static class T3Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T3: 查询商品库存...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品库存查询成功";
}
}
static class T4Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T4: 查询商品图片...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品图片查询成功";
}
}
static class T5Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T5: 查询商品销售状态...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品销售状态查询成功";
}
}
}
1.3、Future的局限性
从本质上说,Future 表示一个异步计算的结果。它提供了 isDone() 来检测计算是否已经完成,并且在计算结束后,可以通过 get() 方法来获取计算结果。在异步计算中,Future 确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
并发执行多任务:Future 只提供了 get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法。
无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但 Future 却没有提供这样的能力。
无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在 Future 中这是无能为力的。
没有异常处理:Future 接口中没有关于异常处理的方法。
所以,我们还需要CompletionService来帮助我们完成这些需求。
1.4、Future 注意事项
当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制。
Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来。
2、CompletionService
2.1、CompletionService 原理
Callable + Future 可以实现多个task并行执行,但是如果遇到前面的 task 执行较慢时需要阻塞等待前面的 task 执行完后面 task 才能取得结果。而 CompletionService 的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
2.2、使用案例
应用:向不同电商平台询价,并保存价格。
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 异步向电商S1询价
Future<Integer> f1 = executor.submit(() -> getPriceByS1());
// 异步向电商S2询价
Future<Integer> f2 = executor.submit(() -> getPriceByS2());
// 获取电商S1报价并异步保存
executor.execute(() -> save(f1.get()));
// 获取电商S2报价并异步保存
executor.execute(() -> save(f2.get())
如果获取电商S1报价的耗时很长,那么即便获取电商S2报价的耗时很短,也无法让保存S2报价的操作先执行,因为这个主线程都阻塞 在了 f1 的 get() 操作上。
使用 CompletionService 实现先获取的报价先保存到数据库
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(() -> getPriceByS1());
// 异步向电商S2询价
cs.submit(() -> getPriceByS2());
//异步向电商S3询价
cs.submit(() -> getPriceByS3());
// 将询价结果异步保存到数据库
for (int i = 0; i < 3; i++) {
Integer r = cs.take().get();
executor.execute(() -> save(r));
}
2.3、应用场景总结
当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。
CompletionService 能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。
线程池隔离。CompletionService 支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
3、CompletableFuture
CompletableFuture 是 Future 接口的扩展和增强。CompletableFuture 实现了 Future 接口,并在此基础上进行了丰富地扩展,完美地弥补了 Future 上述的种种问题。
更为重要的是,CompletableFuture 实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过 CountDownLatch 等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
CompletionStage: 执行某一阶段,可向下执行后续阶段。异步执行,默认线程池是 ForkJoinPool 。
3.1、创建异步操作
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
这四个方法区别在于:
- 没有指定 Executor 的方法会使用 ForkJoinPool 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
- runAsync 方法以 Runnable函数式接口类型为参数,没有返回结果,supplyAsync 方法以 Supplier函数式接口类型为参数,返回结果类型为U,Supplier 接口的 get() 方法会有返回值的但是会阻塞线程。
使用默认的线程池就会出现一个问题,在主线程任务执行完以后,如果异步线程执行任务还没执行完就会直接把线程清除掉,因为默认线程池中的都是守护线程
forkjoinpool,当没有用户线程以后,会随着 jvm 一起清除,可以在主线程中阻塞几秒来解决,但是这样的编码显得格外的不优雅!
3.1.1、runAsync 无返回值
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
public class Yang {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("[" + Thread.currentThread().getName() + "]" + " ");
}
});
countDownLatch.await();
}
}
// 结果
// [ForkJoinPool.commonPool-worker-25]执行无返回结果的异步任务
3.1.2、supplyAsync 有返回值
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
public class Yang {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "]" + "执行有返回值的异步任务");
return "Hello World";
});
System.out.println(future.get());
}
}
// 结果
// [ForkJoinPool.commonPool-worker-25]执行有返回值的异步任务
// Hello World
3.2、获取结果
public T get() throws InterruptedException, ExecutionException
public T join()
- join()和 get()方法都是用来获取 CompletableFuture 异步之后的返回值。
- 两者的主要区别在于,join()方法产生的是 RuntimeException,get()方法抛出的是受检异常(ExecutionException, InterruptedException )需要用户手动处理。
3.2.1、get & join 返回阻塞等待线程结束
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
public class Yang {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("[" + Thread.currentThread().getName() + "]" + "执行有返回值的异步任务");
return "Hello World";
});
System.out.println(future.get());
System.out.println(future.join());
}
}
// 结果
// [ForkJoinPool.commonPool-worker-25]执行有返回值的异步任务
// Hello World
// Hello World
3.3、结果处理
当 CompletableFuture 的计算结果完成或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
- Action 的类型是 BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
- 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
- 这几个方法都会返回 CompletableFuture,当 Action 执行完毕后它的结果返回原始的 CompletableFuture 的计算结果或者返回异常。
3.3.1、whenComplete 无返回值
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
public class Yang {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行结束!");
return "test";
}).whenComplete(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) {
System.out.println("执行完成!");
}
});
System.out.println(future.get());
}
}
// 执行结束!
// 执行完成!
// test
3.3.2、exceptionally 有返回值
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
import java.util.function.Function;
public class Yang {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行结束!");
int i = 12 / 0;
return "test";
}).exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
System.out.println(throwable);
return "异常";
}
});
countDownLatch.await();
}
}
// 执行结束!
// java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
3.4、结果转换
所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
- thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture。
- thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个 CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的 CompletableFuture。
3.4.1、thenApply 有返回值
thenApply 接收一个函数作为参数,使用该函数处理上一个 CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
import java.util.function.Function;
public class Yang {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行结束!");
return "test";
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
System.out.println("执行完成!");
return s + " yang";
}
});
System.out.println(future.get());
}
}
// 执行结束!
// 执行完成!
// test yang
3.4.2、thenCompose 有返回值
thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
import java.util.function.Function;
public class Yang {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行结束!");
return "test";
}).thenCompose(new Function<String, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s) {
System.out.println("执行完成!");
return CompletableFuture.supplyAsync(() -> s + " yang");
}
});
System.out.println(future.get());
}
}
// 执行结束!
// 执行完成!
// test yang
3.5、结果消费
与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。
根据对结果的处理方式,结果消费函数又分为:
- thenAccept系列:对单个结果进行消费
- thenAcceptBoth系列:对两个结果进行消费
- thenRun系列:不关心结果,执行额外逻辑
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
3.5.1、thenAccept 无返回值
thenAccept 通过观察该系列函数的参数类型可知,它们是函数式接口 Consumer,这个接口只有输入,没有返回值。
package com.example.canal.CompleteFuture;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
public class Yang {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(() -> {
int number1 = new Random().nextInt(3) + 1;
System.out.println("第一阶段:" + number1);
return number1;
});
futrue1.thenAccept(new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("最终结果:" + (integer));
}
});
countDownLatch.await();
}
}
// 第一阶段:1
// 最终结果:1
3.5.2、thenAcceptBoth 无返回值
thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果。
package com.example.canal.CompleteFuture;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
public class Yang {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(() -> {
int number1 = new Random().nextInt(3) + 1;
System.out.println("第一阶段:" + number1);
return number1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number2 = new Random().nextInt(3) + 1;
System.out.println("第二阶段:" + number2);
return number2;
});
futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {
@Override
public void accept(Integer integer, Integer integer2) {
System.out.println("最终结果:" + (integer + integer2));
}
});
countDownLatch.await();
}
}
// 结果
// 第一阶段:1
// 第二阶段:3
// 最终结果:4
3.5.3、thenRun 无返回值
thenRun 也是对线程任务结果的一种消费函数,与 thenAccept 不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。
package com.example.canal.CompleteFuture;
import java.util.Random;
import java.util.concurrent.*;
public class Yang {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第一阶段:" + number);
return number;
}).thenRun(new Runnable() {
@Override
public void run() {
System.out.println("thenRun()执行...");
}
});
countDownLatch.await();
}
}
// 结果
// 第一阶段:5
// thenRun()执行...
3.6、结果组合
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
3.6.1、thenCombine 有返回值
thenCombine 方法,合并两个线程任务的结果,并进一步处理。
package com.example.canal.CompleteFuture;
import java.util.Random;
import java.util.concurrent.*;
public class Yang {
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number1 = new Random().nextInt(10);
System.out.println("第一阶段:" + number1);
return number1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number2 = new Random().nextInt(10);
System.out.println("第二阶段:" + number2);
return number2;
});
CompletableFuture<Integer> future3 = future1.thenCombine(future2, (number1, number2) -> number1 + number2);
System.out.println("最终结果:" + future3.get());
countDownLatch.await();
}
}
// 结果
// 第一阶段:1
// 第二阶段:5
// 最终结果:6
3.7、任务交互
3.7.1、applyToEither 有返回值
applyToEither 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。
package com.example.canal.CompleteFuture;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Function;
public class Yang {
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第一阶段 start:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一阶段 end:" + number);
return number;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第二阶段 start:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二阶段 end:" + number);
return number;
});
future1.applyToEither(future2, new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) {
System.out.println("最快结果:" + integer);
return integer * 2;
}
});
countDownLatch.await();
}
}
// 结果
// 第一阶段 start:3
// 第二阶段 start:1
// 第二阶段 end:1
// 最快结果:1
// 第一阶段 end:3
3.7.2、acceptEither 无返回值
acceptEither 两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。
package com.example.canal.CompleteFuture;
import java.util.Random;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
public class Yang {
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第一阶段 start:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一阶段 end:" + number);
return number;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("第二阶段 start:" + number);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二阶段 end:" + number);
return number;
});
future1.acceptEither(future2, new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
System.out.println("最快结果:" + integer);
}
});
countDownLatch.await();
}
}
// 结果
// 第一阶段 start:3
// 第二阶段 start:1
// 第二阶段 end:1
// 最快结果:1
// 第一阶段 end:3
3.7.3、runAfterEither 无返回值
runAfterEither 两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
public class Yang {
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一阶段:1");
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二阶段:2");
return 2;
});
future1.runAfterEither(future2, new Runnable() {
@Override
public void run() {
System.out.println("已经有一个任务完成了");
}
});
countDownLatch.await();
}
}
// 第一阶段:1
// 已经有一个任务完成了
// 第二阶段:2
3.7.4、runAfterBoth 无返回值
runAfterBoth 两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。
package com.example.canal.CompleteFuture;
import java.util.concurrent.*;
public class Yang {
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一阶段:1");
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第二阶段:2");
return 2;
});
future1.runAfterBoth(future2, new Runnable() {
@Override
public void run() {
System.out.println("上面两个任务都执行完成了");
}
});
countDownLatch.await();
}
}
// 第一阶段:1
// 第二阶段:2
// 上面两个任务都执行完成了
3.7.5、anyOf 有返回值
anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回 CompletableFuture。
package com.example.canal.CompleteFuture;
import java.util.Random;
import java.util.concurrent.*;
public class Yang {
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(random.nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(random.nextInt(1));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
});
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
System.out.println(result.get());
countDownLatch.await();
}
}
// 结果
// world
3.7.6、allOf 无返回值
allOf方法用来实现多 CompletableFuture 的同时返回。
package com.example.canal.CompleteFuture;
import java.util.Random;
import java.util.concurrent.*;
public class Yang {
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("future1 完成!");
return "future1 完成!";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("future2 完成!");
return "future2 完成!";
});
CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2);
try {
combindFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
countDownLatch.await();
}
}
// future2 完成!
// future1 完成!
4、CompletableFuture 常用方法总结
分类 | 方法 | 说明 | 返回值 |
---|---|---|---|
异步执行一个线程 | runAsync | 默认ForkJoinPool 线程池 | 无返回值 |
异步执行一个线程 | supplyAsync | 默认 ForkJoinPool 线程池 | 有返回值 |
分类 | 方法 | 说明 | 返回值 |
---|---|---|---|
两个线程依次执行 | thenApply | 获取前一个线程的结果,转换结果,会返还给调用端 | 有返回值 |
两个线程依次执行 | thenAccept | 获取前一个线程的结果,消费结果,不会返还给调用端 | 无返回值 |
两个线程依次执行 | thenRun 忽略前一个线程的结果,执行额外的逻辑 | 无返回值 | |
两个线程依次执行 | whenComplete | 获取前一个线程的结果或异常,消费 | 不影响上一线程返回值 |
两个线程依次执行 | exceptionally | 线程异常执行,配合whenComplete 使用 | 有返回值 |
两个线程依次执行 | handle | 相当于whenComplete + exceptionally | 有返回值 |
分类 | 方法 | 说明 | 返回值 |
---|---|---|---|
-等待2个线程都执行完 | - thenCombine | - 2个线程都要有返回值,等待都结束,结果合并转换 | - 有返回值 |
-等待2个线程执都行完 | - thenAcceptBoth | -2个线程都要有返回值,等待都结束,结果合并消费 | - 无返回值 |
-等待2个线程执都行完 | - runAfterBoth | -2个线程无需要有返回值,等待都结束,执行其他逻辑 | - 无返回值 |
分类 | 方法 | 说明 | 返回值 |
– | – | – | – |
等待2个线程任一执行完 | applyToEither | 2个线程都要有返回值,等待任一结束,转换其结果 | 有返回值 |
等待2个线程任一执行完 | acceptEither | 2个线程都要有返回值,等待任一结束,消费其结果 | 无返回值 |
等待2个线程任一执行完 | runAfterEither | 2个线程无需有返回值,等待任一结束,执行其他逻辑 | 无返回值 |
分类 | 方法 | 说明 | 返回值 |
– | – | – | – |
多个线程等待 | anyOf | 多个线程任一执行完返回 | 有返回值 |
多个线程等待 | allOf | 多个线程全部执行完返回 | 无返回值 |
5、Java8 CompleteFuture 简单使用
我们先看看 Java8 之前的 Future 的使用:
package com.example.canal.CompleteFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Yang {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService cachePool = Executors.newCachedThreadPool();
Future<String> future = cachePool.submit(() -> {
Thread.sleep(3000);
return "异步任务计算结果!";
});
// 提交完异步任务后,主线程可以继续干一些其他的事情
doSomeThingElse();
// 为了获取异步计算结果,我们可以通过 future.get 和 轮询机制来获取
String result;
// Get 方式会导致当前线程阻塞, 这显然违背了异步计算的初衷
// result = future.get();
// 轮询方式虽然不会导致当前线程阻塞, 但是会导致高额的 CPU 负载
long start = System.currentTimeMillis();
while (true) {
if (future.isDone()) {
break;
}
}
System.out.println("轮询耗时:" + (System.currentTimeMillis() - start));
result = future.get();
System.out.println("获取到异步计算结果啦: " + result);
cachePool.shutdown();
}
private static void doSomeThingElse() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程正在做事.....");
}
}
输出:
主线程正在做事… 轮询耗时:1998 获取到异步计算结果啦: 异步任务计算结果!
从上面的 Demo 中我们可以看出,future 在执行异步任务时,,对于结果的获取显的不那么优雅,很多第三方库就针对 Future 提供了回调式的接口以用来获取异步计算结果,如Google的: ListenableFuture,而 Java8 所提供的 CompleteFuture 便是官方为了弥补这方面的不足而提供的 API。
下面简单介绍用法:
package com.example.canal.CompleteFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Yang {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFutureOne = new CompletableFuture<>();
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(() -> {
try {
Thread.sleep(3000);
completableFutureOne.complete("异步任务执行结果");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// WhenComplete 方法返回的 CompletableFuture 仍然是原来的 CompletableFuture 计算结果
CompletableFuture<String> completableFutureTwo = completableFutureOne.whenComplete((s, throwable) -> {
System.out.println("当异步任务执行完毕时打印异步任务的执行结果: " + s);
});
// ThenApply 方法返回的是一个新的 completeFuture
CompletableFuture<Integer> completableFutureThree = completableFutureTwo.thenApply(s -> {
System.out.println("当异步任务执行结束时, 根据上一次的异步任务结果, 继续开始一个新的异步任务!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
});
System.out.println("阻塞方式获取执行结果:" + completableFutureThree.get());
cachePool.shutdown();
}
}
从上面的 Demo 中我们主要需要注意 thenApply 和 whenComplete 这两个方法,这两个方法便是 CompleteFuture 中最具有意义的方法,,他们都会在 completeFuture 调用,complete 方法传入异步计算结果时回调,从而获取到异步任务的结果。
相比之下 future 的阻塞和轮询方式获取异步任务的计算结果,CompleteFuture 获取结果的方式就显的优雅的多。
6、实现最优的 烧水泡茶 程序
著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:
对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶,烧开水,泡茶,T2 负责洗茶壶,洗茶杯,拿茶叶。
6.1、基于Future实现
package com.example.canal.CompleteFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class Yang {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建任务T2的FutureTask
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
// 线程T1执行任务ft1
Thread T1 = new Thread(ft1);
T1.start();
// 线程T2执行任务ft2
Thread T2 = new Thread(ft2);
T2.start();
// 等待线程T1执行结果
System.out.println(ft1.get());
}
}
// T1 需要执行的任务:洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
FutureTask<String> ft2;
// T1任务需要T2任务的FutureTask
T1Task(FutureTask<String> ft2) {
this.ft2 = ft2;
}
@Override
public String call() throws Exception {
System.out.println("T1:洗水壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T1:烧开水...");
TimeUnit.SECONDS.sleep(15);
// 获取T2线程的茶叶
String tf = ft2.get();
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
}
}
// T2 需要执行的任务:洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T2:洗茶壶...");
TimeUnit.SECONDS.sleep(1);
System.out.println("T2:洗茶杯...");
TimeUnit.SECONDS.sleep(2);
System.out.println("T2:拿茶叶...");
TimeUnit.SECONDS.sleep(1);
return "绿茶";
}
}
// T1:洗水壶...
// T2:洗茶壶...
// T1:烧开水...
// T2:洗茶杯...
// T2:拿茶叶...
// T1:拿到茶叶:绿茶
// T1:泡茶...
// 上茶:绿茶
6.2、基于CompletableFuture实现
package com.example.canal.CompleteFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class Yang {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 任务1:洗水壶、烧开水
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
System.out.println("T1:洗水壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:烧开水...");
sleep(15, TimeUnit.SECONDS);
});
// 任务2:洗茶壶、洗茶杯、拿茶叶
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("T2:洗茶壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return "龙井";
});
// 任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
// 等待任务3执行结果
System.out.println(f3.get());
}
static void sleep(int t, TimeUnit u) {
try {
u.sleep(t);
} catch (InterruptedException e) {
}
}
}
// T1:洗水壶...
// T2:洗茶壶...
// T2:洗茶杯...
// T1:烧开水...
// T2:拿茶叶...
// T1:拿到茶叶:龙井
// T1:泡茶...
// 上茶:龙井