CompletableFuture
和FutureTask
,和它们的API方法的功能介绍
CompletableFuture
CompletableFuture
是Java 8引入的一个强大的类,用于处理异步编程。它实现了Future
接口,并提供了多种方法来处理异步计算的结果。
创建CompletableFuture
对象的方法:
supplyAsync(Supplier<U> supplier)
: 使用默认的ForkJoinPool异步执行一个任务。supplyAsync(Supplier<U> supplier, Executor executor)
: 使用指定的Executor异步执行一个任务。runAsync(Runnable runnable)
: 使用默认的ForkJoinPool异步执行一个Runnable任务。runAsync(Runnable runnable, Executor executor)
: 使用指定的Executor异步执行一个Runnable任务。completedFuture(U value)
: 返回一个已经完成的CompletableFuture
。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureExample {
public static void main(String[] args) {
// 使用默认的ForkJoinPool执行任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
// 创建自定义的Executor执行任务
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello", executor);
// 使用Runnable执行任务
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> System.out.println("Runnable executed"));
// 使用自定义的Executor执行Runnable任务
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> System.out.println("Runnable executed"), executor);
// 返回已经完成的CompletableFuture
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Already completed");
executor.shutdown();
}
}
组合和处理异步计算结果的方法:
thenApply(Function<? super T,? extends U> fn)
: 在当前CompletableFuture
完成后,执行函数并返回一个新的CompletableFuture
。thenAccept(Consumer<? super T> action)
: 在当前CompletableFuture
完成后,执行消费函数,不返回结果。thenRun(Runnable action)
: 在当前CompletableFuture
完成后,执行一个Runnable。thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
: 在两个CompletableFuture
都完成后,执行一个函数,并返回一个新的CompletableFuture
。thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
: 在当前CompletableFuture
完成后,执行函数返回另一个CompletionStage
。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步计算,返回 "Hello"
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
});
// thenApply: 在future完成后执行
CompletableFuture<String> resultFuture = future.thenApply(result -> result + " World");
// thenAccept: 在future完成后执行,无返回值
future.thenAccept(result -> System.out.println("Result: " + result));
// thenRun: 在future完成后执行,无参数和返回值
future.thenRun(() -> System.out.println("Computation finished."));
// thenCombine: 组合两个future的结果
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " World");
CompletableFuture<String> combinedFuture = future.thenCombine(future2, (res1, res2) -> res1 + res2);
System.out.println(combinedFuture.get()); // 输出 "Hello World"
// thenCompose: 组合两个异步任务
CompletableFuture<String> composedFuture = future.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World"));
System.out.println(composedFuture.get()); // 输出 "Hello World"
}
}
处理异常的方法:
handle(BiFunction<? super T, Throwable, ? extends U> fn)
: 处理正常和异常结果。exceptionally(Function<Throwable, ? extends T> fn)
: 处理异常情况,并返回一个默认值。whenComplete(BiConsumer<? super T, ? super Throwable> action)
: 在CompletableFuture
完成后(正常或异常)执行一个操作。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
// 异步计算,随机抛出异常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Failed");
}
return "Success";
});
// handle: 处理正常和异常结果
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return result;
});
System.out.println(handledFuture.join()); // 处理并打印结果
// exceptionally: 处理异常情况
CompletableFuture<String> exceptionalFuture = future.exceptionally(ex -> "Error: " + ex.getMessage());
System.out.println(exceptionalFuture.join()); // 处理并打印异常
// whenComplete: 在完成后执行操作
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("Completed with error: " + ex.getMessage());
} else {
System.out.println("Completed with result: " + result);
}
});
}
}
其他常用方法:
complete(T value)
: 手动完成CompletableFuture
并设置结果。join()
: 与get()
类似,但不抛出受检异常。allOf(CompletableFuture<?>... cfs)
: 等待所有提供的CompletableFuture
完成。anyOf(CompletableFuture<?>... cfs)
: 等待任意一个提供的CompletableFuture
完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建一个未完成的CompletableFuture
CompletableFuture<String> future = new CompletableFuture<>();
// 手动完成
future.complete("Manually completed");
System.out.println(future.get()); // 输出 "Manually completed"
// join: 获取结果,无需处理异常
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");
System.out.println(future2.join()); // 输出 "Hello"
// allOf: 等待所有提供的CompletableFuture完成
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task 1");
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> "Task 2");
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future3, future4);
allOfFuture.thenRun(() -> System.out.println("All tasks completed")).join(); // 等待所有任务完成并打印
// anyOf: 等待任意一个提供的CompletableFuture完成
CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> "Task 3");
CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> "Task 4");
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future5, future6);
System.out.println("Any task completed: " + anyOfFuture.join()); // 输出任意一个任务完成的结果
}
}
FutureTask
FutureTask
是Future
和Runnable
的一个具体实现,它可以包装一个Callable
或Runnable
,并且可以直接在线程中执行。
创建FutureTask
对象的方法:
- 构造函数
FutureTask(Callable<V> callable)
: 包装一个Callable
。 - 构造函数
FutureTask(Runnable runnable, V result)
: 包装一个Runnable
。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Callable
Callable<String> callable = () -> {
Thread.sleep(1000); // 模拟耗时操作
return "Callable Result";
};
// 创建FutureTask并包装Callable
FutureTask<String> futureTask1 = new FutureTask<>(callable);
// 创建Runnable
Runnable runnable = () -> {
try {
Thread.sleep(1000); // 模拟耗时操作
System.out.println("Runnable executed");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
// 创建FutureTask并包装Runnable
FutureTask<String> futureTask2 = new FutureTask<>(runnable, "Runnable Result");
// 使用线程执行FutureTask
Thread thread1 = new Thread(futureTask1);
Thread thread2 = new Thread(futureTask2);
thread1.start();
thread2.start();
// 获取结果
System.out.println("FutureTask1 result: " + futureTask1.get()); // 输出 "Callable Result"
System.out.println("FutureTask2 result: " + futureTask2.get()); // 输出 "Runnable Result"
}
}
FutureTask 的主要方法
run()
- 执行任务。如果任务已经完成或者被取消,那么这个方法不会有任何效果。
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) {
// 创建FutureTask并包装Callable
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "Task completed";
});
// 创建并启动线程
Thread thread = new Thread(futureTask);
thread.start();
}
}
get()
- 阻塞等待任务完成并返回结果。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建FutureTask并包装Callable
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "Task completed";
});
// 创建并启动线程
Thread thread = new Thread(futureTask);
thread.start();
// 获取结果
String result = futureTask.get();
System.out.println(result); // 输出 "Task completed"
}
}
get(long timeout, TimeUnit unit)
- 在指定时间内等待任务完成并返回结果,如果超时则抛出
TimeoutException
。
import java.util.concurrent.*;
public class FutureTaskExample {
public static void main(String[] args) {
// 创建FutureTask并包装Callable
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "Task completed";
});
// 创建并启动线程
Thread thread = new Thread(futureTask);
thread.start();
try {
// 获取结果,超时会抛出TimeoutException
String result = futureTask.get(500, TimeUnit.MILLISECONDS);
System.out.println(result);
} catch (TimeoutException e) {
System.out.println("Task timed out");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
cancel(boolean mayInterruptIfRunning)
- 取消任务。如果任务还未开始执行,任务将被取消。如果任务已经开始执行,并且
mayInterruptIfRunning
为true
,则任务会被中断。
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) {
// 创建FutureTask并包装Callable
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "Task completed";
});
// 创建并启动线程
Thread thread = new Thread(futureTask);
thread.start();
// 尝试取消任务
boolean cancelled = futureTask.cancel(true);
System.out.println("Cancelled: " + cancelled);
}
}
isCancelled()
- 如果任务被取消,则返回
true
。
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) {
// 创建FutureTask并包装Callable
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "Task completed";
});
// 创建并启动线程
Thread thread = new Thread(futureTask);
thread.start();
// 取消任务
futureTask.cancel(true);
// 检查任务是否被取消
boolean isCancelled = futureTask.isCancelled();
System.out.println("Is cancelled: " + isCancelled); // 输出 "Is cancelled: true"
}
}
isDone()
- 如果任务已经完成,无论是正常完成还是被取消,返回
true
。
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) {
// 创建FutureTask并包装Callable
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000); // 模拟耗时操作
return "Task completed";
});
// 创建并启动线程
Thread thread = new Thread(futureTask);
thread.start();
// 检查任务是否完成
boolean isDone = futureTask.isDone();
System.out.println("Is done: " + isDone); // 输出 "Is done: false"
}
}
完整示例
CompletableFuture 示例
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建自定义的Executor执行任务
ExecutorService executor = Executors.newFixedThreadPool(2);
// supplyAsync: 异步执行任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
}, executor);
// thenApply: 在future完成后执行函数并返回新future
CompletableFuture<String> resultFuture = future.thenApply(result -> result + " World");
// thenAccept: 在future完成后执行消费函数
resultFuture.thenAccept(result -> System.out.println("Result: " + result)); // 输出 "Result: Hello World"
// handle: 处理正常和异常结果
CompletableFuture<String> handledFuture = resultFuture.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
}
return result;
});
System.out.println(handledFuture.join()); // 输出 "Hello World"
// complete: 手动完成future
future.complete("Manually completed");
System.out.println(future.get()); // 输出 "Manually completed"
// allOf: 等待所有提供的CompletableFuture完成
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1", executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2", executor);
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(future1, future2);
allOfFuture.thenRun(() -> System.out.println("All tasks completed")).join(); // 等待所有任务完成并打印
// anyOf: 等待任意一个提供的CompletableFuture完成
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Task 3", executor);
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> "Task 4", executor);
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future3, future4);
System.out.println("Any task completed: " + anyOfFuture.join()); // 输出任意一个任务完成的结果
executor.shutdown();
}
}
FutureTask 示例
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Callable
Callable<String> callable = () -> {
Thread.sleep(1000); // 模拟耗时操作
return "Callable Result";
};
// 创建FutureTask并包装Callable
FutureTask<String> futureTask1 = new FutureTask<>(callable);
// 创建Runnable
Runnable runnable = () -> {
try {
Thread.sleep(1000); // 模拟耗时操作
System.out.println("Runnable executed");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
// 创建FutureTask并包装Runnable
FutureTask<String> futureTask2 = new FutureTask<>(runnable, "Runnable Result");
// 使用线程执行FutureTask
Thread thread1 = new Thread(futureTask1);
Thread thread2 = new Thread(futureTask2);
thread1.start();
thread2.start();
// 获取结果
System.out.println("FutureTask1 result: " + futureTask1.get()); // 输出 "Callable Result"
System.out.println("FutureTask2 result: " + futureTask2.get()); // 输出 "Runnable Result"
// 取消任务示例
FutureTask<String> futureTask3 = new FutureTask<>(() -> {
Thread.sleep(2000); // 模拟耗时操作
return "Task completed";
});
Thread thread3 = new Thread(futureTask3);
thread3.start();
// 尝试取消任务
boolean cancelled = futureTask3.cancel(true);
System.out.println("Cancelled: " + cancelled); // 输出 "Cancelled: true"
// 检查任务是否被取消
boolean isCancelled = futureTask3.isCancelled();
System.out.println("Is cancelled: " + isCancelled); // 输出 "Is cancelled: true"
// 检查任务是否完成
boolean isDone = futureTask3.isDone();
System.out.println("Is done: " + isDone); // 输出 "Is done: true"
}
}
综述
CompletableFuture 和 FutureTask 的对比
-
CompletableFuture
- 功能更强大:支持链式调用、组合、处理异常等高级功能。
- 异步支持:非常适合异步编程,提供了丰富的API来处理异步计算结果。
- 易用性:使用方便,支持流式API,非常适合复杂的异步工作流。
-
FutureTask
- 基础性:功能较简单,主要用来包装
Runnable
和Callable
。 - 同步调用:需要手动启动线程来执行任务。
- 控制:提供对任务的取消、完成等基本控制。
- 基础性:功能较简单,主要用来包装
CompletableFuture 的常用 API 总结
- 创建:
supplyAsync
,runAsync
,completedFuture
- 组合:
thenApply
,thenAccept
,thenRun
,thenCombine
,thenCompose
- 异常处理:
handle
,exceptionally
,whenComplete
- 其他:
complete
,join
,allOf
,anyOf
FutureTask 的常用 API 总结
- 创建:
FutureTask(Callable<V> callable)
,FutureTask(Runnable runnable, V result)
- 操作:
run
,get
,cancel
,isCancelled
,isDone
这些API提供了灵活的异步编程和任务控制机制,使得在Java中进行并发编程变得更加简便和高效。通过合理使用这些工具,可以大大提高程序的性能和响应能力。