Grouping stream elements by their position - how to handle tail of stream ?
Brian Goetz
brian.goetz at oracle.com
Sat Feb 2 13:55:13 PST 2013
The glass is half full, or half empty, depending on your disposition.
While its trivial to write a version of explode() that remembers the
last element seen, and either emits nothing or a pair (you have a little
fixup at the end to deal with, but that's easy too, just annoying), once
you start writing such stateful lambdas, you are tying yourself to
sequential execution in a very error-prone way. The type system can't
record this assumption, so when someone helpfully later adds a
.parallel() somewhere, your code will silently turn to sewage. So,
don't ever do this.
Too see why the obvious algorithm is sequential only, consider a
decomposition of a data source with a spliterator. In most cases, we
don't necessarily know the even-ness or odd-ness of the sum of sizes of
all prior splits. Which means we don't know whether the first element
of any given split is the left element of some pair or the right element
of some pair.
You might say: I don't care about parallelism, I only care about
sequential.
To which we'd respond: fine, but we're not really interested in
distorting the model to encourage that. The Stream API is designed
around operations that can be equally well performed in serial or
parallel. There are no "serial only" operations supported, and that was
an explicit design choice. Passing state-holding lambdas to these
methods is likely to be a spec violation. We can't enforce it, any more
than we can enforce the lack of data races, but bear in mind that if you
do this, you're writing broken code, and asking for trouble.
Now, that said, here's a parallel algorithm you can implement with
collect(), though you may find the performance overhead too offensive.
Basically, for each chunk of the input t1..tn, you compute two possible
answers:
p0 = (t1,t2), t3,t4), ..., maybe leftover tn
p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn
then you combine these pairs chunks in the mostly obvious way (observe
that p0 has trailing element = !(p1 has trailing element)). Then when
you get to the top of the tree, you take the one that doesn't have an
orphaned initial element, and toss the other. Basically you're doing 2x
work for each input element, but it parallelizes fairly cleanly.
Such an algorithm is ideally suited to collect(), because a Collector is
a representation of a catamorphism, which the above transformation is.
Because the combination logic is associative, it parallelizes cleanly
without needing any nasty stateful lambdas.
When we expose the Op APIs and you can write your own
decomposition-aware operations, you'll have another option: write a
StatefulOp. This won't be an option for 8, but once we do, its easy
enough, and it will let you avoid the extra overhead by creating a
custom operation.
So, to sum up:
- If you're looking for a one-liner, you'll be disappointed;
- If you convince yourself "I would never need parallelism here" and
write the obvious unsafe stateful lambda, please write your manager a
letter first explaining how your incompetence is putting the reliability
of his product in jeopardy;
- If you're willing to write the above collector, you'll likely be
happy enough with the result.
On 2/2/2013 3:35 PM, Boaz Nahum wrote:
> Hi.
>
> I looking in Stream interface for a way to convert a stream of
> { t1, t2, .. Tn }
> to
> { {t1,t2}, ... {Tn-1, Tn} }
> or
> { {t1,t2}, ... {Tn, null}}
>
> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>
>
>
> So I tried to use explode:
>
> * Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>
> Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
> BiConsumer<Stream.Downstream<Pair<Integer, Integer>>, Integer>() {
>
> Integer firstValue;
>
> @Override
> public void accept(Stream.Downstream<Pair<Integer, Integer>>
> pairDownstream, Integer v) {
>
> if (firstValue == null) {
> firstValue = v;
> } else {
>
> pairDownstream.send(new Pair<>(firstValue, v));
> firstValue = null;
>
> }
> }
>
> });
> *
>
> But I didn't find a way to add the tail of input stream {.., 5} to the new
> stream { ,,, {5, null}}.
>
> Of-course this only simplified example of more general problem I have.
>
> Thanks
> Boaz
>
More information about the lambda-dev
mailing list