文章目录
- 一、StreamAPI
- 1.介绍
- 2.代码示例
- 二、基本用法
- 1.创建流
- 2.流并发
- 3.流并发问题
- 三、流方法
- 1.中间操作
- 2.终止操作
一、StreamAPI
StreamAPI这种函数式编程是声明式编程,声明式编程是一种编程范式,它着重于描述问题的"是什么",而不是"如何做"。在声明式编程中,我们更关注问题的定义和规范,而不需要显式地指定每个步骤的实现细节。
1.介绍
Stream Pipeline:流管道、流水线
Intermediate Operations:中间提作
Terminal Operation:终止操作
Stream所有数据和操作被组合成流管道,流管道组成:
一个数据源(可以是一个数组、集合、生成器函数、I/O管道)零或多个中间操作(将一个流变形成另一个流)一个终止操作(产生最终结果)
流是惰性的
,只有在启动最终操作时才会对源数据进行计算,而且只在需要时才会消耗源元素;
2.代码示例
获取列表中最大的偶数,示例如下:
public class StreamApi {
public static void main(String[] args) {
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 获取最大偶数
Integer max = 0;
for (Integer i : list) {
if (i % 2 != 0) {
continue;
}
max = i >= max ? i : max;
}
System.out.println(max);
/*****************************StreamAPI******************************************/
list.stream().filter(integer -> integer % 2 == 0) // intermediate operation
.max(Integer::compareTo) // terminal operation
.ifPresent(System.out::println);
}
}
输出:
10
10
总结流的三大部分:
- 流的数据
- n个中间操作
- 1一个终止操作
二、基本用法
1.创建流
public static void main(String[] args) {
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
Stream<Integer> stream1 = Stream.of(1,2,3);
Stream<Integer> concatStream = Stream.concat(stream1, Stream.of(2,3,4));
System.out.println(concatStream.toList()); // 输出:[1, 2, 3, 2, 3, 4]
Set<Integer> set = Set.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
set.stream().min(Integer::compareTo).ifPresent(System.out::println);
list.stream().min(Integer::compareTo).ifPresent(System.out::println);
}
创建流的方法有很多,代码中举了几个例子:
- Stream.of
- Stream.concat
- 集合的初始化
2.流并发
下面思考两个问题:
- 流是并发的吗?
- 与for循环有哪些区别?
public static void main(String[] args) {
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.println("主线程:"+Thread.currentThread());
list.stream().filter(integer -> {
System.out.println("stream线程"+Thread.currentThread());
return integer % 2 == 0;
}) // intermediate operation
.max(Integer::compareTo) // terminal operation
.ifPresent(System.out::println);
}
输出:
主线程:Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
10
实践证明,目前的写法是没有并发的,还是在主线程中完成的。
与for循环并无区别。
但是目前我们数据量非常大,需要开并发,那么就可以使用parallel解决
。
public static void main(String[] args) {
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.println("主线程:"+Thread.currentThread());
list.stream().parallel().filter(integer -> {
System.out.println("stream线程"+Thread.currentThread());
return integer % 2 == 0;
}) // intermediate operation
.max(Integer::compareTo) // terminal operation
.ifPresent(System.out::println);
}
输出:
主线程:Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[main,5,main]
stream线程Thread[ForkJoinPool.commonPool-worker-1,5,main]
stream线程Thread[main,5,main]
stream线程Thread[ForkJoinPool.commonPool-worker-3,5,main]
stream线程Thread[ForkJoinPool.commonPool-worker-2,5,main]
10
注意事项:
开并发,尽可能在函数内部操作数据,避免出现安全问题。
3.流并发问题
private static int count = 0;
public static void main(String[] args) {
/*****************************StreamAPI*********************************/
System.out.println("主线程:"+Thread.currentThread());
IntStream.range(0,100).parallel()
.forEach(integer -> {// terminal operation
count++;
});
System.out.println("数据变化:"+count);
}
看下这段代码的,由于我们写了parallel开了并发,操作的变量又是在函数外部定义的,所以是会出现安全问题的。
输出的结果可能是小于等于100的任意数字吧。
这时候一定要并发的情况下,那修改方式就是加锁,或者是换成线程安全的类型进行操作(AtomicInteger)。
代码如下:
private static int count = 0;
public static void main(String[] args) {
/*****************************StreamAPI*********************************/
System.out.println("主线程:"+Thread.currentThread());
IntStream.range(0,100).parallel()
.forEach(integer -> {// terminal operation
synchronized (Object.class)
{count++;}
});
System.out.println("数据变化:"+count);
}
public static void main(String[] args) {
/*****************************StreamAPI******************************************/
System.out.println("主线程:"+Thread.currentThread());
AtomicInteger count = new AtomicInteger();
IntStream.range(0,100).parallel() // terminal operation
.forEach(integer -> {
count.getAndIncrement();
});
System.out.println("数据变化:"+count);
}
三、流方法
流的过程:每一个数据 流经 所有管道之后才会再进行下一个数据的流操作
。
1.中间操作
- parallel:并发
- filter:筛选:遍历流中的每一个元素,取出符合条件的数据
- map:一对一处理数据,返回数据
- peek:打印
- sorted:排序
- distinct:去重
- limit:截取流中前 N 个元素
/**
* 中间操作测试
*
* @param args
*/
public static void main(String[] args) {
List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8, 8,9, 10);
list.stream().limit(9)
.filter(integer -> {
System.out.println("filter输出:"+integer);
return integer > 4;
})
.peek(i -> System.out.println("peek输出:" + i))
.map(integer -> {
System.out.println("map输出:"+integer);
return integer;
})
.sorted(Integer::compareTo)
.distinct()
.forEach(integer -> {
System.out.println("foreach输出:" + integer);
});
}
输出:
filter输出:1
filter输出:2
filter输出:3
filter输出:4
filter输出:5
peek输出:5
map输出:5
filter输出:6
peek输出:6
map输出:6
filter输出:7
peek输出:7
map输出:7
filter输出:8
peek输出:8
map输出:8
filter输出:8
peek输出:8
map输出:8
foreach输出:5
foreach输出:6
foreach输出:7
foreach输出:8
从输出中可以看到每个数据是走完整个中间流之后才会再进行下一个数据的中间操作的。
- flatMap:一对多处理数据,返回数据
- skip:跳过流中前 N 个元素
- takeWhile:筛选:当满足条件,拿到这个元素,结束流循环
/**
* 中间操作测试
* - flatMap:一对多处理数据,返回数据
* - skip:跳过流中前 N 个元素
* - takeWhile:筛选:当满足条件,拿到这个元素,结束流循环
*
* @param args
*/
public static void main(String[] args) {
List<Map<String, Object>> list = new ArrayList<>();
list = IntStream.range(0, 10)
.mapToObj(integer -> {
Map<String, Object> map = new HashMap<>();
map.put("name", "Bob"+integer);
return map;
}).collect(Collectors.toList());
System.out.println(list);
//flatMap 一对多输出
List<String> strList = list.stream()
.skip(5)
.flatMap(m -> Arrays.stream(m.get("name").toString().split("Bob")))
.filter(s -> !s.isEmpty())
.distinct()
.toList();
System.out.println(strList);
List<String> strList1 = strList.stream().takeWhile(str -> 8 >= Integer.parseInt(str)).collect(Collectors.toList());
System.out.println(strList1);
}
输出:
[{name=Bob0}, {name=Bob1}, {name=Bob2}, {name=Bob3}, {name=Bob4}, {name=Bob5}, {name=Bob6}, {name=Bob7}, {name=Bob8}, {name=Bob9}]
[5, 6, 7, 8, 9]
[5, 6, 7, 8]
2.终止操作
- foreach:对流中的每个元素执行指定操作,没有返回值。
- max:返回最大值
- count:返回数据总条数
- anyMatch:判断流中是否存在满足条件的元素,如果存在则返回 true,否则返回 false。
- collect:将流中的元素收集到一个容器中。例如,整合一个分组后的数据到容器中,collect(Collectors.groupingBy(s -> s))
- reduce:通过累加器和初始值对流中的元素进行归约操作,返回归约后的结果。例如,可以使用 reduce(0, (a, b) -> a + b) 对整型流求和。
- findFirst:返回流中的第一个元素(如果存在),否则返回一个空的 Optional 对象。
- findAny:返回流中的任意一个元素(如果存在),否则返回一个空的 Optional 对象。