0%

Java 8 的 Stream API 笔记

Java 8增加了函数式编程的能力,通过流(Stream)API来支持对集合的filter/map/reduce操作。流是Java 8中处理集合的关键抽象概念,实现声明式的集合处理方式。

Java 8流API示例

对集合的filter、map和reduce操作,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 计算偶数个数
List<Integer> list = Arrays.asList(3, 2, 12, 5, 6, 11, 13);
long count = list.stream()
.filter(x -> x % 2 == 0).count();
System.out.println(count);

// 筛选出偶数列表
List<Integer> evenList = list.stream()
.filter(x -> x % 2 == 0).collect(Collectors.toList());
System.out.println(evenList);

// 筛选出偶数列表, 然后全部元素加1
List<Integer> plusList = list.stream()
.filter(x -> x % 2 == 0)
.map(x -> x + 1).collect(Collectors.toList());
System.out.println(plusList);

// 全部偶数求和
int sum = list.stream()
.filter(x -> x % 2 == 0)
.mapToInt(Integer::intValue).sum();
System.out.println(sum);

// 全部偶数求和
sum = list.stream()
.filter(x -> x % 2 == 0)
.reduce(0, (x, y) -> x + y);
System.out.println(sum);

流与集合的区别

流(Stream)和集合(collection)有以下区别 [doc]:

  • 无存储(no storage)。流并不是存储元素的数据结构。流,通过计算的操作流水线来传输数据源。数据源,比如数据结构、数组、生成器函数或I/O通道。
  • 原生函数式(functional in nature)。对流进行操作会产生结果,但并不会修改数据源。比如,筛选流,是生成一个不包含被筛选掉的元素新流,而不是从原集合中删除元素。
  • 惰性读取(laziness-seeking)。很多对流的操作,比如筛选(filtering)、映射(mapping)或去重(duplicate removal),能够被惰性实现,以便于性能的优化。比如,“找出第一个由三个连续单词组成的String”,并不需要检测全部的输入字符串。流操作被划分为中间操作(用于生成流)和终止操作(用于生成值或副作用)。中间操作总是惰性的。
  • 可能是无界的(possibly unbounded)。集合的大小是必须确定的,但流并不是。像limit(n) 或 findFirst()这样的逻辑短路操作,允许在有限时间内完成对无限流的计算。
  • 可消耗的(consumable)。流的元素在流的生命周期中只能被访问一次。类似于 Iterator,要想重新访问同一个元素,必须生成新的流。

流操作与流水线

当使用Stream时,会通过三个阶段来建立一个操作流水线 [ref]。

  1. 创建一个Stream。
  2. 在一个或多个步骤中,指定将初始Stream转换为另一个Stream的中间操作
  3. 使用一个终止操作来产生一个结果。该操作会强制它之前的延迟操作立即执行。在这之后,该Stream就不会再被使用了。

整个流操作流水线,如图所示 [ref]:

流操作流水线

Java 8的流创建、中间操作和终止操作的API汇总表,如下 [ref]

创建流 中间操作 终止操作

Collection
stream()
parallelStream()

Stream, IntStream, LongStream, DoubleStream
static generate() 无序
static of(..)
static empty()
static iterate(..)
static concat(..)
static builder()

IntStream, LongStream
static range(..)
static rangeClosed(..)

Arrays
static stream(..)

BufferedReader
lines(..)

Files
static list(..)
static walk(..)
static find(..)

JarFile
stream()

ZipFile
stream()

Pattern
splitAsStream(..)

SplittableRandom
ints(..) 无序
longs(..) 无序
doubles(..) 无序

Random
ints(..)
longs(..)
doubles(..)

ThreadLocalRandom
ints()
longs(..)
doubles(..)

BitSet
stream()

CharSequence (String)
IntStream chars()
IntStream codePoints()

StreamSupport (low level)
static doubleStream(..)
static intStream(..)
static longStream(..)
static stream(..)

BaseStream
sequential()
parallel()
unordered()
onClose(..)

Stream
filter(..)
map(..)
mapToInt(..)
mapToLong(..)
mapToDouble(..)
flatMap(..)
flatMapToInt(..)
flatMapToLong(..)
flatMapToDouble(..)
distinct() 有状态
sorted(..) 有状态
peek(..)
limit(..) 有状态, 逻辑短路
skip(..) 有状态

IntStream、LongStream和DoubleStream方法与Stream类似, 其中额外的方法如下:

IntStream
mapToObj(..)
asLongStream()
asDoubleStream()
boxed()

LongStream
mapToObj(..)
asDoubleStream()
boxed()

DoubleStream
mapToObj(..)
boxed()

BaseStream
iterator()
spliterator()

Stream
forEach(..)
forEachOrdered(..)
toArray(..)
reduce(..)
collect(..)
min(..)
max(..)
count()
anyMatch(..) 逻辑短路
allMatch(..) 逻辑短路
noneMatch(..) 逻辑短路
findFirst() 逻辑短路
findAny() 逻辑短路*, 不确定*

IntStream、LongStream和DoubleStream方法与Stream类似, 其中额外的方法如下:

IntStream, LongStream, DoubleStream
sum()
average()
summaryStatistics()

筛选与映射操作

筛选与映射操作,即上文所说的中间操作。相关的API如下:

1
2
3
Stream<T> filter(Predicate<? super T> predicate)
<R> Stream<R> map(Function<? super T,? extends R> mapper)
<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper)

filter(..),筛选出元素符合某个条件的新流。map(..),用于对元素作某种形式的转换。flatMap(..),用于将多个子流合并为一个新流。

其他的中间操作有,提取子流,limit(..)(提取前n个)和skip(..)(丢弃前n个);有状态的转换,distinct()(过滤重复元素)和sorted(..)(排序),以及主要用于日志调试的peek(..)

1
2
3
4
5
Stream<T> limit(long maxSize)
Stream<T> skip(long n)
Stream<T> distinct()
Stream<T> sorted(Comparator<? super T> comparator)
Stream<T> peek(Consumer<? super T> action)

归约操作与收集器

流有多种形式的通用的归约操作,reduce(..)collect(..),同时也有多种特化的归约形式,比如 sum()min(..)max(..)count()等。reduce(..)collect(..)的方法签名如下:

1
2
3
4
5
Optional<T> reduce(BinaryOperator<T> accumulator)
T reduce(T identity, BinaryOperator<T> accumulator)
<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator,
BinaryOperator<U> combiner)
<R,A> R collect(Collector<? super T,A,R> collector)

sum()min(..)max(..)count()等特化的归约操作,是reduce(..)归约操作的特殊化,内部实现依赖reduce(..),相应JDK的实现源码可以作为印证。

IntStream的sum(..)的JDK现实源码为 [github]:

1
2
3
4
@Override
public final int sum() {
return reduce(0, Integer::sum);
}

IntStream的min(..)的JDK现实源码为 [github]:

1
2
3
4
@Override
public final OptionalInt min() {
return reduce(Math::min);
}

Stream的count()的JDK现实源码为 [github]:

1
2
3
4
@Override
public final long count() {
return mapToLong(e -> 1L).sum();
}

如果不想将流归约为单个值,而只要查看集合被流操作后的结果,需要使用收集器(collector)。上文已经见到,将流收集为List:

1
2
List<Integer> evenList = list.stream()
.filter(..).collect(Collectors.toList());

Collectors是收集器工具类,提供获取预定义收集器(实现接口Collector)的静态方法。Collectors类,除了将流收集为List,还可以是Map、Set、String,或者也能进行统计操作,如求和、计算平均值、最大值、最小值。Collectors的全部静态方法列表如下:

  • averaging[Double|Int|Long]
  • collectingAndThen
  • counting
  • groupingBy
  • groupingByConcurrent
  • joining
  • mapping
  • maxBy
  • minBy
  • partitioningBy
  • reducing
  • summarizing[Double|Int|Long]
  • summing[Double|Int|Long]
  • toCollection
  • toConcurrentMap
  • toList
  • toMap
  • toSet