Combining streams
Michael Hixson
michael.hixson at gmail.com
Mon Jul 29 13:48:41 PDT 2013
Hi Paul,
Thanks, that all makes sense. Incidentally, after staring at the
Stream.concat implementation for a while, I believe I've spotted a
minor bug.
http://hg.openjdk.java.net/lambda/lambda/jdk/file/943bf0f62463/src/share/classes/java/util/stream/Streams.java
687 unsized = aSpliterator.hasCharacteristics(SIZED)
688 && aSpliterator.hasCharacteristics(SIZED)
689 && aSpliterator.estimateSize() +
bSpliterator.estimateSize() < 0;
Should line 688 have "bSpliterator" instead of "aSpliterator"?
Feel free to ignore the rest of this email. I don't ask any more
questions, I just explain where I ended up in case anyone is curious.
I tried out a parallel reduce+concat and it threw an exception. I
guess I was misusing the identity parameter of reduce. Instead of
this:
Stream.of(s1, s2, s3, ...)
.parallel()
.reduce(Stream.empty(), Stream::concat)
I need to do this:
Stream.of(s1, s2, s3, ...)
.parallel()
.reduce(Stream::concat)
.orElseGet(Stream::empty)
I tried writing my own spliterator for concat and it only did slightly
better than the others. That is,
A. custom spliterator-based varargs concat
B. balanced tree varargs concat (from my previous email)
C. parallel reduce + concat
A > B > C, by just a little bit, for every variation of top-level
stream size, substream size, and parallel-or-sequential that I tried.
In terms of lines of additional library code (minus documentation),
A. ~300 lines
B. ~80 lines
C. 0 lines
So I think I will stick with C. :)
-Michael
On Mon, Jul 29, 2013 at 3:23 AM, Paul Sandoz <paul.sandoz at oracle.com> wrote:
> The splitting characteristics of Stream.of will produce a balanced tree, since it is backed by an array. Certainly "Stream.concat(Stream.concat(Stream.concat(Stream.empty(), s1), s2), s3)" produces a right-heavy tree, i just showed that for the equivalent sequential evaluation, but when going parallel the shape is different, and will result in more wrapping, such as:
>
> Stream.concat(Stream.concat((Stream.concat(Stream.empty(), s1)), Stream.concat((Stream.concat(Stream.empty(), s2)))
>
> For flatMap each operation is performed sequentially, regardless of whether the stream is parallel or not, which means the parallelism is limited to the stream of streams and is not applied to the streams themselves.
>
> This means if you have say 32 cores and are doing Stream.of(s1, s2, s3, s4).flatMap(x -> x).reduce(...) then at most 4 cores will be utilized. Where as if doing Stream.of(s1, s2, s3, s4).reduce(Stream.empty(), Stream::concat).reduce(...) then potentially all cores can be utilized since the concatenated stream can be split for the stream of streams and the streams themselves, but the wrapping will induce a higher cost, which may or may not be a small cost, it all depends on the size of the data and/or the cost per-element.
>
>
> Your concat2 version should should work, but i think it better to invest in a Spliterator that can concatenate N streams.
>
> The key here is to think of an array of pairs (first pair is at index 0 and 1 etc..), split that array of pairs in a balanced manner, then for each pair split according to the existing concat implementation. I may hack something together quickly this week unless you get to it before me :-)
>
> Paul.
>
> ----- Original Message -----
> From: michael.hixson at gmail.com
> To: paul.sandoz at oracle.com
> Cc: lambda-dev at openjdk.java.net
> Sent: Friday, 26 July, 2013 11:15:46 PM GMT +00:00 GMT Britain, Ireland, Portugal
> Subject: Re: Combining streams
>
> On Fri, Jul 26, 2013 at 8:29 AM, Paul Sandoz <paul.sandoz at oracle.com> wrote:
>> Hi Michael,
>>
>> These are good questions, what we are currently lacking is a description of the Stream API to help build up a mental model, however i am not sure JavaDoc is the best place for that.
>>
>>
>> On Jul 26, 2013, at 3:52 AM, Michael Hixson <michael.hixson at gmail.com> wrote:
>>
>>> I have a couple of questions about combining streams. Setting aside
>>> the question of which code is easier to read, I am wondering about
>>> things like performance or other hidden gotchas.
>>>
>>> 1. To combine two streams, I should do this:
>>>
>>> Stream.concat(s1, s2)
>>>
>>> not this:
>>>
>>> Stream.of(s1, s2).flatMap(x -> x)
>>>
>>> ... right?
>>>
>>
>> The former will preserve some properties of both streams if it can, specifically if both streams are sized the output is sized and certain optimizations can kick in, where as flatMap will clear any size properties and thus such optimizations are no longer applicable.
>>
>> The flatMap operation is also less efficient since it requires more work per element.
>>
>>
>>> 2. To combine more than two streams, I should do this:
>>>
>>> Stream.of(s1, s2, s3, ...).flatMap(x -> x)
>>>
>>> not this:
>>>
>>> Stream.of(s1, s2, s3, ...).reduce(Stream.empty(), Stream::concat)
>>>
>>> ... right?
>>>
>>
>> This one might be somewhat trickier to evaluate in terms of performance. I find the former more elegant, but as Brian points out it will not work for infinite streams, since the implementation does s.forEach(...)
>>
>> Sequentially the latter is equivalent to:
>>
>> Stream.concat(Stream.concat(Stream.concat(Stream.empty(), s1), s2), s3)
>>
>> So there can be lots of wrapping going on. For parallel evaluation the wrapping is likely to be more prevalent, since it will mirror that of the computation tree.
>
> Ok. I thought flatMap was the way to go on #2 because it seemed
> more... "direct". It also appears to be faster, but if it has worse
> semantics then it's not worth it.
>
> I have a vague, maybe baseless notion that the reduce+concat code
> produces a stream that looks like a perfectly unbalanced tree. Like
> this:
>
> / \
> s1 / \
> s2 / \
> s3 / \
> ...
>
> Also in my head, I imagine the flatMap code producing a flat tree like this:
>
> / | | |
> s1 s2 s3 ...
>
> And I imagine that second shape is better than the first, for some
> reason. Are my suspicions valid? If I were to write my own varargs
> concat method, would it be worth trying to make a balanced tree?
>
> This seems to perform better than reduce+concat, and not quite as well
> as flatMap.
>
> public static <T> Stream<T> concat(Stream<T>... streams) {
> Objects.requireNonNull(streams);
> return concat2(streams, 0, streams.length);
> }
>
> private static <T> Stream<T> concat2(Stream<T>[] streams,
> int fromIndex,
> int toIndex) {
> int width = toIndex - fromIndex;
> switch (width) {
> case 0:
> return Stream.empty();
> case 1:
> return streams[fromIndex];
> case 2:
> return Stream.concat(streams[fromIndex],
> streams[fromIndex + 1]);
> default:
> int split = fromIndex + width / 2;
> if (width % 2 == 1) split++;
> return Stream.concat(concat2(streams, fromIndex, split),
> concat2(streams, split, toIndex));
> }
> }
>
> (Probably writing my own spliterator like Streams.ConcatSpliterator
> would be a better solution, but that looks really complicated.)
>
> -Michael
>
>>
>> IIRC the EG may have touched upon a var-arg concat, but i suspect it fell through the gaps.
>>
>> Paul.
>>
>>
>
More information about the lambda-dev
mailing list