Grouping stream elements by their position - how to handle tail of stream ?

Brian Goetz brian.goetz at oracle.com
Sat Feb 2 20:04:01 PST 2013


Look at the implementations in Collectors for a hint.  Obviously we owe 
some significant documentation, which unfortunately lags the code by too 
much.  But a Collector is basically a mutable foldl/inject; you provide 
a way to create a new mutable container, a way to add a single element 
to it, and a way to combine two mutable containers.  The framework 
decomposes the source, creates a bunch of little containers at the 
leaves, and them goes up the tree, merging.

Dumping the elements into an ArrayList maps easily to Collector:

  collect(ArrayList::new, ArrayList::add, ArrayList::addAll)

This parallelizes, as we can collect each chunk into a list, and then 
merge the lists up the tree with addAll.

The "how do I get the last element" problem goes away, since your data 
structure can explicitly maintain the state of whether the chunk ends 
with a balanced pair or an extra.

But, you need to calculate BOTH p0 and p1 for each chunk (except maybe 
the left-spine chunks), because you don't know until the end which will 
have the right parity.

On 2/2/2013 7:49 PM, Lattie wrote:
>> Now, that said, here's a parallel algorithm you can implement with
>> collect(), though you may find the performance overhead too offensive.
>> Basically, for each chunk of the input t1..tn, you compute two possible
>> answers:
>>
>> p0 = (t1,t2), (t3,t4), ..., maybe leftover tn
>
> This is what I want... the ability to compute p0.  I was not able to
> figure out how to use collect() to do this though, and so I went down
> the road of explode() with a stateful lambda... can someone point me
> to any tutorial or info on how to properly use collect()?
>
> TIA
>
>
> On Sat, Feb 2, 2013 at 1:55 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>> The glass is half full, or half empty, depending on your disposition.
>>
>> While its trivial to write a version of explode() that remembers the
>> last element seen, and either emits nothing or a pair (you have a little
>> fixup at the end to deal with, but that's easy too, just annoying), once
>> you start writing such stateful lambdas, you are tying yourself to
>> sequential execution in a very error-prone way.  The type system can't
>> record this assumption, so when someone helpfully later adds a
>> .parallel() somewhere, your code will silently turn to sewage.  So,
>> don't ever do this.
>>
>> Too see why the obvious algorithm is sequential only, consider a
>> decomposition of a data source with a spliterator.  In most cases, we
>> don't necessarily know the even-ness or odd-ness of the sum of sizes of
>> all prior splits.  Which means we don't know whether the first element
>> of any given split is the left element of some pair or the right element
>> of some pair.
>>
>> You might say: I don't care about parallelism, I only care about
>> sequential.
>>
>> To which we'd respond: fine, but we're not really interested in
>> distorting the model to encourage that.  The Stream API is designed
>> around operations that can be equally well performed in serial or
>> parallel.  There are no "serial only" operations supported, and that was
>> an explicit design choice.  Passing state-holding lambdas to these
>> methods is likely to be a spec violation.  We can't enforce it, any more
>> than we can enforce the lack of data races, but bear in mind that if you
>> do this, you're writing broken code, and asking for trouble.
>>
>> Now, that said, here's a parallel algorithm you can implement with
>> collect(), though you may find the performance overhead too offensive.
>> Basically, for each chunk of the input t1..tn, you compute two possible
>> answers:
>>
>> p0 = (t1,t2), t3,t4), ..., maybe leftover tn
>> p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn
>>
>> then you combine these pairs chunks in the mostly obvious way (observe
>> that p0 has trailing element = !(p1 has trailing element)).  Then when
>> you get to the top of the tree, you take the one that doesn't have an
>> orphaned initial element, and toss the other.  Basically you're doing 2x
>> work for each input element, but it parallelizes fairly cleanly.
>>
>> Such an algorithm is ideally suited to collect(), because a Collector is
>> a representation of a catamorphism, which the above transformation is.
>> Because the combination logic is associative, it parallelizes cleanly
>> without needing any nasty stateful lambdas.
>>
>> When we expose the Op APIs and you can write your own
>> decomposition-aware operations, you'll have another option: write a
>> StatefulOp.  This won't be an option for 8, but once we do, its easy
>> enough, and it will let you avoid the extra overhead by creating a
>> custom operation.
>>
>> So, to sum up:
>>    - If you're looking for a one-liner, you'll be disappointed;
>>    - If you convince yourself "I would never need parallelism here" and
>> write the obvious unsafe stateful lambda, please write your manager a
>> letter first explaining how your incompetence is putting the reliability
>> of his product in jeopardy;
>>    - If you're willing to write the above collector, you'll likely be
>> happy enough with the result.
>>
>>
>> On 2/2/2013 3:35 PM, Boaz Nahum wrote:
>>> Hi.
>>>
>>> I looking in Stream interface for a way to convert a stream of
>>>      {  t1, t2, .. Tn }
>>> to
>>>      { {t1,t2}, ... {Tn-1, Tn} }
>>> or
>>>      { {t1,t2}, ... {Tn, null}}
>>>
>>> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>>>
>>>
>>>
>>> So I tried to use explode:
>>>
>>> *        Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>>>
>>>           Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
>>> BiConsumer<Stream.Downstream<Pair<Integer, Integer>>, Integer>() {
>>>
>>>               Integer firstValue;
>>>
>>>               @Override
>>>               public void accept(Stream.Downstream<Pair<Integer, Integer>>
>>> pairDownstream, Integer v) {
>>>
>>>                   if (firstValue == null) {
>>>                       firstValue = v;
>>>                   } else {
>>>
>>>                       pairDownstream.send(new Pair<>(firstValue, v));
>>>                       firstValue = null;
>>>
>>>                   }
>>>               }
>>>
>>>           });
>>> *
>>>
>>> But I didn't find a way to add the tail of input stream {.., 5} to the new
>>> stream { ,,, {5, null}}.
>>>
>>> Of-course this only simplified example of more general problem I have.
>>>
>>> Thanks
>>> Boaz
>>>
>>


More information about the lambda-dev mailing list