Java8 CompletableFuture异步编程-进阶篇

🏷️个人主页:牵着猫散步的鼠鼠 

🏷️系列专栏:Java全栈-专栏

🏷️个人学习笔记,若有缺误,欢迎评论区指正 

前言

我们在前面文章讲解了CompletableFuture这个异步编程类的基本用法,这节我们继续学习CompletableFuture相关的进阶知识,上文入口:Java8 CompletableFuture异步编程-入门篇-CSDN博客

1、异步任务的交互

异步任务交互指 将异步任务获取结果的速度相比较,按一定的规则( 先到先用 )进行下一步处理。

1.1 applyToEither

applyToEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步的操作。

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)

演示案例:使用最先完成的异步任务的结果

public class ApplyToEitherDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 开启异步任务1
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int x = new Random().nextInt(3);
            CommonUtils.sleepSecond(x);
            CommonUtils.printThreadLog("任务1耗时:" + x + "秒");
            return x;
        });
​
        // 开启异步任务2
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int y = new Random().nextInt(3);
            CommonUtils.sleepSecond(y);
            CommonUtils.printThreadLog("任务2耗时:" + y + "秒");
            return y;
        });
​
        // 哪些异步任务的结果先到达,就使用哪个异步任务的结果
        CompletableFuture<Integer> future = future1.applyToEither(future2, (result -> {
            CommonUtils.printThreadLog("最先到达的结果:" + result);
            return result;
        }));
​
        // 主线程休眠4秒,等待所有异步任务完成
        CommonUtils.sleepSecond(4);
        Integer ret = future.get();
        CommonUtils.printThreadLog("ret = " + ret);
    }
}
​

速记心法:任务1、任务2就像两辆公交,哪路公交先到,就乘坐(使用)哪路公交。

以下是applyToEither 和其对应的异步回调版本

CompletableFuture<R> applyToEither(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func)
CompletableFuture<R> applyToEitherAsync(CompletableFuture<T> other, Function<T,R> func,Executor executor)

1.2 acceptEither

acceptEither() 把两个异步任务做比较,异步任务先到结果的,就对先到的结果进行下一步操作 ( 消费使用 )。

CompletableFuture<Void> acceptEither(CompletableFuture<T> other, Consumer<T> action)
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action)  
CompletableFuture<Void> acceptEitherAsync(CompletableFuture<T> other, Consumer<T> action,Executor executor)

演示案例:使用最先完成的异步任务的结果

public class AcceptEitherDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 异步任务交互
        CommonUtils.printThreadLog("main start");
        // 开启异步任务1
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            int x = new Random().nextInt(3);
            CommonUtils.sleepSecond(x);
            CommonUtils.printThreadLog("任务1耗时:" + x + "秒");
            return x;
        });
​
        // 开启异步任务2
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int y = new Random().nextInt(3);
            CommonUtils.sleepSecond(y);
            CommonUtils.printThreadLog("任务2耗时:" + y + "秒");
            return y;
        });
​
        // 哪些异步任务的结果先到达,就使用哪个异步任务的结果
        future1.acceptEither(future2,result -> {
            CommonUtils.printThreadLog("最先到达的结果:" + result);
        });
​
        // 主线程休眠4秒,等待所有异步任务完成
        CommonUtils.sleepSecond(4);
        CommonUtils.printThreadLog("main end");
    }
}

1.3 runAfterEither

如果不关心最先到达的结果,只想在有一个异步任务先完成时得到完成的通知,可以使用 runAfterEither() ,以下是它的相关方法:

CompletableFuture<Void> runAfterEither(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<T> other, Runnable action, Executor executor)

提示

异步任务交互的三个方法和之前学习的异步的回调方法 thenApply、thenAccept、thenRun 有异曲同工之妙。

2、get() 和 join() 区别

get() 和 join() 都是CompletableFuture提供的以阻塞方式获取结果的方法。

那么该如何选用呢?请看如下案例:

public class GetOrJoinDemo {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "hello";
        });
​
        String ret = null;
        // 抛出检查时异常,必须处理
        try {
            ret = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("ret = " + ret);
​
        // 抛出运行时异常,可以不处理
        ret = future.join();
        System.out.println("ret = " + ret);
    }
}

使用时,我们发现,get() 抛出检查时异常 ,需要程序必须处理;而join() 方法抛出运行时异常,程序可以不处理。所以,join() 更适合用在流式编程中。

3、ParallelStream VS CompletableFuture

CompletableFuture 虽然提高了任务并行处理的能力,如果它和 Stream API 结合使用,能否进一步多个任务的并行处理能力呢?

同时,对于 Stream API 本身就提供了并行流ParallelStream,它们有什么不同呢?

我们将通过一个耗时的任务来体现它们的不同,更重要地是,我们能进一步加强 CompletableFuture 和 Stream API 的结合使用,同时搞清楚CompletableFuture 在流式操作的优势

需求:创建10个MyTask耗时的任务,统计它们执行完的总耗时

定义一个MyTask类,来模拟耗时的长任务

public class MyTask {
    private int duration;
​
    public MyTask(int duration) {
        this.duration = duration;
    }
​
    // 模拟耗时的长任务
    public int doWork() {
        CommonUtils.printThreadLog("doWork");
        CommonUtils.sleepSecond(duration);
        return duration;
    }
}

同时,我们创建10个任务,每个持续1秒。

IntStream intStream = IntStream.range(0, 10);
List<MyTask> tasks = intStream.mapToObj(item -> {
    return new MyTask(1);
}).collect(Collectors.toList());

3.1 并行流的局限

我们先使用串行执行,让所有的任务都在主线程 main 中执行。

public class SequenceDemo {
    public static void main(String[] args) {
        // 方案一:在主线程中使用串行执行
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入list集合便于启动Stream操作
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());

        // step 2: 执行tasks集合中的每个任务,统计总耗时
        long start = System.currentTimeMillis();
        List<Integer> result = tasks.stream().map(myTask -> {
            return myTask.doWork();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();
        double costTime = (end - start) / 1000.0;

        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    }
}

它花费了10秒, 因为每个任务在主线程一个接一个的执行。

因为涉及 Stream API,而且存在耗时的长任务,所以,我们可以使用 parallelStream()

public class ParallelDemo {
    public static void main(String[] args) {
        // 方案二:使用并行流
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());

        // step 2: 执行10个MyTask,统计总耗时
        long start = System.currentTimeMillis();
        List<Integer> results = tasks.parallelStream().map(myTask -> {
            return myTask.doWork();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();

        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks %.2f second",tasks.size(),costTime);
    }
}

它花费了2秒多,因为此次并行执行使用了8个线程 (7个是ForkJoinPool线程池中的, 一个是 main 线程),需要注意是:运行结果由自己电脑CPU的核数决定。

3.2 CompletableFuture 在流式操作的优势

让我们看看使用CompletableFuture是否执行的更有效率

public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时
        // 方案三:使用CompletableFuture
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());

        // step 2: 根据MyTask对象构建10个耗时的异步任务
        long start = System.currentTimeMillis();
        List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
            return CompletableFuture.supplyAsync(() -> {
                return myTask.doWork();
            });
        }).collect(Collectors.toList());

        // step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中
        List<Integer> results = futures.stream().map(future -> {
            return future.join();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();

        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
    }
}

运行发现,两者使用的时间大致一样。能否进一步优化呢?

CompletableFutures 比 ParallelStream 优点之一是你可以指定Executor去处理任务。你能选择更合适数量的线程。我们可以选择大于Runtime.getRuntime().availableProcessors() 数量的线程,如下所示:

public class CompletableFutureDemo2 {
    public static void main(String[] args) {
        // 需求:创建10MyTask耗时的任务,统计它们执行完的总耗时
        // 方案三:使用CompletableFuture
        // step 1: 创建10个MyTask对象,每个任务持续1s,存入List集合
        IntStream intStream = IntStream.range(0, 10);
        List<MyTask> tasks = intStream.mapToObj(item -> {
            return new MyTask(1);
        }).collect(Collectors.toList());

        // 准备线程池
        final int N_CPU = Runtime.getRuntime().availableProcessors();
        // 设置线程池的数量最少是10个,最大是16个
        ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), N_CPU * 2));

        // step 2: 根据MyTask对象构建10个耗时的异步任务
        long start = System.currentTimeMillis();
        List<CompletableFuture<Integer>> futures = tasks.stream().map(myTask -> {
            return CompletableFuture.supplyAsync(() -> {
                return myTask.doWork();
            },executor);
        }).collect(Collectors.toList());

        // step 3: 当所有任务完成时,获取每个异步任务的执行结果,存入List集合中
        List<Integer> results = futures.stream().map(future -> {
            return future.join();
        }).collect(Collectors.toList());
        long end = System.currentTimeMillis();

        double costTime = (end - start) / 1000.0;
        System.out.printf("processed %d tasks cost %.2f second",tasks.size(),costTime);
        // 关闭线程池
        executor.shutdown();
    }
}

测试代码时,电脑配置是4核8线程,而我们创建的线程池中线程数最少也是10个,所以,每个线程负责一个任务( 耗时1s ),总体来说,处理10个任务总共需要约1秒。

3.3 合理配置线程池中的线程数

正如我们看到的,CompletableFuture 可以更好地控制线程池中线程的数量,而 ParallelStream 不能

问题1:如何选用 CompletableFuture 和 ParallelStream ?

如果你的任务是IO密集型的,你应该使用CompletableFuture;

如果你的任务是CPU密集型的,使用比处理器更多的线程是没有意义的,所以选择ParallelStream ,因为它不需要创建线程池,更容易使用。

问题2:IO密集型任务和CPU密集型任务的区别?

CPU密集型也叫计算密集型,此时,系统运行时大部分的状况是CPU占用率近乎100%,I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU 使用率很高。比如说要计算1+2+3+…+ 10万亿、天文计算、圆周率后几十位等, 都是属于CPU密集型程序。

CPU密集型任务的特点:大量计算,CPU占用率一般都很高,I/O时间很短

IO密集型指大部分的状况是CPU在等I/O (硬盘/内存) 的读写操作,但CPU的使用率不高。

简单的说,就是需要大量的输入输出,例如读写文件、传输文件、网络请求。

IO密集型任务的特点:大量网络请求,文件操作,CPU运算少,很多时候CPU在等待资源才能进一步操作。

问题3:既然要控制线程池中线程的数量,多少合适呢?

如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 Ncpu+1

如果是IO密集型任务,参考值可以设置为 2 * Ncpu,其中Ncpu 表示 核心数。

总结

通过这两篇文章的讲解,我们基本学习了CompletableFuture这个异步编程类的基础用法和相关进阶玩法,不过总体上还是偏理论,我后续可以可能会开一篇新的专栏,专门讲解和分享Java高并发相关的代码片段,都是比较实用,请多多支持吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/443504.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

猫头虎分享已解决Bug || 系统监控故障:MonitoringServiceDown, MetricsCollectionError

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

【敬伟ps教程】文字处理工具

文章目录 文字工具使用方式文字图层文字工具选项字符面板段落面板文字工具使用方式 文字工具(快捷键T),包含横排和直排两种类型 创建文本两种类型:点式文本、段落文本 创建文字方式 1、在画面上单击,出现文字光标,可输入文字,然后需要在工具栏中点击“√”或者 Ctrl+…

存算一体成为突破算力瓶颈的关键技术?

大模型的训练和推理需要高性能的算力支持。以ChatGPT为例&#xff0c;据估算&#xff0c;在训练方面&#xff0c;1746亿参数的GPT-3模型大约需要375-625台8卡DGX A100服务器训练10天左右&#xff0c;对应A100 GPU数量约3000-5000张。 在推理方面&#xff0c;如果以A100 GPU单卡…

UnityShader——09数学知识3

方阵 行与列数量相等的矩阵,n*n阶矩阵 对角矩阵 当对角线以外的矩阵内元素全为0&#xff0c;则称之为对角矩阵&#xff0c;对角矩阵的前提是必须是方阵 单位矩阵 对角线元素全为1&#xff0c;其余元素全为0&#xff0c;属于对角矩阵的一部分 矩阵和向量 把1 * n阶矩阵称…

JavaWeb - 2 - HTML、CSS

什么是HTML、CSS&#xff1f; HTML&#xff08;HyperText Markup Language&#xff09;&#xff1a;超文本标记语言 超文本&#xff1a;超越了文本的限制&#xff0c;比普通文本更强大&#xff0c;除了文字信息&#xff0c;还可以定义图片、音频、视频等内容 标记语言&…

ESP8266程序烧录方法(以ESPFlashDownloadTool为例)

0 工具准备 ESP8266必须包含的目标bin ESPFlashDownloadTool_v3.6.3.exe NodeMCU&#xff08;ESP8266&#xff09; sscom5 1 ESP8266程序烧录方法&#xff08;以ESPFlashDownloadTool为例&#xff09; 1.1 生成ESP8266所需的bin文件 可以参考前面所写的《安信可IDE&#xff0…

被唤醒的“第二十条”深入人心

近来张艺谋执导的电影《第二十条》&#xff0c;因为它与正在召开中的全国两会所发布的《最高人民法院工作报告》联系相当紧密&#xff0c;加之可免费收看&#xff0c;网民便相互转告&#xff0c;于是此信息条目立即冲上了网络热搜榜&#xff0c;观者如潮。因为最高人民法院工作…

STM32 HAL库RTC复位丢失年月日的解决办法

STM32 HAL库RTC复位丢失年月日的解决办法 0.前言一、实现方式1.CubeMX配置&#xff1a;2.MX_RTC_Init()函数修改2.编写手动解析函数 二、总结 参考文章&#xff1a;stm32f1 cubeMX RTC 掉电后日期丢失的问题 0.前言 最近在使用STM32F103做RTC实验时&#xff0c;发现RTC复位后时…

LeetCode-Hot100

哈希 1.两数之和&#xff1a; 给定一个整数数组nums和一个整数目标值target&#xff0c;请你再该数组中找出和为目标值target的那两个整数&#xff0c;并返回它们的数组下标。 思路&#xff1a;暴力解法是使用两层循环来遍历每一个数&#xff0c;然后找出两数之和等于target的…

2024/3/9d打卡整数划分---背包动态规划方式,计数类动态规划

目录 题目 DP分析 第一种方法&#xff0c;背包DP 代码 第二种方法&#xff08;有点难想到&#xff09; 代码 题目 一个正整数 n 可以表示成若干个正整数之和&#xff0c;形如&#xff1a;nn1n2…nk&#xff0c;其中 n1≥n2≥…≥nk,k≥1。 我们将这样的一种表示称为正整数 …

maven项目引入私有jar,并打包到java.jar中

私有jar存放位置 maven依赖 <dependency><groupId>com.hikvision.ga</groupId><artifactId>artemis-http-client</artifactId><version>1.1.10</version><scope>system</scope><systemPath>${project.basedir}/s…

FPGA高端项目:FPGA基于GS2971的SDI视频接收+HLS图像缩放+多路视频拼接,提供4套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐本博已有的 SDI 编解码方案本方案的SDI接收转HDMI输出应用本方案的SDI接收图像缩放应用本方案的SDI接收纯verilog图像缩放纯verilog多路视频拼接应用本方案的SDI接收OSD多路视频融合叠加应用本方案的SDI接收HLS多路视频融合叠加应用本方案…

基于YOLOv8深度学习的葡萄病害智能诊断与防治系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:LoadingProgress)

用于显示加载动效的组件。 说明&#xff1a; 该组件从API Version 8开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件 无 接口 LoadingProgress() 创建加载进展组件。 从API version 9开始&#xff0c;该接口支持在ArkTS卡片中使…

Angular基础---HelloWorld---Day2

文章目录 1.循环语句&#xff1a; *ngfor2.循环语句&#xff1a;ngSwitch4.事件的绑定:click5.事件的绑定:input6.模版引用变量7.数据双向绑定ngModel8.动态表单控件9.动态表单空间组 文末附有代码仓库地址&#xff01;&#xff01;&#xff01; 1.循环语句&#xff1a; *ngfor…

大语言模型在科技研发与创新中的角色在快速变化

在技术研发与创新中&#xff0c;比如在软件开发、编程工具、科技论文撰写等方面&#xff0c;大语言模型可以辅助工程师和技术专家进行快速的知识检索、代码生成、技术文档编写等工作。在当今的软件工程和研发领域&#xff0c;尤其是随着大语言模型技术的快速发展&#xff0c;它…

保姆级讲解字符串函数(上篇)

目录 字符分类函数 导图 函数介绍 1.getchar 2. isupper 和 islower 字符转换函数&#xff1a;&#xff08;toupper , tolower&#xff09; 与 putchar 字符串函数 导图 string函数的使用和模拟实现 string的使用 求字符串长度 字符串的比较 string函数的模拟实现…

300分钟吃透分布式缓存-23讲:Redis是如何淘汰key的?

淘汰原理 首先我们来学习 Redis 的淘汰原理。 系统线上运行中&#xff0c;内存总是昂贵且有限的&#xff0c;在数据总量远大于 Redis 可用的内存总量时&#xff0c;为了最大限度的提升访问性能&#xff0c;Redis 中只能存放最新最热的有效数据。 当 key 过期后&#xff0c;或…

一个足球粉丝该怎么建个个人博客?

做一个个人博客第一步该怎么做&#xff1f; 好多零基础的同学们不知道怎么迈出第一步。 那么&#xff0c;就找一个现成的模板学一学呗&#xff0c;毕竟我们是高贵的Ctrl c v 工程师。 但是这样也有个问题&#xff0c;那就是&#xff0c;那些模板都&#xff0c;太&#xff01;…

oracle 获取两个时间相差天数,以及指定一个日期相差天数后的日期

1、获取两个时间相差天数 -- 两个日期相差天数 select (trunc(TO_DATE( 2024-02-28, YYYY-MM-DD ) -TO_DATE( 2024-02-25, YYYY-MM-DD ) )1) from dual2、获取日期减去指定天数后的时间 -- 两个日期相差天数的日期 select (TRUNC(TO_DATE( 2024-02-25, YYYY-MM-DD )- (trunc…