https://mp.weixin.qq.com/s/BMvvTBkDHIyFFX6HQ4tNDA
现在可以粗略地看到,这些中间操作(不管是无状态的 filter(),还是有状态的sorted() 和 limit() 都只是返回了一个包含上一节点引用的中间节点。有点像 HashMap 中的反向单向链表。就这样把一个个中间操作拼接到了控制数据流入的 Head 后面,但是并没有开始做任何数据处理的动作。
这也就是 Stream 延时执行的特性原因之所在。
参见附录I会发现 StatelessOp 和StatefulOp 初始化的时候还会将当前节点的引用传递给上一个节点。
previousStage.nextStage = this;
所以各个节点组成了一个双向链表的结构。
http://www.cnblogs.com/CarpenterLee/p/6637118.html
http://www.cnblogs.com/Dorae/p/7779246.html
https://colobu.com/2014/11/18/Java-8-Stream/
Stream 处理数据的过程可以类别成工厂的流水线。数据可以看做流水线上的原料,对数据的操作可以看做流水线上的工人对原料的操作。
事实上 Stream 只是一个接口,并没有操作的缺省实现。最主要的实现是
ReferencePipeline
,而 ReferencePipeline
继承自AbstractPipeline
,AbstractPipeline
实现了 BaseStream 接口并实现了它的方法。但 ReferencePipeline
仍然是一个抽象类,因为它并没有实现所有的抽象方法,比如 AbstractPipeline
中的 opWrapSink
。ReferencePipeline
内部定义了三个静态内部类,分别是:Head, StatelessOp, StatefulOp
,但只有 Head 不再是抽象类。
流水线的结构有点像双向链表,节点之间通过引用连接。节点可以分为三类,控制数据输入的节点、操作数据的中间节点和控制数据输出的节点。
ReferencePipeline 包含了控制数据流入的 Head ,中间操作
StatelessOp, StatefulOp
,终止操作 TerminalOp
。
Stream 常用的流操作包括:
- 中间操作(Intermediate Operations)
- 无状态(Stateless)操作:每个数据的处理是独立的,不会影响或依赖之前的数据。如
filter()
、flatMap()
、flatMapToDouble()
、flatMapToInt()
、flatMapToLong()
、map()
、mapToDouble()
、mapToInt()
、mapToLong()
、peek()
、unordered()
等 - 有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如
distinct()
、sorted()
、sorted(comparator)
、limit()
、skip()
等 - 终止操作(Terminal Operations)
- 非短路操作:处理完所有数据才能得到结果。如
collect()
、count()
、forEach()
、forEachOrdered()
、max()
、min()
、reduce()
、toArray()
等。 - 短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如
anyMatch()
、allMatch()
、noneMatch()
、findFirst()
、findAny()
等。
stream()
是 Collection 中的 default 方法,实际上调用的是StreamSupport.stream()
方法,返回的是 ReferencePipeline.Head
的实例。ReferencePipeline.Head
的构造函数传递是 ArrayList 中实现的 spliterator 。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。
定义流水线节点
filter()
是 Stream 中定义的方法,在 ReferencePipeline
中实现,返回StatelessOp
的实例。sorted() 和 limit() 的返回值和也都是 Stream 的实现类,并且都接收了this 。不同的是 sorted() 返回的是 ReferencePipeline.StatefulOp 的子类 SortedOps.OfRef 的实例。limit() 返回的 ReferencePipeline.StatefulOp 的实例。
现在可以粗略地看到,这些中间操作(不管是无状态的 filter(),还是有状态的sorted() 和 limit() 都只是返回了一个包含上一节点引用的中间节点。有点像 HashMap 中的反向单向链表。就这样把一个个中间操作拼接到了控制数据流入的 Head 后面,但是并没有开始做任何数据处理的动作。
这也就是 Stream 延时执行的特性原因之所在。
参见附录I会发现 StatelessOp 和StatefulOp 初始化的时候还会将当前节点的引用传递给上一个节点。
previousStage.nextStage = this;
所以各个节点组成了一个双向链表的结构。
先忽略并行的情况,来看一下加注释了1的代码:
ReduceOps.makeRef
接收此 Collector 返回了一个ReduceOp
(实现了TerminalOp
接口)的实例。- 返回的
ReduceOp
实例又被传递给 AbstractPipeline 中的evaluate()
方法。 - 在
evaluate
中,调用了ReduceOp
实例的evaluateSequential
方法,并将上流水线上最后一个节点的引用和sourceSpliterator
传递进去。
@Overridepublic <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get();}
- 然后调用
ReduceOp
实例的makeSink()
方法返回其makeRef()
方法内部类ReducingSink
的实例。 - 接着
ReducingSink
的实例作为参数和spliterator
一起传入最后一个节点的wrapAndCopyInto()
方法,返回值是 Sink 。
启动流水线
流水线组装好了,现在就该启动流水线了。这里的核心方法是
wrapAndCopyInto
,根据方法名也能看出来这里应该做了两件事,wrapSink()
和 copyInto()
。
wrapSink()
将最后一个节点创建的 Sink 传入,并且看到里面有个 for 循环。参见附录I可以发现
每个节点都记录了上一节点的引用(
previousStage
)和每一个节点的深度(depth
)。
所以这个 for 循环是从最后一个节点开始,到第二个节点结束。每一次循环都是将上一节点的
combinedFlags
和当前的 Sink 包起来生成一个新的 Sink 。这和前面拼接各个操作很类似,只不过拼接的是 Sink 的实现类的实例,方向相反。
流水线组装好了,现在就该启动流水线了。这里的核心方法是
wrapAndCopyInto
,根据方法名也能看出来这里应该做了两件事,wrapSink()
和 copyInto()
。
每一个 Sink 都有自己的职责,但具体表现各有不同。无状态操作的 Sink 接收到通知或者数据,处理完了会马上通知自己的 下游。有状态操作的 Sink 则像有一个缓冲区一样,它会等要处理的数据处理完了才开始通知下游,并将自己处理的结果传递给下游。
例如
sorted()
就是一个有状态的操作,一般会有一个属于自己的容器,用来记录处自己理过的数据的状态。sorted()
是在执行 begin 的时候初始化这个容器,在执行 accept 的时候把数据放到容器中,最后在执行 end 方法时才正在开始排序。排序之后再将数据,采用同样的方式依次传递给下游节点。
最后数据流到终止节点,终止节点将数据收集起来就结束了。
然后就没有然后了,
copyInto()
返回类型是 void
,没有返回值。wrapAndCopyInto()
返回了 TerminalOps
创建的 Sink,这时候它里面已经包含了最终处理的结果。调用它的 get()
方法就获得了最终的结果。
我们大致能够想到,应该采用某种方式记录用户每一步的操作,当用户调用结束操作时将之前记录的操作叠加到一起在一次迭代中全部执行掉。沿着这个思路,有几个问题需要解决:
- 用户的操作如何记录?
- 操作如何叠加?
- 叠加之后的操作如何执行?
- 执行后的结果(如果有)在哪里?
注意这里使用的是“操作(operation)”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是<数据来源,操作,回调函数>构成的三元组。Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。
还有IntPipeline, LongPipeline, DoublePipeline没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟ReferencePipeline是并列关系。图中Head用于表示第一个Stage,即调用调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作;StatelessOp和StatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。
图中通过
Collection.stream()
方法得到Head也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Stream记录操作的方式。
以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head开始依次执行每一步的操作(包括回调函数)就行了。这听起来似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前Stage本身才知道该如何执行自己包含的动作。这就需要有某种协议来协调相邻Stage之间的调用关系。
这种协议由Sink接口完成,Sink接口包含的方法如下表所示:
方法名 | 作用 |
void begin(long size) | 开始遍历元素之前调用该方法,通知Sink做好准备。 |
void end() | 所有元素遍历完成之后调用,通知Sink没有更多的元素了。 |
boolean cancellationRequested() | 是否可以结束操作,可以让短路操作尽早结束。 |
void accept(T t) | 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。 |
有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的
accept()
方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的begin()
和end()
方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.begin()方法可能创建一个乘放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。对于短路操作,Sink.cancellationRequested()
也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法。
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的Sink.accept()方法流程是这样的:
void accept(U u){
1. 使用当前Sink包装的回调函数处理u
2. 将处理结果传递给流水线下游的Sink
}
Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。下面我们结合具体例子看看Stream的中间操作是如何将自身的操作包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看Stream.map()方法:
// Stream.map(),调用该方法将产生一个新的Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
...
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override /*opWripSink()方法返回由回调函数包装而成Sink*/
Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {
return new Sink.ChainedReference<P_OUT, R>(downstream) {
@Override
public void accept(P_OUT u) {
R r = mapper.apply(u);// 1. 使用当前Sink包装的回调函数mapper处理u
downstream.accept(r);// 2. 将处理结果传递给流水线下游的Sink
}
};
}
};
}
上述代码看似复杂,其实逻辑很简单,就是将回调函数mapper包装到一个Sink当中。由于Stream.map()是一个无状态的中间操作,所以map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。
再来看一个复杂一点的例子。Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:
// Stream.sort()方法用到的Sink实现
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;// 存放用于排序的元素
RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
super(downstream, comparator);
}
@Override
public void begin(long size) {
...
// 创建一个存放排序元素的列表
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
}
@Override
public void end() {
list.sort(comparator);// 只有元素全部接收之后才能开始排序
downstream.begin(list.size());
if (!cancellationWasRequested) {// 下游Sink不包含短路操作
list.forEach(downstream::accept);// 2. 将处理结果传递给流水线下游的Sink
}
else {// 下游Sink包含短路操作
for (T t : list) {// 每次都调用cancellationRequested()询问是否可以结束处理。
if (downstream.cancellationRequested()) break;
downstream.accept(t);// 2. 将处理结果传递给流水线下游的Sink
}
}
downstream.end();
list = null;
}
@Override
public void accept(T t) {
list.add(t);// 1. 使用当前Sink包装动作处理t,只是简单的将元素添加到中间列表当中
}
}
上述代码完美的展现了Sink的四个接口方法是如何协同工作的:
- 首先beging()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;
- 之后通过accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;
- 最后end()方法告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;
- 如果下游的Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。
结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[处理->转发]模型,结束操作的Sink就是调用链的出口。
我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。但Stream类库的设计者没有这么做,而是设置了一个
Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)
方法来得到Sink,该方法的作用是返回一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?这是因为使用opWrapSink()可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:// AbstractPipeline.wrapSink()
// 从下游向上游不断包装Sink。如果最初传入的sink代表结束操作,
// 函数返回时就可以得到一个代表了流水线上所有操作的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
...
for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:
// AbstractPipeline.copyInto(), 对spliterator代表的数据执行wrappedSink代表的操作。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
...
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知开始遍历
spliterator.forEachRemaining(wrappedSink);// 迭代
wrappedSink.end();// 通知遍历结束
}
...
}
上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代(Spliterator是容器的一种迭代器,参阅),最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。
特别说明:副作用不应该被滥用,也许你会觉得在Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都无法保证,因为Stream可能会并行执行。大多数使用副作用的地方都可以使用归约操作更安全和有效的完成。
- 对于表中返回boolean或者Optional的操作(Optional是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。
- 对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect(), reduce(), max(), min()都是归约操作,虽然max()和min()也是返回一个Optional,但事实上底层是通过调用reduce()方法实现的。
- 对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于Node的具体结构,我们会在下一节探究Stream如何并行执行时给出详细说明。
https://colobu.com/2014/11/18/Java-8-Stream/
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}