源码|批量执行invokeAll()多选一invokeAny()

图片

ExecutorService中定义了两个批量执行任务的方法,invokeAll()和invokeAny(),在批量执行或多选一的业务场景中非常方便。invokeAll()在所有任务都完成(包括成功/被中断/超时)后才会返回,invokeAny()在任意一个任务成功(或ExecutorService被中断/超时)后就会返回。

AbstractExecutorService实现了这两个方法,本文将先后分析invokeAll()和invokeAny()两个方法的源码实现。

invokeAll()

invokeAll()在所有任务都完成(包括成功/被中断/超时)后才会返回。有不限时和限时版本,从更简单的不限时版入手。

不限时版

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get(); // 无所谓先执行哪个任务的get()方法
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

8-12行,先将所有任务都提交到线程池(当然,任何ExecutorService均可)中。

严格来说,不是“提交”,而是“执行”。执行可能是同步或异步的,取决于线程池的策略。不过由于我们仅讨论异步情况(同步同理),用“提交”一词更容易理解。下同。

13-22行,for循环的目的是阻塞调用invokeAll的线程,直到所有任务都执行完毕。当然我们也可以使用其他方式实现阻塞,不过这种方式是最简单的:

  • 15行如果f.isDone()返回true,则当前任务已结束,继续检查下一个任务;否则,调用f.get()让线程阻塞,直到当前任务结束。

  • 17行无所谓先执行哪一个FutureTask实例的get()方法。由于所有任务并发执行,总体阻塞时间取决于于是耗时最长的任务,从而实现了invodeAll的阻塞调用。

  • 18-20行没有捕获InterruptedException。如果有任务被中断,主线程将抛出InterruptedException,以响应中断。

最后,为防止在全部任务结束之前过早退出,23行、25-29行相配合,如果done不为true(未执行到40行就退出了)则取消全部任务。

限时版

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        // Interleave time checks and calls to execute in case
        // executor doesn't have any/much parallelism.
        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) // 及时检查是否超时
                return futures;
        }

        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                if (nanos <= 0L) // 及时检查是否超时
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

10-11行,先将所有任务封装为FutureTask,添加到futures列表中。

18-23行,每提交一个任务,就立刻判断是否超时。这样的话,如果在任务全部提交到线程池中之前,就已经达到了超时时间,则能够尽快检查出超时,结束提交并退出。

对于限时版,将封装任务与提交任务拆开是必要的。

28-29行,每次在调用限时版f.get()进入阻塞状态之前,先检查是否超时。这里也是希望超时后,能够尽快发现并退出。

其他同不限时版。

invokeAny()

invokeAny()在任意一个任务成功(或ExecutorService被中断/超时)后就会返回。也分为不限时和限时版本,但为了进一步保障性能,invokeAny()的实现思路与invokeAll()略有不同。

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

内部调用了doInvokeAny()。

学习5-8行的写法,代码自解释。

doInvokeAny()

简化如下:

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
...
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);
...
    try {
        ExecutionException ee = null;
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        futures.add(ecs.submit(it.next()));
        --ntasks;
        int active = 1;

        for (;;) {
            Future<T> f = ecs.poll();
            if (f == null) {
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)
                    break;
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else
                    f = ecs.take();
            }
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (...) {
                    ee = ...;
                }
            }
        }
...
        throw ee;
    } finally {
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}

要点:

  • ntasks维护未提交的任务数,active维护已提交未结束的任务数。

  • 内部使用ExecutorCompletionService维护已完成的任务。

  • 如果没有任务成功结束,则返回捕获的最后一个异常

  • 第一个任务是必将被执行的,其他任务按照迭代器顺序增量提交

增量提交有什么好处呢?节省资源,如果在提交过程中就有任务完成了,那么没必要继续提交任务耗费时间和空间;降低延迟,如果有任务完成,与全量提交相比,能更早被发现。

14行先向线程池提交一个任务(迭代器第一个),ntasks–,active=1:

futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

这里是真“提交”了,不是“执行”。

然后18-45行循环检查是否有任务成功结束。

首先,19行通过及时返回的poll()方法,尝试取出一个已完成的任务:

Future<T> f = ecs.poll();

根据f的结果,分成两种情况讨论。

ExecutorCompletionService默认使用LinkedBlockingQueue作为任务队列。对LinkedBlockingQueue不熟悉的可参照源码|并发一枝花之BlockingQueue。

case1:如果有任务完成

如果有任务完成,则f不为null,进入40-49行,active–,并尝试取出任务结果:

if (f != null) {
    --active;
    try {
        return f.get();
    } catch (...) {
        ee = ...;
    }
}
  • 如果能够成功取出,即当前任务已成功结束,直接返回。

  • 如果抛出异常,则当前任务异常结束,使用ee记录异常。

显然,如果已完成的任务是异常结束的,invokeAny()不会退出,而是继续查看其它任务。

FutureTask#get()的用法参照源码|使用FutureTask的正确姿势。

case2:如果没有任务完成

如果没有任务完成,则f为null,进入23-39行,判断是继续提交任务、退出还是等待任务结果:

if (f == null) {
    if (ntasks > 0) { // check1
        --ntasks;
        futures.add(ecs.submit(it.next()));
        ++active;
    }
    else if (active == 0) // check2
        break;
    else if (timed) { // check3
        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
        if (f == null)
            throw new TimeoutException();
        nanos = deadline - System.nanoTime();
    }
    else // check4
        f = ecs.take();
}
  • check1:如果还有剩余任务(ntasks > 0),那就继续提交,同时ntasks–,active++。

  • check2:如果没有剩余任务了,且也没有已提交未结束的任务(active == 0),则表示全部任务均已执行结束,但没有一个任务是成功的,可以退出循环。退出循环后,将在47行抛出ee记录的最后一个异常。

  • check3:如果没有剩余任务,但还有已提交未结束的任务,且开启了超时机制,则尝试使用超时版poll()等待任务完成。但是,如果这种情况下超时了,就表示整个invokeAny()方法超时了,所以poll()返回null的时候,要主动抛出TimeoutException。

  • check4:如果可以没有剩余任务,但还有已提交未结束的任务,且未开启超时机制,则使用无限阻塞的take()方法,等待任务完成。

这种一堆if-else的代码很丑。可修改如下:

if (f == null) { // check1
    if (ntasks  0) {
         --ntasks;
         futures.add(ecs.submit(it.next()));
         ++active;
         continue;
     }
     if (active == 0) { // check2
         assert ntasks == 0; // 防止自己改吧改吧把它这句判断挪到了前面
         break;
     }
    if (timed) { // check3
        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
        if (f == null) {
            throw new TimeoutException();
        }
        nanos = deadline - System.nanoTime();
    } else { // check4
        f = ecs.take();
    }
}

>

修改依据:

  • check1、check2、check3/check4没有并列的判断关系

  • check3、check4有并列的判断关系,非此即彼

  • 结构更清爽

原文地址:源码|批量执行invokeAll()&&多选一invokeAny() 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/463071.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

校园博客系统 |基于springboot框架+ Mysql+Java的校园博客系统设计与实现(可运行源码+数据库+设计文档)

推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 目录 前台功能效果图 管理员功能登录前台功能效果图 系统功能设计 数据库E-R图设计 lunwen参考 摘要 研究…

MySQL语法分类 DDL(1)

DDL&#xff08;1&#xff09;(操作数据库、表) 数据库操作(CRUD) C(Create):创建 //指定字符集创建 create database db_1 character set utf8;//避免重复创建数据库报错可以用一下命令 create database if not exists db_1 character set utf8;R(Retrieve):查询 //查询所…

电源适配器

电源适配器 1. 选购指南2. 接口测量方法3. 电源接口4. 抗干扰磁环&#xff0c;稳定输出References 1. 选购指南 插头尺度相同&#xff0c;供电电压 (V) 相同&#xff0c;电流 (A) > 原来的电流 (A) INPUT (输入)&#xff0c;OUTPUT (输出) 2. 接口测量方法 3. 电源接口 外…

ARM和AMD介绍

一、介绍 ARM 和 AMD 都是计算机领域中的知名公司&#xff0c;它们在不同方面具有重要的影响和地位。 ARM&#xff08;Advanced RISC Machine&#xff09;&#xff1a;ARM 公司是一家总部位于英国的公司&#xff0c;专注于设计低功耗、高性能的处理器架构。ARM 架构以其精简指…

HCIP—BGP邻居关系建立实验

BGP的邻居称为&#xff1a;IBGP对等体 EBGP对等体 1.EBGP对等体关系&#xff1a; 位于 不同自治系统 的BGP路由器之间的BGP对等体关系 EBGP对等体一般使用 直连建立 对等体关系&#xff0c;EBGP邻居之间的报文 TTL中值设置为1 两台路由器之间建立EBGP对等体关系&#xff0…

Python Web开发记录 Day12:Django part6 用户登录

名人说&#xff1a;东边日出西边雨&#xff0c;道是无晴却有晴。——刘禹锡《竹枝词》 创作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 1、登录界面2、用户名密码校验3、cookie与session配置①cookie与session②配置…

【机器学习-02】矩阵基础运算---numpy操作

在机器学习-01中&#xff0c;我们介绍了关于机器学习的一般建模流程&#xff0c;并且在基本没有数学公式和代码的情况下&#xff0c;简单介绍了关于线性回归的一般实现形式。不过这只是在初学阶段、为了不增加基础概念理解难度所采取的方法&#xff0c;但所有的技术最终都是为了…

【01】htmlcssgit

01-前端干货-html&css 防脱发神器 一图胜千言 使用border-box控制尺寸更加直观,因此,很多网站都会加入下面的代码 * {margin: 0;padding: 0;box-sizing: border-box; }颜色的 alpha 通道 颜色的 alpha 通道标识了色彩的透明度,它是一个 0~1 之间的取值,0 标识完全…

C语言之快速排序

目录 一 简介 二 代码实现 快速排序基本原理&#xff1a; C语言实现快速排序的核心函数&#xff1a; 三 时空复杂度 A.时间复杂度 B.空间复杂度 C.总结&#xff1a; 一 简介 快速排序是一种高效的、基于分治策略的比较排序算法&#xff0c;由英国计算机科学家C.A.R. H…

【Machine Learning】Suitable Learning Rate in Machine Learning

一、The cases of different learning rates: In the gradient descent algorithm model: is the learning rate of the demand, how to determine the learning rate, and what impact does it have if it is too large or too small? We will analyze it through the follow…

HCIP—OSPF课后练习一

本实验模拟了一个企业网络场景&#xff0c;R1、R2、R3为公司总部网络的路由器&#xff0c;R4、R5分别为企业分支机构1和分支机构2的路由器&#xff0c;并且都采用双上行方式与企业总部相连。整个网络都运行OSPF协议&#xff0c;R1、R2、R3之间的链路位于区域0&#xff0c;R4与R…

Redis和Mysql的数据一致性问题

在高并发的场景下&#xff0c;大量的请求直接访问Mysql很容易造成性能问题。所以我们都会用Redis来做数据的缓存&#xff0c;削减对数据库的请求的频率。 但是&#xff0c;Mysql和Redis是两种不同的数据库&#xff0c;如何保证不同数据库之间数据的一致性就非常关键了。 1、导…

并查集Disjoint Set

并查集的概念 并查集是一种简单的集合表示&#xff0c;它支持以下三种操作 1. make_set(x)&#xff0c;建立一个新集合&#xff0c;唯一的元素是x 2. find_set(x)&#xff0c;返回一个指针&#xff0c;该指针指向包含x的唯一集合的代表&#xff0c;也就是x的根节点 3. union_…

Echo框架:高性能的Golang Web框架

Echo框架&#xff1a;高性能的Golang Web框架 在Golang的Web开发领域&#xff0c;选择一个适合的框架是构建高性能和可扩展应用程序的关键。Echo是一个备受推崇的Golang Web框架&#xff0c;以其简洁高效和强大功能而广受欢迎。本文将介绍Echo框架的基本特点、使用方式及其优势…

学习shell脚本

文章目录 什么是shell脚本为什么要学习shell脚本第一个脚本编写与执行 简单的shell脚本练习简单案例脚本的执行方式差异(source、sh script、./script) 如何使用shell脚本的判断式利用test命令的测试功能利用判断符号[ ]shell脚本的默认变量($0、$1...) shell脚本的条件判断式利…

MySQL安装(Mac系统)

首先要删除本机原有的mysql 停止MySQL服务 sudo /usr/local/mysql/support-files/mysql.server stop不放心可以使用以下命令查询并杀死进程 ps aux | grep mysqld sudo kill <PID>再次尝试停止服务 sudo /usr/local/mysql/support-files/mysql.server stop卸载MySQL&…

多租户平台前端存储结构的选择

下图来源于cookie、localStorage 和 sessionStorage的区别及应用实例 既然localstorage无有效期&#xff0c;关闭浏览器还存在&#xff0c;那么用来存储用户的身份信息并不是太合适&#xff0c;先看一下B站中localstorage都存在了啥&#xff0c;原来把我搜索的记录都存在了下来…

第十二届蓝桥杯EDA省赛真题分析

前言&#xff1a; 第十二届蓝桥杯EDA比赛用的是AD软件&#xff0c;从第十四届起都是使用嘉立创EDA专业版&#xff0c;所以在这里我用嘉立创EDA专业版实现题目要求。 一、省赛第一套真题题目 主观题整套题目如下&#xff1a; 试题一&#xff1a;库文件设计&#xff08;5分&am…

【类脑智能】脑网络通信模型分类及量化指标(附思维导图)

脑网络通信模型分类及量化指标(附思维导图) 参考论文&#xff1a;Brain network communication_ concepts, models and applications 概念 脑网络通信模型是一种使用图论和网络科学概念来描述和量化大脑结构中信息传递的模型。这种模型可以帮助研究人员理解神经信号在大脑内…

P1881 绳子对折

题目描述 FJ 有一个长度为 L&#xff08;1≤L≤10,000&#xff09;的绳子。这个绳子上有 N&#xff08;1≤N≤100&#xff09;个结&#xff0c;包括两个端点。FJ 想将绳子对折&#xff0c;并使较短一边的绳子上的结与较长一边绳子上的结完全重合&#xff0c;如图所示&#xff…