CompletableFuture使用教学

CompletableFuture使用教学

一、开始一个线程异步执行不需要返回值

通过runAsync方式

//1.不加线程池方式
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //停顿几秒
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        System.out.println("hello world");
        System.out.println(completableFuture.get());//null 没有返回值的情况
//2.加线程池方式
		//创建固定线程池(阿里规范建议使用自定义线程池,不能通过Executors来进行创建)
        ExecutorService executors = Executors.newFixedThreadPool(4);//此处偷懒,用此线程池
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            //停顿几秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executors);
        System.out.println(runAsync.get());

二、通过异步方式执行,有返回值

supplyAsync

		//不加线程池方式
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "hello supplyAsync";
        });
        System.out.println(completableFuture.get());
		//加线程池的方式
ExecutorService executors = Executors.newFixedThreadPool(4);
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "hello supplyAsync+Executors";
        }, executors);
        System.out.println(completableFuture.get());

ps:get()方法可以获取异步线程执行完后的结果

三.通过whenComplete减少阻塞和轮询(可加线程池,也可不加)

即当异步线程执行结束会接着执行whenComplete()方法,如果执行期间报错会执行exceptionally()方法
ExecutorService threadPool = Executors.newFixedThreadPool(6);
        CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "--副线程");
            int result = new Random().nextInt(10);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("1秒后出结果");
            return result;
        },threadPool).whenComplete((v,e)->{//没有异常  v是值 e是异常情况
            if (e == null){
                System.out.println("计算完成,UpdateValue:"+v);
            }
        }).exceptionally((e)->{//e是异常情况

            e.printStackTrace();
            System.out.println("异常情况:"+e.getCause()+"\t"+e.getMessage());
            return null;
        });

四、实现通过货品在不同平台进行价格搜索进行数据汇总

普通方式实现

public class Case {
    static List<NetMall> list = Arrays.asList(new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"));
    public static List<String> getPrice(List<NetMall> list,String productName){
        return list.stream()
                .map(netMall -> String.format(productName+ " in %s price is %s"
                        ,netMall.getNetMallName()
                        ,netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        List<String> list1 = getPrice(list, "mysql");
        for (String s : list1) {
            System.out.println(s);
        }
        long end = System.currentTimeMillis();
        System.out.println("---当前操作时间--costTime:"+(end - start)+"ms");
    }

}
class NetMall{
    private String netMallName;

    public String getNetMallName() {
        return netMallName;
    }
    public NetMall(){}

    public NetMall(String netMallName){
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);//此处表示 业务执行所需耗时时间
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);//模拟价格
    }

}

普通方式实现耗时结果:
在这里插入图片描述

completableFuture实现

public class Case {
    static List<NetMall> list = Arrays.asList(new NetMall("jd"),
            new NetMall("dangdang"),
            new NetMall("taobao"));
    public static List<String> getPricesByCompletableFuture(List<NetMall> list,String productName){
        return list.stream().map(netMall -> CompletableFuture.supplyAsync(()->
            String.format(
                    productName+"in %s price is %.2f",
                    netMall.getNetMallName(),
                    netMall.calcPrice(productName)
            ))).collect(Collectors.toList())
                .stream()
                .map(s->s.join())
                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
		List<String> list1 = getPricesByCompletableFuture(Case.list, "mysql");
        for (String s : list1) {
            System.out.println(s);
        }
        long end = System.currentTimeMillis();
        System.out.println("---当前操作时间--costTime:"+(end - start)+"ms");
    }

}
class NetMall{
    private String netMallName;

    public String getNetMallName() {
        return netMallName;
    }
    public NetMall(){}

    public NetMall(String netMallName){
        this.netMallName = netMallName;
    }

    public double calcPrice(String productName){
        try {
            TimeUnit.SECONDS.sleep(1);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);//模拟价格
    }

}

结果耗时:
在这里插入图片描述
相比较能够发现,同时开启三个异步线程,时间仅仅为单个平台查询的时间,大大节省效率!

五、CompletableFuture常用API

1.获得结果和触发计算

public T get() 不见不散,容易阻塞
public T get(long timeout,TimeUnit unit) 过时不候,超过时间会爆异常
public T join() 类似于get(),区别在于是否需要抛出异常
public T getNow(T valueIfAbsent)
立即获取结果不阻塞
	计算完,返回计算完成后的结果
	没算完,返回设定的valueAbsent(直接返回了备胎值xxx)
主动触发计算
	public boolean complete(T value) 是否立即打断get()方法返回括号值(下面代码实现)
public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "abc";
        });
        TimeUnit.SECONDS.sleep(1);
        System.out.println(completableFuture.complete("end")+"\t"+completableFuture.get());
    }

执行结果:true end
解释:执行需要2秒,等待1秒;
complete(默认值)方法会打断执行,如果执行完,则返回结果,如果没有执行完则输出默认值;

2.对计算结果进行处理

(1)thenApply 计算结果存在在依赖关系,使得线程串行化。因为依赖关系,所以一旦有异常,直接叫停。
public static void main(String[] args) {
        CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("111");
            return 1024;
        }).thenApply(f->{
            System.out.println("222");
            return f + 1;
        }).thenApply(f->{
            System.out.println("333");
            return f + 1;
//            return f/0; 会报出异常
        }).whenCompleteAsync((v,e)->{
            System.out.println("****v="+v);
        }).exceptionally(e->{
            e.printStackTrace();
            return null;
        });
        System.out.println("----主线程结束--end");
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
    }
(2)handle 类似于thenApply,但是有异常的话仍然可以往下走一步。
CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 1024;
        }).handle((f,e) -> {
            int age = 10/0;//异常语句
            System.out.println("222");
            return f + 1;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
        System.out.println("----主线程结束--end");
        //在222方法中报错 会继续执行输出333
(3).对计算结果进行消费
接收任务的处理结果,并消费处理,无返回结果|消费型函数式接口,之前的是Function
CompletableFuture.supplyAsync(()->{
            return 1;
        }).thenApply(f -> {
            return f+1;
        }).thenApply(f ->{
            return f+3;
        }).thenApply(f->{
            return f+4;
        }).thenAccept(r->{
            System.out.println("r==\t"+r);
        });

补充:Code之任务之间的顺序执行

thenRun

  • thenRun(Runnable runnable)
  • 任务A执行完执行B,并且B不需要A的结果

thenAccept

  • thenAccept(Consumer action)
  • 任务A执行完执行B,B需要A的结果,但是任务B无返回值

thenApply

  • thenApply(Function fn)

  • 任务A执行完执行B,B需要A的结果,同时任务B有返回值

    (4).CompleteFuture和线程池说明(非常重要)
    

上面的几个方法都有普通版本和后面加Async的版本
以thenRun和thenRunAsync为例,有什么区别?先看结论
1.没有传入自定义线程池,都用默认线程池ForkJoinPool
2.传入了一个自定义线程池如果你执行第一个任务的时候,传入了一个自定义线程池

  • 调用thenRun方法执行第二个任务的时候,则第二个任务和第一个任务是用同一个线程池
  • 调用thenRunAsync执行第二个任务的时候,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
    3.也有可能处理太快,系统优化切换原则,直接使用main线程处理(把sleep去掉)

(5).对计算速度进行选用

//applyToEither 线程先执行完的输出,输出其中一个
CompletableFuture<String> play1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return "play1 ";
        });
        CompletableFuture<String> play2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            return "play2 ";
        });
        CompletableFuture<String> thenCombineResult  = play1.applyToEither(play2, f -> {
            return f + "is winner";
        });
        System.out.println(Thread.currentThread().getName()+"\t"+thenCombineResult.get());

(6).对计算结果进行合并‘

thenCombine 合并

  • 两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCOmbine来处理
  • 先完成的先等着,等待其它分支任务
        CompletableFuture<Integer> completeFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in");
            return 10;
        });
        CompletableFuture<Integer> completeFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in");
            return 20;
        });
        CompletableFuture<Integer> completableFuture = completeFuture1.thenCombine(completeFuture2,
                (x, y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in");
            return x + y;
        });
        System.out.println(completableFuture.get()); //输出 30

合并版本

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in2");
            return 20;
        }), (x, y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in4");
            return 30;
        }), (x, y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in5");
            return x + y;
        });
        System.out.println(completableFuture.get());
        //输出结果
        //ForkJoinPool.commonPool-worker-9	---come in1
		//ForkJoinPool.commonPool-worker-9	---come in2
		//main	---come in3
		//ForkJoinPool.commonPool-worker-2	---come in4
		//main	---come in5
		//60

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/19214.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

对象应用:C++字符串和vector,对象的new与delete重构

对象应用 C字符串和vector字符串创建方式字符串拼接字符串追加 字符串截断autovector创建方式vector操作 new与delete重构new与delete的工作步骤new与delete重构应用只能生成栈对象只能生成堆对象 C字符串和vector C的字符串是一个对象&#xff0c;存在于std标准库中&#xff0…

[echarts] legend icon 自定义的几种方式

echarts 官方配置项 地址 一、默认 图例项的 icon circle, rect, roundRect, triangle, diamond, pin, arrow, none legend: {top: 5%,left: center,itemWidth: 20,itemHeight: 20,data: [{icon: circle, name: 搜索引擎},{icon: rect, name: 直接访问},{icon: roundRect, n…

什么是FPGA?关于FPGA基础知识 一起来了解FPGA lattice 深力科 MachXO3系列 LCMXO3LF-9400C-5BG256C

什么是FPGA&#xff1f;关于FPGA基础知识 一起来了解FPGA lattice 深力科 MachXO3系列 LCMXO3LF-9400C-5BG256C FPGA基础知识&#xff1a;FPGA是英文Field&#xff0d;Programmable Gate Array的缩写&#xff0c;即现场可编程门阵列&#xff0c;它是在PAL、GAL、CPLD等可编程器…

二叉排序树查找成功和不成功的平均查找长度

理解二叉树的特性: 1)结点:包含一个数据元素及若干指向子树分支的信息。 2)结点的度:一个结点拥有子树的数据成为结点的度。 3)叶子结点:也称为终端结点,没有子树的结点或者度为零的结点。 4)分支结点:也称为非终端结点,度不为零的结点成为非终端结点。 5)结点…

【一起撸个DL框架】5 实现:自适应线性单元

CSDN个人主页&#xff1a;清风莫追欢迎关注本专栏&#xff1a;《一起撸个DL框架》GitHub获取源码&#xff1a;https://github.com/flying-forever/OurDL 文章目录 5 实现&#xff1a;自适应线性单元&#x1f347;1 简介2 损失函数2.1 梯度下降法2.2 补充 3 整理项目结构4 损失函…

DAD-DAS模型

DAD-DAS模型 文章目录 DAD-DAS模型[toc]1 产品服务:需求方程2 实际利率:费雪方程3 通货膨胀:菲利普斯方程4 预期通货膨胀&#xff1a;适应性预期5 货币政策规则&#xff1a;泰勒方程6 动态总供给-总需求方程&#xff08;DAS-DAD&#xff09;7 总供给冲击模拟 1 产品服务:需求方…

Elasticsearch:NLP 和 Elastic:入门

自然语言处理 (Natural Language Processing - NLP) 是人工智能 (AI) 的一个分支&#xff0c;专注于尽可能接近人类解释的理解人类语言&#xff0c;将计算语言学与统计、机器学习和深度学习模型相结合。 AI - Artificial Inteligence 人工智能ML - Machine Learning 机器学习DL…

永远不该忘记!科技才是硬道理,手中没有剑,跟有剑不用,是两回事

今天是全国防灾减灾日&#xff0c;距离2008年汶川大地震也已经过去15年了。但时至今日&#xff0c;看到那些图像视频资料&#xff0c;那种触及灵魂的疼痛仍是存在的&#xff0c;2008年的大地震在每个中国人身上都留下了无法抚平的伤疤。 2008年是所有中国人都无法忘记的一年&am…

Ims跟2/3G会议电话(Conference call)流程差异介绍

2/3G Conference call 合并(Merged)通话前,两路电话只能一路保持(Hold),一路通话(Active)。 主叫Merged操作,Hold的一路会变成Active,进入会议通话。 例如终端A跟C通话,再跟B通话,此时B就是Active状态,C从Active变成Hold状态。Merged进入会议通话后,C又从Hold变…

docker安装elasticsearch

前言 安装es么&#xff0c;也没什么难的&#xff0c;主要网上搜一搜&#xff0c;看看文档&#xff0c;但是走过的坑还是需要记录一下的 主要参考这三份文档&#xff1a; Running the Elastic Stack on Docker docker简易搭建ElasticSearch集群 Running Kibana on Docker …

Python-exe调用-控制台命令行执行-PyCharm刷新文件夹

文章目录 1.控制台命令行执行1.1.subprocess.Popen1.2.os.system()1.3.subprocess.getstatusoutput()1.4.os.popen() 2.PyCharm刷新文件夹3.作者答疑 1.控制台命令行执行 主要四种方式实现。 1.1.subprocess.Popen import os import subprocess cmd "project1.exe&qu…

只下载rpm包而不安装(用于内网虚拟机使用)

这里写目录标题 问题&#xff1a;解决&#xff1a;1. 安装yum-utils2. 下载rpm包3. 将rpm包拷贝到离线的虚拟机并安装 遇到的问题&#xff1a;1. error while loading shared libraries: libXXX.so.X: cannot open shared object file: No such file2. wrong ELF class: ELFCLA…

C++学习day--10 条件判断、分支

1、if语句 if 语句的三种形态 形态1&#xff1a;如果。。。那么。。。 #include <iostream> using namespace std; int main( void ) { int salary; cout << " 你月薪多少 ?" ; cin >> salary; if (salary < 20000) { cout <&…

【博客系统】页面设计(附完整源码)

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点! 欢迎志同道合的朋友一起加油喔&#x1f93a;&#x1f93a;&#x1f93a; 目录 一、页面介绍 二、预期效果 1、博客列表页效…

大项目准备(2)

目录 中国十大最具发展潜力城市 docker是什么&#xff1f;能介绍一下吗&#xff1f; 中国十大最具发展潜力城市 按照人随产业走、产业决定城市兴衰、规模经济和交通成本等区位因素决定产业布局的基本逻辑&#xff0c;我们在《中国城市发展潜力排名&#xff1a;2022》研究报告…

websocket

&#x1f449;websocket_菜鸟教程*…*的博客-CSDN博客 目录 1、什么是Socket&#xff1f;什么是WebSocket&#xff1f; 2、WebSocket的通信原理和机制 3、WebSocket技术出现之前&#xff0c;Web端实现即时通讯的方法有哪些&#xff1f; 4、一个简单的WebSocket聊天小例子 …

prometheus监控数据持久化

前置条件 1.规划两台主机安装prometheus # kubectl get nodes --show-labels | grep prometheus nm-foot-gxc-proms01 Ready worker 62d v1.23.6 beta.kubernetes.io/archamd64,beta.kubernetes.io/oslinux,kubernetes.io/archamd64,kubernetes.io…

5款办公必备的好软件,你值得拥有

随着网络信息技术的发展&#xff0c;越来越多的人在办公时需要用到电脑了。如果你想提高办公效率&#xff0c;那么就少不了工具的帮忙&#xff0c;今天给大家分享5款办公必备的好软件。 1.文件管理工具——TagSpaces TagSpaces 是一款开源的文件管理工具,它可以通过标签来组织…

Linux一学就会——系统文件I/O

Linux一学就会——系统文件I/O 有几种输出信息到显示器的方式 #include <stdio.h> #include <string.h> int main() {const char *msg "hello fwrite\n";fwrite(msg, strlen(msg), 1, stdout);printf("hello printf\n");fprintf(stdout, &q…

体验洞察 | 原来它才是最受欢迎的CX指标?

一直以来&#xff0c;企业都在试图追踪他们能否在整个客户旅程中始终如一地提供卓越的客户体验&#xff08;Customer Experience&#xff0c;简称“CX”&#xff09;&#xff0c;并通过多个CX指标&#xff0c;如NPS&#xff08;净推荐值&#xff09;、CSAT&#xff08;客户满意…