Stream.limit parallel ordered performance

Paul Sandoz paul.sandoz at oracle.com
Tue Jan 26 11:15:51 UTC 2016


Hi Tagir,

It is insane :-) in hindsight i cannot believe i missed this trick!

You patch looks reasonable and i don’t see any issue with it. Each leaf-task will collect at most n elements. The truncation will take care of everything else.

Note that it is still the case that in almost all scenarios this is likely to be a bad form of parallel stream.

I think the comment you refer to still applies but now for larger n, so we should refine it.

Paul.

> On 24 Jan 2016, at 12:36, Tagir F. Valeev <amaembo at gmail.com> wrote:
> 
> Hello!
> 
> I'm investigating Stream.limit() performance for parallel ordered
> streams when no SUBSIZED optimization is performed.
> 
> Consider the following code, for example:
> 
> AtomicInteger counter = new AtomicInteger();
> int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
>        .peek(x -> counter.incrementAndGet()).limit(10).toArray();
> System.out.println(Arrays.toString(result));
> System.out.println(counter.get());
> 
> How much the counter.get() would print? It changes from launch to
> launch, but usually within 375000..625000. This is just insane. On my
> 4-core system parallel stream creates 16 individual tasks. I expect
> that every individual task should consume no more than 10 elements, so
> in total no more than 160 elements should be consumed in this case.
> 
> Here's a patch which addresses this issue:
> http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt
> 
> In the limit case non-root leaf tasks may switch to copyIntoWithCancel
> to control the count of consumed elements and do not consume more than
> necessary.
> 
> This change seems to fix the issue addressed in comment (at least
> partially):
> 
> // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n)
> //     regardless of the value of n
> //     Need to adjust the target size of splitting for the
> //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
> //     This will limit the size of the buffers created at the leaf nodes
> //     cancellation will be more aggressive cancelling later tasks
> //     if the target slice size has been reached from a given task,
> //     cancellation should also clear local results if any
> 
> I checked with the following code:
> 
> for(int n : new int[] {10, 100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000}) {
>    System.out.println(n);
>    long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> true).parallel().limit(n).toArray();
>    long[] ref = LongStream.range(0, n).toArray();
>    System.out.println(Arrays.equals(arr, ref));
> }
> 
> It works correctly after applying my patch (while dies with OOME
> without patch, as comment suggests).
> 
> Currently existing unit tests also pass with my patch. However I'm
> very new in the internals of parallel stream processing, so it's
> possible that I'm missing something. Please review! If this looks
> reasonable I will log an issue and write new test cases.
> 
> Thank you in advance,
> With best regards,
> Tagir Valeev.
> 




More information about the core-libs-dev mailing list