Grouping stream elements by their position - how to handle tail of stream ?
Boaz Nahum
boaznahum at gmail.com
Sat Feb 2 23:38:04 PST 2013
Hi Brian.
> The glass is half full, or half empty, depending on your disposition.
I know my English is quite poor. My question was a real - not a criticism.
Juts trying to learn.
I gave up paralism - that why I called sequential before explode
>> si.sequential().explode
> you have a little fixup at the end to deal with, but that's easy too,
just annoying
I didn't find a way to to it with explode (unless my explode is last in the
chain, then it easy). As some else mention in case of sequential stream a
'finalize' method will do the trick.
As you suggested, I will look at Collectors.
Thanks.
Boaz
On Sat, Feb 2, 2013 at 11:55 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
> The glass is half full, or half empty, depending on your disposition.
>
> While its trivial to write a version of explode() that remembers the last
> element seen, and either emits nothing or a pair (you have a little fixup
> at the end to deal with, but that's easy too, just annoying), once you
> start writing such stateful lambdas, you are tying yourself to sequential
> execution in a very error-prone way. The type system can't record this
> assumption, so when someone helpfully later adds a .parallel() somewhere,
> your code will silently turn to sewage. So, don't ever do this.
>
> Too see why the obvious algorithm is sequential only, consider a
> decomposition of a data source with a spliterator. In most cases, we don't
> necessarily know the even-ness or odd-ness of the sum of sizes of all prior
> splits. Which means we don't know whether the first element of any given
> split is the left element of some pair or the right element of some pair.
>
> You might say: I don't care about parallelism, I only care about
> sequential.
>
> To which we'd respond: fine, but we're not really interested in distorting
> the model to encourage that. The Stream API is designed around operations
> that can be equally well performed in serial or parallel. There are no
> "serial only" operations supported, and that was an explicit design choice.
> Passing state-holding lambdas to these methods is likely to be a spec
> violation. We can't enforce it, any more than we can enforce the lack of
> data races, but bear in mind that if you do this, you're writing broken
> code, and asking for trouble.
>
> Now, that said, here's a parallel algorithm you can implement with
> collect(), though you may find the performance overhead too offensive.
> Basically, for each chunk of the input t1..tn, you compute two possible
> answers:
>
> p0 = (t1,t2), t3,t4), ..., maybe leftover tn
> p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn
>
> then you combine these pairs chunks in the mostly obvious way (observe
> that p0 has trailing element = !(p1 has trailing element)). Then when you
> get to the top of the tree, you take the one that doesn't have an orphaned
> initial element, and toss the other. Basically you're doing 2x work for
> each input element, but it parallelizes fairly cleanly.
>
> Such an algorithm is ideally suited to collect(), because a Collector is a
> representation of a catamorphism, which the above transformation is.
> Because the combination logic is associative, it parallelizes cleanly
> without needing any nasty stateful lambdas.
>
> When we expose the Op APIs and you can write your own decomposition-aware
> operations, you'll have another option: write a StatefulOp. This won't be
> an option for 8, but once we do, its easy enough, and it will let you avoid
> the extra overhead by creating a custom operation.
>
> So, to sum up:
> - If you're looking for a one-liner, you'll be disappointed;
> - If you convince yourself "I would never need parallelism here" and
> write the obvious unsafe stateful lambda, please write your manager a
> letter first explaining how your incompetence is putting the reliability of
> his product in jeopardy;
> - If you're willing to write the above collector, you'll likely be happy
> enough with the result.
>
>
>
> On 2/2/2013 3:35 PM, Boaz Nahum wrote:
>
>> Hi.
>>
>> I looking in Stream interface for a way to convert a stream of
>> { t1, t2, .. Tn }
>> to
>> { {t1,t2}, ... {Tn-1, Tn} }
>> or
>> { {t1,t2}, ... {Tn, null}}
>>
>> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>>
>>
>>
>> So I tried to use explode:
>>
>> * Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>>
>>
>> Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
>> BiConsumer<Stream.Downstream<**Pair<Integer, Integer>>, Integer>() {
>>
>> Integer firstValue;
>>
>> @Override
>> public void accept(Stream.Downstream<Pair<**Integer,
>> Integer>>
>> pairDownstream, Integer v) {
>>
>> if (firstValue == null) {
>> firstValue = v;
>> } else {
>>
>> pairDownstream.send(new Pair<>(firstValue, v));
>> firstValue = null;
>>
>> }
>> }
>>
>> });
>> *
>>
>> But I didn't find a way to add the tail of input stream {.., 5} to the new
>> stream { ,,, {5, null}}.
>>
>> Of-course this only simplified example of more general problem I have.
>>
>> Thanks
>> Boaz
>>
>>
More information about the lambda-dev
mailing list