Stream operations -- current set
Brian Goetz
brian.goetz at oracle.com
Mon Sep 17 08:29:29 PDT 2012
>> Intermediate / Lazy (Stateful)
>> ------------------------------
>>
>> Stream<T> uniqueElements();
>>
>> Stream<T> sorted(Comparator<? super T> comparator);
>>
>> Stream<T> cumulate(BinaryOperator<T> operator);
>>
>> Stream<T> sequential();
>
> (Capsule summary of many, um, discussions between Brian and me:
> I hate all of these. But not enough to act hatefully about them :-)
To fill this in some more:
The first three are inconvenient to implement in parallel because they
are stateful and require nonlocal processing, which puts significant
constraints on parallel implementations. The stateless intermediate
operations all have the nice property of being homomorphisms under
concatenation: f(a::b) = f(a) :: f(b); the stateful ones do not. The
statelessness property is a tremendously useful one for parallelization
/ GPUization / etc.
The argument in favor of these ops is that they're useful, and because
we don't commit to an execution strategy until we see the end of the
pipeline anyway, we can determine if the pipeline is entirely of the
"nicer" (stateless) kind anyway and optimize accordingly in cases where
the stateful ops are not used.
The last one (sequential) deserves discussion on its own. The idea is
that streams begin life as either sequential or parallel streams, but
there are use cases where we want to use parallelism for the "head" part
of a pipeline but then constrain the "tail" part to act sequentially in
encounter order.
For example, imagine an expensive filter function f. There's a lot of
parallelism to be gained by applying the filter function in parallel,
but we may not be willing in all cases to lose the encounter order of
the elements at the source. Some examples (using the current API):
// Pure sequential, preserves encounter order
c.filter(f).forEach(g)
// Parallel; no constraint on order of arrival at g
c.parallel().filter(f).forEach(g)
// Parallel filtering; results arrive at g in encounter order
c.parallel().filter(f).sequential().forEach(g)
All of the above are useful. The question is, how do we specify the
latter constraint? Currently, we implement this as a stateful
intermediate/lazy operation, which is a no-op in sequential pipelines.
The parallel implementation does the usual decomposition, computes a
result at each leaf, and then builds a conc-tree for the results to
minimize copying.
There are other ways to model this as well; for example, as a fluently
propagated constraint. This is harder to implement but might play
better with, say, parallel operations on infinite streams (which make
sense if the terminal operation is something like findFirst.)
This question is also related to the question of "how does a client
programatically get access to the stream contents." Given that under
the Iteration2 model a Stream is more like an Iterator, it should be
possible to ask for the results sequentially and lazily. There is a
findFirst operation now; we haven't defined what happens when findFirst
is called repeatedly. It is also easy to provide an Iterator-bearing
method that has the desired effect in the sequential case. But, what
should happen here in the parallel case? Should there be a
Spliterator-bearing method? Should we have a concept of "restartable"
parallel computation where we can ask for more results and this may spur
incremental calculation?
>> <U> Map<U, Collection<T>> groupBy(Mapper<? super T, ? extends U>
>> classifier);
>>
>> <U, W> Map<U, W> reduceBy(Mapper<? super T, ? extends U> classifier,
>> Factory<W> baseFactory,
>> Combiner<W, T, W> reducer);
>>
>> The most controversial signature here is groupBy, because it is the
>> only place
>> in the Streams API that is tied to Collections.
>
> So why is this in Streams rather than in Maps?
Because while groupBy *produces* a Map, it operates on scalar/linear
streams of values. The map structure is induced by the classifier
function.
>> Don has suggested a multi-valued version of groupBy:
>>
>> <U> Map<U, Collection<T>> groupByMulti(FlatMapper<? super T, ?
>> extends U>
>> classifier);
>>
>> which is easy to implement and makes sense to me.
>
> The main argument against this is that at least in parallel designs,
> it is vastly better to reduce the nested value collection while
> it is being generated.
This is totally true (and hence the comments about the reduceBy method
below), but there are times when you are not going to reduce at all.
(If these cases didn't exist, we wouldn't need groupBy at all; it is
slightly regrettable that we have it because people will use it wrong
instead of reduceBy.) But if we are going to have groupBy, Don has made
a strong case for groupByMulti (which is "no harder" than groupBy to
implement, and addresses some use cases that are hard to do with groupBy.)
> There are surely cases where circumstances
> don't let you do this, but it's a little uncomfortable to
> support a method that you hope that people only rarely use.
Yeah, that's the regrettable part. Perhaps the answer here is to give
reducyBy a better name and groupBy a worse name.
More information about the lambda-libs-spec-observers
mailing list