Multiple traversals

Brian Goetz brian.goetz at oracle.com
Fri Dec 20 08:37:38 PST 2013


This is a common question (another form of it was asked just yesterday, 
albeit far less politely).  The thing to realize is: streams don't store 
data; they are abstractions for pipelines of computation on sequences of 
data, whose source might be "repeatable" (like an immutable collection), 
or might be read off of /dev/urandom.

The obvious comparison is "tee" on Unix.  But tee entails buffering and 
flow control; we'd rather avoid imposing these issues on Streams users 
if we can.  Further, any intuition derived from "tee" is likely assume a 
sequential processing model, since Unix pipes don't operate in parallel. 
  The Stream API has been designed to minimize the semantic gap between 
sequential and parallel traversals, and each sequential-centric 
operation we add makes it harder for all users.

If you have a repeatable source (collection or deterministic generator), 
you're done.  You can capture that as follows:

   Supplier<IntStream> supplier = () -> IntStream.range(0, 1000);

and pull new streams as needed:

   supplier.get().filter(...).map(...)...

If you don't have a repeatable source, or the computational steps you 
apply to a repeatable source are expensive enough that you don't want to 
repeat them, you need to buffer.  And ideally you want to buffer in such 
a way that whatever intermediate representation you pick is structured 
in such a way that it is no less splittable than the original source, 
otherwise you can still lose compared to just re-traversing the original 
source in parallel.

There's lots of ways to buffer, but this is outside of what Stream does 
-- by design.  It would have introduced a lot of complexity on the 
common case to support the uncommon case.

On 12/20/2013 12:15 AM, Howard Lovatt wrote:
> Hi All,
>
> In trying out Streams I keep coming up against an issue where I want two
> outputs from a single Stream. I will use the example of finding both min
> and max values from a double stream because min and max exist for a stream.
> The min and max example implies finite streams, but sometimes you might
> want multiple things from an infinite stream.
>
> Various options come to mind:
>
> 1. Put everything in an array or List and then use two streams:
>
>     double[] values = someStream...process...toArray();
>     double max =
> DoubleStream.of(values).max().orElse(Double.NEGATIVE_INFINITY);
>     double min =
> DoubleStream.of(values).min().orElse(Double.POSITIVE_INFINITY);
>
>     Not really very stream like and only works for finite streams.
>
> 2. Peek at the first traversal and create a 2nd stream:
>
>     DoubleStream.Builder copy = DoubleStream.builder();
>     double max = someStream...process...peek(d -> copy.add(d))
>         .max().orElse(Double.NEGATIVE_INFINITY);
>     double min = copy.build().min().orElse(Double.POSITIVE_INFINITY);
>
>     It has got a side effect which is frowned upon and only works for
> sequential finite streams.
>
> 3. Write a custom collector (for the min/max example a collector already
> exists - but it is useful to see what you need to write for when a
> collector in't provided):
>
>     private static class MinMax {
>       double min = Double.POSITIVE_INFINITY;
>       double max = Double.NEGATIVE_INFINITY;
>     }
>
>    private static Collector<Double, MinMax, MinMax> minMax = new
> Collector<Double, MinMax, MinMax>() {
>
>      @Override
>      public Supplier<MinMax> supplier() {
>        return () -> new MinMax();
>      }
>
>      @Override
>      public BiConsumer<MinMax, Double> accumulator() {
>        return (accumulator, value) -> {
>          if (value < accumulator.min) {
>            accumulator.min = value;
>          }
>          else if (value > accumulator.max) {
>            accumulator.max = value;
>          }
>        };
>      }
>
>      @Override
>      public BinaryOperator<MinMax> combiner() {
>        return (first, second) -> {
>          if (first.min < second.min) {
>            second.min = first.min;
>          }
>          if (first.max > second.max) {
>            second.max = first.max;
>          }
>          return second;
>        };
>      }
>
>      @Override
>      public Function<MinMax, MinMax> finisher() {
>        return (result) -> result;
>      }
>
>      @Override
>      public Set<Collector.Characteristics> characteristics() {
>        return EnumSet.allOf(Collector.Characteristics.class);
>      }
>
>    };
>
>      double[] values = someStream...process...boxed().collect(minMax);
>
>     Which is considerably more work and it boxes primitives, but it is the
> solution I err towards.
>
> Comments/suggestions? Is there a better approach?
>
>    -- Howard.
>


More information about the lambda-dev mailing list