一网打尽异步神器CompletableFuture

Future接口以及它的局限性

我们都知道,Java中创建线程的方式主要有两种方式,继承Thread或者实现Runnable接口。但是这两种都是有一个共同的缺点,那就是都无法获取到线程执行的结果,也就是没有返回值。于是在JDK1.5 以后为了解决这种没有返回值的问题,提供了Callable和Future接口以及Future对应的实现类FutureTask,通过FutureTask的就可以获取到异步执行的结果。

于是乎,我们想要开启异步线程,执行任务,获取结果,就可以这么实现。

 FutureTask<String> futureTask = new FutureTask<>(() -> "三友");
 new Thread(futureTask).start();
 System.out.println(futureTask.get());

或者使用线程池的方式

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。

Future接口的局限性

虽然通过Future接口的get方法可以获取任务异步执行的结果,但是get方法会阻塞主线程,也就是异步任务没有完成,主线程会一直阻塞,直到任务结束。

Future也提供了isDone方法来查看异步线程任务执行是否完成,如果完成,就可以获取任务的执行结果,代码如下。

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
while (!future.isDone()) {
  //任务有没有完成,没有就继续循环判断
}
System.out.println(future.get());
executorService.shutdown();

但是这种轮询查看异步线程任务执行状态,也是非常消耗cpu资源。

同时对于一些复杂的异步操作任务的处理,可能需要各种同步组件来一起完成。

所以,通过上面的介绍可以看出,Future在使用的过程中还是有很强的局限性,所以为了解决这种局限性,在JDK1.8的时候,Doug Lea 大神为我们提供了一种更为强大的类CompletableFuture。

什么是CompletableFuture?

CompletableFuture在JDK1.8提供了一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有;除此之外,它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。

CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段任务执行结束之后,可以回调你指定的下一阶段任务,而不需要阻塞获取结果之后来处理结果。

CompletableFuture常见api详解

CompletableFuture的方法api多,但主要可以分为以下几类。

1、实例化CompletableFuture

构造方法创建
CompletableFuture<String> completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());

此时如果有其它线程执行如下代码,就能执行打印出 三友

completableFuture.complete("三友")
静态方法创建

除了使用构造方法构造,CompletableFuture还提供了静态方法来创建

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

supply 和 run 的主要区别就是 supply 可以有返回值,run 没有返回值。至于另一个参数Executor 就是用来执行异步任务的线程池,如果不传Executor 的话,默认是ForkJoinPool这个线程池的实现。

一旦通过静态方法来构造,会立马开启异步线程执行Supplier或者Runnable提交的任务。

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(completableFuture.get());

一旦任务执行完成,就可以打印返回值,这里的使用方法跟Future是一样的。

所以对比两个两种实例化的方法,使用静态方法的和使用构造方法主要区别就是,使用构造方法需要其它线程主动调用complete来表示任务执行完成,因为很简单,因为在构造的时候没有执行异步的任务,所以需要其它线程主动调用complete来表示任务执行完成。

2、获取任务执行结果

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();

get()和get(long timeout, TimeUnit unit)是实现了Future接口的功能,两者主要区别就是get()会一直阻塞直到获取到结果,get(long timeout, TimeUnit unit)值可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。

getNow(T valueIfAbsent):就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的 valueIfAbsent 参数值,如果执行完成了,就会返回任务执行的结果。

join():跟get()的主要区别就是,get()会抛出检查时异常,join()不会。

3、主动触发任务完成

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);

complete:主动触发当前异步任务的完成。调用此方法时如果你的任务已经完成,那么方法就会返回false;如果任务没完成,就会返回true,并且其它线程获取到的任务的结果就是complete的参数值。

completeExceptionally:跟complete的作用差不多,complete是正常结束任务,返回结果,而completeExceptionally就是触发任务执行的异常。

4、对任务执行结果进行下一步处理

只能接收任务正常执行后的回调
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

这类回调的特点就是,当任务正常执行完成,没有异常的时候就会回调。

thenApply:可以拿到上一步任务执行的结果进行处理,并且返回处理的结果 thenRun:拿不到上一步任务执行的结果,但会执行Runnable接口的实现 thenAccept:可以拿到上一步任务执行的结果进行处理,但不需要返回处理的结果

thenApply示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 10)
                .thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

上一步的执行的结果为:10

thenRun示例:

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10)
      .thenRun(() -> System.out.println("上一步执行完成"));

执行结果:

上一步执行完成

thenAccept示例:

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10)
      .thenAccept(v -> System.out.println("上一步执行完成,结果为:" + v));

执行结果:

上一步执行完成,结果为:10

thenApply有异常示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    //模拟异常
    int i = 1 / 0;
    return 10;
}).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

当有异常时是不会回调的

只能接收任务处理异常后的回调
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

当上面的任务执行过程中出现异常的时候,会回调exceptionally方法指定的回调,但是如果没有出现异常,是不会回调的。

exceptionally能够将异常给吞了,并且fn的返回值会返回回去。

其实这个exceptionally方法有点像降级的味道。当出现异常的时候,走到这个回调,可以返回一个默认值回去。

没有异常情况下:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    return 100;
}).exceptionally(e -> {
    System.out.println("出现异常了,返回默认值");
    return 110;
});
System.out.println(completableFuture.join());

执行结果:

100

有异常情况下:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    int i = 1 / 0;
    return 100;
}).exceptionally(e -> {
    System.out.println("出现异常了,返回默认值");
    return 110;
});
System.out.println(completableFuture.join());

执行结果:

出现异常了,返回默认值
110
能同时接收任务执行正常和异常的回调
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);

不论前面的任务执行成功还是失败都会回调的这类方法指定的回调方法。

handle : 跟exceptionally有点像,但是exceptionally是出现异常才会回调,两者都有返回值,都能吞了异常,但是handle正常情况下也能回调。

whenComplete:能接受正常或者异常的回调,并且不影响上个阶段的返回值,也就是主线程能获取到上个阶段的返回值;当出现异常时,whenComplete并不能吞了这个异常,也就是说主线程在获取执行异常任务的结果时,会抛出异常。

这里演示一下whenComplete处理异常示例情况,handle跟exceptionally对异常的处理差不多。

whenComplete处理异常示例:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
      int i = 1 / 0;
      return 10;
}).whenComplete((r, e) -> {
      System.out.println("whenComplete被调用了");
});
System.out.println(completableFuture.join());

执行结果:

whenComplete被调用了
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

5、对任务结果进行合并

public <U,V> CompletionStage<V> thenCombine
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);

这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果。

thenCombine的例子请往下继续看。

6、以Async结尾的方法

上面说的一些方法,比如说thenAccept方法,他有两个对应的Async结尾的方法,如下:

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

thenAcceptAsync跟thenAccept的主要区别就是thenAcceptAsync会重新开一个线程来执行下一阶段的任务,而thenAccept还是用上一阶段任务执行的线程执行。

两个thenAcceptAsync主要区别就是一个使用默认的线程池来执行任务,也就是ForkJoinPool,一个是使用方法参数传入的线程池来执行任务。

当然除了thenAccept方法之外,上述提到的方法还有很多带有Async结尾的对应的方法,他们的主要区别就是执行任务是否开启异步线程来执行的区别。

当然,还有一些其它的api,可以自行查看

CompletableFuture在RocketMQ中的使用

CompletableFuture在RocketMQ中的使用场景比较多,这里我举一个消息存储的场景。

在RocketMQ中,Broker接收到生产者产生的消息的时候,会将消息持久化到磁盘和同步到从节点中。持久化到磁盘和消息同步到从节点是两个独立的任务,互不干扰,可以相互独立执行。当消息持久化到磁盘和同步到从节点中任务完成之后,需要统计整个存储消息消耗的时间,所以统计整个存储消息消耗的时间是依赖前面两个任务的完成。

图片

实现代码如下

消息存储刷盘任务和主从复制任务:

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 提交刷盘的请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//提交主从复制的请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

//刷盘 和 主从复制 两个异步任务通过thenCombine联合
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    // 当两个刷盘和主从复制任务都完成的时候,就会回调
    // 如果刷盘没有成功,那么就将消息存储的状态设置为失败
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    // 如果主从复制没有成功,那么就将消息存储的状态设置为失败
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
    }
    // 最终返回消息存储的结果
    return putMessageResult;
});

对上面两个合并的任务执行结果通过thenAccept方法进行监听,统计消息存储的耗时:

//消息存储的开始时间
long beginTime = this.getSystemClock().now();
// 存储消息,然后返回 CompletableFuture,也就是上面一段代码得返回值‍
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

//监听消息存储的结果
putResultFuture.thenAccept((result) -> {
    // 消息存储完成之后会回调
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
        log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    }
});

CompletableFuture的优点

1、异步函数式编程,实现优雅,易于维护;

2、它提供了异常管理的机制,让你有机会抛出、管理异步任务执行中发生的异常,监听这些异常的发生;

3、拥有对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/190925.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入解析Selenium动作链:精通点击、拖拽、切换等操作

背景&#xff1a; 一些交互动作都是针对某个节点执行的。比如&#xff0c;对于输入框&#xff0c;我们就调用它的输入文字和清空文字方法&#xff1b;对于按钮&#xff0c;就调用它的点击方法。其实&#xff0c;还有另外一些操作&#xff0c;它们没有特定的执行对象&#xff0…

关于vs code Debug调试时候出现“找不到任务C/C++: g++.exe build active file” 解决方法

vs code Debug调试时候出现“找不到任务C/C: g.exe build active file” &#xff0c;出现报错&#xff0c;Debug失败 后来经过摸索和上网查找资料解决问题 方法如下 在Vs code的操作页面左侧有几个配置文件 红框里的是需要将要修改的文件 查看tasks.json和launch.json框选&…

跟着chatgpt一起学|1.spark入门之MLLib

chatgpt在这一章表现的不好&#xff0c;所以我主要用它来帮我翻译文章提炼信息 1.前言 首先找到spark官网里关于MLLib的链接 spark内一共有2种支持机器学习的包&#xff0c; 一种是spark.ml,基于DataFrame的&#xff0c;也是目前主流的 另一种则是spark.mllib,是基于RDD的…

Python---函数的数据---拆包的应用案例(两个变量值互换,*args, **kwargs调用时传递参数用法)

案例&#xff1a; 使用至少3种方式交换两个变量的值 第一种方式&#xff1a;引入一个临时变量 c1 10 c2 2# 引入临时变量temp temp c2 c2 c1 c1 tempprint(c1, c2) 第二种方式&#xff1a;使用加法与减法运算交换两个变量的值&#xff08;不需要引入临时变量&#xff09…

【单调栈】子数组的最小值之和

import java.util.Deque; import java.util.LinkedList;/** 参考链接&#xff1a;https://leetcode.cn/problems/sum-of-subarray-minimums/solutions/1930857/gong-xian-fa-dan-diao-zhan-san-chong-shi-gxa5/* https://leetcode.cn/problems/sum-of-subarray-minim…

[NOIP2006]明明的随机数

一、题目 登录—专业IT笔试面试备考平台_牛客网 二、代码 set去重&#xff0c;再利用vector进行排序 std::set是一个自带排序功能的容器&#xff0c;它已经按照一定的规则&#xff08;默认是元素的小于比较&#xff09;对元素进行了排序。因此&#xff0c;你不能直接对std::s…

栈和队列OJ题

目录 【1】括号匹配问题 思路分析 易错总结 Stack.h&Stack.c手撕栈 isValid括号匹配 【2】设计循环队列 今天接着栈&队列OJ题目。 【1】括号匹配问题 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff…

超实用!Spring Boot 常用注解详解与应用场景

目录 一、Web MVC 开发时&#xff0c;对于三层的类注解 1.1 Controller 1.2 Service 1.3 Repository 1.4 Component 二、依赖注入的注解 2.1 Autowired 2.2 Resource 2.3 Resource 与 Autowired 的区别 2.3.1 实例讲解 2.4 Value 2.5 Data 三、Web 常用的注解 3.1…

【研究中】sql server权限用户设置23.11.26

--更新时间2023.11.26 21&#xff1a;30 负责人&#xff1a;jerrysuse DBAliCMSIF EXISTS (select * from sysobjects where namehkcms_user)--判断是否存在此表DROP TABLE hkcms_user CREATE TABLE hkcms_user (id int primary key identity(1, 1),username char(32) NOT N…

【STM32单片机】简易计算器设计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用STM32F103C8T6单片机控制器&#xff0c;使用动态数码管模块、矩阵按键、蜂鸣器模块等。 主要功能&#xff1a; 系统运行后&#xff0c;数码管默认显示0&#xff0c;输入对应的操作数进行四则运…

(1/2)敏捷实践指南 Agile Practice Guide ([美] Project Management institute 著)

电子工业出版社 Publishing House Of Electronics Industry 北京BeiJing 版次&#xff1a;2018年10月第1版 印次&#xff1a;2023年2月第22次印刷 定价&#xff1a;68元 声明 作为项目管理协会&#xff08;PMI&#xff09;的标准和指南&#xff0c;本指南是通过相关人员的…

MySQL表的操作『增删改查』

✨个人主页&#xff1a; 北 海 &#x1f389;所属专栏&#xff1a; MySQL 学习 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 &#x1f381;软件版本&#xff1a; MySQL 5.7.44 文章目录 1.创建表1.1.创建时指定属性 2.查看表2.1.查看表结构2.2.查看建表信息…

MYSQL 连接的使用

文章目录 前言连接介绍在命令提示符中使用 INNER JOINMySQL LEFT JOINMySQL RIGHT JOIN在PHP脚本中使用JOIN后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;Mysql &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握…

Kafka 如何实现顺序消息

版本说明 本文所有的讨论均在如下版本进行&#xff0c;其他版本可能会有所不同。 Kafka: 3.6.0Pulsar: 2.9.0RabbitMQ 3.7.8RocketMQ 5.0Go1.21github.com/segmentio/kafka-go v0.4.45 结论先行 Kafka 只能保证单一分区内的顺序消息&#xff0c;无法保证多分区间的顺序消息…

【咕咕送书 | 第六期】深入浅出阐述嵌入式虚拟机原理,实现“小而能”嵌入式虚拟机!

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《粉丝福利》 《linux深造日志》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 ⛳️ 写在前面参与规则引言一、为什么嵌入式系统需要虚拟化技术&#xff1f;1.1 专家推荐 二、本书适合谁&#x…

二级分类菜单及三级分类菜单的层级结构返回

前言 在开发投诉分类功能模块时&#xff0c;遇到过这样一个业务场景&#xff1a;后端需要按层级结构返回二级分类菜单所需数据&#xff0c;换言之&#xff0c;将具有父子关系的List结果集数据转为树状结构数据来返回 二级分类菜单 前期准备 这里简单复刻下真实场景中 出现的…

一文从Vue2过渡到Vue3

文章目录 Vue3简介创建Vue3.0工程使用 vue-cli 创建使用 vite 创建Vue3工程结构变化 常用 Composition API拉开序幕的setupref函数reactive函数Vue3.0中的响应式原理vue2.x的响应式Vue3.0的响应式 reactive对比refsetup的两个注意点计算属性与监视computed函数watch函数watchEf…

【限时免费】20天拿下华为OD笔试之【DP/贪心】2023B-观看文艺汇演-200分【欧弟算法】全网注释最详细分类最全的华为OD真题题解

【DP/贪心】2023B-观看文艺汇演 题目描述与示例 某公园将举行多场文艺表演&#xff0c;很多演出都是同时进行&#xff0c;一个人只能同时观看一场演出&#xff0c;且不能迟到早退&#xff0c;由于演出分布在不同的演出场地&#xff0c;所以连续观看的演出最少有 15 分钟的时间…

Docker搭建个人网盘NextCloud并接入雨云对象存储的教程

雨云服务器使用Docker搭建私有云盘NextCloud并接入雨云对象存储ROS的教程。 NextCloud简介 NextCloud由原ownCloud联合创始人Frank Karlitschek创建的&#xff0c;继承原ownCloud的核心技术又有不少的创新。在功能上NextCloud和ownCloud差不多&#xff0c;甚至还要丰富一些&a…

WebSocket了解

一.什么是WebSocket WebSocket是HTML5下一种新的协议&#xff08;websocket协议本质上是一个基于tcp的协议&#xff09;它实现了浏览器与服务器全双工通信&#xff0c;能更好的节省服务器资源和带宽并达到实时通讯的目的Websocket是一个持久化的协议 二.websocket的原理 web…