Stream Gatherers (JEP 473) feedback

Anthony Vanelverdinghe dev at anthonyv.be
Tue Aug 13 16:32:16 UTC 2024


Hi Viktor

Your previous response inspired me to experiment some more with Gatherers
As a result I'd like to make a couple propositions:

* add an additional type parameter.
  After doing so, type inference no longer needs any assistance:
  `var maxGatherer = Gatherer.ofSequential(State::new, State::integrate, State::finish);`
* add an identity Gatherer with an optimized `andThen` implementation
  as well as an optimization in the default implementation of `Gatherer::andThen`
* eliminate the use of raw types in `Gatherers.Value`

Code that implements these propositions is in this gist: https://gist.github.com/anthonyvdotbe/37c85eaa86a7833051bc33f6fe88046c

Kind regards,
Anthony

July 31, 2024 at 7:58 PM, "Viktor Klang" <viktor.klang at oracle.com> wrote:
> 
> Hi Anthony,
> 
> >The use case is a time series, which has methods to return a Stream of data points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If `instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most factory methods with a collection parameter (and compatible type arguments for T and R) will have a degenerate case like this when the collection is empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another example.
> 
> `identity()` would also allow an optimized `andThen` implementation, simply returning its argument. And when uncomposed, the Stream library could eliminate the `gather` stage, allowing characteristics to propogate in this case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` could be optimized to `treeSet.stream().toList()`.
> 
> Have you experimented with implementing your own identity Gatherer and implemented its andThen to return the second argument?
> 
> >That being said, I hadn't considered cases where an intermediate stage in the pipeline would not propagate the characteristics. And in cases where the intermediate stage doesn't affect the characteristics, it would actually be worse to use something like `Gatherers.sorted().andThen(…)`, instead of just keeping track of the previous element and throwing an IllegalStateException if necessary.
> 
> Yeah, that or implementing a reorder buffer Gatherer (in case you have unique and continuous sequence numbers).
> 
> >This raises a new question though: on line 27 I'd expect I wouldn't need to specify the type arguments for the `ofSequential` method invocation. Is this hitting inherent limitations of type inference or is it possible that some generic type bounds aren't as generic as they could be, prohibiting type inference in certain cases?
> 
> Yes, there are some limitations to inference, you can see usage examples in the implementation of Gatherers which does need some assistance to inference:https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/stream/Gatherers.java
> 
> Cheers,
> 
>> 
> **Viktor Klang**
> Software Architect, Java Platform Group
> 
> Oracle
> 
> ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> **From:** Anthony Vanelverdinghe <dev at anthonyv.be>
> **Sent:** Tuesday, 30 July 2024 17:20
> **To:** Viktor Klang <viktor.klang at oracle.com>; core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> **Subject:** [External] : Re: Stream Gatherers (JEP 473) feedback
>  
> 
> July 29, 2024 at 8:08 PM, "Viktor Klang" <viktor.klang at oracle.com> wrote:
> 
> >
> 
> > Hi Anthony,
> 
> Hi Viktor
> 
> > Thank you for your patience, and for providing feedback, it is always much appreciated.
> 
> >
> 
> > >When writing factory methods for Gatherers, there's sometimes a
> 
> > degenerate case that requires returning a no-op Gatherer. So I'd like a
> 
> > way to mark a no-op Gatherer as such, allowing the Stream implementation
> 
> > to recognize and eliminate it from the pipeline. One idea is to add
> 
> > Gatherer.defaultIntegrator(), analogous to the other default… methods.
> 
> > Another is to add Gatherers.identity(), analogous to Function.identity().
> 
> >
> 
> > I contemplated adding that but in the end I decided I didn't want to add it for the sake of adding it,
> 
> > but rather adding it in case it was deemed necessary.
> 
> >
> 
> > Do you have a concrete use-case (code) that you could share?
> 
> The use case is a time series, which has methods to return a Stream of data points, `record DataPoint(Instant, BigDecimal)`. In DataPoint, there are several Gatherer factory methods, one of which is `Gatherer<DataPoint, ?, DataPoint> withInterpolationAt(NavigableSet<Instant> instants)`. If `instants.isEmpty()`, it returns a no-op Gatherer. In general, I guess most factory methods with a collection parameter (and compatible type arguments for T and R) will have a degenerate case like this when the collection is empty. `<T> Gatherer<T, ?, T> append(T... elements)` would be another example.
> 
> `identity()` would also allow an optimized `andThen` implementation, simply returning its argument. And when uncomposed, the Stream library could eliminate the `gather` stage, allowing characteristics to propogate in this case. So `treeSet.stream().gather(identity()).sorted().distinct().toList()` could be optimized to `treeSet.stream().toList()`.
> 
> > >Sometimes a factory method returns a Gatherer that only works correctly
> 
> > if the upstream has certain characteristics, for example
> 
> > Spliterator.SORTED or Spliterator.DISTINCT.
> 
> >
> 
> > Do you have a concrete use-case (code) that you could share?
> 
> All the Streams that are returned from TimeSeries are well-formed: their data points are sorted and distinct. And the Gatherer factory methods in DataPoint assume that their upstreams have these characteristics. However, we can't prevent clients from constructing malformed Streams and invoking the Gatherers on them, which will give erroneous results. Now the Gatherer could keep track of the previous element and verify that the current element is greater than the previous. But the idea was to eliminate this bookkeeping for well-formed Streams, while still preventing erroneous results.
> 
> > >One idea is to add methods
> 
> > like Gatherers.sorted() and Gatherers.distinct(), where the Stream
> 
> > implementation would be able to recognize and eliminate these from the
> 
> > pipeline if the upstream already has these characteristics. That way
> 
> > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another
> 
> > idea is to provide a Gatherer with a way to inspect the upstream
> 
> > characteristics. If the upstream is missing the required
> 
> > characteristic(s), it could then throw an IllegalStateException.
> 
> I figured the latter idea isn't useful: the upstream might be sorted, even though it doesn't have the sorted characteristic. So it would be harsh for the Gatherer to throw an exception in that case.
> 
> > For a rather long time Gatherer had characteristics, however,
> 
> > what I noticed is that given composition of Gatherers what ended up happening
> 
> > almost always was that the combination of characteristics added overhead and devolved into the empty set
> 
> > real fast.
> 
> >
> 
> > Also, when it comes to things like sorted() and distinct(), they (by necessity) have to get processed in full
> 
> > before emitting anything downstream, which creates a lot of extra memory allocation and doesn't lend themselves all that well to any depth-first streaming.
> 
> In the given use case, well-formed Streams would already have the sorted and distinct characteristics. So the idea was that the sorted() and distinct() gatherers would be eliminated from the pipeline entirely in those cases. Only malformed Streams would have to pay the cost of sorted() and distinct(), but that'd be an acceptable price for them to pay.
> 
> That being said, I hadn't considered cases where an intermediate stage in the pipeline would not propagate the characteristics. And in cases where the intermediate stage doesn't affect the characteristics, it would actually be worse to use something like `Gatherers.sorted().andThen(…)`, instead of just keeping track of the previous element and throwing an IllegalStateException if necessary.
> 
> > >The returns clause of Gatherer.Integrator::integrate just states "true
> 
> > if subsequent integration is desired, false if not". In particular, it
> 
> > doesn't document the behavior I'm observing, that returning false also
> 
> > causes downstream to reject any further output elements.
> 
> >
> 
> > Do you have a test case? (There was a bug fixed in this area after 22 was released, so you may want to test it on a 23-ea)
> 
> I've uploaded a test case ( https://urldefense.com/v3/__https://gist.github.com/anthonyvdotbe/17e2285bb4f497ed91502b3c09b9a000__;!!ACWV5N9M2RV99hQ!K6tYLK81tcE53MJoE6Dy5VsdhRBlKjNSIbt2BZ-ymlsPWKXiD1FLu-nWwI8WoOyZWihFugQZw9kXEKupSw$  ), but this is indeed already fixed in JDK 23-ea.
> 
> This raises a new question though: on line 27 I'd expect I wouldn't need to specify the type arguments for the `ofSequential` method invocation. Is this hitting inherent limitations of type inference or is it possible that some generic type bounds aren't as generic as they could be, prohibiting type inference in certain cases?
> 
> > Cheers,
> 
> >
> 
> > √
> 
> >
> 
> > **Viktor Klang**
> 
> > Software Architect, Java Platform Group
> 
> >
> 
> > Oracle
> 
> Kind regards,
> 
> Anthony
> 
> > ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯
> 
> >
> 
> > **From:** core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of Anthony Vanelverdinghe <dev at anthonyv.be>
> 
> > **Sent:** Saturday, 27 July 2024 08:57
> 
> > **To:** core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> 
> > **Subject:** Stream Gatherers (JEP 473) feedback
> 
> >  
> 
> >
> 
> > When writing factory methods for Gatherers, there's sometimes a
> 
> >
> 
> > degenerate case that requires returning a no-op Gatherer. So I'd like a
> 
> >
> 
> > way to mark a no-op Gatherer as such, allowing the Stream implementation
> 
> >
> 
> > to recognize and eliminate it from the pipeline. One idea is to add
> 
> >
> 
> > Gatherer.defaultIntegrator(), analogous to the other default… methods.
> 
> >
> 
> > Another is to add Gatherers.identity(), analogous to Function.identity().
> 
> >
> 
> > Sometimes a factory method returns a Gatherer that only works correctly
> 
> >
> 
> > if the upstream has certain characteristics, for example
> 
> >
> 
> > Spliterator.SORTED or Spliterator.DISTINCT. One idea is to add methods
> 
> >
> 
> > like Gatherers.sorted() and Gatherers.distinct(), where the Stream
> 
> >
> 
> > implementation would be able to recognize and eliminate these from the
> 
> >
> 
> > pipeline if the upstream already has these characteristics. That way
> 
> >
> 
> > we'd be able to write `return Gatherers.sorted().andThen(…);`. Another
> 
> >
> 
> > idea is to provide a Gatherer with a way to inspect the upstream
> 
> >
> 
> > characteristics. If the upstream is missing the required
> 
> >
> 
> > characteristic(s), it could then throw an IllegalStateException.
> 
> >
> 
> > The returns clause of Gatherer.Integrator::integrate just states "true
> 
> >
> 
> > if subsequent integration is desired, false if not". In particular, it
> 
> >
> 
> > doesn't document the behavior I'm observing, that returning false also
> 
> >
> 
> > causes downstream to reject any further output elements.
> 
> >
> 
> > In the Implementation Requirements section of Gatherer, rephrasing
> 
> >
> 
> > "Outputs and state later in the input sequence will be discarded if
> 
> >
> 
> > processing an earlier partition short-circuits." to something like the
> 
> >
> 
> > following would be clearer to me: "As soon as any partition
> 
> >
> 
> > short-circuits, the whole Gatherer short-circuits. The state of other
> 
> >
> 
> > partitions is discarded, i.e. there are no further invocations of the
> 
> >
> 
> > combiner. The finisher is invoked with the short-circuiting partition's
> 
> >
> 
> > state." I wouldn't mention discarding of outputs, since that's implied
> 
> >
> 
> > by the act of short-circuiting.
> 
> >
> 
> > Anthony
> 
> >
>


More information about the core-libs-dev mailing list