Fork/Join框架

是什么

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

Fork: 把一个大任务切分为若干子任务并行的执行

Join: 合并这些子任务的执行结果,最后得到这个大任务的结果。

image-20230820213123325

运行流程图

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行

做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行

为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行

image-20230820213707928

工作窃取流程

工作窃取算法的优缺点:

优点:充分利用线程进行并行计算,减少了线程间的竞争。

缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

Fork/Join框架的设计

步骤1 分割任务。需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。

步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join使用两个类来完成以上两件事情。

  1. ForkJoinTask:要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask:用于有返回结果的任务。
  2. ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。

任务分割出的子任务添加当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机其他工作线程的队列的尾部获取一个任务

Fork/Join框架基本使用

需求:计算1+2+3+4的结果。

使用Fork/Join框架首先要考虑到的是如何分割任务,上述需求希望每个子任务最多执行两个数相加,因此,分割阈值设置为2。

有返回结果的任务,所以必须继承RecursiveTask

示例代码如下:

public class CountTask extends RecursiveTask<Integer> {
    /**
     * 阈值
     */
    private static final int THRESHOLD = 2;
    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        // 如果任务足够小就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分割成两个子任务计算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            // 执行子任务
            leftTask.fork();
            rightTask.fork();
            // 等待子任务执行完,并得到其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 生成一个计算任务,负责计算1+2+3+4
        CountTask task = new CountTask(1, 4);
        // 执行一个任务
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException();
        }
    }
}

ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,需要判断任务是否足够小(小于阈值),如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。 使用join方法会等待子任务执行完并得到其结果

Fork/Join框架的异常处理

ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常

    if(task.isCompletedAbnormally())
    {
        System.out.println(task.getException());
    }

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

Fork/Join框架的实现原理

ForkJoinPool由ForkJoinTask数组ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务

工作队列:

static final class WorkQueue {
        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

        // Instance fields
        volatile int scanState;    // versioned, <0: inactive; odd:scanning
        int stackPred;             // pool stack (ctl) predecessor
        int nsteals;               // number of steals
        int hint;                  // randomization and stealer index hint
        int config;                // pool index and mode
        volatile int qlock;        // 1: locked, < 0: terminate; else 0
        volatile int base;         // index of next slot for poll
        int top;                   // index of next slot for push
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        final ForkJoinPool pool;   // the containing pool (may be null)
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        volatile Thread parker;    // == owner during call to park; else null
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
        
       // ...
}

(1)ForkJoinTask的fork方法实现原理

调用ForkJoinTask的fork方法时,程序会将任务丢到任务队列里,然后立即返回结果。

    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
static final ForkJoinPool common;
ForkJoinPool.common.externalPush(this);
    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        externalSubmit(task);
    }

上述代码:把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。

(2)ForkJoinTask的join方法实现原理

Join方法的主要作用是阻塞当前线程并等待获取结果。代码如下:

    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

    private void reportException(int s) {
        if (s == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL)
            rethrow(getThrowableException());
    }

调用了doJoin()方法,通过doJoin()方法得到当前任务的状态与运算来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。

  • 如果任务状态是已完成,则直接返回任务结果。
  • 如果任务状态是被取消,则直接抛出CancellationException。
  • 如果任务状态是抛出异常,则直接抛出对应的异常。

doJoin()方法的实现代码。

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;

如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/82329.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

版本控制工具Git集成IDEA的学习笔记(第一篇Gitee)

目录 一、Gitee的使用 1、注册网站会员 2、用户中心 3、创建远程仓库 4、配置SSH免密登录 二、集成IDEA&#xff0c;Git项目搭建 1、本地仓库搭建 1&#xff09;创建一个新项目 2&#xff09;打开终端&#xff0c;在当前目录新建一个Git代码库 3&#xff09;忽略文件 …

Linux命令200例:tail用来显示文件的末尾内容(常用)

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌。CSDN专家博主&#xff0c;阿里云社区专家博主&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;数年电商行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &…

通过Git使用GitHub

目录 一、建立个人仓库 二、配置SSH密钥 三、克隆仓库代码 四、推送代码到个人仓库 五、代码拉取 一、建立个人仓库 1.建立GitHub个人仓库&#xff0c;首先注册GitHub用户。注册好了之后&#xff0c;打开用户的界面 然后就是配置问题 配置好后拉到最下方点击create repos…

【C++入门到精通】C++入门 —— 容器适配器、stack和queue(STL)

阅读导航 前言stack1. stack概念2. stack特点3. stack使用 queue1. queue概念2. queue特点3. queue使用 容器适配器1. 什么是适配器2. STL标准库中stack和queue的底层结构3. STL标准库中对于stack和queue的模拟实现⭕stack的模拟实现⭕stack的模拟实现 总结温馨提示 前言 文章…

鲁棒优化入门(5)—Matlab+Yalmip求解鲁棒优化编程实战

之前的博客&#xff1a;鲁棒优化入门&#xff08;二&#xff09;——基于matlabyalmip求解鲁棒优化问题 去年发布了使用Yalmip工具箱求解鲁棒优化问题的博客之后&#xff0c;陆陆续续有朋友问我相关的问题&#xff0c;有人形容从学习这篇博客到求解论文中的鲁棒优化问题&#x…

(二)结构型模式:4、组合模式(Composite Pattern)(C++实例)

目录 1、组合模式&#xff08;Composite Pattern&#xff09;含义 2、组合模式应用场景 3、组合模式的优缺点 4、组合模式的UML图学习 5、C实现组合模式的简单示例&#xff08;公司的OA系统&#xff09; 1、组合模式&#xff08;Composite Pattern&#xff09;含义 组合模…

【C语言学习】二维数组

二维数组 int[3][5];//通常理解为a是一个3行5列的矩阵二维数组的遍历 for(i0; i<3; i){for(j0; j<5; j){a[i][j] i*j;} }

Vivado使用入门之三:I/O约束

一、导图概览 二、I/O约束 2.1 I/O约束的内容 I/O约束主要是对port的位置和电气特性进行设置&#xff0c;进入菜单栏Window的IO Ports&#xff0c;可以查看可约束的相关内容。 一些port的常用特性解释如下 Name: port的名称 Direction:port的输入输出类型&#xff0c;有三种…

DNNGP、DeepGS 和 DLGWAS模型构成对比

一、DNNGP DNNGP 是基于深度卷积神经网络&#xff0c;这个结构包括一个输入层&#xff0c;三个卷积层&#xff0c;一个批标准化层&#xff0c;两个dropout层&#xff0c;一个平坦化层&#xff0c;一个 dense层。 dropout层&#xff1a;在神经网络中,dropout层是一个非常有效的正…

[JavaWeb]【一】入门JavaWeb开发总概及HTML、CSS、JavaScript

目录 一 特色 二 收获​编辑 三 什么是web? 四 网站的工作流程 五 web网站的开发模式​编辑 六 web开发课程学习安排 七、初始web前端 八 HTML、CSS 8.1 什么是HTNL\CSS(w3cschool) 8.2 HTML快速入门 8.3 VS Code开发工具 8.3.1 插件 8.3.2 主题&#xff08;改变颜色&…

破解难题:如何应对项目中的‘老油条’障碍

引言 在项目管理的实践中&#xff0c;我们经常遇到各种各样的人员挑战。其中&#xff0c;有一种特殊的挑战被称为“老油条”现象。这些“老油条”通常在表面上表现得非常配合&#xff0c;但在实际工作中却常常没有任何进展。这种情况不仅会影响项目的进度&#xff0c;还可能对…

Kestrel和ISS服务器下的配置

一、Kestrel服务器 Kestrel是ASP.NET Core框架中的一个跨平台的Web服务器。它是ASP.NET Core应用程序默认的HTTP服务器&#xff0c;并且可作为独立的Web服务器来托管ASP.NET Core应用程序。 Kestrel具有以下特点和功能 1、跨平台 Kestrel是完全跨平台的&#xff0c;可以在Wind…

Git如何上传文件到github

Git下载网址&#xff1a; https://git-scm.com/downloads 1. 新建一个空文件夹&#xff0c;用来上传文件&#xff0c;第一次需创建&#xff0c;以后无需创建 2. 点进去空文件夹&#xff0c;鼠标右键&#xff0c;使用Git Bash Here 打开 3. 克隆远程仓库&#xff1a;git cl…

nginx反向代理、负载均衡

修改nginx.conf的配置 upstream nginx_boot{# 30s内检查心跳发送两次包&#xff0c;未回复就代表该机器宕机&#xff0c;请求分发权重比为1:2server 192.168.87.143 weight100 max_fails2 fail_timeout30s; server 192.168.87.1 weight200 max_fails2 fail_timeout30s;# 这里的…

WPF显示初始界面--SplashScreen

WPF显示初始界面–SplashScreen 前言 WPF应用程序的运行速度快&#xff0c;但并不能在瞬间启动。当第一次启动应用程序时&#xff0c;会有一些延迟&#xff0c;因为公共语言运行时&#xff08;CLR&#xff09;首先需要初始化.NET环境&#xff0c;然后启动应用程序。 对于WPF中…

OpenAI Function calling

开篇 原文出处 最近 OpenAI 在 6 月 13 号发布了新 feature&#xff0c;主要针对模型进行了优化&#xff0c;提供了 function calling 的功能&#xff0c;该 feature 对于很多集成 OpenAI 的应用来说绝对是一个“神器”。 Prompt 的演进 如果初看 OpenAI 官网对function ca…

【LeetCode-中等题】49. 字母异位词分组

题目 题解一:排序哈希表 思路:由于互为字母异位词的两个字符串包含的字母相同&#xff0c;因此对两个字符串分别进行排序之后得到的字符串一定是相同的&#xff0c;故可以将排序之后的字符串作为哈希表的键。 核心api: //将字符串转换为字符数组char[] ch str.toCharArray();…

使用open cv进行角度测量

使用open cv进行角度测量 用了一点初中数学的知识&#xff0c;准确度&#xff0c;跟鼠标点的准不准有关系&#xff0c;话不多说直接上代码 import cv2 import mathpath "test.jpg" img cv2.imread(path) pointsList []def mousePoint(event, x, y, flags, param…

【Linux取经路】解析环境变量,提升系统控制力

文章目录 一、进程优先级1.1 什么是优先级&#xff1f;1.2 为什么会有优先级&#xff1f;1.3 小结 二、Linux系统中的优先级2.1 查看进程优先级2.2 PRI and NI2.3 修改进程优先级2.4 进程优先级的实现原理2.5 一些名词解释 三、环境变量3.1 基本概念3.2 PATH&#xff1a;Linux系…

开源后台管理系统Geekplus Admin

本系统采用前后端分离开发模式&#xff0c;后端采用springboot开发技术栈&#xff0c;mybatis持久层框架&#xff0c;redis缓存&#xff0c;shiro认证授权框架&#xff0c;freemarker模版在线生成代码&#xff0c;websocket消息推送等&#xff0c;后台管理包含用户管理&#xf…