Constructing parallel streams

Brian Goetz brian.goetz at oracle.com
Sat Dec 8 08:58:44 PST 2012


Following up on the previous note, now that the stream-building APIs 
have settled into something fairly simple and regular, I'm not 
completely happy with the arrangement of the stream() / parallel() buns.

For collections, stream() and parallel() seem fine; the user already has 
a collection in hand, and can ask for a sequential or parallel stream. 
(Separately: I'm starting to prefer stream() / parallelStream() as the 
bun names here.)

But, there are other ways to get a stream:

   String.chars()
   Reader.lines()
   regex.matches(source)
   etc

It seems pretty natural for these things to return Streams.  But, in 
accordance with our "no implicit parallelism" dictum, these streams are 
serial.  But many of these streams can be operated on in parallel -- so 
the question is, how would we get a parallel stream out of these?

One obvious choice is to have two operations for each of these:

   String.chars()
   String.charsAsParallelStream()

That's pretty ugly, and unlikely to be consistently implemented.


Now that the Streams construction API and internals have shaken out, 
another option has emerged.  A Spliterator can be traversed sequentially 
or in parallel.  Many sequential streams are constructed out of 
spliterators that already know how to split (e.g., Arrays.spliterator), 
and, we know how to expose some parallelism from otherwise sequential 
data sources anyway (see implementation of Iterators.spliterator).  Just 
because iteration is sequential does not mean there is no exploitable 
parallelism.


So, here's what I propose.  Currently, we have a .sequential() 
operation, which is a no-op on sequential streams and on parallel 
streams acts as a barrier so that upstream computation can occur in 
parallel but downstream computation can occur serially, in encounter 
order (if defined), within-thread.  We've also got a spliterator() 
"escape hatch".

We can add to these a .parallel() operations, which on parallel streams 
is a no-op.  The implementation is very simple and efficient (if applied 
early on in the pipeline.)

Here's the default implementation (which is probably good enough for all 
cases):

   Stream<T> parallel() {
     if (isParallel())
       return this;
     else
       return Streams.parallel(spliterator(), getStreamFlags());
   }

What makes this efficient is that if you apply this operation at the 
very top of the pipeline, it just grabs the underlying spliterator, 
wraps it in a new stream with the parallel flag set, and keeps going. 
(If applied farther down the pipeline, spliterator() returns a 
spliterator wrapped with the intervening operations.)


Bringing this back to our API, this enables us to have a .parallel() 
operation on Stream, so users can say:

   string.chars().parallel()...

if they want to operate on the characters in parallel.

The default implementation of parallel / parallelStream in Streamable 
could then be:

   default Stream<T> parallel() {
       return stream().parallel();
   }

But I think it is still worth keeping the parallel / parallelStream bun 
for collections since this is such an important use case (and is still 
slightly more efficient; a few fewer object creations.)



More information about the lambda-libs-spec-observers mailing list