ExecutorCompletionService详解

本文已收录至Github,推荐阅读 👉 Java随想录

微信公众号:Java随想录

文章目录

    • 摘要
    • ExecutorCompletionService适用场景
    • ExecutorCompletionService使用
    • ExecutorCompletionService原理解析
    • 注意事项
    • 总结

摘要

ExecutorCompletionService 是Java并发编程中的一个有用的工具类,它实现了 CompletionService 接口。ExecutorCompletionService 将 Executor 和BlockingQueue 功能融合在一起,使用它可以提交我们的任务。这个任务委托给 Executor 执行,可以使用 ExecutorCompletionService 对象的 take() 和 poll() 方法获取结果。

本文将深入讲解 ExecutorCompletionService 的使用以及源码解析。

ExecutorCompletionService适用场景

ExecutorCompletionService在以下场景中特别有用:

  • 并行任务处理:当需要同时执行多个任务,并按照完成的顺序获取它们的结果时,可以使用ExecutorCompletionService来简化任务提交和结果获取的流程。
  • 高性能计算:在需要进行大规模计算或复杂计算的场景中,可以将任务拆分成多个子任务,并使用ExecutorCompletionService来管理和获取子任务的结果。

假设现在有一批需要进行计算的任务,为了提高整批任务的执行效率,我们可以使用线程池来异步计算这些任务。通过向线程池中不断提交任务并保留与每个任务关联的Future对象。最后,我们可以遍历这些Future对象,并通过调用 get() 方法获取每个任务的计算结果。

Future的不足

Future 没有办法回调,只能手动去调用,当通过 get() 方法获取线程的返回值时,会导致阻塞,也就是和当前这个 Future 关联的计算任务执行完成的时候才返回结果,新任务必须等待已完成任务的结果才能继续进行处理。

这样会浪费很多时间,因为我们不知道哪个线程先执行完了,只能挨个去获取结果,这样已经完成的线程会因为前面未完成的线程的耗时而无法提前进行汇总,最好是谁先执行完成,谁先返回。

而 ExecutorCompletionService 可以实现这样的效果,节省获取完成结果的时间,它的内部有一个先进先出的阻塞队列,用于保存已经执行完成的 Future,通过调用它的 take() 方法或 poll() 方法可以获取到一个已经执行完成的 Future,进而通过调用 Future 接口实现类的 get() 方法获取最终的结果。

CompletionService的目标是任务谁先完成谁先获取,即结果按照完成先后顺序排序

ExecutorCompletionService使用

ExecutorCompletionService 提供了一种方便的方式来处理一组异步任务,并按照完成的顺序获取它们的结果。它内部使用了Executor框架来执行任务,并且内部管理着一个已完成任务的阻塞队列,在结果获取上提供了更加灵活和高效的机制。

下面是一个简单的例子来演示ExecutorCompletionService的基本使用:

public class ExecutorCompletionServiceExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            completionService.submit(() -> {
                double sleepTime = Math.random() * 1000;
                Thread.sleep((long) sleepTime); // 模拟耗时操作
                return "Task " + taskId + " completed,cost time: " + sleepTime;
            });
        }

        // 获取结果
        for (int i = 0; i < 10; i++) {
            Future<String> future = completionService.take();
            String result = future.get();
            System.out.println(result);
        }

        executor.shutdown();
    }
}

输出:

Task 2 completed,cost time: 170.01927312611775
Task 3 completed,cost time: 460.9622858036789
Task 1 completed,cost time: 563.24738180643
Task 0 completed,cost time: 595.938819219159
Task 5 completed,cost time: 480.4473056068137
Task 4 completed,cost time: 748.2343208613524
Task 6 completed,cost time: 370.4679098376097
Task 7 completed,cost time: 270.45945981324905
Task 9 completed,cost time: 336.5536570760892
Task 8 completed,cost time: 577.5774464801026

在上述代码中,我们创建了一个固定大小的线程池,并使用 ExecutorCompletionService 来提交和获取任务的结果。通过调用completionService.submit()方法来提交任务,并随机指定睡眠时间,来模拟任务执行的耗时,然后通过completionService.take()方法来获取已完成的任务结果。

可以看到是按照任务的执行耗时顺序去获取结果的。

ExecutorCompletionService原理解析

ExecutorCompletionService 提供了两个构造函数,一个可以指定阻塞队列,另一个使用内部默认的阻塞队列,两个构造函数都需要传进线程池参数。

提供了三个获取方法,可以看到都是从队列中获取。

  • take()/poll() 方法的工作都委托给内部的已完成任务队列 completionQueue。
  • 如果队列中有已完成的任务, take() 方法就返回任务的结果,否则阻塞等待任务完成。
  • poll() 与 take() 方法不同,poll() 有两个版本:
    • 无参的 poll() 方法:如果完成队列中有数据就返回,否则返回null。
    • 有参数的 poll() 方法:如果完成队列中有数据就直接返回,否则等待指定的时间,到时间后如果还是没有数据就返回null。

两个提交任务方法,可以看到 submit() 方法最终会委托给内部的 executor 去执行任务,提交任务的时候会将任务封装成 QueueingFuture 对象。

ExecutorCompletionService内部维护了 QueueingFuture 类,QueueingFuture 继承了 FutureTask,并重写了 done() 方法,

可以看到 done() 方法在任务完成的时候会将结果存进 已完成任务队列 completionQueue 中。

Futuretask 的 done() 方法是用来标记一个任务已经完成的方法。当一个 Futuretask 中的任务完成后,就会调用 done() 方法通知。

默认是空方法,不会执行任何动作。

执行流程

当我们使用ExecutorCompletionService类时,它能够按照任务完成的顺序获取它们的结果,这是因为ExecutorCompletionService类内部结合了QueueingFuture类和done()方法的机制。以下是源码流程步骤解释:

  1. 提交任务:
    • 我们通过submit方法将任务提交给ExecutorCompletionService。在提交任务时,ExecutorCompletionService会使用自定义的QueueingFuture类来包装任务,并将其交给底层线程池执行。
  2. QueueingFuture类:
    • QueueingFuture类是ExecutorCompletionService的内部类,继承自FutureTask。它的构造方法接收一个Callable对象作为参数。
    • 在QueueingFuture类中,它重写了done()方法。done()方法会在任务执行完成后被调用。
  3. 任务执行完成时的处理:
    • 当任务执行完成后,在底层线程池的Worker线程中,会调用QueueingFuture的done()方法。
    • 在done()方法中,QueueingFuture会首先调用父类FutureTask的done()方法,以触发对计算结果的获取。然后,它会将任务的结果存储到一个内部的BlockingQueue队列中(即completionQueue)。
  4. 获取任务结果:
    • 当我们调用take方法获取任务结果时,它会从completionQueue队列中取出已完成的任务结果,并返回该结果。如果队列为空,则会阻塞等待,直到有任务完成并返回结果。
    • take方法内部会调用QueueingFuture的get()方法,从而触发对应任务的计算结果的获取。
  5. 保证按顺序获取结果:
    • 由于completionQueue是一个阻塞队列,并且在done()方法中将任务结果按照完成的顺序放入队列中,因此我们可以通过按顺序获取队列中的任务结果,来保证按照任务完成的顺序获取它们的结果。

通过以上源码流程步骤,ExecutorCompletionService类能够按照任务完成的顺序获取结果。它利用QueueingFuture类包装任务并存储结果到阻塞队列中,在任务执行完成后,按照完成的顺序将结果放入队列,从而实现了按顺序获取结果的功能。

注意事项

在使用ExecutorCompletionService时,需要注意以下事项:

  • 合理选择线程池大小:根据任务的数量和复杂性,合理选择线程池的大小,以充分利用系统资源并避免资源浪费。
  • 及时处理异常:在任务执行过程中,如果发生异常,需要及时处理和记录异常信息,以保证程序的稳定性和可靠性。
  • 使用Future对象进行任务取消和超时控制:通过使用Future对象的cancel方法,可以取消正在执行的任务。同时,可以通过调整 poll 方法的参数来设置超时时间,避免长时间等待任务结果而导致阻塞。

总结

ExecutorCompletionService是一个强大且灵活的工具类,能够简化异步任务的处理和结果获取过程。通过使用ExecutorCompletionService,我们可以更加高效地处理一组异步任务,并按照完成的顺序获取它们的结果。

本文介绍了ExecutorCompletionService的基本使用方法,并对其源码进行了解析。希望通过这篇博客能够帮助读者更好地理解和应用ExecutorCompletionService。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/288277.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

花为缘享奢体验中心,轻创业最佳选择

公开数据显示&#xff0c;中国市场上近10年奢侈品存量近4万亿&#xff0c;但二奢流转率仅为5%&#xff0c;相比于日本、美国接近30%的流转率&#xff0c;差距巨大。二奢行业下隐藏着一个万亿级市场&#xff0c;个人如何实现奢侈品创业&#xff0c;花为缘享奢体验中心为你服务。…

Python中的垃圾回收机制是什么

一、写在前面&#xff1a; 我们都知道Python一种面向对象的脚本语言&#xff0c;对象是Python中非常重要的一个概念。在Python中数字是对象&#xff0c;字符串是对象&#xff0c;任何事物都是对象&#xff0c;而它们的核心就是一个结构体--PyObject。 typedef struct_object{i…

瞬态抑制二极管(TVS)的注意事项与布局布线?|深圳比创达电子

一、瞬态抑制二极管(TVS)的注意事项 工作电压/反向截止电压&#xff08;VRVRWM&#xff09;此参数不需要降额&#xff0c;保证大于等于工作电路最大工作电压即可&#xff0c;越接近越好。该参数为TVS的固有参数。VBR是TVS固有参数&#xff0c;与外界冲击波形无关。 被保护器件…

分布式(8)

目录 36.什么是TCC&#xff1f; 37.分布式系统中常用的缓存方案有哪些&#xff1f; 38.分布式系统缓存的更新模式&#xff1f; 39.分布式缓存的淘汰策略&#xff1f; 40.Java中定时任务有哪些&#xff1f;如何演化的&#xff1f; 36.什么是TCC&#xff1f; TCC&#xff08…

低代码技术:颠覆数据孤岛的技术利器

在当今数据驱动的世界中&#xff0c;数据的价值无可忽视。然而&#xff0c;很多组织面临一个普遍的问题&#xff0c;即数据孤岛。数据孤岛指的是不同部门或系统之间无法有效共享和集成数据的情况。这限制了组织在数据驱动的决策和创新方面的能力。然而&#xff0c;低代码平台的…

雍禾植发成毛发行业标杆!雍禾医疗获“年度医疗大健康消费企业”

近期&#xff0c;以“新视野 新链接”为主题的2023 EDGE AWARDS全球创新评选榜单正式发布。该评选由钛媒体发起&#xff0c;聚焦大健康产业&#xff0c;由权威行业专家、王牌分析师、专业投资机构、用户代表共同评审&#xff0c;兼顾综合专业性、影响力、创新性三大维度评选而出…

2024 年加密货币领域需要注意的 5 大网络安全威胁

加密货币世界主要存在于数字领域&#xff0c;面临着众多不断变化的网络威胁&#xff0c;这些威胁所带来的风险&#xff0c;给个人和企业组织造成了重大损失。 本文将研究2023年年加密货币领域的一些关键网络安全趋势&#xff0c;这些趋势预计将持续到 2024 年&#xff0c;并对…

为什么大学c语言课不顺便教一下Linux,Makefile

为什么大学c语言课不顺便教一下Linux&#xff0c;Makefile&#xff0c;git&#xff0c;gdb等配套工具链呢? 在开始前我有一些资料&#xff0c;是我根据自己从业十年经验&#xff0c;熬夜搞了几个通宵&#xff0c;精心整理了一份「Linux的资料从专业入门到高级教程工具包」&…

kubeadm来快速搭建一个K8S集群

二进制搭建适合大集群&#xff0c;50台以下的主机 kubeadm更适合中下企业的业务集群 我们采用了二进制包搭建出的k8s集群&#xff0c;本次我们采用更为简单的kubeadm的方式来搭建k8s集群。 二进制的搭建更适合50台主机以上的大集群&#xff0c;kubeadm更适合中小型企业的集群…

Matplotlib基础

目录&#xff1a; 一、绘制函数图像&#xff1a;二、创建图形对象&#xff1a;三、绘制多子图&#xff1a; 一、绘制函数图像&#xff1a; from matplotlib import pyplot as plt import numpy as np #生成&#xff08;-50,50&#xff09;的数组 x np.arange(-50,50) #计算因…

SSM的校园二手交易平台----计算机毕业设计

项目介绍 本次设计的是一个校园二手交易平台&#xff08;C2C&#xff09;&#xff0c;C2C指个人与个人之间的电子商务&#xff0c;买家可以查看所有卖家发布的商品&#xff0c;并且根据分类进行商品过滤&#xff0c;也可以根据站内搜索引擎进行商品的查询&#xff0c;并且与卖…

山海鲸可视化软件的优势:数据整合、可视化与个性化定制

随着科技的快速发展&#xff0c;企业数字化转型已成为必然趋势。而对于一些本身没有开发优势或非技术型企业&#xff0c;数字化产品的选择就成为重中之重。作为山海鲸可视化软件的开发者&#xff0c;我们深知这一点&#xff0c;对于企业来说&#xff0c;能选择一个产品一定要有…

【代数学作业5】理想的分解:高斯整数环中理想的结构,并根据其范数和素数的性质进行分解

【代数学作业5】理想的分解 写在最前面题目1相关概念题解分析1. ( 1 3 ) ( 1 − 3 ) (1 \sqrt{3}) (1 - \sqrt{3}) (13 ​)(1−3 ​)2. ( 4 3 ) ≠ ( 4 − 3 ) (4 \sqrt{3}) \neq (4 - \sqrt{3}) (43 ​)​(4−3 ​)3. ( 33 , 7 − 3 3 ) ( 4 3 3 ) (33, 7 - 3\sq…

开源大模型应用开发

1.大语言模型初探 ChatGLM3简介 ChatGLM3-6B 是一个基于 Transformer 的预训练语言模型&#xff0c;由清华大学 KEG 实验室和智谱 AI 公司于 2023 年共同训练发布。该模型的基本原理是将大量无标签文本数据进行预训练&#xff0c;然后将其用于各种下游任务&#xff0c;例如文…

边坡安全监测预警系统——高效率

安装边坡安全监测预警系统的原因是多方面的&#xff0c;涉及到社会效益、经济效益和环境效益。随着国家基础设施建设的快速发展&#xff0c;边坡安全监测预警系统的需求越来越迫切。 边坡安全监测预警系统对于保障人民生命财产安全具有重要意义。在山区、丘陵地带&#xff0c;边…

1.C++语言的编译器及编译流程

1.C编译器 编译器就是将“高级语言”翻译为“机器语言&#xff08;低级语言&#xff09;”的程序。以下是一些主流的C编译器及其简要用法&#xff1a; GNU Compiler Collection (GCC):’ GCC原名GNU C Compiler&#xff0c;后来逐渐支持更多的语言编译&#xff08;C、Fortran、…

js文件上传 分片上传/断点续传/极速秒传

(极速秒传)利用md5判断上传的文件是否存在 MD5信息摘要算法&#xff0c;一种被广泛使用的密码散列函数&#xff0c;可以产生出一个128位&#xff08;16字节&#xff09;的散列值&#xff08;hash value&#xff09;&#xff0c;用于确保信息传输完整一致。 每一个文件都会生成…

电磁波的信号加载说明

电磁波的信号加载电磁波(Electromagnetic wave)是由同相振荡 且互相垂直的电场与磁场在空间中衍生发射的振荡粒子波&#xff0c;是以波动的形式传播的电磁场&#xff0c;具有波粒二象性&#xff0c;其粒子形态称为光子&#xff0c;电磁波与光子不是非黑即白的关系&#xff0c;而…

Kubernetes 核心实战之一(精华篇 1/2)

文章目录 1&#xff0c;资源创建方式1.1 yaml1.2 命令行 2&#xff0c;NameSpace命名空间2.1 命令行创建ns2.2 yaml 创建ns 3&#xff0c;Pod3.1 命令行 创建pod3.2 yaml 创建pod3.3 可视化界面 创建3.3.1 Pod nginx3.3.2 Pod nginx tomcat3.3.3 Pod 2ngnix 1&#xff0c;资源…

日文游戏翻译 ,如何做好本地化翻译?

相关调查显示&#xff0c;日本游戏占据全球游戏市场约20%的份额&#xff0c;其销量一直都不错。市场上对于日语游戏翻译的需求也很大。那么&#xff0c;针对日文游戏翻译&#xff0c;如何做好本地化翻译&#xff1f; 首先、做好语言和文化上的本地化。这要求译员从翻译的文本到…