Background: pipeline architecture
Brian Goetz
brian.goetz at oracle.com
Mon Dec 31 13:11:36 PST 2012
Here's an attempt at putting most of the background on the architecture
of pipelines in one place.
A complete stream pipeline has several components:
- A source
- Source flags
- Zero or more intermediate operations
- one terminal operation
The source is a Supplier<Spliterator>. The reason for the indirection
is so we can narrow the window where we require non-interference from
(stream creation, end of terminal operation) down to (start of terminal
operation, end of terminal operation.)
So, for example, this case:
list = new ...
Stream<T> s = list.stream();
// mutate list
s.forEach(...)
will see the list state as it was before the forEach, not at the
list.stream() capture. This is easy to implement; the stream() method
in ArrayList is:
return Streams.stream(
() -> Arrays.spliterator((E[]) elementData, 0, size),
flags);
By deferring evaluation of elementData and size, we can late-bind to the
data.
The source flags are a set of dynamic properties of the source. Defined
flags include:
- SIZED -- the size is known
- ORDERED -- the source has a defined encounter order
- SORTED -- the source is sorted by natural order
- DISTINCT -- the source has distinct elements according to .equals
We also encode serial-vs-parallel as a source flag, though this is
mostly an implementation expediency.
Intermediate operations produce a new stream, lazily; invoking an
intermediate operation on a stream just sets up the new stream, but does
not cause any computation nor does it cause the source to be consumed.
The fundamental operation implemented by an intermediate operation is to
wrap a "Sink" with a new Sink. So in a pipeline like
list.filter(...).map(...).reduce(...)
the terminal reduction operation will create a sink which reduces the
values fed to it, and the map operation will wrap that with a mapping
sink that transforms values as they pass through, and and the filter
operation will wrap it with a filtering sink that only passes some
elements through.
Intermediate operations are divided into two kinds, stateful and
stateless. The stateless operations are the well-behaved ones, which
depend only on their inputs -- filter, map, mapMulti. The stateful ones
include sorted, removeDuplicates, and limit.
In a sequential pipeline, all intermediate ops can be jammed together
for a single pass on the data. In a parallel pipeline, the same can be
done only if all intermediate ops are stateless. Otherwise, we slice up
the pipeline into segments ending in stateful ops, and execute them in
segments:
list.parallel().filter(...).sorted().map(...).reduce(...)
^-------------------^ ^------------------^
segment 1 segment 2
where the output of segment 1 is gathered into a conc-tree and then used
as the source for segment 2. This segmentation is why Doug hates these
operations; it complicates the parallel execution, obfuscates the cost
model, and maps much more poorly to targets like GPUs.
Each intermediate op also has a mask describing its flags. For each of
the flags X described above, two bits are used to represent "injects X",
"preserves X", or "clears X". For example, sorted() preserves size and
injects sortedness and ordering. Filtering preserves ordering and
distinctness but clears size. Mapping preserves size and but clears
sortedness and distinctness. The flags for a pipeline are computed with
boolean fu to take the source flags and fold in the effect of each op as
the pipeline is built. There is also a SHORT_CIRCUIT flag which is only
valid on ops (not source), and forces pull rather than push evaluation.
Examples of short-circuit operations include limit().
Terminal operations cause evaluation of the pipeline; at the time a
terminal operation is executed, the source is consumed (calling get() on
the Supplier<Spliterator>), a chain of sinks is created, parallel
decomposition using spliterators is done, etc.
Flags are used to optimize both sink chain construction and terminal
execution. For example, if the upstream flags indicate sortedness, a
sorted() operation is a no-op, reflected by the implementation of
wrapSink(flags, sink) just returning the sink it was passed. Similarly,
for terminal ops, orderedness can be used to relax constraints on the
output, enabling more efficient computation if you know that the result
need not respect encounter order. If the source is known to be sized,
and all the ops are size-preserving, operations like toArray() can
exploit size information to minimize allocation and copying.
The set of operations are defined in Stream for reference streams, and
IntStream for int streams; each of these has a (private) implementation
class {Reference,Int}Pipeline who share a (private) base class
AbstractPipeline. We represent a stream pipeline as a linked list of
XxxPipeline objects, where each holds an op and links to its parent.
Because of the shared base class, pipelines can cross shapes and still
operations can be jammed together into a single pass, such as in:
people.stream().filter(..).map(Person::getHeight).max();
^Stream<Person>
^Stream<Person>
^IntStream
and even though the "shape" of the data changes from reference to int we
can create a single sink chain where we push Person objects in and
(unboxed) ints come out.
More information about the lambda-libs-spec-observers
mailing list