文章目录
- Future
- Future介绍
- Future原理
- Future代码示例
- CompletableFuture
- CompletableFuture特点
- 应用场景
- 方法特性
- 方法刨析
- supplyAsync/runAsync
- thenApply / thenApplyAsync异步回调
- thenAccept / thenRun
- exceptionally
- whenComplete
- handle
- 实现场景
更多相关内容可查看
Future
Future介绍
一般我们在项目开发过程中如果想要提高程序的执行效率 , 可以使用多线程来让任务异步执行, 从而提高程序执行的效率 ,在SpringBoot项目中实现异步最简单的方式就是使用@Async
注解 虽然@Async
已经能够帮助我们实现异步 ,但是@Async
注解无法获取异步操作的执行结果 如果我们想要获取线程的执行结果, 就需要使用其他的方案
从Java 1.5开始,就提供了Callable和Future
,通过它们可以在任务执行完毕之后得到任务执行结果。
Java中的Future接口是Java并发编程中的一个重要概念,它代表了异步计算的结果
。当你提交一个任务给某个执行器(如ExecutorService)去执行,并且这个任务需要一些时间去完成时,Future就可以用来表示这个尚未完成的计算。通过Future,你可以在不阻塞当前线程的情况下,检查计算是否完成,并获取计算的结果。
Future原理
提交任务:
你使用ExecutorService的
submit
方法提交一个Callable任务或Runnable任务(配合FutureTask)。submit方法会立即返回一个Future对象,这个对象代表了异步计算的结果。
异步计算:
提交的任务会在某个线程(可能是线程池中的线程)上异步执行。这意味着提交任务的线程可以继续执行其他任务,而不必等待计算完成。
查询结果:
通过返回的Future对象,你可以调用
isDone()
方法来检查计算是否已经完成。如果计算已经完成,你可以调用get()
方法来获取计算的结果。如果计算尚未完成,get()方法会阻塞,直到计算完成并返回结果。
取消任务:
如果任务尚未开始或正在进行中,你可以调用
Future.cancel(booleanmayInterruptIfRunning)
来尝试取消任务的执行。如果任务已经完成或无法取消,这个方法将返回false。
异常处理:
如果计算过程中出现异常,该异常会被封装在
ExecutionException
中,并在调用get()
方法时抛出。你可以通过捕获这个异常来处理计算过程中出现的问题
FutureTask:
FutureTask是Future接口的一个实现类,它实现了Runnable接口,因此可以直接提交给ExecutorService执行。FutureTask将Callable的计算结果或Runnable的执行状态封装在内部,并提供了get()方法以获取结果。
Java的Future模式使得我们可以编写非阻塞的并发代码,提高了程序的响应性和吞吐量。然而,需要注意的是,Future只提供了基本的异步计算功能,对于更复杂的并发场景(如多个异步任务的组合、依赖关系等),可能需要使用更高级的并发工具,如CompletableFuture。
Future代码示例
首先,我们定义一个Callable
任务,它模拟一个耗时的操作:
import java.util.concurrent.Callable;
public class MyTask implements Callable<Integer> {
private final int taskId;
public MyTask(int taskId) {
this.taskId = taskId;
}
@Override
public Integer call() throws Exception {
// 模拟耗时的操作
System.out.println("Task " + taskId + " is running.");
Thread.sleep((long) (Math.random() * 1000)); // 随机等待一段时间
int result = taskId * 2; // 假设这是任务的处理结果
System.out.println("Task " + taskId + " finished with result " + result);
return result;
}
}
然后,我们使用ExecutorService
来提交这个任务,并获取Future对象
以便稍后获取结果:
我们可以看到任务执行属于并行执行,此时我们可以同时做更多任务,但是在最后获取结果的时候却阻塞了。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ConcurrencyExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 创建一个列表来保存Future对象,以便稍后获取结果
List<Future<Integer>> futures = new ArrayList<>();
// 提交多个任务到线程池
for (int i = 0; i < 10; i++) {
Future<Integer> future = executorService.submit(new MyTask(i));
futures.add(future);
}
// 关闭线程池(不再接受新任务,但会等待已提交的任务完成)
executorService.shutdown();
// 等待所有任务完成并收集结果
List<Integer> results = new ArrayList<>();
for (Future<Integer> future : futures) {
while (!future.isDone()) {
// 等待任务完成(非阻塞,只是轮询检查)
// 在实际应用中,你可能会使用其他同步机制(如CountDownLatch)
}
try {
// 获取结果(如果任务尚未完成,这里会阻塞)
Integer result = future.get();
results.add(result);
} catch (InterruptedException e) {
// 如果当前线程在等待时被中断,设置中断状态并重新抛出异常
Thread.currentThread().interrupt();
throw e;
} catch (ExecutionException e) {
// 如果任务执行过程中出现异常,这里会抛出ExecutionException
// 你可以通过e.getCause()获取实际的异常
e.printStackTrace();
}
}
// 打印结果
System.out.println("Results: " + results);
// 检查线程池是否已终止
if (!executorService.isTerminated()) {
// 如果线程池还没有终止,可以调用shutdownNow()来尝试停止所有任务
// 这里我们只是打印一个消息,因为我们在上面的循环中已经等待了所有任务完成
System.out.println("Executor is not yet terminated.");
}
}
}
详解看注释即可,通俗来说就是可以根据Futrure提供的方法来获取返回的结果做一些特殊的操作
CompletableFuture
CompletableFuture特点
- 异步执行:CompletableFuture允许任务在后台线程中异步执行,不会阻塞主线程,提高了应用程序的响应性和性能。
- 结果处理:提供了多种方法(如
thenApply、thenAccept、thenRun
等)用于处理异步操作的结果,这些方法都是非阻塞的,并且支持链式调用。- 任务编排:CompletableFuture具有强大的任务编排能力,可以轻松地组织不同任务的运行顺序、规则以及方式。
- 异常处理:支持对异步操作中的异常进行处理,可以使用
exceptionally
方法指定一个异常处理函数
应用场景
并行处理多个独立任务:当任务可以被分解为多个独立的子任务时,可以使用CompletableFuture来并行执行这些子任务,以提高系统的性能和响应速度
异步执行耗时操作:对于耗时的操作,如远程调用、数据库查询等,可以使用CompletableFuture来异步执行这些操作,避免阻塞主线程
方法特性
supply
:表示执行任务带返回值
run
:表示执行任务不带返回值
涉及异步回调的方法也有多个,如下图:
方法刨析
supplyAsync/runAsync
supplyAsync
表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable task) 方法,runAsync
表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法,这两方法的效果跟submit是一样的,测试用例如下:
@Test
void test02() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log.info("1. future任务开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "2. future任务执行结果";
});
log.info("3. 开始执行主线程任务");
String result = future.get(2, TimeUnit.SECONDS);
log.info(result);
log.info("5. 程序执行结束-----------");
}
执行结果如下:
runAsync案例如下:
@Test
void test03() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
log.info("1. future任务开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
log.info("3. 开始执行主线程任务");
log.info("5. 程序执行结束-----------");
}
执行结果如下:
thenApply / thenApplyAsync异步回调
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,案例如下:
@Test
void test04() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log.info("1. future任务开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "2. future任务执行结果";
}).thenApply(s -> {
log.info(s);
return "3. thenApply返回结果";
}).thenApplyAsync(s -> {
log.info(s);
return "4. thenApply返回结果";
});
log.info("5. 开始执行主线程任务");
String result = future.get(2, TimeUnit.SECONDS);
log.info(result);
log.info("6. 程序执行结束-----------");
}
测试结果如下:
thenApplyAsync和thenApply实现的功能是一样的,但thenApply执行2次处理都在同一个线程完成,但thenApplyAsync将处理交给线程池,执行处理的有可能是不同线程。
thenAccept / thenRun
thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是无返回值;thenRun 的方法没有入参,也没有返回值,案例如下:
@Test
void test05() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
log.info("1. future任务开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "2. future任务执行结果";
}).thenAccept(s -> {
System.out.println(s); // 这里打印结果,但thenAccept本身不处理返回值
log.info("3. thenAccept执行了");
}).thenRun(() -> {
log.info("4. thenRun执行了");
});
log.info("5. 开始执行主线程任务");
// 这里调用get()实际上会阻塞,直到异步任务完成
// 但由于future是String类型,v将被赋值为null
String result = String.valueOf(future.get()); // 获取的是supplyAsync的返回结果,不是thenAccept或thenRun的
System.out.println(result); // 打印"2. future任务执行结果"
log.info("6. 程序执行结束-----------");
}
执行效果如下:
exceptionally
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果,案例如下:
@Test
void test06() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log.info("1. future任务开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int i = 10 / 0;
return "2. future任务执行结果";
}).exceptionally(ex -> {
String message = ex.getMessage();
log.info("3. 程序执行出现异常, 异常原因:{}", message);
return "excetion";
});
log.info("4. 开始执行主线程任务");
String s = future.get();
log.info(s);
log.info("5. 程序执行结束-----------");
}
程序测试效果如下:
whenComplete
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。案例如下:
@Test
void test07() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log.info("1. future任务开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "2. future任务执行结果";
}).whenComplete((result,ex)->{
log.info("程序执行结果:{}",result);
log.info("程序执行异常:{}",ex);
});
log.info("4. 开始执行主线程任务");
String s = future.get();
log.info(s);
log.info("5. 程序执行结束-----------");
}
执行结果如下:
handle
跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。案例如下:
@Test
void test07() throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log.info("1. future任务开始执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//int i = 10 / 0;
return "2. future任务执行结果";
}).handle((result,ex)->{
log.info("3. 程序执行结果:{}",result);
log.info("4. 程序执行异常:{}",ex);
return "handle执行结果" ;
});
log.info("4. 开始执行主线程任务");
String s = future.get();
log.info(s);
log.info("5. 程序执行结束-----------");
}
这些案例,直接copy运行即可,断点自己调试更有利于理解
实现场景
场景:在一个电商系统中,你可能需要同时查询用户信息、订单信息和购物车信息。这些查询可以并行执行,以提高响应速度。
CompletableFuture<UserInfo> userInfoFuture = CompletableFuture.supplyAsync(() -> queryUserInfo());
CompletableFuture<OrderInfo> orderInfoFuture = CompletableFuture.supplyAsync(() -> queryOrderInfo());
CompletableFuture<CartInfo> cartInfoFuture = CompletableFuture.supplyAsync(() -> queryCartInfo());
CompletableFuture.allOf(userInfoFuture, orderInfoFuture, cartInfoFuture).join();
UserInfo userInfo = userInfoFuture.get();
OrderInfo orderInfo = orderInfoFuture.get();
CartInfo cartInfo = cartInfoFuture.get();
// 后续处理...
场景:假设你有一个远程调用或数据库查询操作,这些操作可能需要较长时间才能完成,你不想让它们阻塞主线程。
CompletableFuture<String> remoteCallFuture = CompletableFuture.supplyAsync(() -> makeRemoteCall());
// 在这里可以继续执行其他任务,而不需要等待远程调用完成
// 稍后在需要远程调用的结果时获取它
String remoteCallResult = remoteCallFuture.get();
// 后续处理...
场景:你可能需要等待多个异步任务都完成后才能进行下一步处理。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2);
combinedFuture.thenRun(() -> {
String result1 = future1.join();
String result2 = future2.join();
System.out.println(result1 + " " + result2); // 输出: Hello World
});
场景:异步任务执行过程中可能会出现异常或超时,你需要对这些情况进行处理。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 这里可能抛出异常或运行时间过长
});
future.exceptionally(throwable -> {
// 处理异常,如返回默认值、记录日志等
return "Default Value";
});
future.orTimeout(5, TimeUnit.SECONDS) // 设置超时时间为5秒
.exceptionally(throwable -> {
// 处理超时异常
return "Timeout Occurred";
});
// 获取结果时,如果发生异常或超时,将返回上述处理后的值
String result = future.get();