Stream.limit parallel ordered performance
Paul Sandoz
paul.sandoz at oracle.com
Tue Jan 26 11:15:51 UTC 2016
Hi Tagir,
It is insane :-) in hindsight i cannot believe i missed this trick!
You patch looks reasonable and i don’t see any issue with it. Each leaf-task will collect at most n elements. The truncation will take care of everything else.
Note that it is still the case that in almost all scenarios this is likely to be a bad form of parallel stream.
I think the comment you refer to still applies but now for larger n, so we should refine it.
Paul.
> On 24 Jan 2016, at 12:36, Tagir F. Valeev <amaembo at gmail.com> wrote:
>
> Hello!
>
> I'm investigating Stream.limit() performance for parallel ordered
> streams when no SUBSIZED optimization is performed.
>
> Consider the following code, for example:
>
> AtomicInteger counter = new AtomicInteger();
> int[] result = IntStream.range(0, 1_000_000).parallel().filter(x -> true)
> .peek(x -> counter.incrementAndGet()).limit(10).toArray();
> System.out.println(Arrays.toString(result));
> System.out.println(counter.get());
>
> How much the counter.get() would print? It changes from launch to
> launch, but usually within 375000..625000. This is just insane. On my
> 4-core system parallel stream creates 16 individual tasks. I expect
> that every individual task should consume no more than 10 elements, so
> in total no more than 160 elements should be consumed in this case.
>
> Here's a patch which addresses this issue:
> http://cr.openjdk.java.net/~tvaleev/patches/limit/limit-patch.txt
>
> In the limit case non-root leaf tasks may switch to copyIntoWithCancel
> to control the count of consumed elements and do not consume more than
> necessary.
>
> This change seems to fix the issue addressed in comment (at least
> partially):
>
> // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n)
> // regardless of the value of n
> // Need to adjust the target size of splitting for the
> // SliceTask from say (size / k) to say min(size / k, 1 << 14)
> // This will limit the size of the buffers created at the leaf nodes
> // cancellation will be more aggressive cancelling later tasks
> // if the target slice size has been reached from a given task,
> // cancellation should also clear local results if any
>
> I checked with the following code:
>
> for(int n : new int[] {10, 100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000}) {
> System.out.println(n);
> long[] arr = LongStream.range(0, Long.MAX_VALUE).filter(i -> true).parallel().limit(n).toArray();
> long[] ref = LongStream.range(0, n).toArray();
> System.out.println(Arrays.equals(arr, ref));
> }
>
> It works correctly after applying my patch (while dies with OOME
> without patch, as comment suggests).
>
> Currently existing unit tests also pass with my patch. However I'm
> very new in the internals of parallel stream processing, so it's
> possible that I'm missing something. Please review! If this looks
> reasonable I will log an issue and write new test cases.
>
> Thank you in advance,
> With best regards,
> Tagir Valeev.
>
More information about the core-libs-dev
mailing list