Background: pipeline architecture

Brian Goetz brian.goetz at oracle.com
Mon Dec 31 13:11:36 PST 2012


Here's an attempt at putting most of the background on the architecture 
of pipelines in one place.

A complete stream pipeline has several components:
  - A source
  - Source flags
  - Zero or more intermediate operations
  - one terminal operation

The source is a Supplier<Spliterator>.  The reason for the indirection 
is so we can narrow the window where we require non-interference from 
(stream creation, end of terminal operation) down to (start of terminal 
operation, end of terminal operation.)

So, for example, this case:

   list = new ...
   Stream<T> s = list.stream();
   // mutate list
   s.forEach(...)

will see the list state as it was before the forEach, not at the 
list.stream() capture.  This is easy to implement; the stream() method 
in ArrayList is:

         return Streams.stream(
           () -> Arrays.spliterator((E[]) elementData, 0, size),
           flags);

By deferring evaluation of elementData and size, we can late-bind to the 
data.

The source flags are a set of dynamic properties of the source.  Defined 
flags include:
  - SIZED -- the size is known
  - ORDERED -- the source has a defined encounter order
  - SORTED -- the source is sorted by natural order
  - DISTINCT -- the source has distinct elements according to .equals

We also encode serial-vs-parallel as a source flag, though this is 
mostly an implementation expediency.

Intermediate operations produce a new stream, lazily; invoking an 
intermediate operation on a stream just sets up the new stream, but does 
not cause any computation nor does it cause the source to be consumed. 
The fundamental operation implemented by an intermediate operation is to 
wrap a "Sink" with a new Sink.  So in a pipeline like

   list.filter(...).map(...).reduce(...)

the terminal reduction operation will create a sink which reduces the 
values fed to it, and the map operation will wrap that with a mapping 
sink that transforms values as they pass through, and and the filter 
operation will wrap it with a filtering sink that only passes some 
elements through.

Intermediate operations are divided into two kinds, stateful and 
stateless.  The stateless operations are the well-behaved ones, which 
depend only on their inputs -- filter, map, mapMulti.  The stateful ones 
include sorted, removeDuplicates, and limit.

In a sequential pipeline, all intermediate ops can be jammed together 
for a single pass on the data.  In a parallel pipeline, the same can be 
done only if all intermediate ops are stateless.  Otherwise, we slice up 
the pipeline into segments ending in stateful ops, and execute them in 
segments:

   list.parallel().filter(...).sorted().map(...).reduce(...)
                  ^-------------------^ ^------------------^
                     segment 1             segment 2

where the output of segment 1 is gathered into a conc-tree and then used 
as the source for segment 2.  This segmentation is why Doug hates these 
operations; it complicates the parallel execution, obfuscates the cost 
model, and maps much more poorly to targets like GPUs.

Each intermediate op also has a mask describing its flags.  For each of 
the flags X described above, two bits are used to represent "injects X", 
"preserves X", or "clears X".  For example, sorted() preserves size and 
injects sortedness and ordering.  Filtering preserves ordering and 
distinctness but clears size.  Mapping preserves size and but clears 
sortedness and distinctness.  The flags for a pipeline are computed with 
boolean fu to take the source flags and fold in the effect of each op as 
the pipeline is built.  There is also a SHORT_CIRCUIT flag which is only 
valid on ops (not source), and forces pull rather than push evaluation. 
  Examples of short-circuit operations include limit().

Terminal operations cause evaluation of the pipeline; at the time a 
terminal operation is executed, the source is consumed (calling get() on 
the Supplier<Spliterator>), a chain of sinks is created, parallel 
decomposition using spliterators is done, etc.

Flags are used to optimize both sink chain construction and terminal 
execution.  For example, if the upstream flags indicate sortedness, a 
sorted() operation is a no-op, reflected by the implementation of 
wrapSink(flags, sink) just returning the sink it was passed.  Similarly, 
for terminal ops, orderedness can be used to relax constraints on the 
output, enabling more efficient computation if you know that the result 
need not respect encounter order.  If the source is known to be sized, 
and all the ops are size-preserving, operations like toArray() can 
exploit size information to minimize allocation and copying.


The set of operations are defined in Stream for reference streams, and 
IntStream for int streams; each of these has a (private) implementation 
class {Reference,Int}Pipeline who share a (private) base class 
AbstractPipeline.  We represent a stream pipeline as a linked list of 
XxxPipeline objects, where each holds an op and links to its parent. 
Because of the shared base class, pipelines can cross shapes and still 
operations can be jammed together into a single pass, such as in:

   people.stream().filter(..).map(Person::getHeight).max();
                  ^Stream<Person>
                             ^Stream<Person>
                                                    ^IntStream

and even though the "shape" of the data changes from reference to int we 
can create a single sink chain where we push Person objects in and 
(unboxed) ints come out.




More information about the lambda-libs-spec-observers mailing list