RFR(s): 8132800: clarify stream package documentation regarding sequential vs parallel modes

Paul Sandoz paul.sandoz at oracle.com
Tue Aug 4 07:20:12 UTC 2015


On 4 Aug 2015, at 01:09, Stuart Marks <stuart.marks at oracle.com> wrote:

> Hi Tagir,
> 
> Interesting issues.
> 
> Regarding Stream.concat, it may be that, today, changes to the sequential/parallel execution mode aren't propagated to the streams being concatenated.

The execution mode is propagated if either stream to concat is parallel i.e. isParallel()

The issue here is that Stream.spltierator() is a form of terminal operation that will result in pipeline evaluation if there are non-lazy stateful operations present.

What we don’t currently do is propagate the parallelism back to a sequential stream when the other is a parallel stream. We could easily do that as a bug fix. I agree with Stuart it does not require any specification e.g.:

public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
    Objects.requireNonNull(a);
    Objects.requireNonNull(b);

    boolean isPar = a.isParallel() || b.isParallel();
    @SuppressWarnings("unchecked")
    Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
            (Spliterator<T>) (isPar ? a.parallel() : a).spliterator(),
            (Spliterator<T>) (isPar ? b.parallel() : b).spliterator());
    Stream<T> stream = StreamSupport.stream(split, isPar);
    return stream.onClose(Streams.composedClose(a, b));
}


> But is that something inherent to the specification of concatenation, or is it something that might change in the future? It's currently unspecified, so adding a clarification really sounds more like changing the specification to reflect the current implementation, which I'd prefer not to do.
> 
> Regarding the default implementations of takeWhile/dropWhile, again, today, they don't propagate the execution mode upstream. But is this just a bug? Granted the API for doing so isn't obvious, but isn't this something that could just be fixed?
> 

The default implementations are specified to propagate the execution mode in terms of correctly reporting isParallel() but may choose not to split:

* the wrapped spliterator.  The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting.  When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.

So this is a little different from the Stream.concat case.

The only way a default implementation can be implemented is to derive functionality from the upstream spliterator(). Rather than choosing to add yet more code to support this (and most likely poorly too) i opted for not splitting and reuse some existing code (the unordered spliterators that are configured not to split). Nor did i want to wrap the non-splitting spliterator around one which copies a prefix, which introduces a different set of poorly splitting characteristics (and would also penalise sequential operation). This is a tradeoff, and seems to me a reasonable compromise for a default.

Paul.




More information about the core-libs-dev mailing list