Stream operations -- current set

Brian Goetz brian.goetz at oracle.com
Mon Sep 17 08:29:29 PDT 2012


>> Intermediate / Lazy (Stateful)
>> ------------------------------
>>
>>      Stream<T> uniqueElements();
>>
>>      Stream<T> sorted(Comparator<? super T> comparator);
>>
>>      Stream<T> cumulate(BinaryOperator<T> operator);
>>
>>      Stream<T> sequential();
>
> (Capsule summary of many, um, discussions between Brian and me:
> I hate all of these. But not enough to act hatefully about them :-)

To fill this in some more:

The first three are inconvenient to implement in parallel because they 
are stateful and require nonlocal processing, which puts significant 
constraints on parallel implementations.  The stateless intermediate 
operations all have the nice property of being homomorphisms under 
concatenation: f(a::b) = f(a) :: f(b); the stateful ones do not.  The 
statelessness property is a tremendously useful one for parallelization 
/ GPUization / etc.

The argument in favor of these ops is that they're useful, and because 
we don't commit to an execution strategy until we see the end of the 
pipeline anyway, we can determine if the pipeline is entirely of the 
"nicer" (stateless) kind anyway and optimize accordingly in cases where 
the stateful ops are not used.

The last one (sequential) deserves discussion on its own.  The idea is 
that streams begin life as either sequential or parallel streams, but 
there are use cases where we want to use parallelism for the "head" part 
of a pipeline but then constrain the "tail" part to act sequentially in 
encounter order.

For example, imagine an expensive filter function f.  There's a lot of 
parallelism to be gained by applying the filter function in parallel, 
but we may not be willing in all cases to lose the encounter order of 
the elements at the source.  Some examples (using the current API):

   // Pure sequential, preserves encounter order
   c.filter(f).forEach(g)

   // Parallel; no constraint on order of arrival at g
   c.parallel().filter(f).forEach(g)

   // Parallel filtering; results arrive at g in encounter order
   c.parallel().filter(f).sequential().forEach(g)

All of the above are useful.  The question is, how do we specify the 
latter constraint?  Currently, we implement this as a stateful 
intermediate/lazy operation, which is a no-op in sequential pipelines. 
The parallel implementation does the usual decomposition, computes a 
result at each leaf, and then builds a conc-tree for the results to 
minimize copying.

There are other ways to model this as well; for example, as a fluently 
propagated constraint.  This is harder to implement but might play 
better with, say, parallel operations on infinite streams (which make 
sense if the terminal operation is something like findFirst.)


This question is also related to the question of "how does a client 
programatically get access to the stream contents."  Given that under 
the Iteration2 model a Stream is more like an Iterator, it should be 
possible to ask for the results sequentially and lazily.  There is a 
findFirst operation now; we haven't defined what happens when findFirst 
is called repeatedly.  It is also easy to provide an Iterator-bearing 
method that has the desired effect in the sequential case.  But, what 
should happen here in the parallel case?  Should there be a 
Spliterator-bearing method?  Should we have a concept of "restartable" 
parallel computation where we can ask for more results and this may spur 
incremental calculation?

>>      <U> Map<U, Collection<T>> groupBy(Mapper<? super T, ? extends U>
>> classifier);
>>
>>      <U, W> Map<U, W> reduceBy(Mapper<? super T, ? extends U> classifier,
>>                                Factory<W> baseFactory,
>>                                Combiner<W, T, W> reducer);
>>
>> The most controversial signature here is groupBy, because it is the
>> only place
>> in the Streams API that is tied to Collections.
>
> So why is this in Streams rather than in Maps?

Because while groupBy *produces* a Map, it operates on scalar/linear 
streams of values.  The map structure is induced by the classifier 
function.

>> Don has suggested a multi-valued version of groupBy:
>>
>>      <U> Map<U, Collection<T>> groupByMulti(FlatMapper<? super T, ?
>> extends U>
>> classifier);
>>
>> which is easy to implement and makes sense to me.
>
> The main argument against this is that at least in parallel designs,
> it is vastly better to reduce the nested value collection while
> it is being generated.

This is totally true (and hence the comments about the reduceBy method 
below), but there are times when you are not going to reduce at all. 
(If these cases didn't exist, we wouldn't need groupBy at all; it is 
slightly regrettable that we have it because people will use it wrong 
instead of reduceBy.)  But if we are going to have groupBy, Don has made 
a strong case for groupByMulti (which is "no harder" than groupBy to 
implement, and addresses some use cases that are hard to do with groupBy.)

> There are surely cases where circumstances
> don't let you do this, but it's a little uncomfortable to
> support a method that you hope that people only rarely use.

Yeah, that's the regrettable part.  Perhaps the answer here is to give 
reducyBy a better name and groupBy a worse name.


More information about the lambda-libs-spec-observers mailing list