Grouping stream elements by their position - how to handle tail of stream ?
Brian Goetz
brian.goetz at oracle.com
Mon Feb 4 15:47:43 PST 2013
Could you clarify what you are suggesting?
On 2/4/2013 6:27 PM, Howard Lovatt wrote:
> Hi Brian,
>
> This is an enquiry related to grouping or parsing an input stream, but
> a little different.
>
> In numerical programs common operations are matrix: transpose, other
> reshaping, or 'sparsification'. I have seen these implemented on
> parallel architectures as an indirection of indexes, something like:
>
> matrix[indexTransformation[index]]
>
> Has thought been given to such a capability for the stream library?
>
> Thanks,
>
> -- Howard.
>
> Sent from my iPad
>
> On 03/02/2013, at 3:04 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>
>> Look at the implementations in Collectors for a hint. Obviously we owe
>> some significant documentation, which unfortunately lags the code by too
>> much. But a Collector is basically a mutable foldl/inject; you provide
>> a way to create a new mutable container, a way to add a single element
>> to it, and a way to combine two mutable containers. The framework
>> decomposes the source, creates a bunch of little containers at the
>> leaves, and them goes up the tree, merging.
>>
>> Dumping the elements into an ArrayList maps easily to Collector:
>>
>> collect(ArrayList::new, ArrayList::add, ArrayList::addAll)
>>
>> This parallelizes, as we can collect each chunk into a list, and then
>> merge the lists up the tree with addAll.
>>
>> The "how do I get the last element" problem goes away, since your data
>> structure can explicitly maintain the state of whether the chunk ends
>> with a balanced pair or an extra.
>>
>> But, you need to calculate BOTH p0 and p1 for each chunk (except maybe
>> the left-spine chunks), because you don't know until the end which will
>> have the right parity.
>>
>> On 2/2/2013 7:49 PM, Lattie wrote:
>>>> Now, that said, here's a parallel algorithm you can implement with
>>>> collect(), though you may find the performance overhead too offensive.
>>>> Basically, for each chunk of the input t1..tn, you compute two possible
>>>> answers:
>>>>
>>>> p0 = (t1,t2), (t3,t4), ..., maybe leftover tn
>>>
>>> This is what I want... the ability to compute p0. I was not able to
>>> figure out how to use collect() to do this though, and so I went down
>>> the road of explode() with a stateful lambda... can someone point me
>>> to any tutorial or info on how to properly use collect()?
>>>
>>> TIA
>>>
>>>
>>> On Sat, Feb 2, 2013 at 1:55 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>>>> The glass is half full, or half empty, depending on your disposition.
>>>>
>>>> While its trivial to write a version of explode() that remembers the
>>>> last element seen, and either emits nothing or a pair (you have a little
>>>> fixup at the end to deal with, but that's easy too, just annoying), once
>>>> you start writing such stateful lambdas, you are tying yourself to
>>>> sequential execution in a very error-prone way. The type system can't
>>>> record this assumption, so when someone helpfully later adds a
>>>> .parallel() somewhere, your code will silently turn to sewage. So,
>>>> don't ever do this.
>>>>
>>>> Too see why the obvious algorithm is sequential only, consider a
>>>> decomposition of a data source with a spliterator. In most cases, we
>>>> don't necessarily know the even-ness or odd-ness of the sum of sizes of
>>>> all prior splits. Which means we don't know whether the first element
>>>> of any given split is the left element of some pair or the right element
>>>> of some pair.
>>>>
>>>> You might say: I don't care about parallelism, I only care about
>>>> sequential.
>>>>
>>>> To which we'd respond: fine, but we're not really interested in
>>>> distorting the model to encourage that. The Stream API is designed
>>>> around operations that can be equally well performed in serial or
>>>> parallel. There are no "serial only" operations supported, and that was
>>>> an explicit design choice. Passing state-holding lambdas to these
>>>> methods is likely to be a spec violation. We can't enforce it, any more
>>>> than we can enforce the lack of data races, but bear in mind that if you
>>>> do this, you're writing broken code, and asking for trouble.
>>>>
>>>> Now, that said, here's a parallel algorithm you can implement with
>>>> collect(), though you may find the performance overhead too offensive.
>>>> Basically, for each chunk of the input t1..tn, you compute two possible
>>>> answers:
>>>>
>>>> p0 = (t1,t2), t3,t4), ..., maybe leftover tn
>>>> p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn
>>>>
>>>> then you combine these pairs chunks in the mostly obvious way (observe
>>>> that p0 has trailing element = !(p1 has trailing element)). Then when
>>>> you get to the top of the tree, you take the one that doesn't have an
>>>> orphaned initial element, and toss the other. Basically you're doing 2x
>>>> work for each input element, but it parallelizes fairly cleanly.
>>>>
>>>> Such an algorithm is ideally suited to collect(), because a Collector is
>>>> a representation of a catamorphism, which the above transformation is.
>>>> Because the combination logic is associative, it parallelizes cleanly
>>>> without needing any nasty stateful lambdas.
>>>>
>>>> When we expose the Op APIs and you can write your own
>>>> decomposition-aware operations, you'll have another option: write a
>>>> StatefulOp. This won't be an option for 8, but once we do, its easy
>>>> enough, and it will let you avoid the extra overhead by creating a
>>>> custom operation.
>>>>
>>>> So, to sum up:
>>>> - If you're looking for a one-liner, you'll be disappointed;
>>>> - If you convince yourself "I would never need parallelism here" and
>>>> write the obvious unsafe stateful lambda, please write your manager a
>>>> letter first explaining how your incompetence is putting the reliability
>>>> of his product in jeopardy;
>>>> - If you're willing to write the above collector, you'll likely be
>>>> happy enough with the result.
>>>>
>>>>
>>>> On 2/2/2013 3:35 PM, Boaz Nahum wrote:
>>>>> Hi.
>>>>>
>>>>> I looking in Stream interface for a way to convert a stream of
>>>>> { t1, t2, .. Tn }
>>>>> to
>>>>> { {t1,t2}, ... {Tn-1, Tn} }
>>>>> or
>>>>> { {t1,t2}, ... {Tn, null}}
>>>>>
>>>>> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>>>>>
>>>>>
>>>>>
>>>>> So I tried to use explode:
>>>>>
>>>>> * Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>>>>>
>>>>> Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
>>>>> BiConsumer<Stream.Downstream<Pair<Integer, Integer>>, Integer>() {
>>>>>
>>>>> Integer firstValue;
>>>>>
>>>>> @Override
>>>>> public void accept(Stream.Downstream<Pair<Integer, Integer>>
>>>>> pairDownstream, Integer v) {
>>>>>
>>>>> if (firstValue == null) {
>>>>> firstValue = v;
>>>>> } else {
>>>>>
>>>>> pairDownstream.send(new Pair<>(firstValue, v));
>>>>> firstValue = null;
>>>>>
>>>>> }
>>>>> }
>>>>>
>>>>> });
>>>>> *
>>>>>
>>>>> But I didn't find a way to add the tail of input stream {.., 5} to the new
>>>>> stream { ,,, {5, null}}.
>>>>>
>>>>> Of-course this only simplified example of more general problem I have.
>>>>>
>>>>> Thanks
>>>>> Boaz
>>
More information about the lambda-dev
mailing list