iterator()/spliterator() and stateful operations
Brian Goetz
brian.goetz at oracle.com
Mon May 20 09:34:36 PDT 2013
"Well-behaved" pipelines such as
IntStream.range(1,1000).filter(i -> i % 2 == 0)
.map(i -> i*2)
are fully lazy. We do not start generating, filtering, or mapping
elements until the terminal operation begins, and thereafter, we only
generate, filter, and map as many elements as are needed to satisfy the
elements demanded by whatever is consuming the pipeline. These
pipelines are well-behaved in both serial and parallel environments,
largely because the well-behaved (stateless) operations satisfy the
homomorphism
f(a||b) = f(a) || f(b)
Pipelines with stateful operations, such as sorted(), distinct(), or
limit(), lack this nice property. You cannot consume any elements from
a sorted pipeline until all the elements upstream of the sorting
operation have been seen. The same is true to varying lesser degrees
with distinct() and limit(). (For example, if the pipeline is
*unordered*, distinct() can proceed lazily, backed by a
ConcurrentHashSet, but not if it is ordered.)
Recall that the primary purpose of these methods is that they are
"escape hatches" so that users can perform lazy traversals that are not
directly supported by the library. This escape hatch mechanism is also
key to being able to add default methods to Stream interfaces later who
might not be directly implementable in terms of other stream methods.
For example, we might implement in IntStream:
default IntStream everyElementTwice() {
IntStream asIntPipeline
= new IntPipeline(() -> spliterator(),
getSourceAndOpFlags(), isParallel());
return new IntStream.StatelessOp(asIntPipeline, ...);
}
In other words, for IntStream implementations not already backed by the
standard implementation (IntPipeline) which will co-evolve with the
Stream interface, we get the spliterator() and other metadata and use
that to create a new IntPipeline lazily backed by the existing
non-IntPipeline stream, and then chain a new lazy operation off of the
new pipeline.
With this as background, we need to make some decisions about the
iterator() and spliterator() stream methods.
Here's how spliterator() currently works now (iterator() is based on
spliterator()).
1. If the stream is composed only of source, just return the source
spliterator.
2. If the stream is composed only of source and stateless operations,
create a new spliterator that wraps the source spliterator and the
operations. For example, intRange(...).map(...) will return a
spliterator that delegates splitting and traversal to the intRange
spliterator, but applies the mapping function to elements as they pass
through. Spliterators generated in this manner are fully lazy.
3. If the stream has stateful operations, slice the pipeline into
"chunks" at stateful operation boundaries. For example:
intRange(...).filter(...)
.sorted(...) // end of chunk 1
.map(...)
.distinct() // end of chunk 2
.flatMap(...)
.filter(...) // end of chunk 3
.spliterator()
For each chunk, we describe it with a spliterator as in (2), where the
output spliterator of chunk k is the source of chunk k-1. If (2) can
produce a fully lazy spliterator (such as distinct() on an unordered
stream, or limit() on a subsized stream), great; if not the spliterator
will, at first traversal, evaluate the upstream pipeline and describe
the result with a spliterator. We attempt to preserve as much laziness
as possible, but for cases like:
stream.sorted().spliterator()
we're going to have to traversal the whole input when the caller finally
gets around to asking for some elements. If the source is infinite,
that's going to be a problem (but sorting an infinite stream is always a
problem.)
Here are some cases that currently loop infinitely:
random.doubles().parallel().distinct().iterator().next();
random.doubles().parallel().distinct().spliterator().tryAdvance(c);
The reason is that the spliterator wants to evaluate
doubles().distinct() before it knows how to split it (splitting cannot
block.)
There are some things we can do here.
- We can make Random.doubles() and similar generated streams be
UNORDERED. The parallel implementations for distinct() and limit() have
optimized paths for unordered streams that allow them to proceed lazily.
This will fix both of these two particular problems for a lot of
sources, but we'd still have the same failure for:
Stream.iterate(0, f).parallel().distinct().iterator().next();
since this stream has to be ordered.
- We can force the stream on which iterator() is invoked to be
sequential. This seems fairly natural, as calling iterator() is a
committment to sequential iteration of the results. This fixes the
iterator variant of this problem, but not the spliterator problem.
These first two seem pretty natural and are likely uncontroversial. But
do we want to go farther to address to address the spliterator variant like:
Stream.iterate(0, f).parallel()
.distinct().spliterator().tryAdvance(c);
A similar issue happens with limit() applied to large parallel ordered
non-subsized streams with large limits (as you can see, we've done quite
a bit of work already to narrow the damage.) Because limit is tied
intrinsically to encounter order, parallel implementations sometimes
need to do some sort of speculative evaluation and buffering. This can
still be a win for high-Q problems like:
bigNumbers.parallel().filter(n -> isPrime(n)).limit(1000)
because the expensive operation -- filtering -- can still be
parallelized. But for low-Q problems with large bounds, the buffering
required can cause parallel pipelines with limit in them to blow up with
OOME.
An analogue of above, but with poor space bounds instead of time bounds:
Stream.iterate(0, f).parallel()
.filter(...).limit(10M).spliterator().tryAdvance(c);
This will likely OOME.
At some point, the user asked for this, as in:
infinite.parallel().sorted()...
So, questions:
- Should we go ahead with the proposed changes above for generators
(make them unsized) and iterator (make it sequential)?
- Should we go further, trimming the behavior of spliterator() to
reduce time/space consumption, or are we OK here given its likely
narrower usage profile?
More information about the lambda-libs-spec-experts
mailing list