CompletableFuture异步多任务最佳实践

简介

CompletableFuture相比于Java8的并行流,对于处理并发的IO密集型任务有着得天独厚的优势:

  1. 在流式编程下,支持构建任务流时即可执行任务。
  2. CompletableFuture任务支持提交到自定义线程池,调优方便。

本文所有案例都会基于这样的一个需求,某网站有多个商家,用户会在不同的店铺查看同一件商品,只要用户在提供给商店对应的产品名称,商店就会返回对应产品的最终价格。
该需求有几个注意点:

  1. 每次到一家商店查询时,同一时间只能查询一个商品。
  2. 允许用户同一时间在系统里查询多个商店的一个商品。
  3. 查询的商品售价是需要耗时的,平均500ms-2500ms不等。
  4. 为了更直观了解代码性能,我们的例子用户每次会去家左右的店铺分别查询一件商品。

在这里插入图片描述

这里介绍一下,这些功能所要用到的类,首先是商家类,核心方法getPrice会根据用户传入的产品名称返回售价,代码执行时会休眠一段时间返回用户产品最终价格。

/**
 * 商店
 */
public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    /**
     * 传入产品名称返回对应价格和折扣代码
     * @param product 产品名称
     * @return
     */
    public String getPrice(String product) {
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return name + ":" + price + ":" + code;
    }


    

    /**
     * 获取商品价格
     * @param product
     * @return
     */
    public double calculatePrice(String product) {
        try {
            TimeUnit.MILLISECONDS.sleep(RandomUtil.randomInt(500,2500));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return (int)(random.nextDouble() * product.charAt(0) + product.charAt(1));
    }

    public String getName() {
        return name;
    }
}

同时我们也给出了18个商家的列表。

private final static List<Shop> shops = Arrays.asList(new Shop("Nike"),
            new Shop("Apple"),
            new Shop("Coca-Cola"),
            new Shop("Amazon"),
            new Shop("Samsung"),
            new Shop("McDonald's"),
            new Shop("Mercedes-Benz"),
            new Shop("Google"),
            new Shop("Louis Vuitton"),
            new Shop("Chanel"),
            new Shop("Gucci"),
            new Shop("Adidas"),
            new Shop("Pepsi"),
            new Shop("Ford"),
            new Shop("Microsoft"),
            new Shop("Rolex"),
            new Shop("Ferrari"),
            new Shop("IKEA")
    );

实现

顺序流

第一版我们先用顺序流实现改需求,代码很简单用stream遍历商家并调用getPrice获得结果,最终存到List中。

public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<String> resultList = shops.stream()
                .map(shop -> shop.getName() + " price is " + shop.getPrice("Air Jordan"))
                .collect(Collectors.toList());
        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + (end - begin)+"ms");
    }

最终输出结果如下,可以看到用了快30s。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:27409ms

并行流

对此我们采用并行流尝试以多线程的形式执行任务,所以我们将stream改为parallelStream。

public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<String> resultList = shops.parallelStream()
                .map(shop -> shop.getName() + " price is " + shop.getPrice("Air Jordan"))
                .collect(Collectors.toList());
        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + ((end - begin)/1000)+"s");
    }

可以看到耗时仅仅5s,这里补充一下笔者的机器信息,笔者的CPU是6核的,所以并行流执行时会有6个线程在同时工作。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:5077ms

使用CompletableFuture执行异步多查询任务

我们给出了CompletableFuture执行多IO查询任务的代码示例,可以看到代码的执行流程大致为:

  1. 遍历商家。
  2. 提交异步查询任务。
  3. 调用join(),注意这里的join和CompletableFuture的get方法作用是一样的,都是阻塞获取查询结果,唯一的区别就是join方法签名没有抛异常,所以无需try-catch处理。
 public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<String> resultList = shops.stream()
                        .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
                                + shop.getPrice("Air Jordan")))//CompletableFuture提交价格查询任务
                        .map(CompletableFuture::join) //用join阻塞获取结果
                        .collect(Collectors.toList());//组成列表
        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + (end - begin) + "ms");
    }

可以看到执行结果为31s,查询效率还不如顺序流。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:31097ms

原因很简单,我们本次的流操作执行会构成下面这样一张图,可以看到查询价格的操作是有耗时的,随后我们又调用了join方法使得流的后续步骤被阻塞,最终CompletableFuture用的和顺序流一样。

在这里插入图片描述

分解流优化使用CompletableFuture

上述我们提到过,之所以慢是因为join阻塞了流的操作,所以提升效率的方式就是不要让join阻塞流的操作。所以笔者将流拆成了两个。
如下图,第一个流负责提交任务,即遍历每一个商家并将查询价格的任务提交出去,期间不阻塞,最终会生成一个CompletableFuture的List。

紧接着我们遍历上一个流生成的List<CompletableFuture>,调用join方法阻塞获取结果,因为上一个流操作提交任务时不阻塞,所以每个任务一提交时就可能已经在执行了,所以join方法获取结果的耗时也会相对短一些。

在这里插入图片描述

所以我们的代码最后改造成了这样:

public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<CompletableFuture<String>> completableFutureList = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice("Air Jordan")))
                .collect(Collectors.toList());//CompletableFuture提交价格查询任务

        List<String> resultList = completableFutureList.stream()
                .map(CompletableFuture::join) //用join阻塞获取结果
                .collect(Collectors.toList());//组成列表

        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + (end - begin) + "ms");
    }

执行结果如下,可以看到代码耗时差不多也是5s和并行流差不多,原因很简单,线程池默认用6个,对于IO密集型任务来说显然是不够的。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:5633ms

CompletableFuture使用自定义线程池

《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的 情况,处理器的一些核可能就无法充分利用。BrianGoetz建议,线程池大小与处理器的利用率 之比可以使用下面的公式进行估算:
N(threads) = N(CPU)* U(CPU) * (1+ W/C)
其中: N(CPU)是处理器的核的数目,可以通过 Runtime.getRuntime().available Processors() 得到。U(CPU)是期望的 CPU利用率(该值应该介于 0和 1之间) W/C是等待时间与计算时间的比率。

我们的CPU核心数为6,我们希望的CPU利用率为1,而等待时间按照这种的计算应该是1250ms而计算时间可以忽略不计,所以W/C差不多可以换算为1000。
最终我们计算结果为:

N(threads) = N(CPU)* U(CPU) * (1+ W/C)
		   = 6*1*1000
		   =6000

很明显6000个线程非常不合理,所以我们使用了和商店数差不多的线程数,所以我们将线程设置为18(这也是个大概的数字,具体情况还需要经过压测进行增减)。

最终代码写成这样。

 private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20,
            20,
            1,
            TimeUnit.MINUTES,
            new ArrayBlockingQueue<>(100),
            new ThreadFactoryBuilder().setNamePrefix("threadPool-%d").build());


    public static void main(String[] args) {
        long begin = System.currentTimeMillis();
        List<CompletableFuture<String>> completableFutureList = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice("Air Jordan"), threadPool))
                .collect(Collectors.toList());//CompletableFuture提交价格查询任务

        List<String> resultList = completableFutureList.stream()
                .map(CompletableFuture::join) //用join阻塞获取结果
                .collect(Collectors.toList());//组成列表

        long end = System.currentTimeMillis();
        System.out.println("执行结束,执行结果:" + JSONUtil.toJsonStr(resultList) + " 执行耗时:" + (end - begin) + "ms");
        threadPool.shutdownNow();
    }

输出结果如下,可以看到查询效率有了质的飞跃。

执行结束,执行结果:["Nike price is Nike:112.0:GOLD","Apple price is Apple:135.0:DIAMOND","Coca-Cola price is Coca-Cola:111.0:DIAMOND","Amazon price is Amazon:155.0:GOLD","Samsung price is Samsung:165.0:SILVER","McDonald's price is McDonald's:164.0:PLATINUM","Mercedes-Benz price is Mercedes-Benz:111.0:DIAMOND","Google price is Google:121.0:NONE","Louis Vuitton price is Louis Vuitton:168.0:PLATINUM","Chanel price is Chanel:131.0:NONE","Gucci price is Gucci:132.0:SILVER","Adidas price is Adidas:133.0:PLATINUM","Pepsi price is Pepsi:158.0:SILVER","Ford price is Ford:113.0:PLATINUM","Microsoft price is Microsoft:156.0:GOLD","Rolex price is Rolex:126.0:GOLD","Ferrari price is Ferrari:163.0:DIAMOND","IKEA price is IKEA:150.0:NONE"] 执行耗时:1893ms

并行流一定比CompletableFuture烂吗?

如果是计算密集型的任务,使用stream是最佳姿势,因为密集型需要一直计算,加多少个线程都无济于事,使用stream简单使用了。
而对于io密集型的任务,例如上文这种大量查询都需要干等的任务,使用CompletableFuture是最佳姿势了,通过自定义线程创建比cpu核心数更多的线程来提高工作效率才是较好的解决方案

参考文献

Java 8实战:https://book.douban.com/subject/26772632/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/233419.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

向日葵远程控制鼠标异常的问题

​ 在通过向日葵进行远程控制的时候&#xff0c;可能会遇到鼠标位置异常的问题。此时&#xff0c;不管怎么移动鼠标&#xff0c;都会停留在屏幕最上方&#xff0c;而无法点击到正确的位置。如图&#xff1a; 此时&#xff0c;如果启用了“被控端鼠标”功能&#xff0c;可以正…

ChatGLM3-6B和langchain阿里云部署

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、ChatGLM3-6B部署搭建环境部署GLM3 二、Chatglm2-6blangchain部署三、Tips四、总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; …

回顾2023 亚马逊云科技 re_Invent,创新AI,一路同行

作为全球云计算龙头企业的亚马逊云科技于2023年11月27日至12月1日在美国拉斯维加斯举办了2023 亚马逊云科技 re:Invent&#xff0c;从2012年开始举办的亚马逊云科技 re:Invent 全球大会,到现如今2023 亚马逊云科技 re:Invent&#xff0c;回顾历届re:Invent大会&#xff0c;亚马…

imutils库介绍及安装学习

目录 本机环境 安装 函数及属性 列举imutils库信息 属性和函数介绍及使用 属性 常用函数 方法使用 图像平移 图像缩放 图像旋转 骨架提取 通道转换 OPenCV版本的检测 综合测试 介绍 imutils 是一个用于图像处理和计算机视觉任务的 Python 工具包。它提供了一系…

Python爬虫-实现批量抓取王者荣耀皮肤图片并保存到本地

前言 本文是该专栏的第12篇,后面会持续分享python爬虫案例干货,记得关注。 本文以王者荣耀的英雄皮肤为例,用python实现批量抓取“全部英雄”的皮肤图片,并将图片“批量保存”到本地。具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。注意,这里抓取的图片…

centos7中的计划任务

一次调度执行-----at 安装&#xff1a; [rootzaotounan ~]# yum -y install at ​ 启动&#xff1a; [rootzaotounan ~]# systemctl start atd ​ 开机自启动&#xff1a; [rootzaotounan ~]# systemctl enbale atd ​ 语法&#xff1a; at <时间规格> 时间规格参数&…

Linux下C++动态链接库的生成以及使用

目录 一.前言二.生成动态链接库三.使用动态链接库 一.前言 这篇文章简单讨论一下Linux下如何使用gcc/g生成和使用C动态链接库&#xff08;.so文件&#xff09;。 二.生成动态链接库 先看下目录结构 然后看下代码 //demo.h#ifndef DEMO_H #define DEMO_H#include<string&…

键盘打字盲打练习系列之矫正坐姿——4

一.欢迎来到我的酒馆 盲打&#xff0c;矫正坐姿&#xff01; 目录 一.欢迎来到我的酒馆二.继续练习二.矫正坐姿1.键盘与鼠标2.椅子 三.改善坐姿建议 二.继续练习 前面的章节&#xff0c;我们重点向大家介绍了主键盘区指法和键盘键位。经过一个系列的教程学习&#xff0c;相信大…

C语言数据结构-双向链表

文章目录 1 双向链表的结构2 双向链表的实现2.1 定义双向链表的数据结构2.2 打印链表2.3 初始化链表2.4 销毁链表2.5 尾插,头插2.6 尾删,头删2.7 根据头次出现数据找下标2.8 定点前插入2.9 删除pos位置2.10 定点后插入 3 完整代码3.1 List.h3.2 Lish.c3.3 test.c 1 双向链表的结…

redis中缓存雪崩,缓存穿透,缓存击穿等

缓存雪崩 由于原有缓存失效&#xff08;或者数据未加载到缓存中&#xff09;&#xff0c;新缓存未到期间&#xff08;缓存正常从Redis中获取&#xff0c;如下图&#xff09;所有原本应该访问缓存的请求都去查询数据库了&#xff0c;而对数据库CPU和内存造成巨大压力&#xff0c…

数据结构算法-希尔排序算法

引言 在一个普通的下午&#xff0c;小明和小森决定一起玩“谁是老板”的扑克牌游戏。这次他们玩的可不仅仅是娱乐&#xff0c;更是要用扑克牌来决定谁是真正的“大老板”。 然而&#xff0c;小明的牌就像刚从乱麻中取出来的那样&#xff0c;毫无头绪。小森的牌也像是被小丑掷…

C++ 学习系列 -- 实现简单的 String

1 标准库 std::string c 中的 std::string 是一个重要的字符串的类, 我们在日常工作中常常与之打交道。 string是C标准库的重要部分&#xff0c;主要用于字符串处理。使用string库需要在同文件中包括该库 #include<string> std::string 实际上是 std::basic_string<…

基于ssm应急资源管理系统论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本应急资源管理系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息…

jupyter notebook设置代码提示(代码补全)

1.当你打开jupyter notebook时&#xff0c;写代码的时候是默认没有代码提示的。 在base环境依次输入以下四行命令&#xff1a; pip install jupyter_contrib_nbextensions jupyter contrib nbextension install --user pip install jupyter_nbextensions_configurator jupyter …

系统架构设计师教程(二)计算机系统基础知识

系统架构设计师 2.1 计算机系统概述2.2 计算机硬件2.2.1 计算机硬件组成2.2.2 处理器2.2.3 存储器2.2.4 总线2.2.5 接口2.2.6 外部设备 2.3 计算机软件2.3.1 计算机软件概述2.3.2 操作系统2.3.3 数据库关系数据库关系数据库设计的特点及方法关系数据库设计的基本步骤 分布式数据…

python3使用pandas备份mysql数据表

操作系统 &#xff1a;CentOS 7.6_x64 Python版本&#xff1a;3.9.12 MySQL版本&#xff1a;5.7.38 日常开发过程中&#xff0c;会遇到mysql数据表的备份需求&#xff0c;需要针对单独的数据表进行备份并定时清理数据。 今天记录下python3如何使用pandas进行mysql数据表的备…

ubuntu20 安装docker

一.官网安装文档 &#xff08;基本按官方文档安装&#xff09; Install Docker Engine on Ubuntu | Docker Docs 二.安装步骤 1.docker 需要64位操作系统、linux内核要在3.1以上 #uname -r 2.卸载可能存在的旧版本 #sudo apt-get remove docker docker-engine docker-ce …

整数分析 C语言xdoj43

问题描述 给出一个整数n&#xff08;0<n<100000000&#xff09;。求出该整数的位数&#xff0c;以及组成该整数的所有数字中的最大数字和最小数字。 输入说明 输入一个整数n&#xff08;0<n<100000000&#xff09; 输出说明 在一行上依次输出整数n的位…

【无标题】安装环境

这里写目录标题 清华镜像加速 安装cuda11.3 PyTorch 1.10.1https://pytorch.org/get-started/previous-versions/[如果没有可以点Previous pyTorch Versions&#xff0c;这里面有更多的更早的版本](https://pytorch.org/get-started/locally/) 复制非空文件夹cp: -r not specif…

Linux下通过find找文件---通过修改时间查找(-mtime)

通过man手册查找和-mtime选项相关的内容 man find | grep -A 3 mtime # 这里简单介绍了 -mtime &#xff0c;还有一个简单的示例-mtime n Files data was last modified n*24 hours ago. See the comments for -atime to understand how rounding affects the interpretati…