CompletableFuture编排异步线程

CompletableFuture 是 Java 8 引入的一种新的 Future,设计目的是为了编写非阻塞的异步代码。

传统异步编程方式

传统异步编程方式获得异步任务值,首先我们得通过future task ,然后创建一个实现callable内部类,或者通过lambda的表达式,然后再结合thread,或者线程池的方式去执行它,具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "futureTask 执行完成";
            }
        });

        new Thread(futureTask).start();
        //get()方法作用:以阻塞的方式获取任务执行结果
        System.out.println("new Thread的方式获取结果:" + futureTask.get());

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.submit(futureTask);
        //get()方法作用:以阻塞的方式获取任务执行结果
        System.out.println("线程池的方式获取结果:" + futureTask.get());
        executorService.shutdown(); // 关闭线程池

        System.out.println("TODO...");
    }

}

运行结果:
在这里插入图片描述
可以看出整个实现过程比较麻烦,想要获得返回值会调用它的get()方法,会阻塞后面的代码,如果后面的代码并不依赖future task的返回值的话,其实我们更希望以并行的方式去执行,性能肯定是更高的,那么我们结合CompletableFuture来进行改造。

CompletableFuture实现异步编程方式

1.异步执行

supplyAsync

supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法。

// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
 
// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

具体代码:

import java.util.concurrent.*;

public class CompletableFutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            return "默认线程池执行有返回值的任务";
        });
        System.out.println(completableFuture.get());//get()方法抛出ExecutionException, InterruptedException检查时异常,程序必须做处理

        // 自定义线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<String> completableFutureWithThreadExecutor = CompletableFuture.supplyAsync(() -> {
            return "自定义线程池执行有返回值的任务";
        },executorService);
        System.out.println(completableFutureWithThreadExecutor.join());//join()方法只抛出运行时异常,程序可不做处理
        
    }

}

运行结果:

在这里插入图片描述

runAsync

runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法,具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            System.out.println("默认线程池执行没有返回值的任务");
        });
        System.out.println("result:" + completableFuture.get());

        // 自定义线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> completableFutureWithThreadExecutor = CompletableFuture.runAsync(() -> {
            System.out.println("自定义线程池执行没有返回值的任务");

        },executorService);
        System.out.println("result:" + completableFutureWithThreadExecutor.get());

    }

}

运行结果:
在这里插入图片描述

2.获取任务结果的方法

// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException 
 
// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
 
// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()
 
// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)
 
// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)
 
// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex) 
 

3.多任务组合处理

allOf / anyOf

allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null,具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf1 任务完成");
            return "cf1 任务完成";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                int a = 1/0;
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf2 任务完成");
            return "cf2 任务完成";
        });
        CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2);
        System.out.println("cfAll结果->" + cfAll.get());
    }

}

运行结果:
在这里插入图片描述

anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果,具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf1 do something....");
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf1 任务完成");
            return "cf1 任务完成";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(Thread.currentThread() + " cf2 do something....");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cf2 任务完成");
            return "cf2 任务完成";
        });
        CompletableFuture<Object> cfAll = CompletableFuture.anyOf(cf1, cf2);
        System.out.println("cfAll结果->" + cfAll.get());
    }

}

运行结果:
在这里插入图片描述

4.异步回调处理

thenRun和thenRunAsync

thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值,具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });

        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

}

运行结果:
在这里插入图片描述

thenRunAsync具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });

        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

}

运行结果:
在这里插入图片描述
从上面代码和测试结果我们发现thenRun和thenRunAsync区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

thenAccept和thenAcceptAsync

thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值,具体代码如下。

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....,入参:" + result);
        });

        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

}

运行结果:
在这里插入图片描述
测试结果我们发现thenAccep和thenAccepAsync区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

thenApply和thenApplyAsync

thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值,具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            return 1;
        });

        CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            result += 2;
            return result;
        });
        //等待任务1执行完成
        System.out.println("cf1结果->" + cf1.get());
        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

}

运行结果:
在这里插入图片描述
从上面代码和测试结果我们发现thenApply和thenApplyAsync区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

whenComplete和whenCompleteAsync

whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常,具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            int a = 1/0;
            return 1;
        });

        CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {
            System.out.println("上个任务结果:" + result);
            System.out.println("上个任务抛出异常:" + e);
            System.out.println(Thread.currentThread() + " cf2 do something....");
        });

//        //等待任务1执行完成
//        System.out.println("cf1结果->" + cf1.get());
//        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

}

运行结果:
在这里插入图片描述

handle和handleAsync

跟whenComplete基本一致,区别在于handle的回调方法有返回值,具体代码如下。

import java.util.concurrent.*;

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
            //int a = 1/0;
            return 1;
        });

        CompletableFuture<Integer> cf2 = cf1.handle((result, e) -> {
            System.out.println(Thread.currentThread() + " cf2 do something....");
            System.out.println("上个任务结果:" + result);
            System.out.println("上个任务抛出异常:" + e);
            return result+2;
        });

        //等待任务2执行完成
        System.out.println("cf2结果->" + cf2.get());
    }

}

运行结果:
在这里插入图片描述

总结

以上例子展示了 CompletableFuture 的基本使用方法,包括创建异步任务、结果处理、异常处理和结果组合。在实际开发中,你可以根据需要组合使用这些方法来实现复杂的异步逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/563616.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Stable Diffusion教程:提示词(模型、插件、安装包可分享)

什么是提示词 文章提到的模型、插件、安装包都可分享&#xff0c;需要的小伙伴文末领取&#xff01; 你可能没写过提示词&#xff0c;但是一定听说过“提示词”这几个字&#xff0c;也大概能知道它的重要性。 没听说过也没关系&#xff0c;下面我就带你认识认识。 提示词就…

ARM_day6:实现字符串数据收发函数的封装

程序代码&#xff1a; uart4.h&#xff1a; #ifndef __UART4_H__ #define __UART4_H__ #include"stm32mp1xx_gpio.h" #include"stm32mp1xx_rcc.h" #include"stm32mp1xx_uart.h" void uart4_config(); void putchar(char dat); char getchar();…

GeoServer 样式指南

毫无疑问,Geoserver 是遵循 OGC 标准共享空间数据的最佳方式之一。 Geoserver 允许我们以 WMS 格式公开数据,通过以栅格(png、jpeg 等)格式显示大量矢量数据,使开发人员和用户的工作变得非常轻松。 本博客介绍了 GeoServer 功能的又一领域,即动态设置数据样式。首先,矢量…

AGM AG32 MCU在汽车UWB应用方案

AG32的汽车UWB应用方案 汽车电子产品的日益成熟&#xff0c;包括ADAS和车载信息娱乐&#xff0c;正在推动对CPLD的需求。例如&#xff0c;利用安装在车上的各种传感器&#xff08;如雷达、摄像头和激光雷达等&#xff09;来感知周围环境&#xff0c;实现实时监测和数据处理。这…

2023年网络安全行业:机遇与挑战并存

2023年全球网络安全人才概况 根据ISC2的《2023年全球网络安全人才调查报告》&#xff0c;全球的网络安全专业人才数量达到了550万&#xff0c;同比增长了8.7%。然而&#xff0c;这一年也见证了网络安全人才短缺达到了历史新高&#xff0c;缺口数量接近400万。尤其是亚太地区&am…

Centos7 的 Open Stack T 版搭建流程 --- (三)配置消息队列

配置消息队列 文章目录 配置消息队列&#xff08;1&#xff09;安装 RabbitMQ 服务并配置新用户权限controller &#xff08;2&#xff09;如何开启图形化&#xff08;拓展&#xff09; &#xff08;1&#xff09;安装 RabbitMQ 服务并配置新用户权限 controller yum install…

【支持CPU机器】一个Python文件搭建你本地图片生成编辑会话系统——TaskMatrix架构解读

一. 先上效果 TaskMatrix通过ChatGPT 和一系列 Visual Foundation 模型&#xff0c;通过聊天实现图片的绘制、问答和编辑。 二. 流程概览 1. 使用者流程 多模型会话上下文用户指令输入到本系统 -> 多模式会话基础模型 -> 理解用户指令上下文&#xff0c;调用API选择器&…

使用ssh无显示器连接树莓派并配置vnc

配置网络连接并开启ssh 使用树莓派官方的烧录工具&#xff1a; Windows下载链接 Linux下载连接 Mac下载链接 Linux还可以使用命令下载 sudo apt install rpi-imager 在烧录前点击小齿轮配置好网络连接和ssh 如果你不知道树莓派的ip地址&#xff0c;可以使用网线连接电脑和…

新品上线!这个真核宿主-宏病毒组分析流程,太合我意了吧!

病毒被称为地球的“暗物质”&#xff0c;亟待研究。近年来&#xff0c;宏病毒组&#xff08;富集/不富集&#xff09;的研究呈爆炸式增长&#xff0c;人们越来越意识到病毒在调节微生态平衡上发挥着重要作用。 对于宏病毒组项目而言&#xff0c;在新病毒的发现、病毒溯源和进化…

FastJson2中FastJsonHttpMessageConverter找不到类问题

问题描述 如果你最近也在升级FastJson到FastJson2版本&#xff0c;而跟我一样也遇到了FastJsonHttpMessageConverter找不到类问题以及FastJsonConfig找不到问题&#xff0c;那么恭喜你&#xff0c;看完本文&#xff0c;安装完fastjson2、fastjson2-extension、fastjson2-exte…

【Linux学习】Linux调试器-gdb使用

这里写目录标题 &#x1f302;背景&#x1f302;gdb使用&#x1f302;指令总结&#xff1a; &#x1f302;背景 程序的发布方式有两种&#xff0c;debug模式和 release模式 其中&#xff0c;debug模式是可以被调试的&#xff0c;到那时release模式是不能被调试的&#xff1b; …

【Python】使用Python计算简单数值积分

题外话&#xff0c;Python语言命名的来源&#xff1a;&#xff08;见下图&#xff09;Monty Python巨蟒剧团 1、积分题目&#xff08;3&#xff09; 2、解析解答 3、Python计算代码 import math import scipy.integrate as integrate# 积分区间 # x_min 0.0 # 1 # x_min …

Android14 - WindowManagerService之客户端Activity布局

Android14 - WindowManagerService之客户端Activity布局 一、主要角色 WMS作为一个服务端&#xff0c;有多种客户端与其交互的场景。我们以常见的Activity为例&#xff1a; Activity&#xff1a;在ActivityThread构建一个Activity后&#xff0c;会调用其attach方法&#xff0c;…

elementui单个输入框回车刷新整个页面

<!-- 搜索 --> <el-form :model"queryParams" ref"queryForm" :inline"true"><el-form-item label"名称" prop"nameLike"><el-input v-model"queryParams.nameLike" placeholder"请输入…

docker的安装以及docker中nginx配置

机器 test3 192.168.23.103 1机器初始化配置 1.1关闭防火墙&#xff0c;清空防火墙规则 systemctl stop firewalld iptables -F setenforce 01.2部署时间同步 yum install ntp ntpdate -y1.3安装基础软件包 yum install -y wget net-tools nfs-utils lrzsz gcc gcc-c make…

nvm管理多个node版本,快速来回切换node版本

前言 文章基于 windows环境 使用nvm安装多版本nodejs。 最近公司有的项目比较老需要降低node版本才能运行&#xff0c;由于来回进行卸载不同版本的node比较麻烦&#xff1b;所以需要使用node工程多版本管理&#xff0c;后面自己就简单捯饬了一下nvm来管理node&#xff0c;顺便…

将城市名称替换成简写

图片左边是城市全称&#xff0c;右边是城市简写。 现在有一句话“this is Republic of Korea,that is United States of America”&#xff0c;要将其中的城市全称替换成城市简写。 #"Republic of Korea"替换成 South Korea s"this is Republic of Korea,that …

C语言数据结构之链表

目录 前言 \color{maroon}{前言} 前言1.链表的概念及结构2.链表的分类3.无头单向非循环链表的实现4.带头双向循环链表的实现5.顺序表和链表的对比 前言 \color{maroon}{前言} 前言 在上一篇博客中我们提到&#xff0c;线性表包括顺序表和链表&#xff0c;顺序表在上篇博客中已…

【Java基础】23.接口

文章目录 一、接口的概念1.接口介绍2.接口与类相似点3.接口与类的区别4.接口特性5.抽象类和接口的区别 二、接口的声明三、接口的实现四、接口的继承五、接口的多继承六、标记接口 一、接口的概念 1.接口介绍 接口&#xff08;英文&#xff1a;Interface&#xff09;&#xf…

运维小技能:nacos部署(外接mysql)

文章目录 I 安装nacos(m1版本)1.1 镜像启动1.2 查看docker容器日志1.3 开启鉴权II 外接mysql的docker部署方式2.1 复制mysql-schema.sql2.2 导入mysql-schema.sqlIII 配置远程用户3.1 创建数据库远程用户3.2 查看远程用户是否有密码I 安装nacos(m1版本) docker search nacos:查…