iterator()/spliterator() and stateful operations
Howard Lovatt
howard.lovatt at gmail.com
Mon May 20 15:54:57 PDT 2013
If you make sorted, limit, etc maps, e.g sorted is map(Mappers::sort), and a mapper carries with it methods:
minimumSplitSize();
maximumSize();
Then you can do the ideal splitting, including different splits for different parts of the pipeline.
-- Howard.
Sent from my iPad
On 21/05/2013, at 2:34 AM, Brian Goetz <brian.goetz at oracle.com> wrote:
> "Well-behaved" pipelines such as
>
> IntStream.range(1,1000).filter(i -> i % 2 == 0)
> .map(i -> i*2)
>
> are fully lazy. We do not start generating, filtering, or mapping elements until the terminal operation begins, and thereafter, we only generate, filter, and map as many elements as are needed to satisfy the elements demanded by whatever is consuming the pipeline. These pipelines are well-behaved in both serial and parallel environments, largely because the well-behaved (stateless) operations satisfy the homomorphism
>
> f(a||b) = f(a) || f(b)
>
> Pipelines with stateful operations, such as sorted(), distinct(), or limit(), lack this nice property. You cannot consume any elements from a sorted pipeline until all the elements upstream of the sorting operation have been seen. The same is true to varying lesser degrees with distinct() and limit(). (For example, if the pipeline is *unordered*, distinct() can proceed lazily, backed by a ConcurrentHashSet, but not if it is ordered.)
>
> Recall that the primary purpose of these methods is that they are "escape hatches" so that users can perform lazy traversals that are not directly supported by the library. This escape hatch mechanism is also key to being able to add default methods to Stream interfaces later who might not be directly implementable in terms of other stream methods. For example, we might implement in IntStream:
>
> default IntStream everyElementTwice() {
> IntStream asIntPipeline
> = new IntPipeline(() -> spliterator(),
> getSourceAndOpFlags(), isParallel());
> return new IntStream.StatelessOp(asIntPipeline, ...);
> }
>
> In other words, for IntStream implementations not already backed by the standard implementation (IntPipeline) which will co-evolve with the Stream interface, we get the spliterator() and other metadata and use that to create a new IntPipeline lazily backed by the existing non-IntPipeline stream, and then chain a new lazy operation off of the new pipeline.
>
> With this as background, we need to make some decisions about the iterator() and spliterator() stream methods.
>
> Here's how spliterator() currently works now (iterator() is based on spliterator()).
>
> 1. If the stream is composed only of source, just return the source spliterator.
>
> 2. If the stream is composed only of source and stateless operations, create a new spliterator that wraps the source spliterator and the operations. For example, intRange(...).map(...) will return a spliterator that delegates splitting and traversal to the intRange spliterator, but applies the mapping function to elements as they pass through. Spliterators generated in this manner are fully lazy.
>
> 3. If the stream has stateful operations, slice the pipeline into "chunks" at stateful operation boundaries. For example:
>
> intRange(...).filter(...)
> .sorted(...) // end of chunk 1
> .map(...)
> .distinct() // end of chunk 2
> .flatMap(...)
> .filter(...) // end of chunk 3
> .spliterator()
>
> For each chunk, we describe it with a spliterator as in (2), where the output spliterator of chunk k is the source of chunk k-1. If (2) can produce a fully lazy spliterator (such as distinct() on an unordered stream, or limit() on a subsized stream), great; if not the spliterator will, at first traversal, evaluate the upstream pipeline and describe the result with a spliterator. We attempt to preserve as much laziness as possible, but for cases like:
>
> stream.sorted().spliterator()
>
> we're going to have to traversal the whole input when the caller finally gets around to asking for some elements. If the source is infinite, that's going to be a problem (but sorting an infinite stream is always a problem.)
>
> Here are some cases that currently loop infinitely:
>
> random.doubles().parallel().distinct().iterator().next();
> random.doubles().parallel().distinct().spliterator().tryAdvance(c);
>
> The reason is that the spliterator wants to evaluate doubles().distinct() before it knows how to split it (splitting cannot block.)
>
> There are some things we can do here.
>
> - We can make Random.doubles() and similar generated streams be UNORDERED. The parallel implementations for distinct() and limit() have optimized paths for unordered streams that allow them to proceed lazily. This will fix both of these two particular problems for a lot of sources, but we'd still have the same failure for:
>
> Stream.iterate(0, f).parallel().distinct().iterator().next();
>
> since this stream has to be ordered.
>
> - We can force the stream on which iterator() is invoked to be sequential. This seems fairly natural, as calling iterator() is a committment to sequential iteration of the results. This fixes the iterator variant of this problem, but not the spliterator problem.
>
> These first two seem pretty natural and are likely uncontroversial. But do we want to go farther to address to address the spliterator variant like:
>
> Stream.iterate(0, f).parallel()
> .distinct().spliterator().tryAdvance(c);
>
> A similar issue happens with limit() applied to large parallel ordered non-subsized streams with large limits (as you can see, we've done quite a bit of work already to narrow the damage.) Because limit is tied intrinsically to encounter order, parallel implementations sometimes need to do some sort of speculative evaluation and buffering. This can still be a win for high-Q problems like:
>
> bigNumbers.parallel().filter(n -> isPrime(n)).limit(1000)
>
> because the expensive operation -- filtering -- can still be parallelized. But for low-Q problems with large bounds, the buffering required can cause parallel pipelines with limit in them to blow up with OOME.
>
> An analogue of above, but with poor space bounds instead of time bounds:
>
> Stream.iterate(0, f).parallel()
> .filter(...).limit(10M).spliterator().tryAdvance(c);
>
> This will likely OOME.
>
> At some point, the user asked for this, as in:
>
> infinite.parallel().sorted()...
>
>
> So, questions:
> - Should we go ahead with the proposed changes above for generators (make them unsized) and iterator (make it sequential)?
> - Should we go further, trimming the behavior of spliterator() to reduce time/space consumption, or are we OK here given its likely narrower usage profile?
More information about the lambda-libs-spec-observers
mailing list