1.创建异步对象
CompletableFuture提供了四个静态方法来创建一个异步操作
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
// CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// System.out.println("当前线程:" + Thread.currentThread().getName());
// int i = 10 / 2;
// System.out.println("运行结果:" + i);
// }, executor);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor);
Integer i = future.get();
System.out.println("主线程收到值:"+i+" "+Thread.currentThread().getName());
System.out.println("end.....");
}
2.计算完成是回调方法
1.无异常
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((result,exception)->{
//当执行完第一个异步任务的时候用同一个线程执行第二个异步任务
System.out.println("异步任务成功完成 结果是· res="+result+",exception="+exception);
});
System.out.println("end.....");
}
2.有异常
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((result,exception)->{
//当执行完第一个异步任务的时候用同一个线程执行第二个异步任务
System.out.println("异步任务成功完成 结果是· res="+result+",exception="+exception);
});
System.out.println("end.....");
}
3.测试两个异常
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((result,exception)->{
//当执行完第一个异步任务的时候用同一个线程执行第二个异步任务
System.out.println("异步任务成功完成 结果是· res="+result+",exception="+exception);
throw new RuntimeException("测试异常");
}).exceptionally(ex->{
System.out.println("执行出错:"+ex.getMessage());
return 0;
});
Integer i = future.get();
System.out.println("执行结果 i="+i);
System.out.println("end.....");
}
只会拿到第一个异常
3.Handler方法
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
//方法执行完成后的处理,无论是成功完成的处理还是失败完成的处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).handle((res,thr)->{
//回调到主线程执行
System.out.println("当前线程 ===:" + Thread.currentThread().getName());
System.out.println(res+" "+thr);
return 0;
});
Integer i = future.get();
System.out.println("执行结果 i="+i);
System.out.println("end.....");
executor.shutdown();
}
有点像RxJava
4.线程串行化的方法
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
//不能获取到上一步的执行结果
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 1;
System.out.println("运行结果:" + i);
return i;
}, executor).thenRunAsync(()->{
System.out.println("任务2启动了");
},executor);
// Void unused = future1.get();
// System.out.println("执行结果 i="+i);
System.out.println("end.....");
}
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
//不能获取到上一步的执行结果
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).thenApplyAsync(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return 0;
}
}, executor).whenCompleteAsync((o, throwable) -> {
System.out.println("res=" + o + ",throwable=" + throwable);
}, executor).exceptionally(e->{
return 500;
});
Integer i = future.get();
System.out.println("执行结果 i="+i);
System.out.println("end.....");
}
1.测试执行
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
//不能获取到上一步的执行结果
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 1;
System.out.println("运行结果:" + i);
return i;
}, executor).thenApplyAsync(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return 0;
}
}, executor).whenCompleteAsync((o, throwable) -> {
System.out.println("res=" + o + ",throwable=" + throwable);
}, executor).thenApplyAsync(integer -> {
System.out.println("res=" + integer );
return 30;
},executor).whenCompleteAsync((integer, throwable) -> {
System.out.println("任务3执行"+integer);
}).whenCompleteAsync((integer, throwable) -> {
System.out.println("任务4执行"+integer);
}).exceptionally(e->{
return 500;
});
Integer i = future.get();
System.out.println("执行结果 i="+i);
System.out.println("end.....");
}
5.两任务组合 - 都要完成
创建future(返回值、无返回值)
future处理(处理结果;处理异常;处理结果异常)
串行future(不接收返回值;接收返回值;产生返回值)
并行future(不接收返回值、接收返回值;产生返回值)
6.两任务组合,任意一个完成
一个抛了异常,那么接下来没有使用接受异常的操作以外,其他的操作全部不执行
线程对象不用回收,因为是统一由线程池管理,不会造成内存泄漏
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("start.....");
//不能获取到上一步的执行结果
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("当前线程:" + Thread.currentThread().getName());
int i = 10 / 1;
System.out.println("运行结果:" + i);
return i;
}, executor);
CompletableFuture<Integer> future1 = future.thenApplyAsync(integer -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("任务2执行完成");
return 300;
});
CompletableFuture<Integer> future2 = future.thenApplyAsync((integer) -> {
try {
Thread.sleep(2000);
} catch (InterruptedException a) {
}
System.out.println("任务3执行完成");
return 2000;
});
System.out.println(future1 == future);
future1.runAfterEitherAsync(future2, () -> {
System.out.println("f1 或 f2 执行完了");
}, executor);
future1.applyToEitherAsync(future2, integer -> {
System.out.println("谁先执行完 int = "+integer);
return 0;
}, executor);
// System.out.println("执行结果 i="+i);
System.out.println("end.....");
}
7.多任务组合
1. allOf
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("查询商品的图片信息");
return "hello.jpg";
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("查询商品的属性");
return "黑色+256G";
}, executor);
CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("查询商品介绍");
return "华为";
}, executor);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(future01, future02, future03);
System.out.println(":1");
voidCompletableFuture.get();//等待所有的都做完
System.out.println("sa");
System.out.println("所有的结果都做完了: "+future01.get()+","+future02.get()+","+future03.get());
}
2.anyOf
public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> {//有返回值异步
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("查询商品的图片信息");
return "hello.jpg";
}, executor);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {//有返回值异步
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("查询商品的属性");
return "黑色+256G";
}, executor);
CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {//有返回值异步
System.out.println("查询商品介绍");
return "华为";
}, executor);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);
Object o = anyOf.get();//等待所有的都做完
System.out.println("result = " + o);
}