【并发编程】ForkJoinPool工作原理分析

目录

  • 前置内容
  • 课程内容
    • 一、由一道算法题引发的思考
      • 1.算法题
      • 2.什么是归并排序法
    • 二、什么是Fork/Join框架
      • 1.基本介绍
      • 2.ForkJoinPool
      • 2.ForkJoinPool构造函数及参数解读
      • 3.任务提交方式
      • 4.工作原理图
      • 5.工作窃取
      • 6.和普通线程池之间的区别
      • 7.ForkJoinTask
  • 学习总结

前置内容

Q1:在并发编程里面,通常我们遇到的任务类型都有哪些?
答:通常有:计算密集型(CPU密集型)、IO密集型

Q2:它们有什么区别?
答:计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

课程内容

一、由一道算法题引发的思考

1.算法题

有一道算法题,题目为:【如何充分利用多核CPU的性能,快速对一个2千万大小的数组进行排序?】
说到排序算法,我估计大家多少会有点印象的, 毕竟很多面试都会偶尔问到。比如有:冒泡排序啦,选择排序啦,快速排序等等。但是在这个2KW的体量下,显然是不适用的。或许更有经验一点的朋友们想到了一个办法了,那就是:归并排序法。
是的,这里就是我想引出的一个东西【归并排序法】。

2.什么是归并排序法

归并排序(Merge Sort)是一种基于分治思想的排序算法。归并排序的基本思想是将一个大数组分成
两个相等大小的子数组,对每个子数组分别进行排序,然后将两个子数组合并成一个有序的大数组。
因为常常使用递归实现(由先拆分后合并的性质决定的),所以我们称其为归并排序。
归并排序的步骤包括以下3个:

  • 将数组分成两个子数组
  • 对每个子数组进行排序
  • 合并两个有序的子数组

归并排序的时间复杂度为O(nlogn),空间复杂度为O(n),其中n为数组的长度。
当然,还有一个更学术一点的解释:

分治思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题
性质相同
。求出子问题的解,就可得到原问题的解。
分治思想的步骤如下:

  1. 分解:将要解决的问题划分成若干规模较小的同类问题;
  2. 求解:当子问题划分得足够小时,用较简单的方法解决;
  3. 合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。
    计算机十大经典算法中的归并排序、快速排序、二分查找都是基于分治思想实现的算法
    分治任务模型图如下:

这里用归并排序算法,简单实现了一下这个算法题,代码如下:

public class MergeSort {

    private final int[] arrayToSort; //要排序的数组
    private final int threshold;  //拆分的阈值,低于此阈值就不再进行拆分

    public MergeSort(final int[] arrayToSort, final int threshold) {
        this.arrayToSort = arrayToSort;
        this.threshold = threshold;
    }

    /**
     * 排序
     * @return
     */
    public int[] sequentialSort() {
        return sequentialSort(arrayToSort, threshold);
    }

    public static int[] sequentialSort(final int[] arrayToSort, int threshold) {
        //拆分后的数组长度小于阈值,直接进行排序
        if (arrayToSort.length < threshold) {
            //调用jdk提供的排序方法
            Arrays.sort(arrayToSort);
            return arrayToSort;
        }

        int midpoint = arrayToSort.length / 2;
        //对数组进行拆分
        int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
        int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
        //递归调用
        leftArray = sequentialSort(leftArray, threshold);
        rightArray = sequentialSort(rightArray, threshold);
        //合并排序结果
        return merge(leftArray, rightArray);
    }

    public static int[] merge(final int[] leftArray, final int[] rightArray) {
        //定义用于合并结果的数组
        int[] mergedArray = new int[leftArray.length + rightArray.length];
        int mergedArrayPos = 0;
        int leftArrayPos = 0;
        int rightArrayPos = 0;
        while (leftArrayPos < leftArray.length && rightArrayPos < rightArray.length) {
            if (leftArray[leftArrayPos] <= rightArray[rightArrayPos]) {
                mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
                leftArrayPos++;
            } else {
                mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
                rightArrayPos++;
            }
            mergedArrayPos++;
        }

        while (leftArrayPos < leftArray.length) {
            mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
            leftArrayPos++;
            mergedArrayPos++;
        }

        while (rightArrayPos < rightArray.length) {
            mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
            rightArrayPos++;
            mergedArrayPos++;
        }

        return mergedArray;
    }

    public static void main(String[] args) {

        // 初始化一个2KW的数组
        Random random = new Random();
        int[] arrays = new int[20000000];
        for (int i = 0; i < 20000000; i++) {
            arrays[i] = random.nextInt(5);
        }


        long start1 = System.currentTimeMillis();

        // 开始拆分排序
        MergeSort mergeSort = new MergeSort(arrays, 100000);
        mergeSort.sequentialSort();
        System.out.println("任务总耗时:wasteTime=" + (System.currentTimeMillis() - start1));
    }
//    系统输出:
//    任务总耗时:wasteTime=921
} 

可以看出来,总共的耗时是:921ms(这还是我的电脑相对较好)
我想大家都看出来了,这里921ms看似不错的,那如果这个数据规模再大一点又会怎样呢?可以很负责的告诉大家,那时候就不是简单累加了,可能是指数型增长。而且细心的朋友可能已经看出来了,上面的代码完全是在单线程环境下运行(完全就是1核负重前行,其他核在岁月静好,不能忍),有没有可能,多线程环境下,运行效果会更好呢?
我们知道,我们无论做了什么优化,其实很多目的都是为了尽可能地压榨CPU资源的,像我们很多电脑一样,很多时候CPU使用率都是极低的,于是在一些大牛眼里,这样的行为非常浪费资源(/狗头/狗头)。如下图:13%的CPU使用率,资本家看了都在叹气,何其浪费啊
在这里插入图片描述
于是,大牛们为了解决上面说的【1核负重前行,其他核在岁月静好】的不公平现象,提出了一种新的线程池,ForkJoinPool(相信大家会有疑问:线程池?那为啥不用之前的ThreadPoolExecutor?别急,下面会给大家解释)

二、什么是Fork/Join框架

1.基本介绍

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架(分治思想)。(Fork,刀叉的意思;join,连接的意思。 用叉子把某个东西分开,然后最后又连接起来)
Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+ ...+20000000,可以分割成 10 个子任务,每n个子任务分别对 2000000 个数进行求和,最终汇总这 10 个子任务的结果。如下图所示:
在这里插入图片描述

2.ForkJoinPool

ForkJoinPool是Fork/Join框架中的线程池类,它用于管理Fork/Join任务的线程。跟ThreadPoolExecutor一样,它也是继承于AbstractExecutorService类,所以它跟ThreadPoolExecutor相同的行为,例如submit()、invoke()、shutdown()、awaitTermination()等,用于提交任务、执行任务、关闭线程池和等待任务的执行结果。ForkJoinPool类中还包括一些参数,例如线程池的大小、工作线程的优先级、任务队列的容量等,可以根据具体的应用场景进行设置。类图如下:
在这里插入图片描述

它具有以下特性:

  1. ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好;
  2. ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等;
  3. ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。

就像上面说的,ForkJoinPool是ThreadPoolExecutor的补充,前者适合于计算密集型任务,所以后者通常就是适合IO密集型任务了。

2.ForkJoinPool构造函数及参数解读

在这里插入图片描述

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode)

如上面代码所示,ForkJoinPool中有4个核心参数,分别用于:控制线程池的并行数、工作线程的创建、异常处理和指定队列模式等。各参数解释如下:

  1. int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别;
  2. ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;
  3. UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理;
  4. boolean asyncMode:设置队列的工作模式。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。

3.任务提交方式

任务提交是ForkJoinPool的核心能力之一,提交任务有三种方式:
在这里插入图片描述

4.工作原理图

还记得下面这张图吗,ThreadPoolExecutor的工作原理图:
在这里插入图片描述
ForkJoinPool与ThreadPoolExecutor不一样,它为了适应更大的并行度,对工作队列的设计做了改造。它的原理图如下:
在这里插入图片描述

ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中(如上图中的Task-4,通过fork方法继续分出了Task-4.1和Task-4.2)。
如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做【任务窃取】的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务。如此一来,所有的工作线程都不会闲下来了。

5.工作窃取

ForkJoinPool与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool存在引入了工作窃取设计,它是其性能保证的关键之一。工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。

ForkJoinPool的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue实现。它是Deques的特殊形式,但仅支持三种操作方式:push、pop和poll(也称为窃取)。在ForkJoinPool中,队列的读取有着严格的约束,push和pop仅能从其所属线程调用,而poll则可以从其他线程调用。
通过工作窃取,Fork/Join框架可以实现任务的自动负载均衡,以充分利用多核CPU的计算能力,同时也可以避免线程的饥饿和延迟问题
在这里插入图片描述

6.和普通线程池之间的区别

  • 工作窃取算法
    ForkJoinPool采用多任务队列,以及工作窃取算法来提高线程的利用率,而普通线程池则采用共享阻塞任务队列来管理任务。在工作窃取算法中,当一个线程完成自己的任务后,它可以从其它线程的队列中获取一个任务来执行,以此来提高线程的利用率。
  • 任务的分解和合并
    ForkJoinPool可以将一个大任务分解为多个小任务,并行地执行这些小任务,最终将它们的结果合并起来得到最终结果。而普通线程池只能按照提交的任务顺序一个一个地执行任务。
  • 工作线程的数量
    ForkJoinPool会根据当前系统的CPU核心数来自动设置工作线程的数量,以最大限度地发挥CPU的性能优势。而普通线程池需要手动设置线程池的大小,如果设置不合理,可能会导致线程过多或过少,从而影响程序的性能。
  • 任务类型
    ForkJoinPool适用于执行大规模任务并行化,而普通线程池适用于执行一些短小的任务,如处理请求等。

7.ForkJoinTask

ForkJoinTask是Fork/Join框架中的抽象类,它定义了执行任务的基本接口。用户可以通过继承ForkJoinTask类来实现自己的任务类,并重写其中的compute()方法来定义任务的执行逻辑。通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:

RecursiveAction:用于递归执行但不需要返回结果的任务。
RecursiveTask :用于递归执行需要返回结果的任务。
CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数

ForkJoinTask 最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。

fork()——提交任务
fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。

join()——获取任务执行结果
join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。

学习总结

  1. 学习了ForkJoinPool,它的原理,以及工作队列设计
  2. 学习了ForkJoinPool与ThreadPoolExecutor的区别

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/50690.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WEB:web2

背景知识 代码审计 题目 由上述可知&#xff0c;这段代码定义了一个函数encode&#xff0c;接受一个字符串参数$str&#xff0c;并返回对其进行加密后的结果 加密算法包括&#xff1a; 使用strrev函数将字符串进行翻转&#xff1b;对翻转后的每个字符&#xff0c;将其ASCII值…

helm部署rabbitmq

1.添加rabbitmq仓库并下载包 helm repo add bitnami https://charts.bitnami.com/bitnami helm pull bitnami/rabbitmq --version 10.1.4 tar -zxvf rabbitmq-10.1.4.tgz mv values.yaml values.yaml.back grep -v "#" values.yaml.back > values.yaml2.helm部署…

xxl-Job分布式任务调度

1.概述 1.1 什么是任务调度 我们可以先思考一下业务场景的解决方案&#xff1a; 某电商系统需要在每天上午10点&#xff0c;下午3点&#xff0c;晚上8点发放一批优惠券。某银行系统需要在信用卡到期还款日的前三天进行短信提醒。某财务系统需要在每天凌晨0:10结算前一天的财…

系统架构设计师-软件架构设计(3)

目录 一、软件架构风格&#xff08;其它分类&#xff09; 1、闭环控制结构&#xff08;过程控制&#xff09; 2、C2风格 3、MDA&#xff08;模型驱动架构 Model Driven Architecture&#xff09; 4、特定领域软件架构&#xff08;DSSA&#xff09; 4.1 DSSA基本活动及产出物…

MySQL之深入InnoDB存储引擎——Checkpoint机制

文章目录 一、引入二、LSN三、触发时机 一、引入 由于页的操作首先都是在缓冲池中完成的&#xff0c;那么如果一条DML语句改变了页中的记录&#xff0c;那么此时页就是脏的&#xff0c;即缓冲池中页的版本要比磁盘的新。那么数据库需要将新版本的页刷新到磁盘。倘若每次一个页…

Unity源码分享-黄金矿工游戏完整版

Unity源码分享-黄金矿工游戏完整版 项目地址&#xff1a;https://download.csdn.net/download/Highning0007/88118933

Raki的读paper小记:RWKV: Reinventing RNNs for the Transformer Era

Abstract&Introduction&Related Work 研究任务 基础模型架构已有方法和相关工作 RNN&#xff0c;CNN&#xff0c;Transformer稀疏注意力&#xff08;Beltagy等人&#xff0c;2020年&#xff1b;Kitaev等人&#xff0c;2020年&#xff1b;Guo等人&#xff0c;2022年&am…

arm 函数栈回溯

大概意思就是arm每个函数开始都会将PC、LR、SP以及FP四个寄存器入栈。 下面我们看一下这四个寄存器里面保存的是什么内存 arm-linux-gnueabi-gcc unwind.c -mapcs -w -g -o unwind&#xff08;需要加上-mapcs才会严格按照上面说的入栈&#xff09; #include <stdio.h> …

Flutter 开发者工具 Android Studio 开发Flutter应用

Flutter 开发者工具 在 Android Studio 开发Flutter应用 &#x1f525; Android Studio 版本更新 &#x1f525; Android Studio Check for Update Connection failed ​ 解决方案 如果是运行的是32位的android studio需要在andriod studio的启动目录下找到studio.exe.vmoptio…

Flutter-基础Widget

Flutter页面-基础Widget 文章目录 Flutter页面-基础WidgetWidgetStateless WidgetStateful WidgetState生命周期 基础widget文本显示TextRichTextDefaultTextStyle 图片显示FlutterLogoIconImageIamge.assetImage.fileImage.networkImage.memory CircleAvatarFadeInImage 按钮R…

抖音账号矩阵系统开发源码

一、技术自研框架开发背景&#xff1a; 抖音账号矩阵系统是一种基于数据分析和管理的全新平台&#xff0c;能够帮助用户更好地管理、扩展和营销抖音账号。 部分源码分享&#xff1a; ic function indexAction() { //面包屑 $breadcrumbs [ [tit…

【雕爷学编程】MicroPython动手做(13)——掌控板之RGB三色灯2

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

Java另一种debug方法(not remote jmv debug),类似python远程debug方式

这种Debug类似python的debug方式&#xff0c;是运行时将业务代码及依赖推送到Linux并使用Linux的java运行运行程。只要本地能运行&#xff0c;就能自动将代码推送到Linux运行&#xff0c;不需打包及设置远程debug jvm参数&#xff0c;适合一些项目Debug调试 运行时会推送一些依…

67. 二进制求和

题目链接&#xff1a;力扣 解题思路&#xff1a;模拟十进制中的列竖式方法进行计算&#xff0c;逢二进一&#xff0c;因为高位在前&#xff0c;低位在后&#xff0c;两个二进制长度不一定相等&#xff0c;可以取两者长度的较大值&#xff0c;从后面开始遍历两个字符串&#xff…

【算法基础:动态规划】5.3 计数类DP(整数拆分、分拆数)

文章目录 例题&#xff1a;900. 整数划分解法1——完全背包解法2——分拆数⭐⭐⭐ 例题&#xff1a;900. 整数划分 https://www.acwing.com/problem/content/902/ 解法1——完全背包 容量是 n&#xff0c;物品的大小和价值是 1 ~ n 中的所有数字。 import java.util.*;pub…

Echarts 文字太长用省略号代替

xAxis: [{type: category,data: [materialUserEchartsDate.value[0] ? materialUserEchartsDate.value[0].name : ,materialUserEchartsDate.value[1] ? materialUserEchartsDate.value[1].name : ,materialUserEchartsDate.value[2] ? materialUserEchartsDate.value[2].na…

RabbitMQ 集群部署

RabbiMQ 是用 Erlang 开发的,集群非常方便,因为 Erlang 天生就是一门分布式语言,但其本身并不支持负载均衡。 RabbitMQ 的集群节点包括内存节点、磁盘节点。RabbitMQ 支持消息的持久化,也就是数据写在磁盘上,最合适的方案就是既有内存节点,又有磁盘节点。 RabbitMQ 模式大…

Kibana+Prometheus+node_exporter 监控告警部署

下载好三个软件包 一、prometheus安装部署 1、解压 linxxubuntu:~/module$ tar -xvf prometheus-2.45.0-rc.0.linux-amd64.tar.gz 2、修改配置文件的IP地址 # my global config global:scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is ever…

Eclipse memory analyzer 分析GC dump日志定位代码问题

1、问题描述&#xff1a; 使用命令 jstat -gcutil [pid] 查看JVM GC日志&#xff0c;发现生产系统频繁FullGC&#xff0c;大概几分钟一次&#xff0c;而且系统响应速度变慢很多 再使用 free -g 查看服务器内存全部占用&#xff0c;猜测是内存溢出了 2、导出dump日志 jmap -du…

修改整数(有点坑,所以发出来了)

问题描述 小贝给了小聪一个正整数 x&#xff0c;但是小聪决定把这个数改掉。她可以把整数 x 每个位置上的数 t 改成 9-t。 请你帮助小聪来计算一下&#xff0c;如何把 x 改成一个最小的正整数&#xff0c;注意&#xff0c;不能出现首位为 0 的情况。 输入格式 输入一个正整数…