iterator()/spliterator() and stateful operations

Brian Goetz brian.goetz at oracle.com
Mon May 20 09:34:36 PDT 2013


"Well-behaved" pipelines such as

   IntStream.range(1,1000).filter(i -> i % 2 == 0)
                          .map(i -> i*2)

are fully lazy.  We do not start generating, filtering, or mapping 
elements until the terminal operation begins, and thereafter, we only 
generate, filter, and map as many elements as are needed to satisfy the 
elements demanded by whatever is consuming the pipeline.  These 
pipelines are well-behaved in both serial and parallel environments, 
largely because the well-behaved (stateless) operations satisfy the 
homomorphism

   f(a||b) = f(a) || f(b)

Pipelines with stateful operations, such as sorted(), distinct(), or 
limit(), lack this nice property.  You cannot consume any elements from 
a sorted pipeline until all the elements upstream of the sorting 
operation have been seen.  The same is true to varying lesser degrees 
with distinct() and limit().  (For example, if the pipeline is 
*unordered*, distinct() can proceed lazily, backed by a 
ConcurrentHashSet, but not if it is ordered.)

Recall that the primary purpose of these methods is that they are 
"escape hatches" so that users can perform lazy traversals that are not 
directly supported by the library.  This escape hatch mechanism is also 
key to being able to add default methods to Stream interfaces later who 
might not be directly implementable in terms of other stream methods. 
For example, we might implement in IntStream:

   default IntStream everyElementTwice() {
     IntStream asIntPipeline
         = new IntPipeline(() -> spliterator(),
                           getSourceAndOpFlags(), isParallel());
     return new IntStream.StatelessOp(asIntPipeline, ...);
   }

In other words, for IntStream implementations not already backed by the 
standard implementation (IntPipeline) which will co-evolve with the 
Stream interface, we get the spliterator() and other metadata and use 
that to create a new IntPipeline lazily backed by the existing 
non-IntPipeline stream, and then chain a new lazy operation off of the 
new pipeline.

With this as background, we need to make some decisions about the 
iterator() and spliterator() stream methods.

Here's how spliterator() currently works now (iterator() is based on 
spliterator()).

1.  If the stream is composed only of source, just return the source 
spliterator.

2.  If the stream is composed only of source and stateless operations, 
create a new spliterator that wraps the source spliterator and the 
operations.  For example, intRange(...).map(...) will return a 
spliterator that delegates splitting and traversal to the intRange 
spliterator, but applies the mapping function to elements as they pass 
through.  Spliterators generated in this manner are fully lazy.

3.  If the stream has stateful operations, slice the pipeline into 
"chunks" at stateful operation boundaries.  For example:

   intRange(...).filter(...)
                .sorted(...)  // end of chunk 1
                .map(...)
                .distinct()   // end of chunk 2
                .flatMap(...)
                .filter(...)  // end of chunk 3
                .spliterator()

For each chunk, we describe it with a spliterator as in (2), where the 
output spliterator of chunk k is the source of chunk k-1.  If (2) can 
produce a fully lazy spliterator (such as distinct() on an unordered 
stream, or limit() on a subsized stream), great; if not the spliterator 
will, at first traversal, evaluate the upstream pipeline and describe 
the result with a spliterator.  We attempt to preserve as much laziness 
as possible, but for cases like:

   stream.sorted().spliterator()

we're going to have to traversal the whole input when the caller finally 
gets around to asking for some elements.  If the source is infinite, 
that's going to be a problem (but sorting an infinite stream is always a 
problem.)

Here are some cases that currently loop infinitely:

    random.doubles().parallel().distinct().iterator().next();
    random.doubles().parallel().distinct().spliterator().tryAdvance(c);

The reason is that the spliterator wants to evaluate 
doubles().distinct() before it knows how to split it (splitting cannot 
block.)

There are some things we can do here.

  - We can make Random.doubles() and similar generated streams be 
UNORDERED.  The parallel implementations for distinct() and limit() have 
optimized paths for unordered streams that allow them to proceed lazily. 
  This will fix both of these two particular problems for a lot of 
sources, but we'd still have the same failure for:

    Stream.iterate(0, f).parallel().distinct().iterator().next();

since this stream has to be ordered.

  - We can force the stream on which iterator() is invoked to be 
sequential.  This seems fairly natural, as calling iterator() is a 
committment to sequential iteration of the results.  This fixes the 
iterator variant of this problem, but not the spliterator problem.

These first two seem pretty natural and are likely uncontroversial.  But 
do we want to go farther to address to address the spliterator variant like:

    Stream.iterate(0, f).parallel()
          .distinct().spliterator().tryAdvance(c);

A similar issue happens with limit() applied to large parallel ordered 
non-subsized streams with large limits (as you can see, we've done quite 
a bit of work already to narrow the damage.)  Because limit is tied 
intrinsically to encounter order, parallel implementations sometimes 
need to do some sort of speculative evaluation and buffering.  This can 
still be a win for high-Q problems like:

   bigNumbers.parallel().filter(n -> isPrime(n)).limit(1000)

because the expensive operation -- filtering -- can still be 
parallelized.  But for low-Q problems with large bounds, the buffering 
required can cause parallel pipelines with limit in them to blow up with 
OOME.

An analogue of above, but with poor space bounds instead of time bounds:

    Stream.iterate(0, f).parallel()
          .filter(...).limit(10M).spliterator().tryAdvance(c);

This will likely OOME.

At some point, the user asked for this, as in:

   infinite.parallel().sorted()...


So, questions:
  - Should we go ahead with the proposed changes above for generators 
(make them unsized) and iterator (make it sequential)?
  - Should we go further, trimming the behavior of spliterator() to 
reduce time/space consumption, or are we OK here given its likely 
narrower usage profile?


More information about the lambda-libs-spec-experts mailing list