浅谈 parallelStream和Stream 源码及其应用场景

上篇讲述了list.forEach()和list.stream().forEach() 异同点
谈到了并行流的概念,本篇则从源码出发,了解一下其原理。

一、流的初始操作流程

jdk8中 将Collection中加入了转换流的概念。

default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }

在这里插入图片描述
目前看到的两者是一个参数的区别。

//boolean parallel 是否为并行流
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
		//用于检查传入的spliterator是否为空
        Objects.requireNonNull(spliterator);
        //ReferencePipeline.Head 表示流的开始,根据spliterator以及parallel创建对应的流操作链
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
//该构造方法用于初始化Head类的实例
Head(Spliterator<?> source,
             int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }
ReferencePipeline(Spliterator<?> source,
                      int sourceFlags, boolean parallel) {
        //super关键字调用父类的构造方法,完成对父类的初始化工作
        super(source, sourceFlags, parallel);
    }
//按照给定的参数初始化AbstractPipeline类的实例
AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;  
        this.sourceStage = this;  //当前阶段作为源操作
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;  //左移一位取反操作
        this.depth = 0;
        this.parallel = parallel; //当前流水线是否支持并行操作
    }

二、forEach操作

在这里插入图片描述

@Override
        public void forEach(Consumer<? super E_OUT> action) {
            if (!isParallel()) {
            	//串行流执行
                sourceStageSpliterator().forEachRemaining(action);
            }
            else {
            	//并行流执行
                super.forEach(action);
            }
        }

在这里插入图片描述

1)串行流

demo debug add操作,看为何会报错?

public static void main(String[] args) {
        List<String> list= new ArrayList<>();
        list.add("Sunday");
        list.add("Monday");
        list.add("Tuesday");
        list.add("Wednesday");
        list.add("Thursday");
        list.add("Friday");
        list.add("Saturday");
        list.stream().forEach(d->{
            System.out.println("value="+d);
            if (d.equals("Thursday")){
                list.add(d);
            }
        });
    }
//如果此管道截断是源阶段,则获取源阶段拆分器。调用此方法并成功返回后,将消耗管道
final Spliterator<E_OUT> sourceStageSpliterator() {
        if (this != sourceStage)
            throw new IllegalStateException();

        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        if (sourceStage.sourceSpliterator != null) {
            @SuppressWarnings("unchecked")
            Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
            return s;
        }
        else if (sourceStage.sourceSupplier != null) {
            @SuppressWarnings("unchecked")
            Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
            return s;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }
    }

forEachRemaining实现方法
在这里插入图片描述
在这里插入图片描述
return未走,直接走了异常返回 throw new ConcurrentModificationException();

public void forEachRemaining(Consumer<? super E> action) {
            int i, hi, mc; // hoist accesses and checks from loop
            ArrayList<E> lst; Object[] a;
            if (action == null)
                throw new NullPointerException();
            if ((lst = list) != null && (a = lst.elementData) != null) {
                if ((hi = fence) < 0) {
                    mc = lst.modCount;
                    hi = lst.size;
                }
                else
                    mc = expectedModCount;
                //i表示开始迭代的位置
                //i=index index表示上次迭代的位置,将上次迭代器正在迭代的位置复制给i
                //(i=index)>=0 保证当前迭代的下标大于等于0
                //表示最大迭代到hi,设置最大的hi=a.length
                //(index = hi) <= a.length保证数组不跨界
                if ((i = index) >= 0 && (index = hi) <= a.length) {
                    for (; i < hi; ++i) {
                        @SuppressWarnings("unchecked") E e = (E) a[i];
                        //执行具体的迭代
                        action.accept(e);
                    }
                    if (lst.modCount == mc)
                        return;
                }
            }
            throw new ConcurrentModificationException();
        }

2)并行流

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

@Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }
//构造一个 {@code TerminalOp},用于对流的每个元素执行操作。
//action,接收流所有元素的 {@code Consumer} 
//ordered,是否请求有序遍历,因为是并行流,所以ordered未false
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                  boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }
//使用终端操作评估管道以产生结果。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

三、核心原理-ForkJoinPool

其核心原理则-ForkJoinPool

1)Diagrams

在这里插入图片描述

2)compute()

ForkJoin进行计算任务时,计算类是要继承ForkJoinTask并且重写compute方法的。
我们看一下ForkJoinTask内部类是如何重写compute()方法的。

//类似于 AbstractTask,但不需要跟踪子任务
public void compute() {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            //先调用当前splititerator 方法的estimateSize 方法,预估这个分片中的数据量
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while (!isShortCircuit || !taskSink.cancellationRequested()) {
                if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                //根据预估的数据量获取最小处理单元的大小阈值,即当数据量已经小于这个阈值的时候进行计算,否则进行fork.fork方法中调用了ForkJoinPool线程池并行计算
                taskToFork.fork();
                //将任务划分成更小的数据块,进行求解
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null;
            task.propagateCompletion();
        }

重写的compute()方法,当进行fork方法时,实际就是调用了ForkJoinPool线程池进行计算了,那么线程池本身是无顺序的,谁先计算完谁展示。

3)ForkJoinPool核心算法

在这里插入图片描述

“工作窃取”(work-stealing)算法

ForkJoinPool的基本原理是基于“工作窃取”(work-stealing)算法。它维护着一个工作队列(WorkQueue)的数组,每个工作队列对应一个工作线程(WorkerThread)。当一个线程需要执行一个任务时,它会将任务添加到自己的工作队列中。当一个线程的工作队列为空时,它会从其他线程的工作队列中“窃取”一个任务来执行。这个“窃取”操作可以在不同的线程间实现任务的负载均衡。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

“分治法”(Divide-and-Conquer Algorithm)

分治法-典型的应用比如快速排序算法。使用分治法来实现任务的并行执行。分治法是一种将大问题划分成小问题,并通过递归地解决小问题来解决大问题的方法。

四、结果无顺序

在这里插入图片描述

1)若想并行且有顺序,用.forEachOrdered替代

在这里插入图片描述

2).forEachOrdered是如何保证有序的?

private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
            Spliterator<S> rightSplit = task.spliterator, leftSplit;
            long sizeThreshold = task.targetSize;
            boolean forkRight = false;
            while (rightSplit.estimateSize() > sizeThreshold &&
                   (leftSplit = rightSplit.trySplit()) != null) {
                ForEachOrderedTask<S, T> leftChild =
                    new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
                ForEachOrderedTask<S, T> rightChild =
                    new ForEachOrderedTask<>(task, rightSplit, leftChild);

                // 分叉父任务 完成左右子项 “发生在”父项完成之前
                task.addToPendingCount(1);
                // 完成左边的孩子“发生在”完成右边的孩子之前
                rightChild.addToPendingCount(1);
                task.completionMap.put(leftChild, rightChild);

                // If task is not on the left spine
                if (task.leftPredecessor != null) {
                    /*
                     * Completion of left-predecessor, or left subtree,
                     * "happens-before" completion of left-most leaf node of
                     * right subtree.
                     * The left child's pending count needs to be updated before
                     * it is associated in the completion map, otherwise the
                     * left child can complete prematurely and violate the
                     * "happens-before" constraint.
                     */
                    leftChild.addToPendingCount(1);
                    // Update association of left-predecessor to left-most
                    // leaf node of right subtree
                    if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
                        // If replaced, adjust the pending count of the parent
                        // to complete when its children complete
                        task.addToPendingCount(-1);
                    } else {
                        // Left-predecessor has already completed, parent's
                        // pending count is adjusted by left-predecessor;
                        // left child is ready to complete
                        leftChild.addToPendingCount(-1);
                    }
                }

                ForEachOrderedTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    task = leftChild;
                    taskToFork = rightChild;
                }
                else {
                    forkRight = true;
                    task = rightChild;
                    taskToFork = leftChild;
                }
                taskToFork.fork();
            }

            /*
             * Task's pending count is either 0 or 1.  If 1 then the completion
             * map will contain a value that is task, and two calls to
             * tryComplete are required for completion, one below and one
             * triggered by the completion of task's left-predecessor in
             * onCompletion.  Therefore there is no data race within the if
             * block.
             */
            if (task.getPendingCount() > 0) {
                // Cannot complete just yet so buffer elements into a Node
                // for use when completion occurs
                @SuppressWarnings("unchecked")
                IntFunction<T[]> generator = size -> (T[]) new Object[size];
                Node.Builder<T> nb = task.helper.makeNodeBuilder(
                        task.helper.exactOutputSizeIfKnown(rightSplit),
                        generator);
                task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
                task.spliterator = null;
            }
            task.tryComplete();
        }

看代码会大概得理解,此采用了“happens-before”原则,左子树需右子树之前完成,通过计数策略保证前后顺序的完成,继而保证了其有序性。最终执行依旧是ForkJoinPool线程池执行。
在这里插入图片描述

五、应用场景

1)并行的前提是需要硬件支持

前提是硬件支持, 如果单核 CPU, 只会存在并发处理, 而不会并行

2)demo 测试性能(本机测试)

测试配置:16 GB 2667 MHz DDR4

@Test
    public void dateTest() {
        System.out.println("数据汇总开始");
        Long startTime = System.currentTimeMillis();
        int count1 = adminTaskReceiveService.receiveCount();
        int count2 = adminTaskReceiveService.inspectCount();
        int count3=adminTaskReceiveService.constructCount();
        int count4=adminTaskReceiveService.appointmentCount();
        TestResult testResult = new TestResult();
        testResult.setReceiveCount(count1);
        testResult.setInspectCount(count2);
        testResult.setConstructCount(count3);
        testResult.setAppointmentCount(count4);

        int count11 = adminTaskReceiveService.receiveCount1();
        int count22 = adminTaskReceiveService.inspectCount1();
        int count33=adminTaskReceiveService.constructCount1();
        int count44=adminTaskReceiveService.appointmentCount1();
        testResult.setReceiveCount1(count11);
        testResult.setInspectCount1(count22);
        testResult.setConstructCount1(count33);
        testResult.setAppointmentCount1(count44);

        System.out.println("数据汇总结束,result=" + testResult);
        Long endTime = System.currentTimeMillis();
        System.out.println("time=" + (endTime - startTime) + "毫秒");
    }

    @Test
    public void dateTest1() {
        System.out.println("数据汇总开始");
        Long startTime = System.currentTimeMillis();
        TestResult testResult = new TestResult();
        List<Runnable> taskList = new ArrayList<Runnable>() {
            {
                add(() -> testResult.setReceiveCount(adminTaskReceiveService.receiveCount()));
                add(() -> testResult.setInspectCount(adminTaskReceiveService.inspectCount()));
                add(() -> testResult.setConstructCount(adminTaskReceiveService.constructCount()));
                add(() -> testResult.setAppointmentCount(adminTaskReceiveService.appointmentCount()));

                add(() -> testResult.setReceiveCount1(adminTaskReceiveService.receiveCount1()));
                add(() -> testResult.setInspectCount1(adminTaskReceiveService.inspectCount1()));
                add(() -> testResult.setConstructCount1(adminTaskReceiveService.constructCount1()));
                add(() -> testResult.setAppointmentCount1(adminTaskReceiveService.appointmentCount1()));
            }
        };
        taskList.parallelStream().forEach(Runnable::run);
        System.out.println("数据汇总结束,result=" + testResult);
        Long endTime = System.currentTimeMillis();
        System.out.println("time=" + (endTime - startTime) + "毫秒");
    }

一个单线程,一个并行,看结果:测试demo我是需要统计8个数量,由结果可见性能并没什么大区别。
由结果可知:并行处理并不总是能提高性能,特别是当任务规模较小或者任务之间依赖性较强时。此外,在使用并行流时,应该避免使用会修改原始集合的操作,因为这些操作可能会导致不可预测的结果。由于’foreach`操作是终端操作,它会阻塞主线程直到所有元素都被处理完毕,因此即使操作是并行的,它们仍然是按照顺序完成的。
在这里插入图片描述
在这里插入图片描述

3)最后总结

在数据量比较大的情况下,CPU负载本身不是很高,不要求顺序执行的时候,可以使用并行流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/661577.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Verilog HDL基础知识(一)

引言&#xff1a;本文我们介绍Verilog HDL的基础知识&#xff0c;重点对Verilog HDL的基本语法及其应用要点进行介绍。 1. Verilog HDL概述 什么是Verilog&#xff1f;Verilog是IEEE标准的硬件描述语言&#xff0c;一种基于文本的语言&#xff0c;用于描述最终将在硬件中实现…

2024 angstromCTF re 部分wp

Guess the Flag 附件拖入ida 比较简单&#xff0c;就一个异或 switcher 附件拖入ida 明文flag Polyomino 附件拖入ida 需要输入九个数&#xff0c;然后进入处理和判断&#xff0c;如果满足条件则进入输出flag部分&#xff0c;flag和输入有关&#xff0c;所以要理解需要满足什么…

202474读书笔记|《我自我的田渠归来》——愿你拥有向上的力量,一切的好事都应该有权利发生

202474读书笔记|《我自我的田渠归来》——愿你拥有向上的力量 《我自我的田渠归来》作者张晓风&#xff0c;被称为华语散文温柔的一支笔&#xff0c;她的短文很有味道&#xff0c;角度奇特&#xff0c;温柔慈悲而敏锐。 很幸运遇到了这本书&#xff0c;以她的感受重新认识一些事…

ChatGPT的基本原理是什么?又该如何提高其准确性?

在深入探索如何提升ChatGPT的准确性之前&#xff0c;让我们先来了解一下它的工作原理吧。ChatGPT是一种基于深度学习的自然语言生成模型&#xff0c;它通过预训练和微调两个关键步骤来学习和理解自然语言。 在预训练阶段&#xff0c;ChatGPT会接触到大规模的文本数据集&#x…

转发和重定向

目录 是什么 转发&#xff08;Forwarding&#xff09; 概念 特点 实现方式 重定向&#xff08;Redirecting&#xff09; 概念 特点 实现方式 转发和重定向区别整理 转发和重定向的适用场景 转发&#xff08;Forwarding&#xff09; 重定向&#xff08;Redirect&am…

反转!Greenplum 还在,快去 Fork 源码

↑ 关注“少安事务所”公众号&#xff0c;欢迎⭐收藏&#xff0c;不错过精彩内容~ 今早被一条消息刷爆群聊&#xff0c;看到知名开源数仓 Greenplum 的源码仓“删库跑路”了。 要知道 GP 新东家 Broadcom 前几日才刚刚免费开放了 VMware Workstation PRO 17 和 VMware Fusion P…

【本地运行chatgpt-web】启动前端项目和service服务端项目,也是使用nodejs进行开发的。两个都运行成功才可以使用!

1&#xff0c;启动web界面 https://github.com/Chanzhaoyu/chatgpt-web#node https://nodejs.org/en/download/package-manager # 使用nvm 安装最新的 20 版本。 curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bash source /root/.bashrc n…

stm32学习-CubeIDE使用技巧

1.hex文件生成 右键工程 2.仿真调试 3.常用快捷键 作用快捷键代码提示alt/代码注释/反注释ctrl/ 4.项目复制 复制项目&#xff0c;将ioc文件名改为项目名即可图形化编辑

开源一个工厂常用的LIMS系统

Senaite是一款强大且可靠的基于Web的LIMS/LIS系统&#xff0c;采用Python编写&#xff0c;构建在Plone CMS基础架构之上。该系统处于积极开发阶段&#xff0c;在灵活的定制空间中为开发人员提供了丰富的功能。其中&#xff0c;Senaite在处理REST的JSON API上做得出色&#xff0…

【Javascript系列】Terser除了压缩代码之外,还有优化代码的功能

什么是Terser 前端开发的小伙伴一定不陌生&#xff0c;经常用这个工具进行代码压缩。有一种说法是Uglify-es的替代品。作为Javascript的解析器和压缩器&#xff0c;已经得到了开发人员的广泛使用。 可以优化代码 今天一个偶然的机会&#xff0c;在写一个删除功能的确认框&am…

系统架构设计师【第3章】: 信息系统基础知识 (核心总结)

文章目录 3.1 信息系统概述3.1.1 信息系统的定义3.1.2 信息系统的发展3.1.3 信息系统的分类3.1.4 信息系统的生命周期3.1.5 信息系统建设原则3.1.6 信息系统开发方法 3.2 业务处理系统&#xff08;TPS&#xff09;3.2.1 业务处理系统的概念3.2.2 业务处理系统的功能 …

Element-UI 入门指南:从安装到自定义主题的详细教程

Element-UI 是一个基于 Vue.js 的前端组件库&#xff0c;它提供了丰富的 UI 组件&#xff0c;可以帮助开发者快速构建高质量的用户界面。以下是使用 Element-UI 的快速入门指南&#xff1a; 安装 Element-UI Element-UI 是一个基于 Vue.js 的组件库&#xff0c;它提供了丰富的…

【Python】解决Python报错:TypeError: %d format: a number is required, not str

&#x1f9d1; 博主简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

DETR整体模型结构解析

DETR流程 Backbone用卷积神经网络抽特征。最后通过一层1*1卷积转化到d_model维度fm&#xff08;B,d_model,HW&#xff09;。 position embedding建立跟fm维度相同的位置编码(B&#xff0c;d_model,HW&#xff09;。 Transformer Encoder,V为fm&#xff0c;K&#xff0c;Q为fm…

创新案例 | 持续增长,好孩子集团的全球化品牌矩阵战略与客户中心设计哲学

探索好孩子集团如何通过创新设计的全球化品牌矩阵和以客户为中心的产品策略&#xff0c;在竞争激烈的母婴市场中实现持续增长。深入了解其品牌价值观、市场定位策略以及如何满足新一代父母的需求。本文旨在为中高级职场人士、创业家及创新精英提供深度见解&#xff0c;帮助他们…

redis数据类型之Hash,Bitmaps

华子目录 Hash结构图相关命令hexists key fieldhmset key field1 value1 [field2 value2...]hscan key cursor [MATCH pattern] [COUNT count] Bitmaps位图相关命令setbit1. **命令描述**2. **语法**3. **参数限制**4. **内存分配与性能**5. **应用实例**6. **其他相关命令**7.…

筛选的艺术:数组元素的精确提取

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、筛选的基本概念 二、筛选的实际应用案例 1. 筛选能被三整除的元素 2. 筛选小于特定值…

pytorch项目实战-分类模型李宏毅 21 机器学习第二次作业代码详解( 局部最小值 local minima, 鞍点saddle point)

局部最小值 local minima, 鞍点saddle point 前言一、鞍点和局部最小值1.1 判断鞍点和局部最小值1.2 参数更新 二、作业代码讲解2.1 问题描述2.2 代码部分2.2.1 学号验证2.2.2 安装外部库2.2.3 导入外部库2.2.4 创建模型2.2.5 装载预训练的数据和检查点2.2.6 计算 Hess 阵2.2.6…

每日复盘-20240529

20240529 六日涨幅最大: ------1--------300956--------- 英力股份 五日涨幅最大: ------1--------301361--------- 众智科技 四日涨幅最大: ------1--------301361--------- 众智科技 三日涨幅最大: ------1--------300637--------- 扬帆新材 二日涨幅最大: ------1--------30…

python深入探索斐波那契数列:代码示例与不满足的外围条件

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、斐波那契数列的初步实现 二、外围条件的不满足情况 总结 一、斐波那契数列的初步实现 …