Stream construction APIs

Brian Goetz brian.goetz at oracle.com
Sat Dec 8 08:39:06 PST 2012


We've done some overhauling on the stream-building APIs and I really 
like where they landed.  (Close followers of the repo will note the 
churn, but it has stopped.)

For clients, the primary way to get a stream is to ask some aggregate 
for one, as in:

   list.stream()
   String.chars()
   String.codePoints()
   Reader.lines()

etc.

For libraries that want to *build* streams (say, because they want to 
implement Streamable), there is a lower-level API.  This lives primarily 
in Streams.

The primary way to construct a stream (which the above implementations 
use) are from a spliterator and a set of stream flags.  There are four 
entry points, the cross product of serial/parallel and 
immediate/deferred spliterator:

     static<T> Stream<T> stream(Supplier<Spliterator<T>> supplier,
                                int flags)

     static<T> Stream<T> stream(Spliterator<T> spliterator, int flags) {
         return stream(() -> spliterator, flags);
     }

     static<T> Stream<T> parallel(Supplier<Spliterator<T>> supplier,
                                  int flags)

     static<T> Stream<T> parallel(Spliterator<T> spliterator, int flags) {
         return parallel(() -> spliterator, flags);
     }


The distinction between serial and parallel simply determines whether 
stream traversals will be single-thread or parallel.

The immediate vs deferred choice (Spliterator vs Supplier<Spliterator>) 
requires a little more explanation.  The pervasive non-interference 
assumption says that stream sources should not be modified while stream 
operations are in progress.  (More precisely, stream implementations are 
not required to respect such modifications; streams from concurrent data 
structures are free to provide concurrent-modification-friendly 
implementations.)

Given this, the question remains, when do the pipeline operations start, 
in time?  There are two logical candidates:
  - at the point at which the stream source is captured
  - at the point at which a terminal operation is initiated

For cases like c.stream().forEach(), it doesn't matter; these points 
collapse together.  But for cases like:

   Collection<String> c = ...
   Stream<String> asFilteredStream = c.stream().filter(...);
   // populate c
   asFilteredStream.forEach(...)

it makes a difference.  By having the constructors take 
Supplier<Spliterator> it lets us defer binding to the data until the 
result is needed.  While we've mostly come down on the side of more 
restrictive stream-building (i.e., no forking), this one seems different 
because we cannot detect it early, and it seems a common mistake waiting 
to happen.  It also turns out to be relatively cheap to support this case.

For example, here's the stream() implementation from ArrayList:

     return Streams.stream(
         () -> Arrays.spliterator((E[]) elementData, 0, size),
         StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED);

This makes sure we are operating on the real data, as opposed to stale 
data, at the time we start the stream operations.


The other piece of the stream construction is setting stream flags.  The 
defined ones are:
  DISTINCT -- elements are distinct according to .equals
  SORTED -- elements are sorted according to natural order
  ORDERED -- encounter order of elements is considered relevant
  SIZED -- stream size is known and finite

Some defaults are:

Collection: SIZED
List: ORDERED, SIZED
Set: DISTICT, SIZED
SortedSet: DISTINCT, ORDERED, SORTED, SIZED


That's basically it for "how do I make a Stream."  There are also some 
combinators for "make a Spliterator out of an Iterator", "make a 
spliterator for an array segment", etc.

The other stuff in Streams are things like stream generators, which 
maybe should be moved to a Generators class?



More information about the lambda-libs-spec-observers mailing list