文章目录
- 1. 简介:
- 2. 比较
- 1、传统方式
- 2、使用CompletableFuture:
- 异步执行
- 返回值
- 3、组合处理:
- anyOf
- allof :
- 4. 异步回调:
- thenAccept
- thenApply
- whenComplete等同于 thenAccept
- handel()等同于thenApply
- 5. 常用方法:
- 1、supplyAsync
- 2、runAsync
- 3、then*
- 4、then*Async
- 5、thenApply
- 6、thenAccept
- 7、thenRun
- 8、thenCompose
- 9、thenCombine
- 10、runAfterEither
- 11、runAfterBoth
- 12、get()
- 13、V get(long timeout, TimeUnit unit)
- 14、T getNow(T valueIfAbsent)
- 15、whenComplete
- 16、exceptionally
- 17、complete
- 18、cancel
- 19、allOf
- 20、anyOf
1. 简介:
CompletableFuture,它其实是JDK8提供出来的,它是一种基于future异步编程的实现:
1、以前我们执行一个异步任务,想要获得这个任务的返回值的时候,就得通过这个future加callable,这种传统方式,来进行实现,当我们要获得返回值的时候,就去调用这个future task 的get方法,当我们去调用get方法呢,会阻塞,所以像以前这种传统的异步编程,并且要获的返回结果进行处理,这种方式实现起来非常的麻烦,并且要获的返回值,会阻塞后面的代码。
2、而当CompletableFuture来进行异步任务的话,并且我们要拿到这个返回值,进行处理,也可以异步的方式,而不会阻塞后面的代码
3、CompletableFuture:提供了组合处理:比如我们有多个异步任务,那么这多个异步任务中,我想拿到里面最快的这一个,在我们以前传统的异步编程当中,实现起来非常麻烦,但是通过 CompletableFuture是非常的简单的,如果说我们想在这多个异步任务当中,全部执行完成之后,想去执行一些其他的,在之前实现也非常麻烦,此时就可以通过CompletableFuture来实现是非常的简单的,
4.还支持异步回调,我们想在一个异步任务执行完成之后,去执行另一个异步任务,在以前我们必须对第一个任务进行“JOIN"等待,等第一个任务完成了之后,再去执行第二任务,或者把第二个任务,嵌套在第一个任务内,当第一个任务执行完成之后再去执行第二个任务,但是这种嵌套方式会出现我们所谓的”回调地狱“,也就是说如果你的异步任务非常的多,并且都希望按照顺序一个一个去执行的话,你需要每一个嵌套在上一个当中,这样代码的可阅读性,维护性都是非常差,那我们通过CompletableFuture,它所提供的这种异步回调,实现起来是非常简洁的
2. 比较
1、传统方式
package org.example;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @ClassName CompletableFutureTest
* @Author JS
* @Date 2024-05-15 23:30
* @Version 1.0
**/
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "futureTask1执行完成";
}
});
new Thread(futureTask).start();
//get() 方法的作用是,以阻塞的方式,获取任务执行结果
System.out.println(futureTask.get());
System.out.println("TODO....");
}
}
2、使用CompletableFuture:
异步执行
- runAsync()方法执行任务是没有返回值的
- supplyAsync() 方法执行任务则支持返回值
package org.example;
import java.util.concurrent.CompletableFuture;
/**
* @ClassName CompletableFutureDemo
* @Author lenovo
* @Date 2024-05-15 23:45
* @Version 1.0
**/
public class CompletableFutureDemo {
public static void main(String[] args) {
CompletableFuture future1 = CompletableFuture.runAsync(() -> {
System.out.println("执行没有返回值的任务");
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "执行有返回的任务";
});
System.out.println(future2.join());
}
}
返回值
- get()方法抛出 (InterruptedException、ExecutionException) 检查时异常,程序必须处理)
- join() 方法只抛出运行时异常,程序可不做处理
3、组合处理:
- anyOf 返回跑的最快的那个future。最快的如果异常都玩完
- allof 全部并行执行,执行完之后再去执行我们所需要的业务逻辑,如果需要获得返回值,需要配合thenApply,异常会抛出不影响其他任务
anyOf
package org.example;
import java.util.concurrent.CompletableFuture;
/**
* @ClassName CompletableFutureDemo2
* @Author lenovo
* @Date 2024-05-15 23:52
* @Version 1.0
**/
public class CompletableFutureDemo2 {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "朋友小王去你家送药");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "朋友小去你家送药");
CompletableFuture<Object> anyFutures = CompletableFuture.anyOf(future1, future2);
System.out.println(anyFutures.join());
}
}
- 其实这种就可以运用在我们异步任务当中,有多个备选方案,比如我们请求第三方接口,那么这第三方接口呢,他提供l多个线路,那这几个线路在某个时间段之内不确定谁更快,那么我们就可以将这几个线路,都通过 CompletableFuture 来进行异步请求,然后再通过anyOf 拿到请求最快的那一个
allof :
package org.example;
import java.util.concurrent.CompletableFuture;
/**
* @ClassName CompletableFutureDemo3
* @Author lenovo
* @Date 2024-05-15 23:52
* @Version 1.0
**/
public class CompletableFutureDemo3 {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "朋友小王去你家送药 ");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " 朋友小去你家送药");
CompletableFuture<Object> anyFutures = CompletableFuture.allof(future1, future2).thenApply(
res->{
return future1.join()+future2.join();
//todo...执行我们所需要的业务逻辑代码
}
);
System.out.println(anyFutures.join());
}
}
- 如果你想在多个异步任务执行完成之后,来去执行一些业务逻辑的话,那么就可以通过allof,同样的传入多个future,allof的参数是一个可变长度的数组,然后再结合 thenApply 这个方法来进行处理,那在这个方法体当中,我们就可以将 future1 和 future2 进行 join,等待他们处理完成之后,我们就可在后面,执行我们所需要的业务逻辑代码,这种逻辑功能比较适用于多个任务,都需要并行的去执行,但是呢要拿到他们每个任务的结果,比方说我们在注册 验证的时候,首先判断,它的用户名是否被注册、手机号码是否被注册、手机验证码是否正确,如果这几项都没有问题的话,我们再去执行注册相关的业务代码。注意 allof出现异常,不影响其他任务,所以说可以针对每一个future,进行try-catch 捕捉他们的异常,如果非核心的future 他出现了异常,我们也可以考虑继续执行我们的核心业务
4. 异步回调:
-
whenComplete() 没有返回值,且返回的 CompletableFuture 为任务结果,而非回调结果
-
handle() 有返回值,且返回的CompletableFuture 为回调结果
他们两个出现异常不会中断 ,throwable 参数会接受前面的任务的异常,异常会通过get抛出到主线程
-
链式处理
-
thenRun(Runnable runnable): 对异步任务的结果进行操作,不能传入参,也没有返回值
-
thenAccept(Consumer consumer):可传入参数
-
thenApply(Function function):可传入参数,并返回结果
-
他们三个出现异常后面的任务会中断,处理任务中感知不到异常,异常会通过get抛出到主线程
thenAccept
package org.example;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @ClassName CompletableFutureTest2
* @Author lenovo
* @Date 2024-05-16 0:25
* @Version 1.0
**/
public class CompletableFutureTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> 5)
.thenAccept(result -> {
System.out.println("任务执行结果 =" + result * 3);
}).thenAccept(result -> {
System.out.println("任务执行完成");
});
}
}
thenApply
package org.example;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @ClassName CompletableFutureTest2
* @Author lenovo
* @Date 2024-05-16 0:35
* @Version 1.0
**/
public class CompletableFutureTest3 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> 5)
.thenApply(result -> result * 3)
.thenApply(result -> result + 3);
System.out.println("future3:"+future3.join());
}
}
whenComplete等同于 thenAccept
package org.example;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @ClassName CompletableFutureTest2
* @Author lenovo
* @Date 2024-05-16 0:25
* @Version 1.0
**/
public class CompletableFutureTest4 {
public static void main(String[] args) {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(111);
return 444;
});
CompletableFuture<Integer> future1Callback = future1.whenComplete((result, throwable)->{
System.out.println(222);}).whenComplete((result,throwable) -> {
System.out.println(333);
});
System.out.println(future1Callback.join());
}
}
handel()等同于thenApply
package org.example;
import java.util.concurrent.CompletableFuture;
/**
* @ClassName CompletableFutureTest2
* @Author lenovo
* @Date 2024-05-16 0:44
* @Version 1.0
**/
public class CompletableFutureTest5 {
public static void main(String[] args) {
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()-> {
return"返回任务结果";
});
CompletableFuture<String> future2Callback = future2.handle((result, throwable) ->{
return"返回任务结果";
});
System.out.println(future2Callback.join());
}
}
5. 常用方法:
1、supplyAsync
- 异步执行任务,任务有返回值
package com.Js;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "Hello";
}, executorService);
}
private static void sleep(int i) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
}
}
2、runAsync
- 异步执行任务,任务没有返回值
package com.Js;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
}, executorService);
}
private static void sleep(int i) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
System.out.println(e);
e.printStackTrace();
}
}
}
3、then*
-
功能:前一个异步任务执行完,然后执行本任务
-
当前执行thenApply()方法的线程来负责执行本任务,比如main线程,但是如果前一个异步任务还没执行完,那么main线程就不能执行本任务了,得等前一个任务执行完后才能执行本任务,这个时候就会在执行前一个任务的线程上执行本任务,这样才能保证执行顺序
package com.Js;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Supplier<String> taskA = () -> {
System.out.println("1:" + Thread.currentThread().getName());
return "Hello";
};
Function<String, String> taskB = s -> {
System.out.println("2:" + Thread.currentThread().getName());
return s + " World";
};
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(taskA, executorService);
sleep(1);
future1.thenApply(taskB);
System.out.println("haha");
// ...
}
private static void sleep(int i) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4、then*Async
- 会利用CompletableFuture中公共的ForkJoinPool来执行任务
5、thenApply
- 任务类型:Function<? super T,? extends U> fn
- 有入参,有返回值
6、thenAccept
- 任务类型:Consumer<? super T> action
- 有入参,无返回值
7、thenRun
-
任务类型:Runnable action
-
无入参,无返回值
8、thenCompose
- 任务类型:Function<? super T, ? extends CompletionStage> fn
- 有入参,有返回值,返回值类型只能是CompletionStage
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return "Hello";
}, executorService).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return s + " World";
})).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return s + " !!!";
}));
System.out.println(future.get());
- 按顺序执行两个并行任务
9、thenCombine
-
任务类型:CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn
-
有两个入参
-
第一个参数为CompletionStage
-
-
第二个参数为具体要执行的任务,认为类型为BiFunction,有两个入参,一个返回值
-
ExecutorService executorService = Executors.newFixedThreadPool(10); CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); sleep(3); return "Hello"; }, executorService); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); sleep(3); return " World"; }, executorService); CompletableFuture<String> result = futureA.thenCombine(futureB, (resultA, resultB) -> { System.out.println(Thread.currentThread().getName()); return resultA + resultB; }); System.out.println(result.get());
-
-
-
整合两个并行执行的任务结果
-
10、runAfterEither
-
两个任务中任意一个完成了,就执行回调
-
package com.Js; import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); sleep(5); return "Hello"; }, executorService); CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); sleep(5); return "World"; }, executorService); task2.runAfterEither(task1, () -> System.out.println("!!!")); } private static void sleep(int i) { try { TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } } }
-
11、runAfterBoth
-
两个任务都完成了,才执行回调
package com.Js; import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); sleep(5); return "Hello"; }, executorService); CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); // sleep(5); return "World"; }, executorService); task2.runAfterBoth(task1, () -> System.out.println("!!!")); } private static void sleep(int i) { try { TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } } }
12、get()
- 阻塞等待结果
13、V get(long timeout, TimeUnit unit)
- 超时等待结果
14、T getNow(T valueIfAbsent)
- 立即获取结果,如果任务还没完成,则返回valueIfAbsent
15、whenComplete
- 入参为BiConsumer<? super T, ? super Throwable> action
- 任务正常结束第一个参数传入的是结果
- 任务异常结束第二个参数传入的是异常
- 异常类型是CompletionException
- 没有返回值
16、exceptionally
- 入参为Function<Throwable, ? extends T> fn
- 入参为异常
- 异常类型是CompletionException
- 入参为异常
- 可以返回其他值
- 如果任务没有出现异常则不会执行
17、complete
- 直接让任务完成
package com.Js;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
sleep(5);
return "Hello";
}, executorService);
task1.complete("haha");
System.out.println(task1.get());
executorService.shutdown();
}
private static void sleep(int i) {
try {
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
18、cancel
-
如果任务还没开始,或正在执行,则能取消,设置取消标记为true
-
package com.Js; import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> { sleep(3); System.out.println(Thread.currentThread().getName()); return "Hello"; }, executorService); sleep(1); task.cancel(false); System.out.println(task.get()); } private static void sleep(int i) { try { TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { System.out.println(e); e.printStackTrace(); } } }
-
-
如果任务已经完成,则设置取消标记为false
19、allOf
-
所有任务都执行完后才执行下一个任务
package com.Js; import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); sleep(5); return "Hello"; }, executorService); CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return "World"; }, executorService); CompletableFuture.allOf(task1, task2).join(); System.out.println("!!!"); } private static void sleep(int i) { try { TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } } }
20、anyOf
-
任意一个任务执行完就可以执行下一个任务了
package com.Js; import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; public class Main { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); sleep(5); return "Hello"; }, executorService); CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName()); return "World"; }, executorService); CompletableFuture.anyOf(task1, task2).join(); System.out.println("!!!"); } private static void sleep(int i) { try { TimeUnit.SECONDS.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } } }