并发编程系列-分而治之思想Forkjoin

我们介绍过一些有关并发编程的工具和概念,包括线程池、Future、CompletableFuture和CompletionService。如果仔细观察,你会发现这些工具实际上是帮助我们从任务的角度来解决并发问题的,而不是让我们陷入线程之间如何协作的繁琐细节(比如等待和通知等)。

对于简单的并行任务,你可以使用“线程池+Future”的方式来处理。而对于任务之间存在聚合关系的情况,无论是AND聚合还是OR聚合,你都可以利用CompletableFuture来解决。至于批量的并行任务,则可以借助CompletionService来实现。

我们一直强调,并发编程可以从三个层面来思考,分别是分工、协作和互斥。当你关注于任务本身时,你会发现自己的思维模式已经超越了并发编程的技术细节,更加贴近了现实世界中的工作方式。因此,我将线程池、Future、CompletableFuture和CompletionService都归类到了“分工”这个层面。

下面我将通过现实世界里的工作流程图来描述并发编程领域中的简单并行任务、聚合任务和批量并行任务。相信通过这些图示,你能够更好地将自己的思维模式与现实世界联系起来。

alt

在前面提到的简单并行任务、聚合任务和批量并行任务模型之外,还有一种任务模型被称为“分治”。如字面意义所示,分治是一种解决复杂问题的思维方法和模式;具体而言,它将一个复杂的问题分解成多个相似的子问题,然后再将这些子问题进一步分解成更小的子问题,直到每个子问题变得足够简单从而可以直接求解。

从理论上讲,每个问题都对应着一个任务,因此分治实际上就是对任务的划分和组织。分治思想在许多领域都有广泛的应用。例如,在算法领域,我们经常使用分治算法来解决问题(如归并排序和快速排序都属于分治算法,二分查找也是一种分治算法)。在大数据领域,MapReduce计算框架背后的思想也是基于分治。

由于分治这种任务模型的普遍性,Java并发包提供了一种名为Fork/Join的并行计算框架,专门用于支持分治任务模型的应用。

分治任务模型

这里你需要先深入了解一下分治任务模型,分治任务模型可分为两个阶段:一个阶段是 任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是 结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。

alt

简版分治任务模型图

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。

Fork/Join的使用

Fork/Join是一个并行计算框架,主要用于支持分治任务模型。在这个计算框架中,Fork代表任务的分解,而Join代表结果的合并。Fork/Join计算框架主要由两部分组成:分治任务的线程池ForkJoinPool和分治任务ForkJoinTask。这两部分的关系类似于ThreadPoolExecutor和Runnable之间的关系,都是用于提交任务到线程池的,只不过分治任务有自己独特的类型ForkJoinTask。

ForkJoinTask是一个抽象类,其中有许多方法,其中最核心的是fork()方法和join()方法。fork()方法用于异步执行一个子任务,而join()方法通过阻塞当前线程来等待子任务的执行结果。ForkJoinTask有两个子类:RecursiveAction和RecursiveTask。从它们的名字就可以看出,它们都使用递归的方式处理分治任务。这两个子类都定义了一个抽象方法compute(),不同之处在于RecursiveAction的compute()方法没有返回值,而RecursiveTask的compute()方法有返回值。这两个子类也都是抽象类,在使用时需要创建自定义的子类来扩展功能。

接下来,让我们来实现一下如何使用Fork/Join并行计算框架来计算斐波那契数列(下面的代码示例源自Java官方示例)。首先,我们需要创建一个ForkJoinPool线程池以及一个用于计算斐波那契数列的Fibonacci分治任务。然后,通过调用ForkJoinPool线程池的invoke()方法来启动分治任务。由于计算斐波那契数列需要返回结果,所以我们的Fibonacci类继承自RecursiveTask。Fibonacci分治任务需要实现compute()方法,在这个方法中,逻辑与普通计算斐波那契数列的方法非常相似,只是在计算Fibonacci(n - 1)时使用了异步子任务,这通过f1.fork()语句来实现。

static void main(String[] args){
  //创建分治任务线程池
  ForkJoinPool fjp =
    new ForkJoinPool(4);
  //创建分治任务
  Fibonacci fib =
    new Fibonacci(30);
  //启动分治任务
  Integer result =
    fjp.invoke(fib);
  //输出结果
  System.out.println(result);
}
//递归任务
static class Fibonacci extends
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 =
      new Fibonacci(n - 1);
    //创建子任务
    f1.fork();
    Fibonacci f2 =
      new Fibonacci(n - 2);
    //等待子任务结果,并合并结果
    return f2.compute() + f1.join();
  }
}

ForkJoinPool工作原理

Fork/Join并行计算的核心组件是ForkJoinPool。下面简单介绍一下ForkJoinPool的工作原理。

与ThreadPoolExecutor类似,ForkJoinPool现的。不同之处在部有多个任务队列,用于生产者和消费者之间的通信。当我们通过ForkJoinPool的invoke()或submit()方法提交任务时,ForkJoinPool根据一定的路由规则将任务分配到一个任务队列中。如果任务执行过程中创建了子任务,那么子任务会被提交到对应工作线程的任务队列中。

当工作线程的任务队列为空时,它是否无事可做呢?不是的。ForkJoinPool引入了一种称为"任务窃取"的机制。当工作线程空闲时,它可以从其他工作线程的任务队列中"窃取"任务。例如,在下图中线程T2的任务队列已经为空,它可以窃取线程T1的任务队列中的任务。这样,所有的工作线程都能保持忙碌状态。

ForkJoinPool中的任务队列采用双端队列的形式。工作线程从任务队列的一个端获取任务,而"窃取任务"则从另一端进行消费。这种设计能够避免许多不必要的数据竞争。我们介绍的是ForkJoinPool的简化原理,实际上它的实现比我们介绍的要复杂得多。如果你对此感兴趣,建议阅读其源码。

alt

ForkJoinPool工作原理图

模拟MapReduce统计单词数量

Fork/Join并行计算框架被用来实现学习MapReduce的入门程序,该程序用于统计文件中每个单词的数量。以下是如何使用Fork/Join并行计算框架实现此功能。

首先,我们可以使用二分法递归地将文件拆分为更小的部分,直到每个部分只有一行数据。然后,在每个部分中统计单词的数量,并逐级汇总结果。你可以参考之前提到的简化版分治任务模型图以理解该过程。

现在,让我们开始实现。下面的示例程序使用字符串数组String[] fc来模拟文件内容,其中每个元素与文件中的行数据一一对应。关键代码位于compute()方法中,这是一个递归方法。它将前半部分数据fork一个递归任务进行处理(关键代码:mr1.fork()),而后半部分数据在当前任务中递归处理(mr2.compute())。

import java.util.concurrent.RecursiveTask;

public class WordCountTask extends RecursiveTask<Integer{
    private final String[] fc;
    private final int start, end;
    
    public WordCountTask(String[] fc, int start, int end) {
        this.fc = fc;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        if (end - start <= 1) {
            // 对单行数据进行统计
            return countWords(fc[start]);
        } else {
            int mid = (start + end) / 2;
            WordCountTask mr1 = new WordCountTask(fc, start, mid);
            mr1.fork();
            WordCountTask mr2 = new WordCountTask(fc, mid, end);
            int result2 = mr2.compute();
            int result1 = mr1.join();
            // 汇总结果
            return result1 + result2;
        }
    }
    
    private int countWords(String line) {
        String[] words = line.split(" ");
        return words.length;
    }
}

这个示例程序是对Fork/Join模型的简化,实际上在真正的MapReduce框架中,还涉及到数据划分、映射阶段、归约阶段等更多的步骤。但是通过此示例,你可以初步了解如何使用Fork/Join并行计算框架来处理类似的任务。

总结

Fork/Join并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的MapReduce,所以你可以把Fork/Join看作单机版的MapReduce。

Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8提供的Stream API里面并行流也是以ForkJoinPool为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个ForkJoinPool,这个共享的ForkJoinPool默认的线程数是CPU的核数;如果所有的并行流计算都是CPU密集型计算的话,完全没有问题,但是如果存在I/O密集型的并行流计算,那么很可能会因为一个很慢的I/O计算而拖慢整个系统的性能。所以 建议用不同的ForkJoinPool执行不同类型的计算任务

如果你对ForkJoinPool详细的实现细节感兴趣,也可以参考 Doug Lea的论文。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/88030.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一网打尽java注解-克隆-面向对象设计原则-设计模式

文章目录 注解内置注解元注解 对象克隆为什么要克隆&#xff1f;如何克隆浅克隆深克隆 Java设计模式什么是设计模式&#xff1f;为什么要学习设计模式&#xff1f; 建模语言类接口类之间的关系依赖关系关联关系聚合关系组合关系继承关系实现关系 面向对象设计原则单一职责开闭原…

仓库使用综合练习

目录 1、使用mysql:5.6和 owncloud 镜像&#xff0c;构建一个个人网盘。 2、安装搭建私有仓库 Harbor 3、编写Dockerfile制作Web应用系统nginx镜像&#xff0c;生成镜像nginx:v1.1&#xff0c;并推送其到私有仓库。 4、Dockerfile快速搭建自己专属的LAMP环境&#xff0c;生…

跨平台图表:ChartDirector for .NET 7.1 Crack

什么是新的 ChartDirector for .NET 7.0 支持跨平台使用&#xff0c;但仅限于 .NET 6。这是因为在 .NET 7 中&#xff0c;Microsoft 停止了用于非 Windows 使用的 .NET 图形库 System.Drawing.Common。由于 ChartDirector for .NET 7.0 依赖于该库&#xff0c;因此它不再支持 .…

小白到运维工程师自学之路 第七十九集 (基于Jenkins自动打包并部署Tomcat环境)1

一、传统的流程 1、传统网站部署的流程 在运维过程中&#xff0c;网站部署是运维的工作之一。传统的网站部署的流程大致分为:需求分 析-->原型设计-->开发代码-->提交代码-->内网部署-->内网测试-->确认上线-->备份数据-->外网更新-->外网测试--&g…

【ES6】—数组的扩展

一、类数组/ 伪数组 1. 类/伪数组: 并不是真正意义的数组&#xff0c;有长度的属性&#xff0c;但无法使用Array原型上的方法 let divs document.getElementsByTagName(div) console.log(divs) // HTMLCollection []let divs2 document.getElementsByClassName("xxx&q…

《操作系统真象还原》学习笔记:第七章 中断

由于 CPU 获知了计算机中发生的某些事&#xff0c;CPU 暂停正在执行的程序&#xff0c;转而去执行处理该事件的程序&#xff0c;当这段程序执行完毕后&#xff0c;CPU 继续执行刚才的程序。整个过程称为中断处理&#xff0c;也称为中断。 把中断按事件来源分类&#xff0c;来自…

食品制造行业云MES系统解决方案

食品饮料行业大致可以分为初级产品加工、二次加工、食品制造、食品分装、调味品和饲料加工等几大类。由于处于产业链不同的位置&#xff0c;其管理存在一定的差异&#xff0c;那么食品行业的MES应该怎么建设呢&#xff1f; 食品饮料行业生产管理特点&#xff1a; 食品饮料行业…

leetcode 122. 买卖股票的最佳时机 II

2023.8.21 和买卖股票的最佳时机相比&#xff0c;本题的股票可以买卖多次了&#xff0c;直接用贪心解决&#xff0c;计算所有涨价的股票相加。代码如下&#xff1a; 贪心&#xff1a; class Solution { public:int maxProfit(vector<int>& prices) {int ans 0;for…

Ubuntu18.04 交叉编译curl-7.61.0

下载 官方网址是&#xff1a;curl 安装依赖库 如果需要curl支持https协议&#xff0c;需要先交叉编译 openssl,编译流程如下&#xff1a; Ubuntu18.04 交叉编译openssl-1.1.1_我是谁&#xff1f;&#xff1f;的博客-CSDN博客 解压 # 解压&#xff1a; $tar -xzvf curl-7.61.…

村口的人家排放污水,污水浸染了整个村子,怎么办

从前有一个很不错的村子里&#xff0c;村子里有很多户人家&#xff0c;随着生活水平越来越好&#xff0c;房子也修起来了&#xff0c;柏油马路也宽敞了&#xff0c;大家进出村子&#xff0c;都要走那条马路&#xff0c;要不就出不去。 目录 1. 修厕所 2. 村口的日家 3. 告诉…

Android 系统桌面 App —— Launcher 开发(1)

Android 系统桌面 App —— Launcher 开发&#xff08;1&#xff09; Launcher简介 Launcher就是Android系统的桌面&#xff0c;俗称“HomeScreen”也就是我们开机后看到的第一个App。launcher其实就是一个app&#xff0c;它的作用是显示和管理手机上其他App。目前市场上有很…

商城的TPS与并发用户数是如何换算的?

商城的TPS与并发用户数的换算关系可以通过以下公式计算&#xff1a; TPS 并发用户数 / 平均事务响应时间 其中&#xff0c;平均事务响应时间是指系统处理一个事务所需的平均时间。 下面是商城性能测试的一些用例示例&#xff1a; 用户登录&#xff1a; 目标&#xff1a;测…

【路由器】小米 WR30U 解锁并刷机

文章目录 解锁 ssh环境准备解锁过程 刷入 mt798x uboot简介刷入流程 刷入 ImmortalWrt简介刷入流程 刷为原厂固件参考资料 本文主要记录个人对小米 WR30U 路由器的解锁和刷机过程&#xff0c;整体步骤与 一般安装流程 类似&#xff0c;但是由于 WR30U 的解锁 ssh 和刷机的过程中…

关于档案馆建设的一些标准性文件说明

第一章 总则 第一条 本条阐明了本标准的编制目的。 中国是一个历史悠久的文明古国&#xff0c;档案事业的发展源远流长。档案是人类活动的真实记录&#xff0c;是人们认识和把握客观规律的重要依据。借助档案&#xff0c;我们能够更好地了解过去、把握现在、预见未来。档案工…

使用easyExcel导入导出Date类型的转换问题

起因&#xff1a;在业务需求上需要将Excel表中的日期导入&#xff0c;存储到数据库中&#xff0c;但是entity中的日期类型使用Date来接收&#xff0c;这样导致时间精确到秒。这时&#xff0c;即使使用DateTimeFormat("yyyy-MM-dd")也无法成功转换&#xff0c;会报如下…

微信开发之一键发布群公告的技术实现

简要描述&#xff1a; 设置群公告 请求URL&#xff1a; http://域名地址/setChatRoomAnnouncement 请求方式&#xff1a; POST 请求头Headers&#xff1a; Content-Type&#xff1a;application/jsonAuthorization&#xff1a;login接口返回 参数&#xff1a; 参数名必…

linux+c+qt杂记

虚拟机网络选择&#xff1b; 桥接模式&#xff1a;设置window宿主机的IP/dns,把虚拟机设置为桥接即可。 切换到终端&#xff1a;我的是 ctrlaltFnF1&#xff1f; 问题解决&#xff1a; Ubuntu系统下载&#xff08;清华大学开源软件镜像站&#xff09;&#xff08;ubuntu-20.…

Java实现一个简单的图书管理系统(内有源码)

简介 哈喽哈喽大家好啊&#xff0c;之前作者也是讲了Java不少的知识点了&#xff0c;为了巩固之前的知识点再为了让我们深入Java面向对象这一基本特性&#xff0c;就让我们完成一个图书管理系统的小项目吧。 项目简介&#xff1a;通过管理员和普通用户的两种操作界面&#xff0…

机器学习之概率论

最近&#xff0c;在了解机器学习相关的数学知识&#xff0c;包括线性代数和概率论的知识&#xff0c;今天&#xff0c;回顾了概率论的知识&#xff0c;贴上几张其他博客的关于概率论的图片&#xff0c;记录学习过程。

视频云存储/安防监控EasyCVR视频汇聚平台如何通过角色权限自行分配功能模块?

视频云存储/安防监控EasyCVR视频汇聚平台基于云边端智能协同&#xff0c;支持海量视频的轻量化接入与汇聚、转码与处理、全网智能分发、视频集中存储等。音视频流媒体视频平台EasyCVR拓展性强&#xff0c;视频能力丰富&#xff0c;具体可实现视频监控直播、视频轮播、视频录像、…