Multiple traversals
Howard Lovatt
howard.lovatt at gmail.com
Fri Dec 20 12:25:18 PST 2013
Thanks for the reply.
A BiParallelStream class styled after ExecutorService could be written, e.g.:
BiParallelStream<Double, Double, Double> minMax = someStream...process...toBiParallelStream();
minMax.submit1().min().orElse(Double.POSITIVE_INFINITY); // Process in background
minMax.submit2().max().orElse(Double.NEGATIVE_INFINITY); // Process in background
double min = minMax.get1(); // Wait until process 1 has finished
double max = minMax.get2(); // Wait until process 2 has finished
The buffering in BiParallelStream would be interesting! If allowing the original stream, someStream...process..., to be parallel and the two tributaries, min and max, to also be parallel proved to be too difficult buffering wise there would still be value in supporting BiParallelStream because the tributaries would be executed in parallel.
This concept of parallelism across streams, rather than along streams, would be an interesting addition to the stream API.
That would be considerable work, maybe Java 9 :)
-- Howard.
Sent from my iPad
> On 21 Dec 2013, at 3:37 am, Brian Goetz <brian.goetz at oracle.com> wrote:
>
> This is a common question (another form of it was asked just yesterday, albeit far less politely). The thing to realize is: streams don't store data; they are abstractions for pipelines of computation on sequences of data, whose source might be "repeatable" (like an immutable collection), or might be read off of /dev/urandom.
>
> The obvious comparison is "tee" on Unix. But tee entails buffering and flow control; we'd rather avoid imposing these issues on Streams users if we can. Further, any intuition derived from "tee" is likely assume a sequential processing model, since Unix pipes don't operate in parallel. The Stream API has been designed to minimize the semantic gap between sequential and parallel traversals, and each sequential-centric operation we add makes it harder for all users.
>
> If you have a repeatable source (collection or deterministic generator), you're done. You can capture that as follows:
>
> Supplier<IntStream> supplier = () -> IntStream.range(0, 1000);
>
> and pull new streams as needed:
>
> supplier.get().filter(...).map(...)...
>
> If you don't have a repeatable source, or the computational steps you apply to a repeatable source are expensive enough that you don't want to repeat them, you need to buffer. And ideally you want to buffer in such a way that whatever intermediate representation you pick is structured in such a way that it is no less splittable than the original source, otherwise you can still lose compared to just re-traversing the original source in parallel.
>
> There's lots of ways to buffer, but this is outside of what Stream does -- by design. It would have introduced a lot of complexity on the common case to support the uncommon case.
>
>> On 12/20/2013 12:15 AM, Howard Lovatt wrote:
>> Hi All,
>>
>> In trying out Streams I keep coming up against an issue where I want two
>> outputs from a single Stream. I will use the example of finding both min
>> and max values from a double stream because min and max exist for a stream.
>> The min and max example implies finite streams, but sometimes you might
>> want multiple things from an infinite stream.
>>
>> Various options come to mind:
>>
>> 1. Put everything in an array or List and then use two streams:
>>
>> double[] values = someStream...process...toArray();
>> double max =
>> DoubleStream.of(values).max().orElse(Double.NEGATIVE_INFINITY);
>> double min =
>> DoubleStream.of(values).min().orElse(Double.POSITIVE_INFINITY);
>>
>> Not really very stream like and only works for finite streams.
>>
>> 2. Peek at the first traversal and create a 2nd stream:
>>
>> DoubleStream.Builder copy = DoubleStream.builder();
>> double max = someStream...process...peek(d -> copy.add(d))
>> .max().orElse(Double.NEGATIVE_INFINITY);
>> double min = copy.build().min().orElse(Double.POSITIVE_INFINITY);
>>
>> It has got a side effect which is frowned upon and only works for
>> sequential finite streams.
>>
>> 3. Write a custom collector (for the min/max example a collector already
>> exists - but it is useful to see what you need to write for when a
>> collector in't provided):
>>
>> private static class MinMax {
>> double min = Double.POSITIVE_INFINITY;
>> double max = Double.NEGATIVE_INFINITY;
>> }
>>
>> private static Collector<Double, MinMax, MinMax> minMax = new
>> Collector<Double, MinMax, MinMax>() {
>>
>> @Override
>> public Supplier<MinMax> supplier() {
>> return () -> new MinMax();
>> }
>>
>> @Override
>> public BiConsumer<MinMax, Double> accumulator() {
>> return (accumulator, value) -> {
>> if (value < accumulator.min) {
>> accumulator.min = value;
>> }
>> else if (value > accumulator.max) {
>> accumulator.max = value;
>> }
>> };
>> }
>>
>> @Override
>> public BinaryOperator<MinMax> combiner() {
>> return (first, second) -> {
>> if (first.min < second.min) {
>> second.min = first.min;
>> }
>> if (first.max > second.max) {
>> second.max = first.max;
>> }
>> return second;
>> };
>> }
>>
>> @Override
>> public Function<MinMax, MinMax> finisher() {
>> return (result) -> result;
>> }
>>
>> @Override
>> public Set<Collector.Characteristics> characteristics() {
>> return EnumSet.allOf(Collector.Characteristics.class);
>> }
>>
>> };
>>
>> double[] values = someStream...process...boxed().collect(minMax);
>>
>> Which is considerably more work and it boxes primitives, but it is the
>> solution I err towards.
>>
>> Comments/suggestions? Is there a better approach?
>>
>> -- Howard.
>>
More information about the lambda-dev
mailing list