ForkJoinPool 你真的明白和用对了吗

ForkJoinPool 是一个功能强大的 Java 类,用于处理计算密集型任务,使用 ForkJoinPool 分解计算密集型任务,并并行执行它们,能够产生更好的性能。它的工作原理是将任务分解成更小的子任务,使用分而治之的策略进行操作,使其能够并发地执行任务,从而提高吞吐量并减少处理时间。

ForkJoinPool 的独特特性之一是它用于优化性能的工作窃取算法。当工作线程完成分配的任务时,它将从其他线程窃取任务,确保所有线程都有效地工作,并且不会浪费计算机资源。

ForkJoinPool 在 Java 的并行流和 CompletableFutures 中广泛使用,允许开发人员轻松地并发执行任务。此外,其他 JVM 语言(如 Kotlin和 Akka)也使用这个框架来构建需要高并发性和弹性的消息驱动应用程序。

使用 ForkJoinPool 构建线程池

ForkJoinPool 存储着 worker,这些 worker 是在机器的每个 CPU 核心上运行的进程。这些进程中的每一个都存储在一个双端队列(Deque)中。一旦工作线程的任务用完,它就开始从其他工作线程窃取任务。

首先,会有分岔任务的过程。这意味着一个大任务将被分解成可以并行执行的小任务。一旦所有子任务完成,它们就会重新加入。最后,ForkJoinPool 类通过 Join 的方式提供一个输出结果,如下图所示。

无标题-2023-06-15-1917 (1).png

当任务在 ForkJoinPool 中提交时,该进程将被分成更小的进程并推送到共享队列中。

一旦 fork() 方法被调用,任务将被并行调用,直到基本条件为真。一旦处理被分叉,join() 方法会确保线程相互等待,直到进程完成。

所有任务最初都将提交给一个主队列,这个主队列将把任务推送给 work 线程。同时,与堆栈数据结构相同,任务是使用后进先出(LIFO)策略插入的,如下图所示。

无标题-2023-06-15-1917 (2).png

Work-stealing 窃取算法

Work-stealing 算法是一种用于实现并行计算和负载平衡的策略。它通常用于在分布式系统和多核处理器中,以高效地分配和平衡计算任务。

Work-stealing 算法的优点是它可以实现高效的负载平衡和并行计算,同时减少了任务的等待时间。当一个线程完成自己的任务并变得空闲时,它将尝试从另一个线程的队列末端“窃取”任务,与队列数据结构相同,它遵循 FIFO 策略。这种策略将允许空闲线程拾取等待时间最长的任务,从而减少了总体等待时间并提高了吞吐量。

在下面的图中,线程 2 通过轮询线程 1 的队列中的最后一个元素,从线程 1 窃取一个任务,然后执行该任务。被窃取的任务通常是队列中等待时间最长的的任务,这确保了工作负载在池中的所有线程之间均匀分布。

无标题-2023-06-15-1917 (3).png

总的来说,ForkJoinPool 的工作窃取算法是一个强大的功能,可以通过确保所有可用的计算资源得到有效利用来显著提高并行程序的性能。

ForkJoinPool 主类

让我们快速浏览一下支持使用 ForkJoinPool 进行处理的主类。

  • ForkJoinPool 创建一个线程池来使用 ForkJoin:它的工作原理与其他线程池类似。这个类中最重要的方法是 commonPool(),它用于创建了 ForkJoin 线程池。
  • RecursiveAction:该类的主要功能是计算递归操作。在 compute() 方法中,我们没有返回值,这是因为递归发生在 compute() 方法中。
  • RecursiveTask: 这个类的工作方式类似于 RecursiveAction,不同之处在于 compute() 方法将返回一个值。

使用 RecursiveAction

要使用 RecursiveAction 的功能,我们需要继承它并覆盖它的 compute() 方法。

在下面的代码示例中,我们将以并行和递归的方式计算数组中每个数字的两倍数。

我们看到在代码中,fork() 方法调用 compute() 方法。一旦整个数组得到了每个元素的和,递归调用就停止了。同时,一旦对数组的所有元素进行递归求和,我们就会显示结果。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ForkJoinDoubleAction {

  public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    int[] array = {1, 5, 10, 15, 20, 25, 50};
    DoubleNumber doubleNumberTask = new DoubleNumber(array, 0, array.length);

    // 调用compute方法
    forkJoinPool.invoke(doubleNumberTask);
    System.out.println(DoubleNumber.result);
  }
}

class DoubleNumber extends RecursiveAction {

  final int PROCESS_THRESHOLD = 2;
  int[] array;
  int startIndex, endIndex;
  static int result;

  DoubleNumber(int[] array, int startIndex, int endIndex) {
    this.array = array;
    this.startIndex = startIndex;
    this.endIndex = endIndex;
  }

  @Override
  protected void compute() {
    if (endIndex - startIndex <= PROCESS_THRESHOLD) {
      for (int i = startIndex; i < endIndex; i++) {
        result += array[i] * 2;
      }
    } else {
      int mid = (startIndex + endIndex) / 2;
      DoubleNumber leftArray = new DoubleNumber(array, startIndex, mid);
      DoubleNumber rightArray = new DoubleNumber(array, mid, endIndex);

      // 递归地调用compute方法
      leftArray.fork();
      rightArray.fork();

      // Joins
      leftArray.join();
      rightArray.join();
    }
  }
}

计算的结果输出是 252。

从 RecursiveAction 中要记住的重要一点是,它不返回值。还可以通过使用分而治之的策略来分解这个过程,从而提高性能。

同样要注意的是,当将 RecursiveAction 用于可以有效地分解为更小的子问题的任务时,它是最有效的。

因此,RecursiveAction 和 ForkJoinPool 应该用于计算密集型任务,在这些任务中,工作的并行化可以显著提高性能。否则,由于线程的创建和管理,性能会变得更差。

RecursiveTask

在这个示例中,我们使用 RecursiveTask 类看看有没有什么区别。

RecursiveAction 和 RecursiveTask 之间的区别在于,使用 RecursiveTask,我们可以在compute() 方法中返回一个值。

import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinSumArrayTask extends RecursiveTask<Integer> {

  private final List<Integer> numbers;

  public ForkJoinSumArrayTask(List<Integer> numbers) {
    this.numbers = numbers;
  }

  @Override
  protected Integer compute() {
    if (numbers.size() <= 2) {
      return numbers.stream().mapToInt(e -> e).sum();
    } else {
      int mid = numbers.size() / 2;
      List<Integer> list1 = numbers.subList(0, mid);
      List<Integer> list2 = numbers.subList(mid, numbers.size());
 
      ForkJoinSumArrayTask task1 = new ForkJoinSumArrayTask(list1);
      ForkJoinSumArrayTask task2 = new ForkJoinSumArrayTask(list2);

      task1.fork();

      return task1.join() + task2.compute();
    }
  }

  public static void main(String[] args) {
    ForkJoinPool forkJoinPool = new ForkJoinPool();

    List<Integer> numbers = List.of(1, 3, 5, 7, 9);
    int output = forkJoinPool.invoke(new ForkJoinSumArrayTask(numbers));

    System.out.println(output);
  }
}

在上面的代码中,我们递归地分解数组,直到它达到基本条件。

一旦我们破坏了主数组,我们就会将 list1 和 list2 发送给 ForkJoinSumArrayTask,然后我们分叉 task1,它将并行执行 compute() 方法和数组的其他部分。

一旦递归过程达到基本条件,就会调用 join 方法,将结果连接起来。

最后输出结果为 25。

何时使用 ForkJoinPool

ForkJoinPool 不应该在所有情况下都使用。如前所述,最好将其用于高度密集的并发进程。让我们具体看看这些情况都有哪些:

  • 递归任务: ForkJoinPool 非常适合执行递归算法,如快速排序、归并排序或二进制搜索。这些算法可以分解成更小的子问题并并行执行,显著提高性能。
  • 并行问题:如果你的问题可以很容易地划分为独立的子任务,例如图像处理或数值模拟,那么可以使用 ForkJoinPool 并行执行子任务。
  • 高并发场景:在高并发场景中,例如 web 服务器、数据处理管道或其他高性能应用程序,可以使用 ForkJoinPool 跨多个线程并行执行任务,这有助于提高性能和吞吐量。

结尾

在本文中,我们看到了如何使用最重要的 ForkJoinPool 功能在 CPU 内核中执行繁重的操作。最后让我们来总结本文的要点:

  • ForkJoinPool 是一个线程池,它使用分而治之策略递归地执行任务。
  • JVM 语言(如Kotlin和Akka)使用 ForkJoinPool 来构建消息驱动型的应用程序。
  • ForkJoinPool 并行执行任务,从而有效地利用计算机资源。
  • Work-stealing 窃取算法通过允许空闲线程从繁忙线程窃取任务来优化资源利用。
  • 任务存储在双端队列中,存储采用后进先出策略,窃取采用先进先出策略
  • ForkJoinPool 框架中的主要类包括 ForkJoinPool、RecursiveAction 和RecursiveTask:
    • RecursiveAction 用于计算递归操作,它不返回任何值。
    • RecursiveTask 用于计算递归操作,但返回一个值。
    • compute() 方法在两个类中被重写以实现自定义逻辑。
    • fork() 方法调用 compute() 方法并将任务分解为更小的子任务。
    • join() 方法等待子任务完成并合并它们的结果。
    • ForkJoinPool 通常与并行流和 CompletableFuture 一起使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/80800.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

动手学深度学习—卷积神经网络LeNet(代码详解)

1. LeNet LeNet由两个部分组成&#xff1a; 卷积编码器&#xff1a;由两个卷积层组成&#xff1b;全连接层密集块&#xff1a;由三个全连接层组成。 每个卷积块中的基本单元是一个卷积层、一个sigmoid激活函数和平均汇聚层&#xff1b;每个卷积层使用55卷积核和一个sigmoid激…

强训第31天

选择 传输层叫段 网络层叫包 链路层叫帧 A 2^16-2 C D C 70都没收到&#xff0c;确认号代表你该从这个号开始发给我了&#xff0c;所以发70而不是71 B D C 248&123120 OSI 物理层 数据链路层 网络层 传输层 会话层 表示层 应用层 C 记一下304读取浏览器缓存 502错误网关 编…

349. 两个数组的交集 题解

题目描述&#xff1a;349. 两个数组的交集 - 力扣&#xff08;LeetCode&#xff09; 给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 方法一&#xff1a; 解题思路&#xff1a; 我们可以…

时序数据库influxdb笔记

官方资料 https://docs.influxdata.com/influxdb/v2.7/install/?tLinux https://www.influxdata.com/influxdb/ 安装 1、linux平台下 1&#xff09;下载 2&#xff09;解压 3&#xff09;添加账户&#xff08; adduser influx&#xff09; 4&#xff09;设置目录权限 5…

缺少或找不到vcruntime140_1.dll的解决方法

某天&#xff0c;当我准备打开电脑上的一个应用程序时&#xff0c;突然收到一个错误提示&#xff0c;显示缺少了vcruntime140_1.dll文件。这个文件是一个重要的系统组件&#xff0c;它的丢失导致了我无法正常运行该应用程序。于是&#xff0c;我开始了一场寻找和修复旅程。然而…

RFID技术助力汽车零配件装配产线,提升效率与准确性

随着科技的不断发展&#xff0c;越来越多的自动化设备被应用到汽车零配件装配产线中。其中&#xff0c;射频识别&#xff08;Radio Frequency Identification&#xff0c;简称RFID&#xff09;技术凭借其独特的优势&#xff0c;已经成为了这一领域的重要技术之一。本文将介绍RF…

【BUG】docker安装nacos,浏览器却无法访问到页面

个人主页&#xff1a;金鳞踏雨 个人简介&#xff1a;大家好&#xff0c;我是金鳞&#xff0c;一个初出茅庐的Java小白 目前状况&#xff1a;22届普通本科毕业生&#xff0c;几经波折了&#xff0c;现在任职于一家国内大型知名日化公司&#xff0c;从事Java开发工作 我的博客&am…

go 协程并发数控制

错误的写法&#xff1a; 这里的<-ch 是为了从channel 中读取 数据&#xff0c;为了不使channel通道被写满&#xff0c;阻塞 go 协程数的创建。但是请注意&#xff0c;go workForDraw(v, &wg) 是不阻塞后续的<-ch 执行的&#xff0c;所以就一直go workForDraw(v, &…

数学建模之“灰色预测”模型

灰色系统分析法在建模中的应用 1、CUMCM2003A SARS的传播问题 2、CUMCM2005A长江水质的评价和预测CUMCM2006A出版社的资源配置 3、CUMCM2006B艾滋病疗法的评价及疗效的预测问题 4、CUMCM2007A 中国人口增长预测 灰色系统的应用范畴大致分为以下几方面: (1&#xff09;灰色关…

js简介以及在html中的2种使用方式(hello world)

简介 javascript &#xff1a;是一个跨平台的脚本语言&#xff1b;是一种轻量级的编程语言。 JavaScript 是 Web 的编程语言。所有现代的 HTML 页面都使用 JavaScript。 HTML&#xff1a; 结构 css&#xff1a; 表现 JS&#xff1a; 行为 HTMLCSS 只能称之为静态网页&#xff0…

IronPDF for .NET Crack

IronPDF for .NET Crack ronPDF现在将等待HTML元素加载后再进行渲染。 IronPDF现在将等待字体加载后再进行渲染。 添加了在绘制文本时指定旋转的功能。 添加了在保存为PDFA时指定自定义颜色配置文件的功能。 IronPDF for.NET允许开发人员在C#、F#和VB.NET for.NET Core和.NET F…

批量提取文件名到excel,详细的提取步骤

如何批量提取文件名到excel&#xff1f;我们的电脑中可能存储着数量非常多的电子文件&#xff0c;现在需要快速将这些文件的名称全部提取到Excel中。虽然少量数据可以通过复制粘贴的方式轻松完成&#xff0c;但是对于上万个数据而言&#xff0c;复制粘贴都是行不通的&#xff0…

改进YOLO系列:2.添加ShuffleAttention注意力机制

添加ShuffleAttention注意力机制 1. ShuffleAttention注意力机制论文2. ShuffleAttention注意力机制原理3. ShuffleAttention注意力机制的配置3.1common.py配置3.2yolo.py配置3.3yaml文件配置1. ShuffleAttention注意力机制论文 论文题目:SA-NET: SHUFFLE ATTENTION …

【CSS动画02--卡片旋转3D】

CSS动画02--卡片旋转3D 介绍代码HTMLCSS css动画02--旋转卡片3D 介绍 当鼠标移动到中间的卡片上会有随着中间的Y轴进行360的旋转&#xff0c;以下是几张图片的介绍&#xff0c;上面是鄙人自己录得一个供大家参考的小视频&#x1f92d; 代码 HTML <!DOCTYPE html>…

计算机视觉--距离变换算法的实战应用

前言&#xff1a; Hello大家好&#xff0c;我是Dream。 计算机视觉CV是人工智能一个非常重要的领域。 在本次的距离变换任务中&#xff0c;我们将使用D4距离度量方法来对图像进行处理。通过这次实验&#xff0c;我们可以更好地理解距离度量在计算机视觉中的应用。希望大家对计算…

MobaXterm网络远程工具介绍下载安装破解使用

一、介绍 obaXterm 是远程计算机的工具箱。在单个 Windows 应用程序中&#xff0c;它提供了大量为程序员、网站管理员、IT 管理员量身定制的功能。 MobaXterm 为 Windows 桌面提供了重要的远程网络工具&#xff08;SSH、X11、RDP、VNC、FTP、MOSH 等&#xff09;和Unix 命令&a…

Unity 找不到 Navigation 组件的解决

当我们想利用unity 里面的Navigation 组件来实现我们的物体的自动导航时&#xff0c;有时竟然会发现我们的菜单栏里面找不到 该组件 这时我们应该怎么办&#xff1f; 请确保你的项目中已经导入了Unity的AI模块。要导入该模块&#xff0c;请打开"Project Settings"&am…

计算机网络----CRC冗余码的运算

目录 1. 冗余码的介绍及原理2. CRC检验编码的例子3. 小练习 1. 冗余码的介绍及原理 冗余码是用于在数据链路层的通信链路和传输数据过程中可能会出错的一种检错编码方法&#xff08;检错码&#xff09;。原理&#xff1a;发送发把数据划分为组&#xff0c;设每组K个比特&#…

pytest自动化测试框架,真正做到从0到1由浅入深详细讲解【万字级】

目录 嗨咯铁汁们&#xff0c;很久不见&#xff0c;我还是你们的老朋友凡叔&#xff0c;这里也感谢各位小伙伴的点赞和关注&#xff0c;你们的三连是我最大的动力哈&#xff0c;我也不会辜负各位的期盼&#xff0c;这里呢给大家出了一个pytest自动化测试框架由浅入深详细讲解。 …

Tomcat日志中文乱码

修改安装目录下的日志配置 D:\ProgramFiles\apache-tomcat-9.0.78\conf\logging.properties java.util.logging.ConsoleHandler.encoding GBK