异步回调模式

异步回调

所谓异步回调,本质上就是多线程中线程的通信,如今很多业务系统中,某个业务或者功能调用多个外部接口,通常这种调用就是异步的调用。如何得到这些异步调用的结果自然也就很重要了。
Callable、Future、FutureTask

public class test implements Callable<Boolean>{
    public static void main(String[] args) {
        test a=new test();
        FutureTask futureTask=new FutureTask<>(a);
        new Thread(futureTask).start();
        Object su=null;
        try {
            su=futureTask.get();
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println(su);
    }
    @Override
    public Boolean call() throws Exception {
        return null;
    }
}

FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。通过FutureTask获取异步线程的执行结果,但是其调用get()方法获取异步结果时,主线程也会被阻塞。属于异步阻塞模式。异步阻塞模式属于主动模式的异步调用,异步回调属于被动模式的异步调用。Java中回调模式的标准实现类为CompletableFuture。由于此类出现时间比较晚,期间Guava和Netty等都提出了自己的异步回调模式API来使用。这里主要介绍CompletableFuture,其他的有时间后面再学习。

CompletableFuture

在这里插入图片描述
CompletableFuture实现Future和CompletionStage两个接口。此类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。主要方法如下所示:

runAsync和supplyAsync创建子任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
}

可以看出runAsync没有返回值,supplyAsync有返回值,此处用supplyAsync举例:

ExecutorService executor= Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
        return "你好,周先生";
},executor);
System.out.println(completableFuture.get());//输出你好,周先生
executor.shutdown();

上例中的线程池可以自己构造,如若不指定使用CompletableFuture中默认的线程池ForkJoinPool。
handle()方法统一处理异常和结果

//在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}
//可能不在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(asyncPool, fn);
}
//在指定线程池executor中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}

案例:

CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
     throw new RuntimeException("你好");
});
completableFuture.handle(new BiFunction<String,Throwable,String>(){
     @Override
     public String apply(String s, Throwable throwable) {
          if(throwable==null){
               System.out.println("mei");;
          }else {
               System.out.println("出错了");
          }
          return "ok";
     }
});

异步任务的串行执行

主要方法为以下几种:thenApply()、thenAccept()、thenRun()和 thenCompose()。

thenApply()
此方法实现异步任务的串行化执行,前一个任务结果作为下一个任务的入参。

	后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

其中泛型参数T:上一个任务所返回结果的类型。泛型参数U:当前任务的返回类型。
案例:

		ExecutorService executor= Executors.newFixedThreadPool(10);
        CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        },executor).thenApplyAsync(new Function<String,String>() {
            @Override
            public String apply(String s) {
                System.out.println(Thread.currentThread().getId());//13
                return "你好,毛先生";
            }
        });

        System.out.println(completableFuture.get());//输出你好,毛先生
        executor.shutdown();

thenRun()
此方法不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。而且没有返回值。

	//后一个任务与前一个任务在同一个线程中执行
	public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }
	//后一个任务与前一个任务可以不在同一个线程中执行
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }
	//后一个任务在executor线程池中执行
    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

thenAccept()
使用此方法时一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。

	//后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) {
        return biAcceptStage(screenExecutor(executor), other, action);
    }

thenCompose()
对两个任务进行串行的调度操作,第一个任务操作完成时,将其结果作为参数传递给第二个任务。

	//后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) {
        return uniComposeStage(screenExecutor(executor), fn);
    }

thenCompose()方法第二个任务的返回值是一个CompletionStage异步实例。

		ExecutorService executor= Executors.newFixedThreadPool(10);
        CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        },executor).thenComposeAsync(new Function<String,CompletableFuture<String>>(){
            @Override
            public CompletableFuture<String> apply(String s) {
                return CompletableFuture.supplyAsync(()->{
                    System.out.println(Thread.currentThread().getId());//12
                    return "你好,毛先生";
                });
            }
        });
        System.out.println(completableFuture.get());//输出你好,毛先生
        executor.shutdown();

异步任务的合并执行

主要实现为以下几个方法:thenCombine()、runAfterBoth()、
thenAcceptBoth()。

thenCombine()
thenCombine()会在两个CompletionStage任务都执行完成后,一块来处理两个任务的执行结果。如果要合并多个任务,可以使用allOf()。

	//合并第二步任务的CompletionStage实例,返回第三步任务的CompletionStage
	public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }
	//不一定在同一个线程中执行第三步任务的CompletionStage实例
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
	//第三步任务的CompletionStage实例在指定的executor线程池中执行
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }

其中泛型参数T:表示第一个任务所返回结果的类型。泛型参数U:表示第二个任务所返回结果的类型。泛型参数V:表示第三个任务所返回结果的类型。
案例:

		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,毛先生";
        });
        CompletableFuture<String> future3 = future1.thenCombine(future2, new BiFunction<String, String, String>(){
            @Override
            public String apply(String s, String s2) {
                return s+"-----"+s2;
            }
        });
        String s = future3.get();
        System.out.println(s);//你好,周先生-----你好,毛先生

而runAfterBoth()方法不关注每一步任务的输入参数和输出参数,thenAcceptBoth()中第三个任务接收第一个和第二个任务的结果,但是不返回结果。

异步任务的选择执行

若异步任务的选择执行不是按照某种条件进行选择的,而按照执行速度进行选择的:前面两并行任务,谁的结果返回速度快,其结果将作为第三步任务的输入。对两个异步任务的选择可以通过CompletionStage接口的applyToEither()、acceptEither()等方法来实现。
applyToEither()

	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数
	public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }
	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,不一定在同一个线程中执行fn回调函数
    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }
	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,在指定线程池执行fn回调函数
    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor) {
        return orApplyStage(screenExecutor(executor), other, fn);
    }

案例:

		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "你好,周先生";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,毛先生";
        });
        CompletableFuture<String> future3 = future1.applyToEither(future2, new Function<String, String>(){
            @Override
            public String apply(String s) {
                return s;
            }
        });
        String s = future3.get();
        System.out.println(s);//你好,毛先生

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/231815.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C/C++之输入输出

文章目录 一.C语言的输入输出1.printfi. 输出整数ii. 浮点数iii.字符 & 字符串 2.scanfi.整数ii.浮点数iii. 字符 & 字符串 3.特殊用法i. * 的应用ii. %n 的应用iii. %[] 的应用 二.C中的输入输出1.couti. 缓冲区&#xff08;buffer&#xff09;ii. cout之格式化输出 2…

python爬虫学习-批量爬取图片

python爬虫学习-批量爬取图片 爬虫步骤爬取前十页图片到本地根据页码获取网络源码使用xpath解析网页解析网页并下载图片主函数如下 爬取的网站为站长素材&#xff08;仅做学习使用&#xff09; 爬取的目标网站为 https://sc.chinaz.com/tupian/qinglvtupian.html如果爬取多页&…

有什么进销存软件能对接微信小程序?

有什么进销存软件能对接微信小程序&#xff1f; 据我所知&#xff0c;很多进销存软件都有配套的微信小程序吧。 以我们现在用的这个为例&#xff0c;这也是同行推荐过来的&#xff0c;很好用&#xff0c;而且性价比很高—— 在线平台&#xff0c;无需下载APP&#xff0c;搭载…

Python Cupy 模块:加速大规模数值计算

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com Cupy是一个基于NumPy的库&#xff0c;专门设计用于在GPU上进行高性能计算。它提供了与NumPy相似的API&#xff0c;因此用户可以很容易地将现有的NumPy代码迁移到Cupy上&#xff0c;从而充分利用GPU的并行计算能力…

Java数据结构06——树

1.why: 数组&链表&树 2. 大纲 2.1前中后序 public class HeroNode {private int no;private String name;private HeroNode left;//默认为nullprivate HeroNode right;//默认为nullpublic HeroNode(int no, String name) {this.no no;this.name name;}public int …

基于Python的前程无忧、51job、智联招聘等招聘网站数据获取及数据分析可视化大全【代码+演示】

需要本项目的可以私信博主&#xff0c;获取&#xff0c;或者文末卡片获取 import pandas as pd import glob import warnings warnings.filterwarnings("ignore")# 指定目录 directory ./data/# 使用glob来获取所有.xlsx文件 excel_files glob.glob(directory *.x…

软件科技成果鉴定测试需提供哪些材料?

为了有效评估科技成果的质量&#xff0c;促进科技理论向实际应用转化&#xff0c;所以需要进行科技成果鉴定测试。申请鉴定的科技成果范围是指列入国家和省、自治区、直辖市以及国务院有关部门科技计划内的应用技术成果&#xff0c;以及少数科技计划外的重大应用技术成果。   …

高项备考葵花宝典-项目进度管理输入、输出、工具和技术(下,很详细考试必过)

项目进度管理的目标是使项目按时完成。有效的进度管理是项目管理成功的关键之一&#xff0c;进度问题在项目生命周期内引起的冲突最多。 小型项目中&#xff0c;定义活动、排列活动顺序、估算活动持续时间及制定进度模型形成进度计划等过程的联系非常密切&#xff0c;可以视为一…

Bash脚本处理ogg、flac格式到mp3格式的批量转换

现在下载的许多音乐文件是flac和ogg格式的&#xff0c;QQ音乐上下载的就是这样的&#xff0c;这些文件尺寸比较大&#xff0c;在某些场合使用不便&#xff0c;比如在车机上播放还是mp3格式合适&#xff0c;音质这些在车机上播放足够了&#xff0c;要求不高。比如本人就喜欢下载…

unity 2d 入门 飞翔小鸟 Cinemachine 镜头跟随小鸟 多边形碰撞器 解决镜头不会穿模问题(十二)

1、安装 window->package manager 2、创建Cinemachine 右键->Cinemachine->2D Carmera 3、创建空对象和多边形控制器如图 记得勾选 is Trigger 空对象位置记得要和小鸟保持一致&#xff0c;不然等下写完脚本后&#xff0c;镜头一开始会移动一下 4、将多边形触…

课堂练习3.3:进程的调度

3-6 课堂练习3.3&#xff1a;进程的调度 在内存中一般存放着数目远大于计算机 CPU 个数的进程&#xff0c;进程调度的作用是选择合适的进程来使用CPU&#xff0c;进程调度器对系统性能有重要影响。本实训分析Linux 0.11的进程调度算法&#xff0c;该操作系统采用了一种时间片与…

文件重命名:轻松高效,批量重命名文件只需掌握一点技巧

在日常工作和生活中&#xff0c;经常要对文件进行重命名。有时候可能要对一批文件进行重命名&#xff0c;如果一个个手动重命名&#xff0c;不仅费时费力&#xff0c;还容易出错。如何掌握一些文件重命名的技巧&#xff0c;那就能轻松高效地完成这项任务。接下来就讲解云炫文件…

华为ensp实验——基于全局地址池的DHCP组网实验

目录 前言实验目的实验内容实验结果 前言 该实验基于华为ensp&#xff0c;版本号是1.3.00.100 V100R003C00SPC100&#xff0c;只供学习和参考&#xff0c;不作任何商业用途。 具体的DHCP命令可以看系列文章链接&#xff0c;计算机网络实验&#xff08;华为eNSP模拟器&#xff…

win11+RTX4070Ti 安装 CUDA + cuDNN(图文教程)

win11RTX4070TI 安装 CUDA cuDNN&#xff08;图文教程&#xff09; 教程基本信息介绍查看电脑是否有最新显卡驱动并确定已安装下载CUDA安装CUDA查看CUDA是否安装成功安装cuDNN验证cuDNN是否安装成功 教程基本信息介绍 此教程为本人安装记录&#xff0c;仅供参考 本教程时间&am…

BI技巧丨RowNumber应用介绍

白茶在之前的文章中&#xff0c;给大家介绍过Rank函数的应用场景&#xff0c;其实与Rank函数同时推出的还有RowNumber函数&#xff0c;二者之间有一些差异&#xff0c;但是总体应用的场景基本类似。 RowNumber函数基本语法 ROWNUMBER ( [<relation>][, <orderBy>…

CSPNet: A New Backbone that can Enhance Learning Capability of CNN(2019)

文章目录 -Abstract1 Introduction2 Related workformer work 3 Method3.1 Cross Stage Partial Network3.2 Exact Fusion Model 4 Experiments5 Conclusion 原文链接 源代码 - 梯度信息重用&#xff08;有别于冗余的梯度信息&#xff09;可以减少计算量和内存占用提高效率&am…

算法:合并两个有序数组(双指针)

时间复杂度 O(m n)&#xff0c;空间复杂度 O(1) /*** param {number[]} nums1* param {number} m* param {number[]} nums2* param {number} n* return {void} Do not return anything, modify nums1 in-place instead.*/ var merge function(nums1,m,nums2,n) {let p1 m-1…

redis之缓存穿透,击透,雪崩~

以下为一个我们正常的缓存流程&#xff1a; 缓存雪崩&#xff1a; 在双十一的时候&#xff0c;淘宝的首页访问量是非常大的&#xff0c;所以它的很多数据是放在redis缓存里面&#xff0c;对应redis中的key&#xff0c;假设设置了缓存失效的时间为3小时&#xff0c;超过这三个小…

【Hadoop_02】Hadoop运行模式

1、Hadoop的scp与rsync命令&#xff08;1&#xff09;本地运行模式&#xff08;2&#xff09;完全分布式搭建【1】利用102将102的文件推到103【2】利用103将102的文件拉到103【3】利用103将102的文件拉到104 &#xff08;3&#xff09;rsync命令&#xff08;4&#xff09;xsync…

smarty模版 [BJDCTF2020]The mystery of ip 1

打开题目 点击flag给了我们一个ip 点击hint&#xff0c;查看源代码处告诉了我们要利用这个ip bp抓包&#xff0c;并添加X-Forward-For头 所以这道题是XFF可控 本来联想到XFF漏洞引起的sql注入&#xff0c;但是我们无论输入什么都会正常回显&#xff0c;就联想到ssti注入 我们…