Stream Gatherers (JEP 473) feedback

Anthony Vanelverdinghe dev at anthonyv.be
Thu Sep 12 17:54:53 UTC 2024


Hi Viktor

September 8, 2024 at 5:20 PM, "Viktor Klang" <viktor.klang at oracle.com> wrote:
> 
> >The non-reusability is intentional here, being a drop-in replacement for `Stream::concat`.
> 
> Gatherers are designed to be reusable, Streams not. So having a Gatherer which isn't reusable would be a bit of a non-starter I'm afraid. Or perhaps I misunderstood?

While I understand that most Gatherers will be reusable, and that it's a desirable characteristic, surely there will also be non-reusable Gatherers?
In particular, any Gatherer that is the result of a factory method with a `Stream<T>` parameter which supports infinite Streams, will be non-reusable, won't it?

For concatenation, I'd expect a `Gatherer append(T...)` to be reusable.
But I'd find it equally intuitive for a `Gatherer concat(Stream<T>)` not to be reusable, since its argument might be an infinite Stream.
In a previous response you proposed using `Gatherer concat(Supplier<Stream<T>>)` instead, but then I'd just pass `() -> aStream`, wonder why the parameter isn't just a `Stream<T>`, and the Gatherer would still not be reusable.

As another example, take Gunnar Morling's zip Gatherers: https://github.com/gunnarmorling/zip-gatherer
I don't see how Gatherers like this could be made reusable, or why that would even be desirable.

> Personally, when I want to concat multiple streams of the same type I do:
> 
> Stream.of(foo, bar).flatMap(identity).filter(…).map(…).toList();

Yes, this is the idiom mentioned by `Stream::concat`, but your `foo` is not equivalent to the one in my e-mail.
My use case was about a pipeline where the concatenation comes somewhere in the middle of the pipeline.
Currently we have to extract a variable for the first part of the pipeline and then use either `Stream::concat` or the above idiom to maintain readability.
With a `concat` Gatherer, the pipeline could be written as one fluent chain of method invocations.

Kind regards, Anthony

> Cheers,
> 
>> 
> **Viktor Klang**
> Software Architect, Java Platform Group
> 
> Oracle
> 
> ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> **From:** Anthony Vanelverdinghe <dev at anthonyv.be>
> **Sent:** Saturday, 7 September 2024 21:03
> **To:** Viktor Klang <viktor.klang at oracle.com>; core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> **Subject:** Re: [External] : Re: Stream Gatherers (JEP 473) feedback
>  
> 
> September 2, 2024 at 10:36 AM, "Viktor Klang" <viktor.klang at oracle.com> wrote:
> 
> > 
> 
> > Hi Anthony,
> 
> Hi Viktor
> 
> > Thank you for your patience, I needed some time to experiment and think about your feedback.
> 
> > 
> 
> > >* how realistic is it for type inference to be improved to the point that usage of the Gatherers API wouldn't require type arguments? Both technically and in terms of cost-benefit?
> 
> > 
> 
> > If looking at the past to inform extrapolation into the future, then the trend is going in the direction of improving over time.
> 
> > 
> 
> > >Gatherers.identity()
> 
> > 
> 
> > I still need some time to experiment with this, as there are some implications.
> 
> > For instance, if you do: **Steam.of(1).gather(Gatherers.identity()) **you'd want that gatherer to be dropped since it is a no-op, but you can't really do that without breaking the contract of Stream.gather, as that operation should "consume" the original Stream reference and return a new one (to preserve stream building linearity), so you'd still need to create a new ReferencePipeline instance which is a copy of the current one, and mark the previous as consumed)—in essence Stream.gather(identity) wouldn't be a no-op.
> 
> > 
> 
> > There are some other, performance-related, things I'll need to verify as well before coming to any conclusion on this.
> 
> > 
> 
> > >Gatherers.concat(Stream<T> stream)
> 
> The non-reusability is intentional here, being a drop-in replacement for `Stream::concat`.
> 
> (In what follows, assume the ellipses are method references, and the pipelines are nicely formatted and perfectly readable.)
> 
> The idea is to be able to write pipelines of the form:
> 
> var list = foo.filter(...).map(...).flatMap(...).concat(bar).map(...).filter(...).gather(...).toList();
> 
> Currently you have to write such pipelines as:
> 
> var list = Stream.concat(foo.filter(...).map(...).flatMap(...), bar).map(...).filter(...).gather(...).toList();
> 
> or:
> 
> var head = foo.filter(...).map(...).flatMap(...);
> 
> var concatenated = Stream.concat(head, bar);
> 
> var list = concatenated.map(...).filter(...).gather(...).toList();
> 
> But now you could write them as follows and retain a single, fluent pipeline:
> 
> var list = foo.filter(...).map(...).flatMap(...).gather(concat(bar)).map(...).filter(...).gather(...).toList();
> 
> My argument for including it would be that the above use case is common enough. `Stream::concat` could then also reference it in its Javadoc as an alternative.
> 
> > Creating such a Gatherer means that it is not reusable. You'd need to have a Supplier<Stream<T>>. Otherwise this happens:
> 
> > 
> 
> > jshell>     public static <T> Gatherer<T, ?, T> **concat**(Stream<T>**newStream**) {
> 
> >    ...>         return Gatherer.of(
> 
> >    ...>                 Gatherer.Integrator.ofGreedy((_, **e**, **d**) -> d.push(e)),
> 
> >    ...>                 (_, **d**) -> newStream.sequential().allMatch(d::push)
> 
> >    ...>         );
> 
> >    ...>     }
> 
> >    ...> 
> 
> > |  created method concat(Stream<T>)
> 
> > 
> 
> > jshell> var **inject** = concat(Stream.of(1,2))
> 
> > inject ==> GathererImpl[initializer=DEFAULT, integrator=$Lam ... 00001c00000db898 at 1068e947]
> 
> > 
> 
> > jshell> Stream.of(0).gather(inject.andThen(inject)).toList()
> 
> > |  Exception java.lang.IllegalStateException: stream has already been operated upon or closed
> 
> > |        at AbstractPipeline.evaluate (AbstractPipeline.java:260)
> 
> > |        at ReferencePipeline.allMatch (ReferencePipeline.java:677)
> 
> > |        at lambda$concat$1 (#4:4)
> 
> > |        at Gatherers$Composite.lambda$impl$3 (Gatherers.java:611)
> 
> > |        at GathererOp$GatherSink.end (GathererOp.java:181)
> 
> > |        at AbstractPipeline.copyInto (AbstractPipeline.java:571)
> 
> > |        at AbstractPipeline.wrapAndCopyInto (AbstractPipeline.java:560)
> 
> > |        at AbstractPipeline.evaluate (AbstractPipeline.java:636)
> 
> > |        at AbstractPipeline.evaluateToArrayNode (AbstractPipeline.java:291)
> 
> > |        at ReferencePipeline.toArray (ReferencePipeline.java:656)
> 
> > |        at ReferencePipeline.toArray (ReferencePipeline.java:662)
> 
> > |        at ReferencePipeline.toList (ReferencePipeline.java:667)
> 
> > |        at (#6:1)
> 
> > 
> 
> > That being said, given how little code it takes to implement something like that, I am not sure it warrants inclusion:
> 
> > jshell>     public static <T> Gatherer<T, ?, T> **concat**(Supplier<?extends Stream<T>> **newStream**) {
> 
> >    ...>         return Gatherer.of(
> 
> >    ...>                 Gatherer.Integrator.ofGreedy((**_**, **e**, **d**) -> d.push(e)),
> 
> >    ...>                 (**_**, **d**) -> newStream.get().sequential().allMatch(d::push)
> 
> >    ...>         );
> 
> >    ...>     }
> 
> > |  created method concat(Supplier<? extends Stream<T>>)
> 
> > 
> 
> > jshell> var **inject** = concat(() -> Stream.of(1,2))
> 
> > inject ==> GathererImpl[initializer=DEFAULT, integrator=$Lam ... 00001c00000d9c70 at 1a052a00]
> 
> > 
> 
> > jshell> Stream.of(0).gather(inject.andThen(inject)).toList()
> 
> > $1 ==> [0, 1, 2, 1, 2]
> 
> > 
> 
> > Cheers,
> 
> > 
> 
> > √
> 
> > 
> 
> > **Viktor Klang**
> 
> > Software Architect, Java Platform Group
> 
> > 
> 
> > Oracle
> 
> Kind regards,
> 
> Anthony
> 
> > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> > 
> 
> > **From:** Anthony Vanelverdinghe <dev at anthonyv.be>
> 
> > **Sent:** Monday, 19 August 2024 20:37
> 
> > **To:** Viktor Klang <viktor.klang at oracle.com>; core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> 
> > **Subject:** Re: [External] : Re: Stream Gatherers (JEP 473) feedback
> 
> >  
> 
> > 
> 
> > August 15, 2024 at 1:27 PM, "Viktor Klang" <viktor.klang at oracle.com> wrote:
> 
> > 
> 
> > >
> 
> > 
> 
> > > Hi Anthony,
> 
> > 
> 
> > Hi Viktor
> 
> > 
> 
> > > Thanks for the input—it's much appreciated!
> 
> > 
> 
> > >
> 
> > 
> 
> > > Introducing yet another, user-facing, type parameter to get slightly improved type inference is unfortunately for me a too high of a price to pay. Ideally, type inference/unification will be improved over time making this issue go away without impacting any signatures.
> 
> > 
> 
> > My arguments would be:
> 
> > 
> 
> > * the type parameter enables using subtypes of Downstream, e.g. `Gatherer::integrator` could return an `Integrator<A, T, R, SpecialDownstream<R>>`
> 
> > 
> 
> > * the type parameter improves type inference
> 
> > 
> 
> > * the type parameter would increase usability. In my experience, nearly all Gatherers are created through the factory methods in Gatherer. And thanks to the improved type inference, I assert that all factory method invocations would work without any type arguments at all. Nowadays type inference is so good that I found it remarkable how often (relatively speaking) I need to provide type arguments with Gatherers, compared to other generic APIs. A substantial amount of Java developers has probably never even had to provide type arguments before, so being able to eliminate their need from the Gatherers API as well seems like a considerable advantage to me
> 
> > 
> 
> > * how realistic is it for type inference to be improved to the point that usage of the Gatherers API wouldn't require type arguments? Both technically and in terms of cost-benefit?
> 
> > 
> 
> > > I'm warming up to the idea of shipping a Gatherers.identity(), and before that happens I would like to see more use-cases where having such a thing would provide a real edge. I can come up with a bunch of synthetic scenarios where it's a win, but it's always better to see app logic numbers.
> 
> > 
> 
> > To summarize previous mails, my arguments are:
> 
> > 
> 
> > * it's a common Gatherer. Gatherers of the form `Gatherer<T, ?, T>` will likely have a degenerate case that is the identity. Some actual factory methods are `append(T... elements)` and `concat(Stream<T> stream)`, `prepend(T... elements)`, and `withInterpolationAt(Set<Instant> instants)`.
> 
> > 
> 
> > * optimization: if a Stream pipeline only contains compositions of `Gatherer.identity()`, the Gatherers can be eliminated entirely from the pipeline and characteristics can be propagated. So for example `list.stream().gather(withInterpolationAt(aSetThatHappensToBeEmpty)).count()` would be optimized to `list.stream().count()` and return instantly. Note that while a homemade implementation could optimize its `andThen` implementation, it wouldn't be able to optimize `Gatherer::andThen` and `Stream::gather`.
> 
> > 
> 
> > * API consistency: there's `Function.identity()`, so why not `Gatherers.identity()` (or `Gatherer.identity()`)? Actually I'd argue this method is more useful for Gatherers, since for Functions, this is often written as `o -> o` instead. For Gatherers there's no alternative like that.
> 
> > 
> 
> > On a final note, in case it hasn't been done yet, I'd like to propose `Gatherers.concat(Stream<T> stream)`. The current `Stream::concat` doesn't allow fluent/readable concatenation of multiple streams.
> 
> > 
> 
> > > Getting rid of the rawtypes in Value could be done, at any point since it isn't exposed to user code. I'll keep this in mind for any upcoming maintenance 👍
> 
> > 
> 
> > >
> 
> > 
> 
> > > Keep the feedback coming 🙂
> 
> > 
> 
> > >
> 
> > 
> 
> > > Cheers,
> 
> > 
> 
> > >
> 
> > 
> 
> > > √
> 
> > 
> 
> > Kind regards,
> 
> > 
> 
> > Anthony
> 
> > 
> 
> > > **Viktor Klang**
> 
> > 
> 
> > > Software Architect, Java Platform Group
> 
> > 
> 
> > >
> 
> > 
> 
> > > Oracle
> 
> > 
> 
> > >
> 
> > 
> 
> > > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> > 
> 
> > >
> 
> > 
> 
> > > **From:** Anthony Vanelverdinghe <dev at anthonyv.be>
> 
> > 
> 
> > > **Sent:** Tuesday, 13 August 2024 18:32
> 
> > 
> 
> > > **To:** Viktor Klang <viktor.klang at oracle.com>; core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> 
> > 
> 
> > > **Subject:** [External] : Re: Stream Gatherers (JEP 473) feedback
> 
> > 
> 
> > >  
> 
> > 
> 
> > >
> 
> > 
> 
> > > Hi Viktor
> 
> > 
> 
> > >
> 
> > 
> 
> > > Your previous response inspired me to experiment some more with Gatherers
> 
> > 
> 
> > >
> 
> > 
> 
> > > As a result I'd like to make a couple propositions:
> 
> > 
> 
> > >
> 
> > 
> 
> > > * add an additional type parameter.
> 
> > 
> 
> > >
> 
> > 
> 
> > >   After doing so, type inference no longer needs any assistance:
> 
> > 
> 
> > >
> 
> > 
> 
> > >   `var maxGatherer = Gatherer.ofSequential(State::new, State::integrate, State::finish);`
> 
> > 
> 
> > >
> 
> > 
> 
> > > * add an identity Gatherer with an optimized `andThen` implementation
> 
> > 
> 
> > >
> 
> > 
> 
> > >   as well as an optimization in the default implementation of `Gatherer::andThen`
> 
> > 
> 
> > >
> 
> > 
> 
> > > * eliminate the use of raw types in `Gatherers.Value`
> 
> > 
> 
> > >
> 
> > 
> 
> > > Code that implements these propositions is in this gist: https://urldefense.com/v3/__https://gist.github.com/anthonyvdotbe/37c85eaa86a7833051bc33f6fe88046c__;!!ACWV5N9M2RV99hQ!J9jmL_Q8cHhLAre5Oz5Dq3qafSXAQ2V8f-LrbjNY_tU4qSEx0LDudohXkxCugKiIJpm708DXqVct8oxUqg$
> 
> > 
> 
> > >
> 
> > 
> 
> > > Kind regards,
> 
> > 
> 
> > >
> 
> > 
> 
> > > Anthony
> 
> > 
> 
> > >
> 
> > 
> 
> > > July 31, 2024 at 7:58 PM, "Viktor Klang" <viktor.klang at oracle.com> wrote:
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Hi Anthony,
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >The use case is a time series, which has methods to return a Stream of data points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If `instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most factory methods with a collection parameter (and compatible type arguments for T and R) will have a degenerate case like this when the collection is empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another example.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > `identity()` would also allow an optimized `andThen` implementation, simply returning its argument. And when uncomposed, the Stream library could eliminate the `gather` stage, allowing characteristics to propogate in this case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` could be optimized to `treeSet.stream().toList()`.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Have you experimented with implementing your own identity Gatherer and implemented its andThen to return the second argument?
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >That being said, I hadn't considered cases where an intermediate stage in the pipeline would not propagate the characteristics. And in cases where the intermediate stage doesn't affect the characteristics, it would actually be worse to use something like `Gatherers.sorted().andThen(…)`, instead of just keeping track of the previous element and throwing an IllegalStateException if necessary.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Yeah, that or implementing a reorder buffer Gatherer (in case you have unique and continuous sequence numbers).
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >This raises a new question though: on line 27 I'd expect I wouldn't need to specify the type arguments for the `ofSequential` method invocation. Is this hitting inherent limitations of type inference or is it possible that some generic type bounds aren't as generic as they could be, prohibiting type inference in certain cases?
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Yes, there are some limitations to inference, you can see usage examples in the implementation of Gatherers which does need some assistance to inference:https://urldefense.com/v3/__https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/stream/Gatherers.java__;!!ACWV5N9M2RV99hQ!J9jmL_Q8cHhLAre5Oz5Dq3qafSXAQ2V8f-LrbjNY_tU4qSEx0LDudohXkxCugKiIJpm708DXqVdv0LXetA$
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Cheers,
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > √
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > **Viktor Klang**
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Software Architect, Java Platform Group
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Oracle
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > **From:** Anthony Vanelverdinghe <dev at anthonyv.be>
> 
> > 
> 
> > >
> 
> > 
> 
> > > > **Sent:** Tuesday, 30 July 2024 17:20
> 
> > 
> 
> > >
> 
> > 
> 
> > > > **To:** Viktor Klang <viktor.klang at oracle.com>; core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> 
> > 
> 
> > >
> 
> > 
> 
> > > > **Subject:** [External] : Re: Stream Gatherers (JEP 473) feedback
> 
> > 
> 
> > >
> 
> > 
> 
> > > >  
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > July 29, 2024 at 8:08 PM, "Viktor Klang" <viktor.klang at oracle.com> wrote:
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Hi Anthony,
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Hi Viktor
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Thank you for your patience, and for providing feedback, it is always much appreciated.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > >When writing factory methods for Gatherers, there's sometimes a
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > degenerate case that requires returning a no-op Gatherer. So I'd like a
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > way to mark a no-op Gatherer as such, allowing the Stream implementation
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > to recognize and eliminate it from the pipeline. One idea is to add
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Gatherer.defaultIntegrator(), analogous to the other default… methods.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Another is to add Gatherers.identity(), analogous to Function.identity().
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > I contemplated adding that but in the end I decided I didn't want to add it for the sake of adding it,
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > but rather adding it in case it was deemed necessary.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Do you have a concrete use-case (code) that you could share?
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > The use case is a time series, which has methods to return a Stream of data points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If `instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most factory methods with a collection parameter (and compatible type arguments for T and R) will have a degenerate case like this when the collection is empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another example.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > `identity()` would also allow an optimized `andThen` implementation, simply returning its argument. And when uncomposed, the Stream library could eliminate the `gather` stage, allowing characteristics to propogate in this case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` could be optimized to `treeSet.stream().toList()`.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > >Sometimes a factory method returns a Gatherer that only works correctly
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > if the upstream has certain characteristics, for example
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Spliterator.SORTED or Spliterator.DISTINCT.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Do you have a concrete use-case (code) that you could share?
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > All the Streams that are returned from TimeSeries are well-formed: their data points are sorted and distinct. And the Gatherer factory methods in DataPoint assume that their upstreams have these characteristics. However, we can't prevent clients from constructing malformed Streams and invoking the Gatherers on them, which will give erroneous results. Now the Gatherer could keep track of the previous element and verify that the current element is greater than the previous. But the idea was to eliminate this bookkeeping for well-formed Streams, while still preventing erroneous results.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > >One idea is to add methods
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > like Gatherers.sorted() and Gatherers.distinct(), where the Stream
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > implementation would be able to recognize and eliminate these from the
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > pipeline if the upstream already has these characteristics. That way
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > idea is to provide a Gatherer with a way to inspect the upstream
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > characteristics. If the upstream is missing the required
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > characteristic(s), it could then throw an IllegalStateException.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > I figured the latter idea isn't useful: the upstream might be sorted, even though it doesn't have the sorted characteristic. So it would be harsh for the Gatherer to throw an exception in that case.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > For a rather long time Gatherer had characteristics, however,
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > what I noticed is that given composition of Gatherers what ended up happening
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > almost always was that the combination of characteristics added overhead and devolved into the empty set
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > real fast.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Also, when it comes to things like sorted() and distinct(), they (by necessity) have to get processed in full
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > before emitting anything downstream, which creates a lot of extra memory allocation and doesn't lend themselves all that well to any depth-first streaming.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > In the given use case, well-formed Streams would already have the sorted and distinct characteristics. So the idea was that the sorted() and distinct() gatherers would be eliminated from the pipeline entirely in those cases. Only malformed Streams would have to pay the cost of sorted() and distinct(), but that'd be an acceptable price for them to pay.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > That being said, I hadn't considered cases where an intermediate stage in the pipeline would not propagate the characteristics. And in cases where the intermediate stage doesn't affect the characteristics, it would actually be worse to use something like `Gatherers.sorted().andThen(…)`, instead of just keeping track of the previous element and throwing an IllegalStateException if necessary.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > >The returns clause of Gatherer.Integrator::integrate just states "true
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > if subsequent integration is desired, false if not". In particular, it
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > doesn't document the behavior I'm observing, that returning false also
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > causes downstream to reject any further output elements.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Do you have a test case? (There was a bug fixed in this area after 22 was released, so you may want to test it on a 23-ea)
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > I've uploaded a test case ( https://urldefense.com/v3/__https://gist.github.com/anthonyvdotbe/17e2285bb4f497ed91502b3c09b9a000__;!!ACWV5N9M2RV99hQ!K6tYLK81tcE53MJoE6Dy5VsdhRBlKjNSIbt2BZ-ymlsPWKXiD1FLu-nWwI8WoOyZWihFugQZw9kXEKupSw$  ), but this is indeed already fixed in JDK 23-ea.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > This raises a new question though: on line 27 I'd expect I wouldn't need to specify the type arguments for the `ofSequential` method invocation. Is this hitting inherent limitations of type inference or is it possible that some generic type bounds aren't as generic as they could be, prohibiting type inference in certain cases?
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Cheers,
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > √
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > **Viktor Klang**
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Software Architect, Java Platform Group
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Oracle
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Kind regards,
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > Anthony
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > **From:** core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of Anthony Vanelverdinghe <dev at anthonyv.be>
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > **Sent:** Saturday, 27 July 2024 08:57
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > **To:** core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > **Subject:** Stream Gatherers (JEP 473) feedback
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >  
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > When writing factory methods for Gatherers, there's sometimes a
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > degenerate case that requires returning a no-op Gatherer. So I'd like a
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > way to mark a no-op Gatherer as such, allowing the Stream implementation
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > to recognize and eliminate it from the pipeline. One idea is to add
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Gatherer.defaultIntegrator(), analogous to the other default… methods.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Another is to add Gatherers.identity(), analogous to Function.identity().
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Sometimes a factory method returns a Gatherer that only works correctly
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > if the upstream has certain characteristics, for example
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Spliterator.SORTED or Spliterator.DISTINCT. One idea is to add methods
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > like Gatherers.sorted() and Gatherers.distinct(), where the Stream
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > implementation would be able to recognize and eliminate these from the
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > pipeline if the upstream already has these characteristics. That way
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > idea is to provide a Gatherer with a way to inspect the upstream
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > characteristics. If the upstream is missing the required
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > characteristic(s), it could then throw an IllegalStateException.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > The returns clause of Gatherer.Integrator::integrate just states "true
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > if subsequent integration is desired, false if not". In particular, it
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > doesn't document the behavior I'm observing, that returning false also
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > causes downstream to reject any further output elements.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > In the Implementation Requirements section of Gatherer, rephrasing
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > "Outputs and state later in the input sequence will be discarded if
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > processing an earlier partition short-circuits." to something like the
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > following would be clearer to me: "As soon as any partition
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > short-circuits, the whole Gatherer short-circuits. The state of other
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > partitions is discarded, i.e. there are no further invocations of the
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > combiner. The finisher is invoked with the short-circuiting partition's
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > state." I wouldn't mention discarding of outputs, since that's implied
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > by the act of short-circuiting.
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > > Anthony
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > > >
> 
> > 
> 
> > >
> 
> > 
> 
> > > >
> 
> > 
> 
> > >
> 
> >
>


More information about the core-libs-dev mailing list