Java8 Stream详解

Stream类继承关系

在这里插入图片描述

前置知识

Spliterator接口使用

Spliterator是在java 8引入的一个接口,它通常和stream一起使用,用来遍历和分割序列。

只要用到stream的地方都需要Spliterator,比如List,Collection,IO channel等等。

我们先看一下Collection中stream方法的定义:

default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }
default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }

我们可以看到,不管是并行stream还是非并行stream,都是通过StreamSupport来构造的,并且都需要传入一个spliterator的参数。

好了,我们知道了spliterator是做什么的之后,看一下它的具体结构:

在这里插入图片描述

spliterator有四个必须实现的方法,我们接下来进行详细的讲解。

tryAdvance
tryAdvance就是对stream中的元素进行处理的方法,如果元素存在,则对它进行处理,并返回true,否则返回false。

如果我们不想处理stream后续的元素,则在tryAdvance中返回false即可,利用这个特征,我们可以中断stream的处理。这个例子我将会在后面的文章中讲到。

trySplit
trySplit尝试对现有的stream进行分拆,一般用在parallelStream的情况,因为在并发stream下,我们需要用多线程去处理stream的不同元素,trySplit就是对stream中元素进行分拆处理的方法。

理想情况下trySplit应该将stream拆分成数目相同的两部分才能最大提升性能。

estimateSize
estimateSize表示Spliterator中待处理的元素,在trySplit之前和之后一般是不同的,后面我们会在具体的例子中说明。

characteristics
characteristics表示这个Spliterator的特征,Spliterator有8大特征:

public static final int ORDERED    = 0x00000010;//表示元素是有序的(每一次遍历结果相同)
public static final int DISTINCT   = 0x00000001;//表示元素不重复
public static final int SORTED     = 0x00000004;//表示元素是按一定规律进行排列(有指定比较器)
public static final int SIZED      = 0x00000040;//
表示大小是固定的
public static final int NONNULL    = 0x00000100;//表示没有null元素
public static final int IMMUTABLE  = 0x00000400;//表示元素不可变
public static final int CONCURRENT = 0x00001000;//表示迭代器可以多线程操作
public static final int SUBSIZED   = 0x00004000;//表示子Spliterators都具有SIZED特性

一个Spliterator可以有多个特征,多个特征进行or运算,最后得到最终的characteristics。

举个例子
上面我们讨论了Spliterator一些关键方法,现在我们举一个具体的例子:

@AllArgsConstructor
@Data
public class CustBook {
    private String name;

}

先定义一个CustBook类,里面放一个name变量。

定义一个方法,来生成一个CustBook的list:

    public static List<CustBook> generateElements() {
        return Stream.generate(() -> new CustBook("cust book"))
                .limit(1000)
                .collect(Collectors.toList());
    }

我们定义一个call方法,在call方法中调用了tryAdvance方法,传入了我们自定义的处理方法。这里我们修改book的name,并附加额外的信息。

    public String call(Spliterator<CustBook> spliterator) {
        int current = 0;
        while (spliterator.tryAdvance(a -> a.setName("test name"
                .concat("- add new name")))) {
            current++;
        }

        return Thread.currentThread().getName() + ":" + current;
    }

最后,写一下测试方法:

    @Test
    public void useTrySplit(){
        Spliterator<CustBook> split1 = SpliteratorUsage.generateElements().spliterator();
        Spliterator<CustBook> split2 = split1.trySplit();

        log.info("before tryAdvance: {}",split1.estimateSize());
        log.info("Characteristics {}",split1.characteristics());
        log.info(call(split1));
        log.info(call(split2));
        log.info("after tryAdvance {}",split1.estimateSize());
    }

运行的结果如下:

23:10:08.852 [main] INFO com.flydean.SpliteratorUsage - before tryAdvance: 500
23:10:08.857 [main] INFO com.flydean.SpliteratorUsage - Characteristics 16464
23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - main:500
23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - main:500
23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - after tryAdvance 0

List总共有1000条数据,调用一次trySplit之后,将List分成了两部分,每部分500条数据。

注意,在tryAdvance调用之后,estimateSize变为0,表示所有的元素都已经被处理完毕。

再看一下这个Characteristics=16464,转换为16进制:Ox4050 = ORDERED or SIZED or SUBSIZED 这三个的或运算。

Optional

创建 Optional 对象

1)可以使用静态方法 empty() 创建一个空的 Optional 对象

Optional<String> empty = Optional.empty();
System.out.println(empty); // 输出:Optional.empty

2)可以使用静态方法 of() 创建一个非空的 Optional 对象

Optional<String> opt = Optional.of("沉默王二");
System.out.println(opt); // 输出:Optional[沉默王二]

当然了,传递给 of() 方法的参数必须是非空的,也就是说不能为 null,否则仍然会抛出 NullPointerException。

String name = null;
Optional<String> optnull = Optional.of(name);

3)可以使用静态方法 ofNullable() 创建一个即可空又可非空的 Optional 对象

String name = null;
Optional<String> optOrNull = Optional.ofNullable(name);
System.out.println(optOrNull); // 输出:Optional.empty

ofNullable() 方法内部有一个三元表达式,如果为参数为 null,则返回私有常量 EMPTY;否则使用 new 关键字创建了一个新的 Optional 对象——不会再抛出 NPE 异常了。

判断值是否存在

可以通过方法 isPresent() 判断一个 Optional 对象是否存在,如果存在,该方法返回 true,否则返回 false——取代了 obj != null 的判断。

Optional<String> opt = Optional.of("沉默王二");
System.out.println(opt.isPresent()); // 输出:true

Optional<String> optOrNull = Optional.ofNullable(null);
System.out.println(opt.isPresent()); // 输出:false

Java 11 后还可以通过方法 isEmpty() 判断与 isPresent() 相反的结果。

Optional<String> opt = Optional.of("沉默王二");
System.out.println(opt.isPresent()); // 输出:false

Optional<String> optOrNull = Optional.ofNullable(null);
System.out.println(opt.isPresent()); // 输出:true

非空表达式

Optional 类有一个非常现代化的方法——ifPresent(),允许我们使用函数式编程的方式执行一些代码,因此,我把它称为非空表达式。如果没有该方法的话,我们通常需要先通过 isPresent() 方法对 Optional 对象进行判空后再执行相应的代码:

Optional<String> optOrNull = Optional.ofNullable(null);
if (optOrNull.isPresent()) {
    System.out.println(optOrNull.get().length());
}

有了 ifPresent() 之后,情况就完全不同了,可以直接将 Lambda 表达式传递给该方法,代码更加简洁,更加直观。

Optional<String> opt = Optional.of("沉默王二");
opt.ifPresent(str -> System.out.println(str.length()));

Java 9 后还可以通过方法 ifPresentOrElse(action, emptyAction) 执行两种结果,非空时执行 action,空时执行 emptyAction。

Optional<String> opt = Optional.of("沉默王二");
opt.ifPresentOrElse(str -> System.out.println(str.length()), () -> System.out.println("为空"));

设置(获取)默认值

有时候,我们在创建(获取) Optional 对象的时候,需要一个默认值,orElse() 和 orElseGet() 方法就派上用场了。

orElse() 方法用于返回包裹在 Optional 对象中的值,如果该值不为 null,则返回;否则返回默认值。该方法的参数类型和值得类型一致。

String nullName = null;
String name = Optional.ofNullable(nullName).orElse("沉默王二");
System.out.println(name); // 输出:沉默王二

orElseGet() 方法与 orElse() 方法类似,但参数类型不同。如果 Optional 对象中的值为 null,则执行参数中的函数。

String nullName = null;
String name = Optional.ofNullable(nullName).orElseGet(()->"沉默王二");
System.out.println(name); // 输出:沉默王二

从输出结果以及代码的形式上来看,这两个方法极其相似,这不免引起我们的怀疑,Java 类库的设计者有必要这样做吗?

假设现在有这样一个获取默认值的方法,很传统的方式。

public static String getDefaultValue() {
    System.out.println("getDefaultValue");
    return "沉默王二";
}

然后,通过 orElse() 方法和 orElseGet() 方法分别调用 getDefaultValue() 方法返回默认值。

public static void main(String[] args) {
    String name = null;
    System.out.println("orElse");
    String name2 = Optional.ofNullable(name).orElse(getDefaultValue());

    System.out.println("orElseGet");
    String name3 = Optional.ofNullable(name).orElseGet(OrElseOptionalDemo::getDefaultValue);
}

注:类名 :: 方法名是 Java 8 引入的语法,方法名后面是没有 () 的,表明该方法并不一定会被调用。

输出结果如下所示:

orElse
getDefaultValue

orElseGet
getDefaultValue

输出结果是相似的,没什么太大的不同,这是在 Optional 对象的值为 null 的情况下。假如 Optional 对象的值不为 null 呢?

public static void main(String[] args) {
    String name = "沉默王三";
    System.out.println("orElse");
    String name2 = Optional.ofNullable(name).orElse(getDefaultValue());

    System.out.println("orElseGet");
    String name3 = Optional.ofNullable(name).orElseGet(OrElseOptionalDemo::getDefaultValue);
}

输出结果如下所示:

orElse
getDefaultValue
orElseGet

咦,orElseGet() 没有去调用 getDefaultValue()。哪个方法的性能更佳,你明白了吧?

获取值

直观从语义上来看,get() 方法才是最正宗的获取 Optional 对象值的方法,但很遗憾,该方法是有缺陷的,因为假如 Optional 对象的值为 null,该方法会抛出 NoSuchElementException 异常。这完全与我们使用 Optional 类的初衷相悖。

public class GetOptionalDemo {
    public static void main(String[] args) {
        String name = null;
        Optional<String> optOrNull = Optional.ofNullable(name);
        System.out.println(optOrNull.get());
    }
}

这段程序在运行时会抛出异常:

Exception in thread "main" java.util.NoSuchElementException: No value present
	at java.base/java.util.Optional.get(Optional.java:141)
	at com.cmower.dzone.optional.GetOptionalDemo.main(GetOptionalDemo.java:9)

尽管抛出的异常是 NoSuchElementException 而不是 NPE,但在我们看来,显然是在“五十步笑百步”。建议 orElseGet() 方法获取 Optional 对象的值。

过滤值

小王通过 Optional 类对之前的代码进行了升级,完成后又兴高采烈地跑去找老马要任务了。老马觉得这小伙子不错,头脑灵活,又干活积极,很值得培养,就又交给了小王一个新的任务:用户注册时对密码的长度进行检查。

小王拿到任务后,乐开了花,因为他刚要学习 Optional 类的 filter() 方法,这就派上了用场。

public class FilterOptionalDemo {
    public static void main(String[] args) {
        String password = "12345";
        Optional<String> opt = Optional.ofNullable(password);
        System.out.println(opt.filter(pwd -> pwd.length() > 6).isPresent());
    }
}

filter() 方法的参数类型为 Predicate(Java 8 新增的一个函数式接口),也就是说可以将一个 Lambda 表达式传递给该方法作为条件,如果表达式的结果为 false,则返回一个 EMPTY 的 Optional 对象,否则返回过滤后的 Optional 对象。

在上例中,由于 password 的长度为 5 ,所以程序输出的结果为 false。假设密码的长度要求在 6 到 10 位之间,那么还可以再追加一个条件。来看小王增加难度后的代码。

Predicate<String> len6 = pwd -> pwd.length() > 6;
Predicate<String> len10 = pwd -> pwd.length() < 10;

password = "1234567";
opt = Optional.ofNullable(password);
boolean result = opt.filter(len6.and(len10)).isPresent();
System.out.println(result);

这次程序输出的结果为 true,因为密码变成了 7 位,在 6 到 10 位之间。想象一下,假如小王使用 if-else 来完成这个任务,代码该有多冗长。

转换值

小王检查完了密码的长度,仍然觉得不够尽兴,觉得要对密码的强度也进行检查,比如说密码不能是“password”,这样的密码太弱了。于是他又开始研究起了 map() 方法,该方法可以按照一定的规则将原有 Optional 对象转换为一个新的 Optional 对象,原有的 Optional 对象不会更改。

先来看小王写的一个简单的例子:

public class OptionalMapDemo {
    public static void main(String[] args) {
        String name = "沉默王二";
        Optional<String> nameOptional = Optional.of(name);
        Optional<Integer> intOpt = nameOptional
                .map(String::length);
        
        System.out.println( intOpt.orElse(0));
    }
}

在上面这个例子中,map() 方法的参数 String::length,意味着要 将原有的字符串类型的 Optional 按照字符串长度重新生成一个新的 Optional 对象,类型为 Integer。

搞清楚了 map() 方法的基本用法后,小王决定把 map() 方法与 filter() 方法结合起来用,前者用于将密码转化为小写,后者用于判断长度以及是否是“password”。

public class OptionalMapFilterDemo {
    public static void main(String[] args) {
        String password = "password";
        Optional<String>  opt = Optional.ofNullable(password);

        Predicate<String> len6 = pwd -> pwd.length() > 6;
        Predicate<String> len10 = pwd -> pwd.length() < 10;
        Predicate<String> eq = pwd -> pwd.equals("password");

        boolean result = opt.map(String::toLowerCase).filter(len6.and(len10 ).and(eq)).isPresent();
        System.out.println(result);
    }
}

Stream流水线解决方案

ReferencePipeline

在这里插入图片描述
ReferencePipeline 是一个结构类,定义内部类组装了各种操作流,定义了Head、StatelessOp、StatefulOp三个内部类,实现了 BaseStream 与 Stream 的接口方法。

Stream流水线解决方案

我们大致能够想到,应该采用某种方式记录用户每一步的操作,当用户调用结束操作时将之前记录的操作叠加到一起在一次迭代中全部执行掉。沿着这个思路,有几个问题需要解决:

  • 用户的操作如何记录?

  • 操作如何叠加?

  • 叠加之后的操作如何执行?

  • 执行后的结果(如果有)在哪里?

操作如何记录

注意这里使用的是“操作(operation)”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是<数据来源,操作,回调函数>构成的三元组。

Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。

在这里插入图片描述
还有IntPipeline, LongPipeline, DoublePipeline没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟ReferencePipeline是并列关系。

图中Head用于表示第一个Stage,即调用调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作;StatelessOp和StatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。

Stream流水线组织结构示意图如下:

在这里插入图片描述
图中通过Collection.stream()方法得到Head也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Stream记录操作的方式。

操作如何叠加

以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head开始依次执行每一步的操作(包括回调函数)就行了。

这听起来似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前Stage本身才知道该如何执行自己包含的动作。这就需要有某种协议来协调相邻Stage之间的调用关系。

这种协议由Sink接口完成,Sink接口包含的方法如下表所示:

方法名作用
void begin(long size)开始遍历元素之前调用该方法,通知Sink做好准备。
void end()所有元素遍历完成之后调用,通知Sink没有更多的元素了。
boolean cancellationRequested()是否可以结束操作,可以让短路操作尽早结束。
void accept(T t)遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。

有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法即可,并不需要知道其内部是如何处理的。

当然对于有状态的操作,Sink的begin()和end()方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.begin()方法可能创建一个盛放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。

对于短路操作,Sink.cancellationRequested()也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。

实际上Stream API内部实现的的本质,就是如何重写Sink的这四个接口方法。

有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的Sink.accept()方法流程是这样的:

void accept(U u){
    1. 使用当前Sink包装的回调函数处理u
    2. 将处理结果传递给流水线下游的Sink
}

Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。

下面我们结合具体例子看看Stream的中间操作是如何将自身的操作包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看Stream.map()方法:

// Stream.map(),调用该方法将产生一个新的Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    ...
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override /*opWripSink()方法返回由回调函数包装而成Sink*/
        Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {
            return new Sink.ChainedReference<P_OUT, R>(downstream) {
                @Override
                public void accept(P_OUT u) {
                    R r = mapper.apply(u);// 1. 使用当前Sink包装的回调函数mapper处理u
                    downstream.accept(r);// 2. 将处理结果传递给流水线下游的Sink
                }
            };
        }
    };
}

上述代码看似复杂,其实逻辑很简单,就是将回调函数mapper包装到一个Sink当中。由于Stream.map()是一个无状态的中间操作,所以map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。

再来看一个复杂一点的例子。Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:

// Stream.sort()方法用到的Sink实现
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;// 存放用于排序的元素
    RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
        super(downstream, comparator);
    }
    @Override
    public void begin(long size) {
        ...
        // 创建一个存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }
    @Override
    public void end() {
        list.sort(comparator);// 只有元素全部接收之后才能开始排序
        downstream.begin(list.size());
        if (!cancellationWasRequested) {// 下游Sink不包含短路操作
            list.forEach(downstream::accept);// 2. 将处理结果传递给流水线下游的Sink
        }
        else {// 下游Sink包含短路操作
            for (T t : list) {// 每次都调用cancellationRequested()询问是否可以结束处理。
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);// 2. 将处理结果传递给流水线下游的Sink
            }
        }
        downstream.end();
        list = null;
    }
    @Override
    public void accept(T t) {
        list.add(t);// 1. 使用当前Sink包装动作处理t,只是简单的将元素添加到中间列表当中
    }
}

上述代码完美的展现了Sink的四个接口方法是如何协同工作的:

  • 首先begin()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;

  • 之后通过accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;

  • 最后end()方法告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;

  • 如果下游的Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。

叠加之后的操作如何执行

在这里插入图片描述
Sink完美封装了Stream每一步操作,并给出了[处理->转发]的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。

是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。

结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。

结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[处理->转发]模型,结束操作的Sink就是调用链的出口。

我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。

但Stream类库的设计者没有这么做,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法来得到Sink,该方法的作用是返回一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?

这是因为使用opWrapSink()可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:

// AbstractPipeline.wrapSink()
// 从下游向上游不断包装Sink。如果最初传入的sink代表结束操作,
// 函数返回时就可以得到一个代表了流水线上所有操作的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    ...
    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:

// AbstractPipeline.copyInto(), 对spliterator代表的数据执行wrappedSink代表的操作。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知开始遍历
        spliterator.forEachRemaining(wrappedSink);// 迭代
        wrappedSink.end();// 通知遍历结束
    }
    ...
}

上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代,最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。

执行后的结果在哪里

最后一个问题是流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?首先要说明的是不是所有的Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(Side-effects),比如使用Stream.forEach()方法将结果打印出来就是常见的使用副作用的场景(事实上,除了打印之外其他场景都应避免使用副作用),对于真正需要返回结果的结束操作结果存在哪里呢?

特别说明:副作用不应该被滥用,也许你会觉得在Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都无法保证,因为Stream可能会并行执行。大多数使用副作用的地方都可以使用归约操作更安全和有效的完成。

// 错误的收集方式
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
      .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
// 正确的收集方式
List<String>results =
     stream.filter(s -> pattern.matcher(s).matches())
             .collect(Collectors.toList());  // No side-effects!

回到流水线执行结果的问题上来,需要返回结果的流水线结果存在哪里呢?这要分不同的情况讨论,下表给出了各种有返回结果的Stream结束操作。

返回类型对应的结束操作
booleananyMatch() allMatch() noneMatch()
OptionalfindFirst() findAny()
归约结果reduce() collect()
数组toArray()
  • 对于表中返回boolean或者Optional的操作(Optional是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。

  • 对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect(), reduce(), max(), min()都是归约操作,虽然max()和min()也是返回一个Optional,但事实上底层是通过调用reduce()方法实现的。

  • 对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于Node的具体结构,我们会在下一节探究Stream如何并行执行时给出详细说明。

Stream常用操作

在这里插入图片描述

在这里插入图片描述

流的四种构建形式

由数值直接构建流

@Test
    public void streamFromValue() {
        Stream stream = Stream.of(1, 2, 3, 4, 5);

        stream.forEach(System.out::println);
    }

通过数组构建流

@Test
    public void streamFromArray() {
        int[] numbers = {1, 2, 3, 4, 5};

        IntStream stream = Arrays.stream(numbers);
        stream.forEach(System.out::println);
    }

通过文件生成流

@Test
    public void streamFromFile() throws IOException {
        // TODO 此处替换为本地文件的地址全路径
        String filePath = "";

        Stream<String> stream = Files.lines(
                Paths.get(filePath));

        stream.forEach(System.out::println);
    }

通过函数生成流(无限流)

@Test
    public void streamFromFunction() {
//        2,4,6,8...一直无限下去
//        Stream stream = Stream.iterate(0, n -> n + 2);

        Stream stream = Stream.generate(Math::random);
        stream.limit(100)
                .forEach(System.out::println);

    }

流的预定义收集器

集合收集器

/**
     * 集合收集器
     */
    @Test
    public void toList() {

        List<Sku> list = CartService.getCartSkuList();

        List<Sku> result = list.stream()
                .filter(sku -> sku.getTotalPrice() > 100)

                .collect(Collectors.toList());

        System.out.println(
                JSON.toJSONString(result, true));

    }

分组(返回值Map<Boolean,List>, key是Boolean类型,只能分两组)

@Test
    public void group() {
        List<Sku> list = CartService.getCartSkuList();

        // Map<分组条件,结果集合>
        Map<Object, List<Sku>> group = list.stream()
                .collect(
                        Collectors.groupingBy(
                                sku -> sku.getSkuCategory()));

        System.out.println(
                JSON.toJSONString(group, true));
    }

分区

@Test
    public void partition() {
        List<Sku> list = CartService.getCartSkuList();

        Map<Boolean, List<Sku>> partition = list.stream()
                .collect(Collectors.partitioningBy(
                        sku -> sku.getTotalPrice() > 100));

        System.out.println(
                JSON.toJSONString(partition, true));
    }

流收集器常用操作如下:

// Accumulate names into a List
List<String> list = people.stream().map(Person::getName).collect(Collectors.toList());

// Accumulate names into a TreeSet
Set<String> set = people.stream().map(Person::getName).collect(Collectors.toCollection(TreeSet::new));

// Convert elements to strings and concatenate them, separated by commas
String joined = things.stream()
                      .map(Object::toString)
                      .collect(Collectors.joining(", "));

// Compute sum of salaries of employee
int total = employees.stream()
                     .collect(Collectors.summingInt(Employee::getSalary)));

// Group employees by department
Map<Department, List<Employee>> byDept
    = employees.stream()
               .collect(Collectors.groupingBy(Employee::getDepartment));

// Compute sum of salaries by department
Map<Department, Integer> totalByDept
    = employees.stream()
               .collect(Collectors.groupingBy(Employee::getDepartment,
                                              Collectors.summingInt(Employee::getSalary)));

// Partition students into passing and failing
Map<Boolean, List<Student>> passingFailing =
    students.stream()
            .collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD));

归约与汇总操作(涉及到并行执行)

归约reduce

@Test
    public void reduceTest() {

        /**
         * 订单对象
         */
        @Data
        @AllArgsConstructor
        class Order {
            /**
             * 订单编号
             */
            private Integer id;
            /**
             * 商品数量
             */
            private Integer productCount;
            /**
             * 消费总金额
             */
            private Double totalAmount;
        }

        /*
            准备数据
         */
        ArrayList<Order> list = Lists.newArrayList();
        list.add(new Order(1, 2, 25.12));
        list.add(new Order(2, 5, 257.23));
        list.add(new Order(3, 3, 23332.12));

        /*
            以前的方式:
            1. 计算商品数量
            2. 计算消费总金额
         */

        /*
            汇总商品数量和总金额
         */
        Order order = list.stream()
                .parallel()
                .reduce(
                        // 初始化值
                        new Order(0, 0, 0.0),

                        // Stream中两个元素的计算逻辑
                        (Order order1, Order order2) -> {
                            System.out.println("执行 计算逻辑 方法!!!");

                            int productCount =
                                    order1.getProductCount()
                                            + order2.getProductCount();

                            double totalAmount =
                                    order1.getTotalAmount()
                                            + order2.getTotalAmount();

                            return new Order(0, productCount, totalAmount);
                        },

                        // 并行情况下,多个并行结果如何合并
                        (Order order1, Order order2) -> {
                            System.out.println("执行 合并 方法!!!");

                            int productCount =
                                    order1.getProductCount()
                                            + order2.getProductCount();

                            double totalAmount =
                                    order1.getTotalAmount()
                                            + order2.getTotalAmount();

                            return new Order(0, productCount, totalAmount);
                        });

        System.out.println(JSON.toJSONString(order, true));
    }

运行结果

执行 计算逻辑 方法!!!
执行 计算逻辑 方法!!!
执行 计算逻辑 方法!!!
执行 合并 方法!!!
执行 合并 方法!!!
{
    "id":0,
    "productCount":10,
    "totalAmount":23614.469999999998
}

汇总collect

@Test
    public void collectTest() {
        /**
         * 订单对象
         */
        @Data
        @AllArgsConstructor
        class Order {
            /**
             * 订单编号
             */
            private Integer id;
            /**
             * 用户账号
             */
            private String account;
            /**
             * 商品数量
             */
            private Integer productCount;
            /**
             * 消费总金额
             */
            private Double totalAmount;
        }

        /*
            准备数据
         */
        ArrayList<Order> list = Lists.newArrayList();
        list.add(new Order(1, "zhangxiaoxi", 2, 25.12));
        list.add(new Order(2, "zhangxiaoxi",5, 257.23));
        list.add(new Order(3, "lisi",3, 23332.12));

        /*
            Map<用户账号, 订单(数量和金额)>
         */

        Map<String, Order> collect = list.stream()
                .parallel()
                .collect(
                        () -> {
                            System.out.println("执行 初始化容器 操作!!!");

                            return new HashMap<String, Order>();
                        },
                        (HashMap<String, Order> map, Order newOrder) -> {
                            System.out.println("执行 新元素添加到容器 操作!!!");

                            /*
                                新元素的account已经在map中存在了
                                不存在
                             */
                            String account = newOrder.getAccount();

                            // 如果此账号已存在,将新订单数据累加上
                            if (map.containsKey(account)) {
                                Order order = map.get(account);
                                order.setProductCount(
                                        newOrder.getProductCount()
                                                + order.getProductCount());
                                order.setTotalAmount(
                                        newOrder.getTotalAmount()
                                                + order.getTotalAmount());
                            } else {
                                // 如果不存在,直接将新订单存入map
                                map.put(account, newOrder);
                            }

                        }, (HashMap<String, Order> map1, HashMap<String, Order> map2) -> {
                            System.out.println("执行 并行结果合并 操作!!!");

                            map2.forEach((key, value) -> {
                                map1.merge(key, value, (order1, order2) -> {

                                    // TODO 注意:一定要用map1做合并,因为最后collect返回的是map1
                                    return new Order(0, key,
                                            order1.getProductCount()
                                                    + order2.getProductCount(),
                                            order1.getTotalAmount()
                                                    + order2.getTotalAmount());
                                });
                            });
                        });

        System.out.println(JSON.toJSONString(collect, true));
    }

运行结果

执行 初始化容器 操作!!!
执行 初始化容器 操作!!!
执行 初始化容器 操作!!!
执行 新元素添加到容器 操作!!!
执行 新元素添加到容器 操作!!!
执行 新元素添加到容器 操作!!!
执行 并行结果合并 操作!!!
执行 并行结果合并 操作!!!
{
    "lisi":{
        "account":"lisi",
        "id":3,
        "productCount":3,
        "totalAmount":23332.12
    },
    "zhangxiaoxi":{
        "account":"zhangxiaoxi",
        "id":0,
        "productCount":7,
        "totalAmount":282.35
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/33903.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据库监控与调优【十七】—— 表结构设计优化

表结构设计优化 第一范式&#xff08;1NF&#xff09; 字段具有原子性&#xff0c;即数据库的每一个字段都是不可分割的原子数据项&#xff0c;不能是集合、数组、记录等非原子数据项 当实体中的某个属性有多个值时&#xff0c;必须拆分为不同的属性 例子&#xff1a; 如图…

《黑马头条》SpringBoot+SpringCloud+ Nacos等企业级微服务架构项目

环境搭建、SpringCloud微服务(注册发现、服务调用、网关) 1)课程对比 2)项目概述 2.1)能让你收获什么 2.2)项目课程大纲 2.3)项目概述 随着智能手机的普及&#xff0c;人们更加习惯于通过手机来看新闻。由于生活节奏的加快&#xff0c;很多人只能利用碎片时间来获取信息&#x…

iview切换Select时选项丢失,重置Seletc时选项丢失

分析原因 在旧版本的iview中如果和filterable一起使用时&#xff0c;当值清空选项或者使用重置按钮清空时选项会丢失。 解决方式一 把去掉filterable 解决方式二 使用ref&#xff0c;调用clearSingleSelect()方法清空 ref"perfSelect" this.$refs.perfSelect.c…

【数据库原理与实践】知识点归纳(下)

第6章 规范化理论 一、关系模式设计中存在的问题 关系、关系模式、关系数据库、关系数据库的模式 关系模式看作三元组&#xff1a;R < U,F >&#xff0c;当且仅当U上的一个关系r满足F时&#xff0c;r称为关系模式R < U,F >的一个关系 第一范式&#xff08;1NF&…

Mybatis-Plus:实现自定义SQL

目录 1.简介 2.自定义SQL具体实现 2.1.注解SQL 2.2.Wrapper传参注解SQL 2.3.Wrapper传参xml文件SQL 2.4.正常传参XML文件SQL 3.总结 1.简介 Mybatis-Plus&#xff08;以下简称MBP&#xff09;的初衷是为了简化开发&#xff0c;而不建议开发者自己写SQL语句的&#xff1b…

Java又双叒叕“凉”了?

前几天&#xff0c;TIOBE的一份6月编程语言榜单公布&#xff1a;Java退出前三&#xff0c;位居第四。一波Java凉了的言论甚嚣尘上。其实不止Java&#xff0c;python、C、C&#xff0c;哪一个没被提过“凉”... 而现实是&#xff0c;Java的招聘需求依然很大&#xff1a; 不可否…

Java面试Day17

1.什么是 Java 内部类&#xff1f; 内部类的分类有哪些 &#xff1f;内部类有哪些优点和应用场景&#xff1f; 顾名思义&#xff0c;内部类是指定义在某一个类中的类&#xff0c;主要分为成员内部类&#xff0c;静态内部类&#xff0c;局部内部类和匿名内部类四种。 创建与获取…

ffmpeg+nginx-rtmp转发视频流

本篇博客最早发布于实验室公共博客&#xff0c;但已无人维护&#xff0c;现迁移至个人博客 nginx与nginx-rtmp-module安装 画了好几天图&#xff0c;实在有些乏力&#xff0c;找点有意思的事情做做 觉得视频流传输挺有意思&#xff0c;B站找了些视频&#xff0c;但感觉有些大…

20230627通过WPS给PPT幻灯片加入页码

20230627通过WPS给PPT幻灯片加入页码 2023/6/27 23:16 缘起&#xff1a;不想每次都手工给打印的PPT加页码&#xff0c;就通过百度搜索来自动加页码了&#xff01; 真是偷懒让人进步呀&#xff01; 百度搜索&#xff1a;ppt加页码怎么设置 方法步骤.png ?点击「插入」选项卡「幻…

两台电脑之间传输文件——就近共享

文章目录 背景步骤补充&#xff1a;跨设备共享 背景 两台电脑之间共享文件有很多种方式&#xff0c;这里介绍一种最简洁的——Windows自带的就近共享。它适合偶尔传输一些简单文件。比如把笔记本上的电子书传输到surface上阅读。 注意: 如果共享的电脑正在运行最新版本的Wind…

【数据结构与算法】7、队列(Queue)的实现【用栈实现队列】

目录 一、队列介绍二、使用 LinkedList 实现队列三、LeetCode&#xff1a;用【栈】实现队列(1) 老师讲之前我自己的实现&#xff08;Correct&#xff09;(2) 实现思路(3) 代码实现 四、jdk 的 Queue五、双端队列&#xff08;Deque&#xff09;六、循环队列(1) 分析(2) 入队(3) …

基于FPGA的RC滤波器设计实现

目录 简介&#xff1a; 传递函数 FPGA代码实现 总结 简介&#xff1a; RC滤波器的特性基本情况介绍 RC一阶低通滤波介绍&#xff1b;RC滤波器电路简单&#xff0c;抗干扰性强&#xff0c;有较好的低频性能&#xff0c;并且选用标准的阻容元件易得&#xff0c;所以在工程测…

【JAVA】十分钟带你了解java的前世今生

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【初始JAVA】 文章目录 前言JAVA介绍诞生&#x1f52c;名字与图标&#x1f916;发展&#x1f6e9;️未来&#x1fa84; 前言 玩过我的世界的朋友想必对JAVA以及它的图标都很熟悉&#xff0c;在游戏开始画面…

7.3 SpringBoot整合MyBatis分页插件github.pageHelper:实现图书列表API

文章目录 前言一、自己实现分页第一步&#xff0c;count 查询 总记录数&#xff08;totalCount&#xff09;&#xff0c;计算总页数&#xff08;totalPages&#xff09;第二步&#xff0c;limit 查询 指定页数据 二、不考虑分页的查询图书列表MapperBookServiceImplBookListPar…

FastDFS单机部署及SpringBoot整合

前言 FastDFS是一个开源的高性能分布式文件系统。它的主要功能包括&#xff1a;文件存储、文件同步和文件访问&#xff08;文件上传和文件下载&#xff09;&#xff0c;可以解决高容量和负载平衡问题。FastDFS应满足其服务基于文件的网站的要求&#xff0c;如照片共享网站和视…

Maynor的博客专家成长之路——暨2023年中复盘

文章目录 博客专家成长之路——暨2023年中复盘前言念念不忘的博客专家每天只做三件事敲代码写博客健健身 我的感悟 不足之处未来&#xff1a;和CSDN共同成长最后 博客专家成长之路——暨2023年中复盘 前言 ​ 2023年不知不觉已经过去了半年有余&#xff0c;也是时候作年中复盘…

ChatGPT 是什么?

写在前面&#xff1a;这篇文章是今年1月份对chatgpt做调研学习时写的&#xff0c;都是从别处搬来的&#xff0c;纯扫盲的作用。本来一直以草稿的形势存在&#xff0c;但今天整理博客&#xff0c;顺便给发出来吧。 文章目录 1. ChatGPT简介1.1 ChatGPT 支持的场景举例 2 ChatGPT…

计算机网络————应用层

文章目录 概述域名系统DNS域名结构域名服务器解析过程常见的DNS记录DNS报文格式基础结构部分问题部分资源记录(RR, Resource Record)部分 万维网WWWURLHTTPHTTP发展HTTP报文结构请求报文响应报文 cookie 内容分发网络CDN 概述 应用层的具体内容就是规定应用进程在通信时所遵循的…

python数据分析之利用多种机器学习方法实现文本分类、情感预测

大家好&#xff0c;我是带我去滑雪&#xff01; 文本分类是一种机器学习和自然语言处理&#xff08;NLP&#xff09;任务&#xff0c;旨在将给定的文本数据分配到预定义的类别或标签中。其目标是为文本数据提供自动分类和标注&#xff0c;使得可以根据其内容或主题进行组织、排…

【AI】PyTorch安装记录及Anaconda环境配置

【AI】PyTorch安装记录及Anaconda环境配置 说下本地环境&#xff0c;RTX4070 12GB GPU&#xff1b;618刚买&#xff0c;不能让他闲着&#xff0c;配置一下炼丹环境&#xff0c;开始为打工人工作。为了方便后续部署模型之间依赖不冲突&#xff0c;所以使用Anaconda管理Python环…