RFR: JDK-8319123 : Implementation of JEP-461: Stream Gatherers (Preview)

Tagir F. Valeev tvaleev at openjdk.org
Wed Nov 8 15:41:15 UTC 2023


On Mon, 30 Oct 2023 15:38:35 GMT, Viktor Klang <vklang at openjdk.org> wrote:

> This is a Draft PR for [JEP-461](https://openjdk.org/jeps/461)

Very solid work, thank you! See some minor comments inline. I actually have much more ideas of specific gatherers, but they could be discussed separately.

One thing that bothers me is that Gatherer is similar to Collector, yet the APIs are totally different, so users may be confused. I like the Gatherer API much more and I see that evolving current Collector API might be hard, given tons of third-party implementations, so I don't see a good way to fix this. But probably we should provide Collector-to-Gatherer adapter (producing one-element stream)? And in general, it would be nice to use exactly-one-element-producing-gatherers as terminal operations, without explicit findFirst().get().

src/java.base/share/classes/java/util/stream/AbstractPipeline.java line 191:

> 189:      * existing pipeline.
> 190:      *
> 191:      * The previous stage must be unlinked and unconsumed.

Note that linebreak in Javadoc is normally ignored. It's probably better to add an explicit `<p>`. This way it will be rendered better in IDE documentation view.

src/java.base/share/classes/java/util/stream/Gatherer.java line 99:

> 97:  * public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
> 98:  *     return Gatherer.of(
> 99:  *         (unused, element, downstream) -> // integrator

As [JEP 456](https://openjdk.org/jeps/456) is already integrated, probably it's reasonable to use `(_, element, downstream) ->` here?

src/java.base/share/classes/java/util/stream/Gatherer.java line 118:

> 116:  * }</pre>
> 117:  *
> 118:  * AS an example, in order to create a gatherer to implement a sequential

AS -> As (lowercase 's')

src/java.base/share/classes/java/util/stream/Gatherer.java line 119:

> 117:  *
> 118:  * AS an example, in order to create a gatherer to implement a sequential
> 119:  * Prefix Scan as a Gatherer, it could be done the following way:

Do we need 'Prefix Scan' in title case?

src/java.base/share/classes/java/util/stream/Gatherer.java line 252:

> 250:      *         Gatherer as input that Gatherer
> 251:      */
> 252:     default <AA, RR> Gatherer<T, ?, RR> andThen(Gatherer<? super R, AA, ? extends RR> that) {

It would be great to remove AA type parameter, which is mentioned only once, so could be replaced with `?`. This will improve user experience if they happen to need to specify type parameters explicitly.

src/java.base/share/classes/java/util/stream/Gatherer.java line 272:

> 270:      * Returns a combiner which is the default combiner of a Gatherer.
> 271:      * The returned combiner identifies that the owning Gatherer must only
> 272:      * be evaluated sequentially.

Should we specify that a single shared instance is guaranteed to be returned, so clients may use `combiner == defaultCombiner()`, rather than `combiner.equals(defaultCombiner())`?

src/java.base/share/classes/java/util/stream/Gatherer.java line 308:

> 306:      */
> 307:     static <T, R> Gatherer<T, Void, R> ofSequential(
> 308:             Integrator<Void, T, R> integrator) {

Probably PECS signature `? super T, ? extends R` could be useful here?

src/java.base/share/classes/java/util/stream/Gatherer.java line 330:

> 328:     static <T, R> Gatherer<T, Void, R> ofSequential(
> 329:             Integrator<Void, T, R> integrator,
> 330:             BiConsumer<Void, Downstream<? super R>> finisher) {

Probably, accepting `Consumer<Downstream<? super R>>` and adapting it would be more user-friendly?

src/java.base/share/classes/java/util/stream/Gatherer.java line 351:

> 349:      * @return the new {@code Gatherer}
> 350:      */
> 351:     static <T, A, R> Gatherer<T, A, R> ofSequential(

There are two two-function overloads:
> ofSequential(Integrator<Void, T, R>, BiConsumer<Void, Downstream<? super R>>)
and
> ofSequential(Supplier<A>, Integrator<A, T, R>)

While the functional parameters have clearly different signatures, and one will have no problems with lambdas, it's possible that resolve will fail if you use overloaded method references:

> ofSequential(A::b, C::d)

Assume that there are several A::b and C::d methods accepting different number of parameters. In this case, one cannot use method references. This is probably a minor problem, but I prefer to avoid overloads accepting the same number of functions. Probably we may use more descriptive names, like `ofSequentialStateless` for stateless overloads?

src/java.base/share/classes/java/util/stream/GathererOp.java line 162:

> 160:              * consideration at this point doesn't yield any performance gains.
> 161:              */
> 162:             proceed &= integrator.integrate(state, t, this);

Note that you are exposing the `GatherSink` object to the clients, and it implements a public `Consumer` interface. Clients may use `((Consumer<T>)downstream).accept(something)`, which may produce unpredictable results. Not sure if this is considered to be a problem, but somebody will certainly try to do this!

src/java.base/share/classes/java/util/stream/Gatherers.java line 347:

> 345:         return Gatherer.ofSequential(
> 346:                 // Initializer
> 347:                 () -> new ArrayList<>(windowSize),

Note that if `windowSize` happens to be much bigger than the stream size (e.g., 1_000_000_000), we will waste a lot of memory and may fail with OutOfMemoryError for no reason, instead of collecting the whole stream into a single window. Not sure whether we should do anything with this. Probably using `SpinedBuffer` and limiting the initial size, like `Math.min(windowSize, 1_000_000)` will be better. WDYT?

src/java.base/share/classes/java/util/stream/Gatherers.java line 355:

> 353:                         return true;
> 354:                     } else {
> 355:                         var full = List.copyOf(window);

Here, we are rejecting `null` stream elements for no particular reason. Let's use `SharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(window.toArray())` instead?

src/java.base/share/classes/java/util/stream/Gatherers.java line 364:

> 362:                 (window, downstream) -> {
> 363:                     if(!downstream.isRejecting() && !window.isEmpty())
> 364:                         downstream.push(List.copyOf(window));

Similarly, let's use `SharedSecrets.getJavaUtilCollectionAccess().listFromTrustedArrayNullsAllowed(window.toArray())`

src/java.base/share/classes/java/util/stream/Gatherers.java line 389:

> 387:      * @throws IllegalArgumentException when windowSize is less than 1
> 388:      */
> 389:     public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize) {

>From my experience, sliding window with windowSize = 2 is the most important and deserves the explicit support (I have such an operation in my StreamEx library and it's used quite often). Please consider adding (name subject to discussions):

`public static <T, R> Gatherer<T, ?, R> pairMap(BiFunction<T, T, R> functionToApplyToTwoAdjacentElements)`. It could be more optimal, as you only need to store the previous element. Moreover, I believe one can implement a proper combiner to make it parallel-friendly. Could be added separately after this PR is merged. I can contribute it.

src/java.base/share/classes/java/util/stream/Gatherers.java line 403:

> 401:                 // Integrator
> 402:                 Integrator.ofGreedy((window, e, downstream) -> {
> 403:                     window.addLast(e);

Too bad it will reject null elements in the current implementation. This should be documented, or better fixed. We can use ArrayList and keep track of the current start index, and export slides manually using arraycopy twice and  `listFromTrustedArrayNullsAllowed`. It's only a little bit trickier and should not be slower than the current implementation. I can write it if you don't want to do it by yourself :-)

src/java.base/share/classes/java/util/stream/Gatherers.java line 426:

> 424:      * transformation for scenarios where no combiner-function can be
> 425:      * implemented, or for reductions which are intrinsically
> 426:      * order-dependent.

I think, we should highlight the most important difference between `fold` and `reduce`: `reduce` must provide an associative function, while `fold` does not require this restriction.

src/java.base/share/classes/java/util/stream/Gatherers.java line 436:

> 434:      *     Stream.of(1,2,3,4,5,6,7,8,9)
> 435:      *           .gather(
> 436:      *               Gatherers.fold(() -> "", (string, number) -> string + number)

This particular sample is not great, as it's totally replaceable with reduce() (`map(String::valueOf).reduce(String::concat)`), so users may wonder why using fold instead. I can suggest another sample which I show everywhere:
`anyList.stream().gather(Gatherers.fold(() -> 1, (acc, e) -> acc * 31 + Objects.hashCode(e))).findFirst().get();`
(haven't tested, but the idea is like this)
It computes the hashCode of the list, according to the list specification. Here, the accumulator is inherently non-associative, so if you replace it with `reduce()` and make the stream parallel, it will yield the wrong result. But `Gatherers.fold` will work!

If you don't like it, we can think up something else, but concatenation sample is certainly not good.

src/java.base/share/classes/java/util/stream/Gatherers.java line 450:

> 448:      * @throws NullPointerException if any of the parameters are null
> 449:      */
> 450:     public static <T, R> Gatherer<T, ?, R> fold(

Should it be explicitly named as `foldLeft`? Because `foldRight` is also possible (though will require complete stream buffering). Also see [JDK-8133680](https://bugs.openjdk.org/browse/JDK-8133680) and [JDK-8292845](https://bugs.openjdk.org/browse/JDK-8292845) (probably should be linked to gatherer's issue, and read the comment about naming)

src/java.base/share/classes/java/util/stream/Gatherers.java line 482:

> 480:      * @throws NullPointerException if any of the parameters are null
> 481:      */
> 482:     public static <T, R> Gatherer<T, ?, R> scan(

Similarly, probably should be named `scanLeft`?

-------------

PR Review: https://git.openjdk.org/jdk/pull/16420#pullrequestreview-1714010997
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382600262
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382601067
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382601204
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382601289
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382612744
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382607035
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382604080
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382604256
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382606016
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382607830
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382613728
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382613989
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382614003
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382615261
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382615861
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382616606
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382617752
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382616313
PR Review Comment: https://git.openjdk.org/jdk/pull/16420#discussion_r1382617857


More information about the core-libs-dev mailing list