Multiple traversals
Brian Goetz
brian.goetz at oracle.com
Fri Dec 20 08:37:38 PST 2013
This is a common question (another form of it was asked just yesterday,
albeit far less politely). The thing to realize is: streams don't store
data; they are abstractions for pipelines of computation on sequences of
data, whose source might be "repeatable" (like an immutable collection),
or might be read off of /dev/urandom.
The obvious comparison is "tee" on Unix. But tee entails buffering and
flow control; we'd rather avoid imposing these issues on Streams users
if we can. Further, any intuition derived from "tee" is likely assume a
sequential processing model, since Unix pipes don't operate in parallel.
The Stream API has been designed to minimize the semantic gap between
sequential and parallel traversals, and each sequential-centric
operation we add makes it harder for all users.
If you have a repeatable source (collection or deterministic generator),
you're done. You can capture that as follows:
Supplier<IntStream> supplier = () -> IntStream.range(0, 1000);
and pull new streams as needed:
supplier.get().filter(...).map(...)...
If you don't have a repeatable source, or the computational steps you
apply to a repeatable source are expensive enough that you don't want to
repeat them, you need to buffer. And ideally you want to buffer in such
a way that whatever intermediate representation you pick is structured
in such a way that it is no less splittable than the original source,
otherwise you can still lose compared to just re-traversing the original
source in parallel.
There's lots of ways to buffer, but this is outside of what Stream does
-- by design. It would have introduced a lot of complexity on the
common case to support the uncommon case.
On 12/20/2013 12:15 AM, Howard Lovatt wrote:
> Hi All,
>
> In trying out Streams I keep coming up against an issue where I want two
> outputs from a single Stream. I will use the example of finding both min
> and max values from a double stream because min and max exist for a stream.
> The min and max example implies finite streams, but sometimes you might
> want multiple things from an infinite stream.
>
> Various options come to mind:
>
> 1. Put everything in an array or List and then use two streams:
>
> double[] values = someStream...process...toArray();
> double max =
> DoubleStream.of(values).max().orElse(Double.NEGATIVE_INFINITY);
> double min =
> DoubleStream.of(values).min().orElse(Double.POSITIVE_INFINITY);
>
> Not really very stream like and only works for finite streams.
>
> 2. Peek at the first traversal and create a 2nd stream:
>
> DoubleStream.Builder copy = DoubleStream.builder();
> double max = someStream...process...peek(d -> copy.add(d))
> .max().orElse(Double.NEGATIVE_INFINITY);
> double min = copy.build().min().orElse(Double.POSITIVE_INFINITY);
>
> It has got a side effect which is frowned upon and only works for
> sequential finite streams.
>
> 3. Write a custom collector (for the min/max example a collector already
> exists - but it is useful to see what you need to write for when a
> collector in't provided):
>
> private static class MinMax {
> double min = Double.POSITIVE_INFINITY;
> double max = Double.NEGATIVE_INFINITY;
> }
>
> private static Collector<Double, MinMax, MinMax> minMax = new
> Collector<Double, MinMax, MinMax>() {
>
> @Override
> public Supplier<MinMax> supplier() {
> return () -> new MinMax();
> }
>
> @Override
> public BiConsumer<MinMax, Double> accumulator() {
> return (accumulator, value) -> {
> if (value < accumulator.min) {
> accumulator.min = value;
> }
> else if (value > accumulator.max) {
> accumulator.max = value;
> }
> };
> }
>
> @Override
> public BinaryOperator<MinMax> combiner() {
> return (first, second) -> {
> if (first.min < second.min) {
> second.min = first.min;
> }
> if (first.max > second.max) {
> second.max = first.max;
> }
> return second;
> };
> }
>
> @Override
> public Function<MinMax, MinMax> finisher() {
> return (result) -> result;
> }
>
> @Override
> public Set<Collector.Characteristics> characteristics() {
> return EnumSet.allOf(Collector.Characteristics.class);
> }
>
> };
>
> double[] values = someStream...process...boxed().collect(minMax);
>
> Which is considerably more work and it boxes primitives, but it is the
> solution I err towards.
>
> Comments/suggestions? Is there a better approach?
>
> -- Howard.
>
More information about the lambda-dev
mailing list