Java8实战-总结49
- CompletableFuture:组合式异步编程
- 对多个异步任务进行流水线操作
- 构造同步和异步操作
- 将两个 CompletableFuture 对象整合起来,无论它们是否存在依赖
CompletableFuture:组合式异步编程
对多个异步任务进行流水线操作
构造同步和异步操作
使用CompletableFuture
提供的特性,以异步方式重新实现findPrices方法。详细代码如下所示(使用CompletableFuture
实现findPrices
方法):
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync( //以异步方式取得每个shop中指定产品的原始价格
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse)) //Quote对象存在时,对其返回的值进行转换
.map(future -> future.thenCompose(quote -> //使用另一个异步任务构造期望的Future,申请折扣
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)//等待流中的所有Future执行完毕,并提取各自的返回值
.collect(toList());
}
这一次,事情看起来变得更加复杂了这三次转换的流程如下图所示:
进行的这三次map
操作和前面代码中的同步方案没有太大的区别,不过使用CompletableFuture
类提供的特性,在需要的地方把它们变成了异步操作。
- 获取价格
这三个操作中的第一个已经在各个例子中见过很多次,只需要将Lambda
表达式作为参数传递给supplyAsync
工厂方法就可以以异步方式对shop
进行查询。第一个转换的结果是一个Stream<CompletableFuture<String>>
,一旦运行结束,每个CompletableFuture
对象中都会包含对应shop
返回的字符串。注意,你对CompletableFuture
进行了设置,用前面代码中的方法向其传递了一个订制的执行器Executor
。 - 解析报价
现在需要进行第二次转换将字符串转变为订单。由于一般情况下解析操作不涉及任何远程服务,也不会进行任何I/O
操作,它几乎可以在第一时间进行,所以能够采用同步操作,不会带来太多的延迟。由于这个原因,你可以对第一步中生成的CompletableFuture
对象调用它的thenApply
,将一个由字符串转换Quote
的方法作为参数传递给它。注意到了吗?直到调用的CompletableFuture
执行结束,使用的thenApply
方法都不会阻塞代码的执行。这意味着CompletableFuture
最终结束运行时,你希望传递Lambda
表达式给thenApply
方法,将Stream
中的每个CompletableFuture<String>
对象转换为对应的CompletableFuture<Quote>
对象。你可以把这看成是为处理CompletableFuture
的结果建立了一个菜单,就像你曾经为Stream
的流水线所做的事儿一样。 - 为计算折扣价格构造Future
第三个map
操作涉及联系远程的Discount
服务,为从商店中得到的原始价格申请折扣率。这一转换与前一个转换又不大一样,因为这一转换需要远程执行(或者,就这个例子而言,它需要模拟远程调用带来的延迟),出于这一原因,你也希望它能够异步执行。为了实现这一目标,你像第一个调用传递getPrice
给supplyAsync
那样,将这一操作以Lambda
表达式的方式传递给了supplyAsync
工厂方法,该方法最终会返回另一个CompletableFuture
对象。到目前为止,你已经进行了两次异步操作,用了两个不同的CompletableFutures
对象进行建模,你希望能把它们以级联的方式串接起来进行工作。 - 从
shop
对象中获取价格,接着把价格转换为Quote
。 - 拿到返回的
Quote
对象,将其作为参数传递给Discount
服务,取得最终的折扣价格。
Java 8
的 CompletableFuture AP
I提供了名为thenCompose
的方法,它就是专门为这一目的而设计的,thenCompose
方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。换句话说,你可以创建两个CompletableFutures
对象,对第一个CompletableFuture
对象调用 thenCompose
,并向其传递一个函数。当第一个CompletableFuture
执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture
的返回做输入计算出的第二个CompletableFuture
对象。使用这种方式,即使Future
在向不同的商店收集报价,主线程还是能继续执行其他重要的操作,比如响应
事件。
将这三次map
操作的返回的Stream
元素收集到一个列表,你就得到了一个List<CompletableFuture<String>>
,等这些CompletableFuture
对象最终执行完毕,就可以像之前代码中那样利用join
取得它们的返回值。代码实现的新版findPrices
方法产生的输出如下:
[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price
is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]
Done in 2035 msecs
上面代码中使用的thenCompose
方法像CompletableFuture
类中的其他方法一样,也提供了一个以Async
后缀结尾的版本thenComposeAsync
。通常而言,名称中不带Async
的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async
结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。就这个例子而言,第二个CompletableFuture
对象的结果取决于第一个CompletableFuture
,所以无论你使用哪个版本的方法来处理CompletableFuture
对象,对于最终的结果,或者大致的时间而言都没有多少差别。选择thenCompose
方法的原因是因为它更高效一些,因为少了很多线程切换的开销。
将两个 CompletableFuture 对象整合起来,无论它们是否存在依赖
上面的代码中,你对一个CompletableFuture
对象调用了thenCompose
方法,并向其传递了第二个 CompletableFuture
,而第二个CompletableFuture
又需要使用第一个CompletableFuture
的执行结果作为输入。但是,另一种比较常见的情况是,你需要将两个完全不相干的CompletableFuture
对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务。
这种情况,你应该使用thenCombine
方法,它接收名为BiFunction
的第二参数,这个参数定义了当两个CompletableFuture
对象完成计算后,结果如何合并。同thenCompose
方法一样,thenCombine
方法也提供有一个Async
的版本。这里,如果使用thenCombineAsync
会导致BiFunction
中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。回到我们正在运行的这个例子,有一家商店提供的价格是以欧元(EUR)计价的,但是你希望以美元的方式提供给你的客户。你可以用异步的方式向商店查询指定商品的价格,同时从远程的汇率服务那里查到欧元和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。用这种方式,你需要使用第三个CompletableFuture
对象,当前两个 CompletableFuture
计算出结果,并由BiFunction
方法完成合并后,由它来最终结束这一任务,代码清单如下所示:
Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) //创建第一个任务查询商店取得商品的价格
.thenCombine(
CompletableFuture.supplyAsync(
() -> exchangeService.getRate(Money.EUR, Money.USD)), //创建第二个独立任务,查询美元和欧元之间的转换汇率
(price, rate) -> price * rate);通过乘法 整合得到的商品价格和汇率
);
这里整合的操作只是简单的乘法操作,用另一个单独的任务对其进行操作有些浪费资源,所以你只要使用thenCombine
方法,无需特别求助于异步版本的thenCombineAsync
方法。下图展示了上面代码中创建的多个任务是如何在线程池中选择不同的线程执行的,以及它们最终的运行结果又是如何整合的。