skip/limit in parallel context
Paul Sandoz
paul.sandoz at oracle.com
Thu Dec 13 04:13:11 PST 2012
Hi Christian,
The problem is you are not combining/merging the counts correctly. The reduction step is different to the combining/merging step.
Try this:
.reduce(0L, (count, t) -> count + 1L, (countLeft, countRight) -> countLeft + countRight)
The first lambda is the reducer, this will be executed on leaf nodes of the computation tree (a sequential stream can be considered a degenerate case of just one leaf node). The second lambda will be executed to combine the results of two nodes.
This is a tricky area, it's not entirely a free lunch going from sequential to parallel evaluation with folding or reducing. You need to be aware of the properties of your reducing function (e.g. is it associative or commutative?, are the combining and reducing steps different?, what is the identity element?).
Hth,
Paul.
On Dec 13, 2012, at 12:30 PM, "Mallwitz, Christian" <christian.mallwitz at commerzbank.com> wrote:
> Hi,
>
> Using jdk1.8.0-b68-09_dec_2012 and having (this is counting even numbers up to 100 in various ways)
>
> import java.util.Arrays;
> import java.util.stream.Stream;
>
> public class LambdaParallelLimitSkip {
>
> private static final int n = 100;
>
> public static void main(String[] args) {
> testSerial();
> testParallel();
> }
>
> public static void testSerial() {
> System.out.println("serial count after limit(10): " + getArrayBasedStream(false)
> .filter(l -> l%2==0) // filter even numbers
> .limit(10)
> .reduce(0L, (left, right) -> left + 1L)); // count
> System.out.println("serial count after skip(10) : " + getArrayBasedStream(false)
> .filter(l -> l%2==0) // filter even numbers
> .skip(10)
> .reduce(0L, (left, right) -> left + 1L)); // count
> }
>
> public static void testParallel() {
> System.out.println("parallel count after limit(10): " + getArrayBasedStream(true)
> .filter(l -> l%2==0) // filter even numbers
> .limit(10)
> .reduce(0L, (left, right) -> left + 1L)); // count
> System.out.println("parallel count after skip(10) : " + getArrayBasedStream(true)
> .filter(l -> l%2==0) // filter even numbers
> .skip(10)
> .reduce(0L, (left, right) -> left + 1L)); // count
> }
>
> public static Stream<Long> getArrayBasedStream(boolean parallel) {
> Long[] list = new Long[n];
> for(int i=0; i<n; i++) { list[i] = i+1L; }
> return parallel ? Arrays.asList(list).parallel() : Arrays.asList(list).stream();
> }
> }
>
> This outputs
>
> serial count after limit(10): 10
> serial count after skip(10) : 40
> parallel count after limit(10): 7
> parallel count after skip(10) : 25
>
> The last item "parallel count after skip(10) : 25" is incorrect - this needs to be in the 40..50 range, doesn't it?
>
> Now I understand that the Java Doc is saying skip/limit 'at most n elements' but IMHO this is not particular useful.
>
> skip/limit _could_ consume more than n elements of a source stream (and calling the filter predicate in my example more often in the parallel than serial case) but should do "exactly n" when passing elements to further steps in the processing downstreams.
>
> Regards
> Christian
>
More information about the lambda-dev
mailing list