RFR: 8154387 - Parallel unordered Stream.limit() tries to collect 128 elements even if limit is less

Tagir F. Valeev amaembo at gmail.com
Mon Apr 18 12:35:50 UTC 2016


Hello!

Thank you for review!

PS>  913         UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
PS>  914             this.s = s;
PS>  915             this.unlimited = limit < 0;
PS>  916             this.skipThreshold = limit >= 0 ? limit : 0;
PS>  917             this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
PS>  918                 (skip + limit) /
PS> ForkJoinPool.getCommonPoolParallelism() + 1) : CHUNK_SIZE;
PS>  919             this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
PS>  920         }
PS>  921

PS> Note the common pool parallelism can never be 0. I dunno if you
PS> added 1 for that or another reason.

It's actually
((skip + limit) / ForkJoinPool.getCommonPoolParallelism()) + 1
Not
(skip + limit) / (ForkJoinPool.getCommonPoolParallelism() + 1)

Probably  I should add explicit parentheses to make this clear. One is
added exactly to make chunkSize at least 1.

PS> Did you consider:

PS>   (skip + limit) / AbstractTask.LEAF_TARGET

PS> ?

It should not make drastic changes in my test, but I will try.

PS> What if chunkSize is zero? should it be a minimum of 1?

PS> Testing wise i think our existing tests cover things ok.

PS> Performance-wise looks good. Probable primes are my favourite way
PS> of easily increasing Q (cost per op) :-)

PS> Can you run the stream tests and the perf tests with parallelism disabled:

PS>   -Djava.util.concurrent.ForkJoinPool.common.parallelism=1

Ok. I think I should also test the performance for some high-N low-Q
task to check whether it not degrades. Will perform all the tests
later this week.

By the way is these some special place to commit/store JMH tests
(except CodeReview server), so they could be reused later?

With best regards,
Tagir Valeev.

PS> ?

PS> Thanks,
PS> Paul.


>> The rationale is to speed-up the parallel processing for unordered
>> streams with low limit value. Such problems occur when you want to
>> perform expensive filtering and select at most x elements which pass
>> the filter (order does not matter). Currently unordered limit
>> operation buffers up to 128 elements for each parallel task before it
>> checks whether limit is reached. This is actually harmful when
>> requested limit is lower: much more elements are requested from the
>> upstream than necessary. Here's simple JMH test which illustrates the
>> problem:
>> 
>> http://cr.openjdk.java.net/~tvaleev/webrev/8154387/jmh/
>> It extracts the requested number of probable-primes from the list of
>> 10000 BigInteger numbers. The results with 9ea+111:
>> 
>> Benchmark                    (limit)  Mode  Cnt      Score      Error  Units
>> LimitTest.parLimit                 2  avgt   30    108,971 ±    0,643  us/op
>> LimitTest.parLimit                20  avgt   30    934,176 ±   14,003  us/op
>> LimitTest.parLimit               200  avgt   30   8772,417 ±  190,609  us/op
>> LimitTest.parLimit              2000  avgt   30  41775,463 ± 1800,537  us/op
>> LimitTest.parUnorderedLimit        2  avgt   30   2557,798 ±   13,161  us/op
>> LimitTest.parUnorderedLimit       20  avgt   30   2578,283 ±   23,547  us/op
>> LimitTest.parUnorderedLimit      200  avgt   30   4577,318 ±   40,793  us/op
>> LimitTest.parUnorderedLimit     2000  avgt   30  12279,346 ±  523,823  us/op
>> LimitTest.seqLimit                 2  avgt   30     34,831 ±    0,190  us/op
>> LimitTest.seqLimit                20  avgt   30    369,729 ±    1,427  us/op
>> LimitTest.seqLimit               200  avgt   30   3690,544 ±   13,907  us/op
>> LimitTest.seqLimit              2000  avgt   30  36681,637 ±  156,538  us/op
>> 
>> When the limit is 2 or 20, parallel unordered version is slower than
>> parallel ordered! Even for limit = 200 it's still slower than
>> sequential operation.
>> 
>> The idea of the patch is to tweak the CHUNK_SIZE using the given limit and
>> parallelism level. I used the following formula:
>> 
>> this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
>>     (skip + limit) / ForkJoinPool.getCommonPoolParallelism() + 1) : CHUNK_SIZE;
>> 
>> This does not affect cases when limit is big or not set at all (in
>> skip mode). However it greatly improves cases when limit is small:
>> 
>> Benchmark                    (limit)  Mode  Cnt      Score      Error  Units
>> LimitTest.parLimit                 2  avgt   30    109,502 ±    0,750  us/op
>> LimitTest.parLimit                20  avgt   30    954,716 ±   39,276  us/op
>> LimitTest.parLimit               200  avgt   30   8706,226 ±  184,330  us/op
>> LimitTest.parLimit              2000  avgt   30  42126,346 ± 3163,444  us/op
>> LimitTest.parUnorderedLimit        2  avgt   30     39,303 ±    0,177  us/op !!!
>> LimitTest.parUnorderedLimit       20  avgt   30    266,107 ±    0,492  us/op !!!
>> LimitTest.parUnorderedLimit      200  avgt   30   2547,177 ±   58,538  us/op !!!
>> LimitTest.parUnorderedLimit     2000  avgt   30  12216,402 ±  430,574  us/op
>> LimitTest.seqLimit                 2  avgt   30     34,993 ±    0,704  us/op
>> LimitTest.seqLimit                20  avgt   30    369,497 ±    1,754  us/op
>> LimitTest.seqLimit               200  avgt   30   3716,059 ±   61,054  us/op
>> LimitTest.seqLimit              2000  avgt   30  36814,356 ±  161,531  us/op
>> 
>> Here you can see that unordered cases are significantly improved. Now
>> they are always faster than parallel ordered and faster than
>> sequential for limit >= 20.
>> 
>> I did not think up how to test this patch as it does not change
>> visible behavior, only speed. However all the existing tests pass.
>> 
>> What do you think?
>> 
>> With best regards,
>> Tagir Valeev.
>> 




More information about the core-libs-dev mailing list