Combining streams
Paul Sandoz
paul.sandoz at oracle.com
Mon Jul 29 03:23:11 PDT 2013
The splitting characteristics of Stream.of will produce a balanced tree, since it is backed by an array. Certainly "Stream.concat(Stream.concat(Stream.concat(Stream.empty(), s1), s2), s3)" produces a right-heavy tree, i just showed that for the equivalent sequential evaluation, but when going parallel the shape is different, and will result in more wrapping, such as:
Stream.concat(Stream.concat((Stream.concat(Stream.empty(), s1)), Stream.concat((Stream.concat(Stream.empty(), s2)))
For flatMap each operation is performed sequentially, regardless of whether the stream is parallel or not, which means the parallelism is limited to the stream of streams and is not applied to the streams themselves.
This means if you have say 32 cores and are doing Stream.of(s1, s2, s3, s4).flatMap(x -> x).reduce(...) then at most 4 cores will be utilized. Where as if doing Stream.of(s1, s2, s3, s4).reduce(Stream.empty(), Stream::concat).reduce(...) then potentially all cores can be utilized since the concatenated stream can be split for the stream of streams and the streams themselves, but the wrapping will induce a higher cost, which may or may not be a small cost, it all depends on the size of the data and/or the cost per-element.
Your concat2 version should should work, but i think it better to invest in a Spliterator that can concatenate N streams.
The key here is to think of an array of pairs (first pair is at index 0 and 1 etc..), split that array of pairs in a balanced manner, then for each pair split according to the existing concat implementation. I may hack something together quickly this week unless you get to it before me :-)
Paul.
----- Original Message -----
From: michael.hixson at gmail.com
To: paul.sandoz at oracle.com
Cc: lambda-dev at openjdk.java.net
Sent: Friday, 26 July, 2013 11:15:46 PM GMT +00:00 GMT Britain, Ireland, Portugal
Subject: Re: Combining streams
On Fri, Jul 26, 2013 at 8:29 AM, Paul Sandoz <paul.sandoz at oracle.com> wrote:
> Hi Michael,
>
> These are good questions, what we are currently lacking is a description of the Stream API to help build up a mental model, however i am not sure JavaDoc is the best place for that.
>
>
> On Jul 26, 2013, at 3:52 AM, Michael Hixson <michael.hixson at gmail.com> wrote:
>
>> I have a couple of questions about combining streams. Setting aside
>> the question of which code is easier to read, I am wondering about
>> things like performance or other hidden gotchas.
>>
>> 1. To combine two streams, I should do this:
>>
>> Stream.concat(s1, s2)
>>
>> not this:
>>
>> Stream.of(s1, s2).flatMap(x -> x)
>>
>> ... right?
>>
>
> The former will preserve some properties of both streams if it can, specifically if both streams are sized the output is sized and certain optimizations can kick in, where as flatMap will clear any size properties and thus such optimizations are no longer applicable.
>
> The flatMap operation is also less efficient since it requires more work per element.
>
>
>> 2. To combine more than two streams, I should do this:
>>
>> Stream.of(s1, s2, s3, ...).flatMap(x -> x)
>>
>> not this:
>>
>> Stream.of(s1, s2, s3, ...).reduce(Stream.empty(), Stream::concat)
>>
>> ... right?
>>
>
> This one might be somewhat trickier to evaluate in terms of performance. I find the former more elegant, but as Brian points out it will not work for infinite streams, since the implementation does s.forEach(...)
>
> Sequentially the latter is equivalent to:
>
> Stream.concat(Stream.concat(Stream.concat(Stream.empty(), s1), s2), s3)
>
> So there can be lots of wrapping going on. For parallel evaluation the wrapping is likely to be more prevalent, since it will mirror that of the computation tree.
Ok. I thought flatMap was the way to go on #2 because it seemed
more... "direct". It also appears to be faster, but if it has worse
semantics then it's not worth it.
I have a vague, maybe baseless notion that the reduce+concat code
produces a stream that looks like a perfectly unbalanced tree. Like
this:
/ \
s1 / \
s2 / \
s3 / \
...
Also in my head, I imagine the flatMap code producing a flat tree like this:
/ | | |
s1 s2 s3 ...
And I imagine that second shape is better than the first, for some
reason. Are my suspicions valid? If I were to write my own varargs
concat method, would it be worth trying to make a balanced tree?
This seems to perform better than reduce+concat, and not quite as well
as flatMap.
public static <T> Stream<T> concat(Stream<T>... streams) {
Objects.requireNonNull(streams);
return concat2(streams, 0, streams.length);
}
private static <T> Stream<T> concat2(Stream<T>[] streams,
int fromIndex,
int toIndex) {
int width = toIndex - fromIndex;
switch (width) {
case 0:
return Stream.empty();
case 1:
return streams[fromIndex];
case 2:
return Stream.concat(streams[fromIndex],
streams[fromIndex + 1]);
default:
int split = fromIndex + width / 2;
if (width % 2 == 1) split++;
return Stream.concat(concat2(streams, fromIndex, split),
concat2(streams, split, toIndex));
}
}
(Probably writing my own spliterator like Streams.ConcatSpliterator
would be a better solution, but that looks really complicated.)
-Michael
>
> IIRC the EG may have touched upon a var-arg concat, but i suspect it fell through the gaps.
>
> Paul.
>
>
More information about the lambda-dev
mailing list