Grouping stream elements by their position - how to handle tail of stream ?

Brian Goetz brian.goetz at oracle.com
Sat Feb 2 13:55:13 PST 2013


The glass is half full, or half empty, depending on your disposition.

While its trivial to write a version of explode() that remembers the 
last element seen, and either emits nothing or a pair (you have a little 
fixup at the end to deal with, but that's easy too, just annoying), once 
you start writing such stateful lambdas, you are tying yourself to 
sequential execution in a very error-prone way.  The type system can't 
record this assumption, so when someone helpfully later adds a 
.parallel() somewhere, your code will silently turn to sewage.  So, 
don't ever do this.

Too see why the obvious algorithm is sequential only, consider a 
decomposition of a data source with a spliterator.  In most cases, we 
don't necessarily know the even-ness or odd-ness of the sum of sizes of 
all prior splits.  Which means we don't know whether the first element 
of any given split is the left element of some pair or the right element 
of some pair.

You might say: I don't care about parallelism, I only care about 
sequential.

To which we'd respond: fine, but we're not really interested in 
distorting the model to encourage that.  The Stream API is designed 
around operations that can be equally well performed in serial or 
parallel.  There are no "serial only" operations supported, and that was 
an explicit design choice.  Passing state-holding lambdas to these 
methods is likely to be a spec violation.  We can't enforce it, any more 
than we can enforce the lack of data races, but bear in mind that if you 
do this, you're writing broken code, and asking for trouble.

Now, that said, here's a parallel algorithm you can implement with 
collect(), though you may find the performance overhead too offensive. 
Basically, for each chunk of the input t1..tn, you compute two possible 
answers:

p0 = (t1,t2), t3,t4), ..., maybe leftover tn
p1 = t1, (t2, t3), (t4, t5), ... maybe leftover tn

then you combine these pairs chunks in the mostly obvious way (observe 
that p0 has trailing element = !(p1 has trailing element)).  Then when 
you get to the top of the tree, you take the one that doesn't have an 
orphaned initial element, and toss the other.  Basically you're doing 2x 
work for each input element, but it parallelizes fairly cleanly.

Such an algorithm is ideally suited to collect(), because a Collector is 
a representation of a catamorphism, which the above transformation is. 
Because the combination logic is associative, it parallelizes cleanly 
without needing any nasty stateful lambdas.

When we expose the Op APIs and you can write your own 
decomposition-aware operations, you'll have another option: write a 
StatefulOp.  This won't be an option for 8, but once we do, its easy 
enough, and it will let you avoid the extra overhead by creating a 
custom operation.

So, to sum up:
  - If you're looking for a one-liner, you'll be disappointed;
  - If you convince yourself "I would never need parallelism here" and 
write the obvious unsafe stateful lambda, please write your manager a 
letter first explaining how your incompetence is putting the reliability 
of his product in jeopardy;
  - If you're willing to write the above collector, you'll likely be 
happy enough with the result.


On 2/2/2013 3:35 PM, Boaz Nahum wrote:
> Hi.
>
> I looking in Stream interface for a way to convert a stream of
>     {  t1, t2, .. Tn }
> to
>     { {t1,t2}, ... {Tn-1, Tn} }
> or
>     { {t1,t2}, ... {Tn, null}}
>
> Lets assume {t1, t2} are aggeraget by Pair<T,T>
>
>
>
> So I tried to use explode:
>
> *        Stream<Integer> si = Arrays.asList(1, 2, 3, 4, 5).stream();
>
>          Stream<Pair<Integer, Integer>> spi = si.sequential().explode(new
> BiConsumer<Stream.Downstream<Pair<Integer, Integer>>, Integer>() {
>
>              Integer firstValue;
>
>              @Override
>              public void accept(Stream.Downstream<Pair<Integer, Integer>>
> pairDownstream, Integer v) {
>
>                  if (firstValue == null) {
>                      firstValue = v;
>                  } else {
>
>                      pairDownstream.send(new Pair<>(firstValue, v));
>                      firstValue = null;
>
>                  }
>              }
>
>          });
> *
>
> But I didn't find a way to add the tail of input stream {.., 5} to the new
> stream { ,,, {5, null}}.
>
> Of-course this only simplified example of more general problem I have.
>
> Thanks
> Boaz
>


More information about the lambda-dev mailing list