并发编程系列-CompletableFuture

利用多线程来提升性能,实质上是将顺序执行的操作转化为并行执行。仔细观察后,你还会发现在顺序转并行的过程中,一定会牵扯到异步化。举个例子,现在下面这段示例代码是按顺序执行的,为了优化性能,我们需要将其改为并行执行。那具体的实施方法是什么呢?

//以下两个方法都是耗时操作
doBizA();
doBizB();

确实,实现并行化的方法很简单,就像下面的代码一样,我们创建两个子线程来执行这些操作。你会发现在下面的并行方案中,主线程无需等待doBizA()和doBizB()的执行结果,也就是说doBizA()和doBizB()这两个操作已经被异步化了。

new Thread(()->doBizA())
  .start();
new Thread(()->doBizB())
  .start();

异步化是实施并行方案的基础,更具体地说,它是实现利用多线程优化性能这一核心方案的基础。明白这一点后,你可能会理解为什么异步编程最近几年变得如此流行了,因为优化性能是互联网大厂的核心需求之一。Java在1.8版本引入了CompletableFuture来支持异步编程,这可能是你见过的最复杂的工具类了,但它的功能确实令人惊叹。

CompletableFuture的核心优势

为了体验CompletableFuture异步编程的优势,我们将使用CompletableFuture来实现一个烧水泡茶的程序。首先,我们需要制定分工方案。在下面的程序中,我们将任务分为三个部分:任务1负责洗水壶和烧开水,任务2负责洗茶壶、茶杯和取茶叶,任务3负责泡茶。任务3必须等待任务1和任务2都完成之后才能开始。下图展示了这个分工方案。

alt

烧水泡茶分工方案

下面是代码实现,你先略过runAsync()、supplyAsync()、thenCombine()这些不太熟悉的方法,从整体来看,你会发现:

  1. 无需手动维护线程,没有繁琐的手动线程管理工作,任务的线程分配也无需关注;
  2. 语义更明确,例如 f3 = f1.thenCombine(f2, ()->{}) 能够明确表达“任务3必须等待任务1和任务2都完成之后才能开始”;
  3. 代码更简洁且专注于业务逻辑,几乎所有的代码都是与业务逻辑相关的。
//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 =
  CompletableFuture.runAsync(()->{
  System.out.println("T1:洗水壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T1:烧开水...");
  sleep(15, TimeUnit.SECONDS);
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 =
  CompletableFuture.supplyAsync(()->{
  System.out.println("T2:洗茶壶...");
  sleep(1, TimeUnit.SECONDS);

  System.out.println("T2:洗茶杯...");
  sleep(2, TimeUnit.SECONDS);

  System.out.println("T2:拿茶叶...");
  sleep(1, TimeUnit.SECONDS);
  return "龙井";
});
//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 =
  f1.thenCombine(f2, (__, tf)->{
    System.out.println("T1:拿到茶叶:" + tf);
    System.out.println("T1:泡茶...");
    return "上茶:" + tf;
  });
//等待任务3执行结果
System.out.println(f3.join());

void sleep(int t, TimeUnit u) {
  try {
    u.sleep(t);
  }catch(InterruptedException e){}
}
// 一次执行结果:
T1:洗水壶...
T2:洗茶壶...
T1:烧开水...
T2:洗茶杯...
T2:拿茶叶...
T1:拿到茶叶:龙井
T1:泡茶...
上茶:龙井

领略CompletableFuture异步编程的优势之后,下面我们详细介绍CompletableFuture的使用,首先是如何创建CompletableFuture对象。

创建CompletableFuture对象

创建CompletableFuture对象主要依靠下列四个静态方法,我们首先来看前两个。在烧水泡茶的例子中,我们已经使用了 runAsync(Runnable runnable)supplyAsync(Supplier<U> supplier),它们之间的区别是:Runnable 接口的run()方法没有返回值,而Supplier接口的get()方法有返回值。

前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核心数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些耗时的I/O操作,就会导致线程池中的所有线程都被阻塞在I/O操作上,从而引发线程饥饿问题,进而影响整个系统的性能。因此,强烈建议你根据不同的业务类型创建不同的线程池,以避免彼此之间的干扰。

//使用默认线程池
static CompletableFuture<Void>
  runAsync(Runnable runnable)
static <U> CompletableFuture<U>
  supplyAsync(Supplier<U> supplier)
//可以指定线程池
static CompletableFuture<Void>
  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U>
  supplyAsync(Supplier<U> supplier, Executor executor)

创建完CompletableFuture对象之后,会自动地以异步方式执行runnable.run()方法或者supplier.get()方法。对于一个异步操作,你需要关注两个问题:一个是异步操作何时完成,另一个是如何获取异步操作的执行结果。因为CompletableFuture类实现了Future接口,所以这两个问题都可以通过Future接口来解决。此外,CompletableFuture类还实现了CompletionStage接口,该接口包含了丰富的方法,仅在1.8版本中就有40个。对于这些方法,我们该如何理解呢?

如何理解CompletionStage接口

你可以从责任分工的角度来类比工作流程。任务之间存在时序关系,包括串行关系、并行关系和汇聚关系等。这样说可能有些抽象,为了更好地理解,我举一个前面烧水泡茶的例子。其中洗水壶和烧开水之间是串行关系,洗水壶、烧开水以及洗茶壶、洗茶杯这两组任务之间是并行关系,而烧开水、拿茶叶和泡茶则是汇聚关系。

alt

串行关系

alt

并行关系

alt

汇聚关系

CompletionStage接口可以清晰地描述任务之间的这种时序关系,例如前面提到的 f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的AND指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有AND聚合关系,那就一定还有OR聚合关系,所谓OR指的是依赖的任务只要有一个完成就可以执行当前任务。

在编程领域,还有一个绕不过去的山头,那就是异常处理,CompletionStage接口也可以方便地描述异常处理。

下面我们就来一一介绍,CompletionStage接口如何描述串行关系、AND聚合关系、OR聚合关系以及异常处理。

1. 描述串行关系

CompletionStage接口里面描述串行关系,主要是thenApply、thenAccept、thenRun和thenCompose这四个系列的接口。

thenApply系列函数里参数fn的类型是接口Function<T, R>,这个接口里与CompletionStage相关的方法是 R apply(T t),这个方法既能接收参数也支持返回值,所以thenApply系列方法返回的是 CompletionStage<R>

而thenAccept系列方法里参数consumer的类型是接口 Consumer<T>,这个接口里与CompletionStage相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以thenAccept系列方法返回的是 CompletionStage<Void>

thenRun系列方法里action的参数是Runnable,所以action既不能接收参数也不支持返回值,所以thenRun系列方法返回的也是 CompletionStage<Void>

这些方法里面Async代表的是异步执行fn、consumer或者action。其中,需要你注意的是thenCompose系列方法,这个系列的方法会新创建出一个子流程,最终结果和thenApply系列是相同的。

CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

通过下面的示例代码,你可以看一下thenApply()方法是如何使用的。首先通过supplyAsync()启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

CompletableFuture<String> f0 =
  CompletableFuture.supplyAsync(
    () -> "Hello World")      //①
  .thenApply(s -> s + " QQ")  //②
  .thenApply(String::toUpperCase);//③

System.out.println(f0.join());
//输出结果
HELLO WORLD QQ

2. 描述AND汇聚关系

CompletionStage接口里面描述AND汇聚关系,主要是thenCombine、thenAcceptBoth和runAfterBoth系列的接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。它们的使用你可以参考上面烧水泡茶的实现程序,这里就不赘述了。

CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

3. 描述OR汇聚关系

CompletionStage接口里面描述OR汇聚关系,主要是applyToEither、acceptEither和runAfterEither系列的接口,这些接口的区别也是源自fn、consumer、action这三个核心参数不同。

CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

下面的示例代码展示了如何使用applyToEither()方法来描述一个OR汇聚关系。

CompletableFuture<String> f1 =
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f2 =
  CompletableFuture.supplyAsync(()->{
    int t = getRandom(5, 10);
    sleep(t, TimeUnit.SECONDS);
    return String.valueOf(t);
});

CompletableFuture<String> f3 =
  f1.applyToEither(f2,s -> s);

System.out.println(f3.join());

4. 异常处理

虽然上面我们提到的fn、consumer、action它们的核心方法都 不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

CompletableFuture<Integer>
  f0 = CompletableFuture.
    .supplyAsync(()->(7/0))
    .thenApply(r->r*10);
System.out.println(f0.join());

CompletionStage接口给我们提供的方案非常简单,比try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

下面的示例代码展示了如何使用exceptionally()方法来处理异常,exceptionally()的使用非常类似于try{}catch{}中的catch{},但是由于支持链式编程方式,所以相对更简单。既然存在try{}catch{},那么必然还有try{}finally{},whenComplete()和handle()系列方法就类似于try{}finally{}中的finally{},无论是否发生异常都会执行whenComplete()中的回调函数consumer和handle()中的回调函数fn。whenComplete()和handle()的差异在于whenComplete()不支持返回结果,而handle()则支持返回结果的。

CompletableFuture<Integer>
  f0 = CompletableFuture
    .supplyAsync(()->(7/0))
    .thenApply(r->r*10)
    .exceptionally(e->0);
System.out.println(f0.join());

总结

曾经一提到异步编程,人们常会联想到回调函数,在JavaScript中,几乎所有的异步问题都依赖于回调函数来解决。然而,当处理异常和复杂的异步任务关系时,回调函数往往显得力不从心,这也导致了「回调地狱」(Callback Hell)的出现。在过去的几年里,异步编程备受诟病。

为了更好地支持异步编程,Java语言在1.8版本引入了CompletableFuture,并在Java 9版本中提供了更加完善的Flow API。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/75416.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

哈工大开源“活字”对话大模型

一、介绍 大规模语言模型&#xff08;LLM&#xff09;在自然语言处理的通用领域已取得了令人瞩目的成功。对于广泛的应用场景&#xff0c;这种技术展示了强大的潜力&#xff0c;学术界和工业界的兴趣也持续升温。哈工大自然语言处理研究所30余位老师和学生参与开发了通用对话大…

数据库概述、部署MySQL服务、必备命令、密码管理、安装图形软件、SELECT语法 、筛选条件

Top NSD DBA DAY01 案例1&#xff1a;构建MySQL服务器案例2&#xff1a;密码管理案例3&#xff1a;安装图形软件案例4&#xff1a;筛选条件 1 案例1&#xff1a;构建MySQL服务器 1.1 问题 在IP地址192.168.88.50主机和192.168.88.51主机上部署mysql服务练习必备命令的使用 …

性能测试压力曲线模型分析

性能测试模压力曲线&#xff1a; 曲线图关键点介绍&#xff1a; 横轴&#xff1a;从左到右表现了Number of Concurrent Users&#xff08;并发用户数&#xff09;的不断增长。 纵轴&#xff1a;分别表示Utilization&#xff08;资源的利用情况&#xff0c;包括硬件资源和软件…

Android Framework 动态更新插拔设备节点执行权限

TF卡设备节点是插上之后动态添加&#xff0c;所以不能通过初始化设备节点权限来解决&#xff0c;需要监听TF插入事件&#xff0c;在init.rc 监听插入后动态更新设备节点执行权限 添加插拔TF卡监听 frameworks/base/services/core/java/com/android/server/StorageManagerServic…

Vue3组件库

Vue组件库 ViteVue3TypescriptTSX 1、项目搭建 1.1、创建项目&#xff08;yarn&#xff09; D:\WebstromProject>yarn create vite yarn create v1.22.19 [1/4] Resolving packages... [2/4] Fetching packages... [3/4] Linking dependencies... [4/4] Building fresh pa…

MySQL8安装和删除教程 下载源码 保姆级(Windows)

删除 停止Mysql服务 管理员的权限来运行cmd&#xff0c;输入 net stop MySQL80 注意你电脑上的MySQL服务不一定是MySQL80,MySQL80是默认的&#xff0c;不是怎么办?在services.msc中找即可 下载一个小工具 geek:Geek下载打开软件&#xff0c;在列表中找到图片中的两项 sc…

【微服务技术一】Eureka、Nacos、Ribbon(配置管理、注册中心、负载均衡)

微服务技术一 技术栈图一、注册中心Eureka概念&#xff1a;搭建EurekaServer服务注册服务发现&#xff08;消费者对提供者的远程调用&#xff09; 二、Ribbon负载均衡负载均衡的原理&#xff1a;LoadBalanced负载均衡的策略&#xff1a;IRule懒加载 三、Nacos注册中心Nacos的安…

excel中定位条件,excel中有哪些数据类型、excel常见错误值、查找与替换

一、如何定位条件 操作步骤&#xff1a;开始 - 查找和选择 - 定位条件&#xff08;ctrl G 或 F5&#xff09; 注&#xff1a;如果F5不可用&#xff0c;可能是这个快捷键被占用了 案例&#xff1a;使用定位条件选择取余中空单元格&#xff0c;填入100&#xff0c;按组合键ct…

【MySQL】MySQL不走索引的情况分析

未建立索引 当数据表没有设计相关索引时&#xff0c;查询会扫描全表。 create table test_temp (test_id int auto_incrementprimary key,field_1 varchar(20) null,field_2 varchar(20) null,field_3 bigint null,create_date date null );expl…

Python源码05:使用Pyecharts画词云图图

**Pyecharts是一个用于生成 Echarts 图表的 Python 库。Echarts 是一个基于 JavaScript 的数据可视化库&#xff0c;提供了丰富的图表类型和交互功能。**通过 Pyecharts&#xff0c;你可以使用 Python 代码生成各种类型的 Echarts 图表&#xff0c;例如折线图、柱状图、饼图、散…

通过 Amazon SageMaker JumpStart 部署 Llama 2 快速构建专属 LLM 应用

来自 Meta 的 Llama 2 基础模型现已在 Amazon SageMaker JumpStart 中提供。我们可以通过使用 Amazon SageMaker JumpStart 快速部署 Llama 2 模型&#xff0c;并且结合开源 UI 工具 Gradio 打造专属 LLM 应用。 Llama 2 简介 Llama 2 是使用优化的 Transformer 架构的自回归语…

el-table实现懒加载(el-table-infinite-scroll)

2023.8.15今天我学习了用el-table对大量的数据进行懒加载。 效果如下&#xff1a; 1.首先安装&#xff1a; npm install --save el-table-infinite-scroll2 2.全局引入&#xff1a; import ElTableInfiniteScroll from "el-table-infinite-scroll";// 懒加载 V…

通过网关访问微服务,一次正常,一次不正常 (nacos配置的永久实例却未启动导致)

微服务直接访问没问题&#xff0c;通过网关访问&#xff0c;就一次正常访问&#xff0c;一次401错误&#xff0c;交替正常和出错 负载均衡试了 路由配置检查了 最后发现nacos下竟然有2个order服务实例&#xff0c;我明明只开启了一个呀 原来之前的8080端口微服务还残留&…

开工大吉|华润鞋业二期自动化改造项目开工典礼圆满举行

2023年8月10日上午&#xff0c;山东百华鞋业有限公司择良辰吉时隆重举行了华润鞋业二期厂房动工仪式&#xff0c;公司总经理郭兴梅女士携公司管理层代表和施工单位代表参加了动工仪式。 根据公司发展规划&#xff0c;对未来发展的美好期许&#xff0c;以及公司生产与研发保持的…

ApacheCon - 云原生大数据上的 Apache 项目实践

Apache 软件基金会的官方全球系列大会 CommunityOverCode Asia&#xff08;原 ApacheCon Asia&#xff09;首次中国线下峰会将于 2023 年 8 月 18-20 日在北京丽亭华苑酒店举办&#xff0c;大会含 17 个论坛方向、上百个前沿议题。 字节跳动云原生计算团队在此次 CommunityOve…

手机里视频太大怎么压缩?压缩教程分享

现在视频文件的体积越来越大了&#xff0c;动不动就是几个GB起步&#xff0c;如果后期再剪辑处理一下&#xff0c;更是会占据更多的设备空间了&#xff0c;还会导致我们传输受到限制&#xff0c;这时候就需要我们对视频进行压缩处理&#xff0c;下面给大家分享几个简单的方法&a…

Python爬虫——scrapy_基本使用

安装scrapy pip install scrapy创建scrapy项目&#xff0c;需要在终端里创建 注意&#xff1a;项目的名字开头不能是数字&#xff0c;也不能包含中文 scrapy startproject 项目名称 示例&#xff1a; scrapy startproject scra_baidu_36创建好后的文件 3. 创建爬虫文件&…

go的gin和gorm框架实现切换身份的接口

使用go的gin和gorm框架实现切换身份的接口&#xff0c;接收前端发送的JSON对象&#xff0c;查询数据库并更新&#xff0c;返回前端信息 接收前端发来的JSON对象&#xff0c;包含由openid和登陆状态组成的一个string和要切换的身份码int型 后端接收后判断要切换的身份是否低于该…

vue3+vite配置vantUI主题

❓在项目中统一配置UI主题色&#xff0c;各个组件配色统一修改 vantUI按需安装 参考vantUI文档 创建vantVar.less文件夹进行样式编写 vantVar.less :root:root{//导航--van-nav-bar-height: 44px;//按钮--van-button-primary-color: #ffffff;--van-button-primary-backgr…

CentOS系统环境搭建(三)——Centos7安装DockerDocker Compose

centos系统环境搭建专栏&#x1f517;点击跳转 Centos7安装Docker&Docker Compose 使用 yum 安装Docker 内核 [rootVM-4-17-centos ~]# uname -r 3.10.0-1160.88.1.el7.x86_64Docker 要求 CentOS 系统的内核版本高于 3.10 更新 yum yum update安装需要的软件包&#x…