RFR: 8196106: Support nested infinite or recursive flat mapped streams [v7]

Rémi Forax forax at openjdk.org
Sat Apr 13 09:15:44 UTC 2024


On Fri, 12 Apr 2024 14:53:01 GMT, Viktor Klang <vklang at openjdk.org> wrote:

>> This PR implements Gatherer-inspired encoding of `flatMap` that shows that it is both competitive performance-wise as well as improve correctness.
>> 
>> Below is the performance of `Stream::flatMap` (for reference types):
>> 
>> Before this PR (on my MacBook, aarch64):
>> 
>> Non-short-circuiting:
>> 
>> Benchmark                 (size)   Mode  Cnt        Score       Error  Units
>> FlatMap.par_array             10  thrpt   12   257582,480 ? 31360,242  ops/s
>> FlatMap.par_array            100  thrpt   12    55202,015 ? 14011,668  ops/s
>> FlatMap.par_array           1000  thrpt   12     6546,252 ?  3675,764  ops/s
>> FlatMap.par_doublestream      10  thrpt   12   267423,410 ? 37338,089  ops/s
>> FlatMap.par_doublestream     100  thrpt   12    27140,703 ?  4979,878  ops/s
>> FlatMap.par_doublestream    1000  thrpt   12     2978,178 ?  1890,250  ops/s
>> FlatMap.par_intstream         10  thrpt   12   268194,530 ? 37295,092  ops/s
>> FlatMap.par_intstream        100  thrpt   12    30897,034 ?  5388,245  ops/s
>> FlatMap.par_intstream       1000  thrpt   12     3969,043 ?  2494,641  ops/s
>> FlatMap.par_longstream        10  thrpt   12   279756,861 ? 19519,497  ops/s
>> FlatMap.par_longstream       100  thrpt   12    45733,955 ? 18385,144  ops/s
>> FlatMap.par_longstream      1000  thrpt   12     5115,615 ?  4147,237  ops/s
>> FlatMap.seq_array             10  thrpt   12  2937192,934 ? 58605,583  ops/s
>> FlatMap.seq_array            100  thrpt   12    41859,462 ?   139,021  ops/s
>> FlatMap.seq_array           1000  thrpt   12      442,677 ?     1,041  ops/s
>> FlatMap.seq_doublestream      10  thrpt   12  4972651,093 ? 35997,267  ops/s
>> FlatMap.seq_doublestream     100  thrpt   12    99265,005 ?   193,497  ops/s
>> FlatMap.seq_doublestream    1000  thrpt   12     1037,030 ?     3,254  ops/s
>> FlatMap.seq_intstream         10  thrpt   12  5133751,888 ? 23516,294  ops/s
>> FlatMap.seq_intstream        100  thrpt   12   145166,206 ?   399,263  ops/s
>> FlatMap.seq_intstream       1000  thrpt   12     1565,004 ?     3,207  ops/s
>> FlatMap.seq_longstream        10  thrpt   12  5047029,363 ? 24742,300  ops/s
>> FlatMap.seq_longstream       100  thrpt   12   233225,307 ?  7162,604  ops/s
>> FlatMap.seq_longstream      1000  thrpt   12     2999,926 ?     9,945  ops/s
>> 
>> // Short-circuiting:
>> 
>> Benchmark                   (size)   Mode  Cnt       Score       Error  Units
>> FlatMap.par_iterate_double      10  thrpt   12   46336,834 ?  6803,751  ops/s
>> FlatMap.par_iterate_double     100 ...
>
> Viktor Klang has updated the pull request incrementally with one additional commit since the last revision:
> 
>   Adding additional, short-circuit-specific, cases to the FlatMap benchmark

src/java.base/share/classes/java/util/stream/AbstractPipeline.java line 439:

> 437:      * Returns whether any of the stages in the (entire) pipeline is short-circuiting
> 438:      * or not.
> 439:      * @return {@code true} if any stage in this pipeline is stateful,

stateful -> short-circuiting

src/java.base/share/classes/java/util/stream/AbstractPipeline.java line 442:

> 440:      *         {@code false} if not.
> 441:      */
> 442:     protected final boolean isShortCircuitingPipeline() {

protected can be replaced by package-private

src/java.base/share/classes/java/util/stream/AbstractPipeline.java line 448:

> 446:              u = u.nextStage) {
> 447:         }
> 448:         return result;

can be written in a simpler way

for(var stage = sourceStage.nextStage; stage != null; stage = stage.nextStage) {
  if (StreamOpFlag.SHORT_CIRCUIT.isKnown(u.combinedFlags)) {
    return true;
  }
  return false;
}

no local variable and no SSA phi

src/java.base/share/classes/java/util/stream/DoublePipeline.java line 267:

> 265:             @Override
> 266:             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
> 267:                 final DoubleConsumer fastPath =

this `final` is unecessary, inconsistent with the style of all other fiels of this package.

src/java.base/share/classes/java/util/stream/DoublePipeline.java line 281:

> 279:                     @Override
> 280:                     public void accept(double e) {
> 281:                         try (final var result = mapper.apply(e)) {

`final` is unecessary and i will keep the type instead of `var` because this code quite complex and eliding the type does not help (and also mixing var and not var in the same method is not recommanded cf Stuart guide on where using 'var')

src/java.base/share/classes/java/util/stream/IntPipeline.java line 300:

> 298:             @Override
> 299:             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
> 300:                 final IntConsumer fastPath =

same as above

src/java.base/share/classes/java/util/stream/LongPipeline.java line 283:

> 281:             @Override
> 282:             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
> 283:                 final LongConsumer fastPath =

same as above

-------------

PR Review Comment: https://git.openjdk.org/jdk/pull/18625#discussion_r1563885750
PR Review Comment: https://git.openjdk.org/jdk/pull/18625#discussion_r1563886287
PR Review Comment: https://git.openjdk.org/jdk/pull/18625#discussion_r1563887433
PR Review Comment: https://git.openjdk.org/jdk/pull/18625#discussion_r1563888203
PR Review Comment: https://git.openjdk.org/jdk/pull/18625#discussion_r1563889383
PR Review Comment: https://git.openjdk.org/jdk/pull/18625#discussion_r1563889757
PR Review Comment: https://git.openjdk.org/jdk/pull/18625#discussion_r1563889835


More information about the core-libs-dev mailing list