文章目录
- 背景
- CompletableFuture简介
- 使用场景
- 如何编排任务步骤
- 场景一 多个任务串行执行
- 场景二 多个步骤并行执行
- 场景三 一个串行步骤后两个并行步骤
- 场景四 一个步骤依赖两个并行步骤
- 场景五 一个步骤依赖多个并行步骤同时完成
- 场景六 一个任务依赖多个任务的任意一个完成结果
- 其他功能
- 异常处理
- 自定义线程池
- 取消 CompletableFuture
- 延迟执行
- 超时处理
背景
大家好,我是大表哥laker。今天,我想和大家分享一篇关于如何使用CompletableFuture对业务流程进行编排,以降低依赖之间的阻塞,提高程序的效率。
在现代软件开发中,异步编程已经变得越来越重要。使用异步编程可以帮助我们更好地利用CPU资源,提高程序的效率。然而,在处理复杂的任务时,我们可能需要执行多个异步任务,并将它们编排在一起,以便在所有任务都完成后执行某些操作。当我们面对需要同时处理多个任务的情况时,一种常见的问题是如何协调多个任务的执行,特别是当它们之间有依赖关系时,常常需要进行任务编排。Java 8 引入了 CompletableFuture,为我们提供了一种强大而灵活的任务编排方式,可以轻松实现并行处理、等待多个任务的完成、以及异常处理等功能。
CompletableFuture简介
CompletableFuture是Java8提供的一个强大的异步编程工具,可以用于并发执行多个异步任务。与传统的线程池和Future接口相比,CompletableFuture提供了更强大的任务编排和组合的能力,使得异步编程变得更加简单、高效、易读。
在CompletableFuture中,任务之间可以串行执行,也可以并行执行,甚至可以嵌套执行。而且,我们还可以使用各种组合操作来实现任务的链式编排,这使得我们可以方便地创建复杂的任务流程,而不需要编写复杂的控制逻辑。
在 JDK 8 之前的版本中,编写异步任务常使用 Future 实现,使用 Future 执行异步任务并且获得异步执行结果时,我们会通过两种方式,要么调用阻塞的 get()
方法,或者轮询调用 isDone()
方法获取任务状态,判断是否为 true
。不过这两种方法在执行时都会使主线程被迫等待,对性能会产生一定影响。因此,Java 团队在 JDK 8 版本中新增了 CompletableFuture 工具,通过使用观察者模式进行设计,实现异步回调进行异步编程,一定程度上降低了编写异步任务的代码量和难度。
CompletableFuture的优势:
- 非阻塞式调用
- 基于回调式编程
- 支持函数式编程
- 支持流式编程
- 完善的异常处理机制
基本示例
让我们看看CompletableFuture的基本用法,以一个简单的示例为例。假设我们需要从某个Web API获取一些数据,并对其进行加工和展示。我们可以使用以下代码:
CompletableFuture.supplyAsync(() -> fetchData())
.thenApply(data -> processData(data))
.thenAccept(result -> displayResult(result))
.join();
在这个例子中,我们使用CompletableFuture.supplyAsync
方法启动一个异步任务,该任务将调用fetchData
方法来获取数据。然后,我们使用thenApply
方法将数据传递给processData
方法进行加工处理。最后,我们使用thenAccept
方法将结果传递给displayResult
方法进行展示。最后,我们使用join
方法来等待所有任务完成。
使用场景
- 处理I/O密集型任务:如从Web API获取数据或从数据库读取数据。
- 批处理任务:需要同时处理大量数据的任务,可以使用 CompletableFuture 实现并行处理,提高程序的效率。
- 并发任务:多个任务之间没有明显的依赖关系,可以使用 CompletableFuture 实现并行执行,提高程序的效率。
- 负载均衡:使用 CompletableFuture 实现任务的负载均衡,将任务分配到不同的节点或线程池中执行,从而提高系统的并发能力和稳定性。
- 异步操作:CompletableFuture 提供了一系列的异步操作方法,如异步执行、异步等待、异步处理结果等,可以帮助我们更方便地进行异步编程。
- 非阻塞操作:使用 CompletableFuture 可以实现非阻塞操作,避免线程被阻塞,提高系统的响应能力。
如何编排任务步骤
-
每个任务步骤可以是一次 RPC 调 用、一次数据库操作或者是一次本地方法调用等。
-
在使用 CompletableFuture 进行异步化编程时,每个步骤都会产生一个 CompletableFuture 对象,最终结果也会用一个 CompletableFuture来进行表示。
场景一 多个任务串行执行
假设有三个操作 step1、step2、step3 存在依赖关系,其中 step2 的执行依赖 step1 的结果,step3的执行依赖step2的结果。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 1");
return "step1 result";
});
// 重点是 thenApply
CompletableFuture<String> cf2 = cf1.thenApply(result1 -> {
//result1 为 CF1 的结果
log.info("执行 step 2");
return "result2";
});
CompletableFuture<String> cf3 = cf2.thenApply(result2 -> {
//result2 为 CF2 的结果
log.info("执行 step 3");
return "result3";
});
log.info("main 1");
String step3Result = cf3.get(3, TimeUnit.SECONDS);
log.info("main end");
结果日志
10:08:31.641 [main] INFO com.laker.demo.test2 - main 1
10:08:31.642 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test2 - 执行 step 1
10:08:31.647 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test2 - 执行 step 2
10:08:31.647 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test2 - 执行 step 3
10:08:31.647 [main] INFO com.laker.demo.test2 - main end
场景二 多个步骤并行执行
假设有三个操作 step1、step2、step3 不存在依赖关系,多个步骤可以并行执行。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 1");
return "step1 result";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 2");
return "step2 result";
});
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 3");
return "step3 result";
});
log.info("main 1");
String step3Result = cf3.get(3, TimeUnit.SECONDS);
log.info("main end");
}
结果日志
13:54:22.493 [main] INFO com.laker.demo.test6 - main 1
13:54:22.493 [ForkJoinPool.commonPool-worker-7] INFO com.laker.demo.test6 - 执行 step 3
13:54:22.493 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test6 - 执行 step 1
13:54:22.493 [ForkJoinPool.commonPool-worker-5] INFO com.laker.demo.test6 - 执行 step 2
13:54:22.498 [main] INFO com.laker.demo.test6 - main end
其他示例
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<CompletableFuture<Integer>> futures = numbers.stream()
.map(num -> CompletableFuture.supplyAsync(() -> num * 2))
.collect(Collectors.toList());
List<Integer> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
在上面的示例中,我们首先定义了一个整数列表numbers
,然后将其转换为CompletableFuture
列表。每个CompletableFuture
都会异步计算对应的整数的两倍,并将结果返回。最后,我们使用CompletableFuture.join()
方法等待所有任务完成,并将它们的结果收集到一个整数列表中。
场景三 一个串行步骤后两个并行步骤
假设有三个操作 step1、step2、step3 存在依赖关系,其中 step2 和step3的执行依赖 step1 的结果。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 1");
return "step1 result";
// 重点是这里
}).whenComplete((result1, throwable) -> {
log.info("result1 is {}" , result1);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 2");
return "step2 result";
});
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 3");
return "step3 result";
});
});
String result = cf1.get(3, TimeUnit.SECONDS);
结果日志
14:04:01.733 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test7 - 执行 step 1
14:04:01.735 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test7 - result1 is step1 result
14:04:01.737 [ForkJoinPool.commonPool-worker-5] INFO com.laker.demo.test7 - 执行 step 2
14:04:01.738 [ForkJoinPool.commonPool-worker-7] INFO com.laker.demo.test7 - 执行 step 3
场景四 一个步骤依赖两个并行步骤
假设有三个操作 step1、step2、step3 存在依赖关系,其中 step3 的执行依赖 step1 和 step2 的结果。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 1");
return"step1 result";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 2");
return"step2 result";
});
// 重点 cf1.thenCombine(cf2...
CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (result1, result2) -> {
log.info(result1 + " , " + result2);
log.info("执行 step 3");
return"step3 result";
});
log.info("main 1");
String step3Result = cf3.get(3, TimeUnit.SECONDS);
log.info("main end");
结果日志
09:49:14.255 [main] INFO com.laker.demo.test - main 1
09:49:14.255 [ForkJoinPool.commonPool-worker-5] INFO com.laker.demo.test - 执行 step 2
09:49:14.255 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test - 执行 step 1
09:49:14.260 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test - step1 result , step2 result
09:49:14.260 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test - 执行 step 3
09:49:14.260 [main] INFO com.laker.demo.test - main end
场景五 一个步骤依赖多个并行步骤同时完成
假设有四个操作 step1、step2、step3、step4 存在依赖关系,其中 step4 的执行依赖 step1 、step2 和 step3 三个步骤的结果。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 1");
return "step1 result";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 2");
return "result2";
});
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 3");
return "result3";
});
// 重点CompletableFuture.allOf(cf1, cf2, cf3);
CompletableFuture<Void> cf4 = CompletableFuture.allOf(cf1, cf2, cf3);
CompletableFuture<String> result = cf4.thenApply(v -> {
// 这里的 join 并不会阻塞,因为传给 thenApply 的函数是在 cf1、cf2、cf3 全部完成时,才会执行 。
String result3 = cf1.join();
String result4 = cf2.join();
String result5 = cf3.join();
log.info("执行 step 4");
// 根据 result3、result4、result5 组装最终 result;
return "result";
});
log.info("main 1");
String step4Result = result.get(3, TimeUnit.SECONDS);
log.info("main end");
结果日志
10:09:35.167 [main] INFO com.laker.demo.test3 - main 1
10:09:35.166 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test3 - 执行 step 1
10:09:35.166 [ForkJoinPool.commonPool-worker-5] INFO com.laker.demo.test3 - 执行 step 2
10:09:35.166 [ForkJoinPool.commonPool-worker-7] INFO com.laker.demo.test3 - 执行 step 3
10:09:35.170 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test3 - 执行 step 4
10:09:35.170 [main] INFO com.laker.demo.test3 - main end
场景六 一个任务依赖多个任务的任意一个完成结果
假设有四个操作 step1、step2、step3、step4 存在依赖关系,其中 step4 的执行依赖 step1 、step2 和 step3 的任意一个完成结果。
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 1");
return "step1 result";
});
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 2");
return "result2";
});
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 3");
return "result3";
});
// 重点 CompletableFuture.anyOf(cf1, cf2, cf3);
CompletableFuture<Object> cf4 = CompletableFuture.anyOf(cf1, cf2, cf3);
CompletableFuture<String> result = cf4.thenApply(v -> {
// 这里的 join 并不会阻塞,因为传给 thenApply 的函数是在 cf1、cf2、cf3 全部完成时,才会执行 。
String result3 = cf1.join();
String result4 = cf2.join();
String result5 = cf3.join();
log.info("执行 step 4");
// 根据 result3、result4、result5 组装最终 result;
return "result";
});
log.info("main 1");
String step4Result = result.get(3, TimeUnit.SECONDS);
log.info("main end");
结果日志
10:12:40.454 [main] INFO com.laker.demo.test4 - main 1
10:12:40.454 [ForkJoinPool.commonPool-worker-5] INFO com.laker.demo.test4 - 执行 step 2
10:12:40.454 [ForkJoinPool.commonPool-worker-7] INFO com.laker.demo.test4 - 执行 step 3
10:12:40.454 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test4 - 执行 step 1
10:12:40.458 [ForkJoinPool.commonPool-worker-5] INFO com.laker.demo.test4 - 执行 step 4
10:12:40.458 [main] INFO com.laker.demo.test4 - main end
其他功能
异常处理
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
// 如果发生异常 内部会通过 new CompletionException(throwable) 对异常进行包装
log.info("执行 step 1");
int i = 1 / 0;
return "step1 result";
}).exceptionally(err -> {// 通过 exceptionally 捕获异常,这里的 err 已经被 包装过,因此需要通过 Throwable.getCause() 提取异常
log.error("Exception !!!!!!!!!!!!!!!!!!!!" , ExceptionUtils.extractRealException(err));
// 异常后的默认值
return "default value";
});
String result = cf1.get();
log.info("结果为:" + result);
原始异常获取工具类
public class ExceptionUtils {
public static Throwable extractRealException(Throwable throwable) {
// 这里判断异常类型是否为 CompletionException、ExecutionException,如果是则进行提取,否则直接返回。
if (throwable instanceof CompletionException || throwable
instanceof ExecutionException) {
if (throwable.getCause() != null) {
return throwable.getCause();
}
}
return throwable;
}
}
结果日志
13:27:48.594 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test5 - 执行 step 1
13:27:48.608 [ForkJoinPool.commonPool-worker-3] ERROR com.laker.demo.test5 - Exception !!!!!!!!!!!!!!!!!!!!
java.lang.ArithmeticException: / by zero
at com.laker.demo.test5.lambda$main$0(test5.java:20)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
13:27:48.608 [main] INFO com.laker.demo.test5 - 结果为:default value
CompletableFuture
提供了多种方式来处理异步任务的异常,比如:
exceptionally
方法:用于捕获异常并返回默认值;handle
方法:用于捕获异常并进行处理;whenComplete
方法:无论异步任务执行成功或失败,都会执行该方法,并对结果进行处理。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 异常
int i = 1 / 0;
return "Hello World";
});
// exceptionally方法
CompletableFuture<String> handleFuture = future.exceptionally(ex -> {
System.out.println(ex.getMessage());
return "Default Value";
});
// handle方法
CompletableFuture<String> handleFuture2 = future.handle((result, ex) -> {
if (ex != null) {
System.out.println(ex.getMessage());
return "Default Value";
}
return result;
});
// whenComplete方法
CompletableFuture<Void> whenCompleteFuture = future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println(ex.getMessage());
} else {
System.out.println(result);
}
});
自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 1");
return"step1 result";
}, executor);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
log.info("执行 step 2");
return"step2 result";
});
// 重点 cf1.thenCombine(cf2...
CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (result1, result2) -> {
log.info(result1 + " , " + result2);
log.info("执行 step 3");
return"step3 result";
});
log.info("main 1");
String step3Result = cf3.get(3, TimeUnit.SECONDS);
log.info("main end");
结果日志
09:47:51.361 [main] INFO com.laker.demo.test - main 1
09:47:51.361 [pool-1-thread-1] INFO com.laker.demo.test - 执行 step 1
09:47:51.361 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test - 执行 step 2
09:47:51.364 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test - step1 result , step2 result
09:47:51.364 [ForkJoinPool.commonPool-worker-3] INFO com.laker.demo.test - 执行 step 3
09:47:51.364 [main] INFO com.laker.demo.test - main end
取消 CompletableFuture
cancel(boolean mayInterruptIfRunning)
:尝试取消正在执行的任务,参数mayInterruptIfRunning
表示是否中断正在执行的任务。如果任务已经完成或已经被取消,则该方法返回false
,否则返回true
。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模拟耗时任务
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result";
});
// 取消任务
boolean cancelResult = future.cancel(true);
System.out.println("任务是否取消成功:" + cancelResult);
// 判断任务是否被取消
System.out.println("任务是否被取消:" + future.isCancelled());
// 判断任务是否完成
System.out.println("任务是否完成:" + future.isDone());
// 获取任务结果(如果任务被取消,会抛出CancellationException异常)
try {
String result = future.get();
System.out.println("任务结果:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (CancellationException e) {
e.printStackTrace();
}
结果日志
任务是否取消成功:true
任务是否被取消:true
任务是否完成:true
java.util.concurrent.CancellationException
at java.base/java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2396)
at com.laker.demo.CompletableFutureCancelExample.main(CompletableFutureCancelExample.java:20)
延迟执行
JDK9才支持的API
下面是一个示例代码,创建一个CompletableFuture,延迟2秒后返回一个字符串:
log.info("start");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello, CompletableFuture!";
}, CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS));
String result = future.join();
log.info("result:{}", result); // Hello, CompletableFuture!
结果日志
20:07:03.442 [main] INFO com.laker.demo.test11 - start
20:07:05.454 [main] INFO com.laker.demo.test11 - result:Hello, CompletableFuture!
超时处理
JDK9才支持的API
我们可以使用 completeOnTimeout
和 orTimeout
方法来处理超时。例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result";
});
String result = future.completeOnTimeout("timeout", 1000, TimeUnit.MILLISECONDS).get();
System.out.println(result); // "timeout"
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result";
});
future2.orTimeout(1000, TimeUnit.MILLISECONDS).whenComplete((r, e) -> {
if (e instanceof TimeoutException) {
System.out.println("Timeout");
} else {
System.out.println(r);
}
});
上面的代码中,我们使用 completeOnTimeout
方法和 orTimeout
方法来处理超时。如果在指定时间内没有获取到结果,就返回超时的默认值。