java.util.stream.Stream: API for user-extensible intermediate operations

Brian Goetz brian.goetz at oracle.com
Wed Jun 28 20:37:10 UTC 2023


Thanks for these comments.  To respond indirectly to your point about 
parallelization, many of the challenges arise from a tradeoff made in 
designing Spliterator, that arises from the desire to be able to extract 
parallelism not only from the really easy case (arrays), but also more 
complex cases like trees.

Spliterator leaves the choice of where to split to the data structure, 
because the data structure knows how its data is laid out.  This lets, 
for example, trees split left-and-right, rather than trying to find a 
pivot point somewhere halfway down the right side of the tree.  This, in 
turn, reduces the cost of splitting and the cost of extracting the next 
element -- all critical elements in parallel performance.

What we traded away when we did this is the ability to predictably split 
in a given place (which makes parallel zipWith nearly impossible), and 
to a lesser degree to know where a split is in the traversal (a 
spliterator may, but is not required to, provide this information.)  
This ripples into your pairMap example, though not in nearly a bad way 
as with zip; when pairMapping [ 1, 2, 3, 4, 5, 6 ], and let's say we 
split between 3 and 4, we know that f(4,5) and f(5, 6) are in the 
output, and we can precompute them, but we have to buffer until we 
combine so we can extract f(3, 4).  (Actually, whether we have to buffer 
depends on whether combination combines f(3, 4) into the left or right 
split.  For this particular problem, we'd not need to buffer if the 
extra element went into the left split.)

You have a worse problem with non-sliding windows, where we'd want to 
split the above stream into f(1, 2), f(3, 4), f(5, 6). When we split 
between 3 and 4, we may not know whether we're at an odd or even offset, 
so we might not even be able to start computing f(4, 5) because it might 
not be in the output at all.

Again, these come more from the deliberate design tradeoffs made with 
Spliterator, rather than from the particular way gatherers do their 
combination.  In some cases, it might even be more efficient to copy to 
the source to a more predictably splittable representation before 
parallel execution.



On 6/28/2023 4:15 PM, Tagir Valeev wrote:
> Hello!
>
> Interesting read, thanks. I'm the author of the StreamEx library,
> which actually enhances Stream API, and I use it during my daily job,
> so I have some thoughts about extending Stream API as well. Below are
> some of them.
>
> First, there are many useful operations that can be trivially defined
> in terms of existing operations. For example,
>    mapPartial(Function<? super T, ? extends Optional<? extends R>> mapper)
> It's essentially map(mapper).flatMap(Optional::stream), though it's
> much more performant to express it as
>    map(value -> mapper.apply(value).orElse(null)).filter(Objects::nonNull)
> It might be desired to encapsulate such implementations in static
> methods, and here .chain() or .transform() method would be helpful,
> seehttps://bugs.openjdk.org/browse/JDK-8140283.
>
> There are also some operations that cannot be expressed via existing
> 'normal' operations, and could be implemented taking spliterator() of
> the original stream.
> This is somewhat limited, especially taking into account how
> WrappingSpliterator handles tryAdvance and trySplit. Stream API would
> certainly benefit
> if it can be more transparent and provide better ways to extend it.
>
> One problematic place is how to provide a reasonable default
> implementation for new methods like 'gather'. There are many
> third-party implementations of Stream interface in the wild. Many of
> them
> just delegate most of the operations to JDK stream, adding some custom
> logic. It should be expected that calling 'gather' on them will work
> according to the spec.
>
> Another thing that bothers me is parallel stream support. Not every
> custom operation that you may imagine could be efficiently implemented
> for parallel streams. But for some operations this is certainly
> possible. It's unclear whether the proposed Gatherer interface is
> flexible enough to provide good parallelization for operations. I have
> a couple of exercises:
>
> 1. public <R> Stream<R> pairMap(BiFunction<? super T, ? super T, ?
> extends R> mapper) {...}
>    This operation processes overlapping adjacent pairs of the source
> stream (the resulting stream is one element shorter). It's quite
> useful, it's used 12 times in our production code. Also, it's a nice
> building block for other operations. For example, we have Stream<T>
> mapFirst(Function<? super T, ? extends T> mapper) {...} which maps the
> first element of the stream, leaving other elements intact. It could
> be implemented as prepend(sentinel).pairMap((left, right) -> left ==
> sentinel ? mapper.apply(right) : right) (prepend can be trivially
> expressed via concat()). It's possible (though hard) to implement
> parallel pairMap using spliterator() escape hatch, and it will do its
> work nicely without performance hiccups and using only the constant
> amount of memory per every forked parallel task. I think such an
> operation is a good test for new extensions. If pairMap could be
> expressed in terms of newly-proposed operations, and it would nicely
> work in parallel, it would be a strong argument that the new API is
> good.
>
> 2. Collapse family. The basic idea is 'anti-flatMap'. It processes the
> stream respecting its order, and sometimes combines adjacent elements
> to a single one using different rules. It normally uses BiPredicate<T,
> T> to check whether two adjacent elements should be collapsed. There
> are at least five public operations in this family:
>    Stream<T> collapse(BiPredicate<? super T, ? super T> collapsible) --
> like your proposed collapseRuns or deduplicateAdjacent
>    Stream<T> collapse(BiPredicate<? super T, ? super T> collapsible,
> BinaryOperator<T> merger) -- similar but with custom associative
> operation that merges collapsible elements
>    <R, A> Stream<R> collapse(BiPredicate<? super T, ? super T>
> collapsible, Collector<? super T, A, R> collector) -- similar but
> merging collapsible elements using the specified collector
>    Stream<Entry<T, Long>> runLengths() -- produces pairs that track
> number of elements collapsed into one
>    Stream<List<T>> groupRuns(BiPredicate<? super T, ? super T>
> sameGroup) -- like collapse(collapsible, toList()) but much better
> optimized
>    <U> Stream<U> intervalMap(BiPredicate<? super T, ? super T>
> sameInterval, BiFunction<? super T, ? super T, ? extends U> mapper) --
> mapper applies to the first and last element of every interval.
> Possible application: convert Stream(1,2,3,6,7,8) to Stream(new
> Interval(1, 3), new Interval(6, 8)) (collapse adjacent numbers into a
> single 'interval' record)
>
>    It's also possible (though very hard) to provide efficient
> parallelizable implementation of these methods using spliterator().
> Again, if a new API allows doing this simpler without much performance
> overhead, it would be a big win. The proposed implementation provides
> no combiner, so apparently it's not parallel-friendly.
>
> Other operations that people may want:
>    <V, R> Stream<R> zipWith(BaseStream<V, ?> other, BiFunction<? super
> T, ? super V, ? extends R> mapper) -- zip two streams (one can zip
> with IntStream.range(0, ...).boxed() to get a stream of elements with
> their indices). Quite impossible to parallelize efficiently in the
> general case
>    Stream<T> ifEmpty(Stream<? extends T> another) -- replace contents
> of this stream with another stream but only if this stream is
> completely empty. Surprisingly useful, somewhat like orElseGet in
> Optional. Is it possible to parallelize efficiently if the original
> stream is empty and the other is parallel?
>
> Another thing that I noticed when extending Stream API is that
> short-circuiting collectors would be a good addition. I have such an
> API, and there are a number of such collectors:
>    <T> Collector<T, ?, Optional<T>> onlyOne() -- returns element if the
> stream contains exactly one element; otherwise returns empty optional.
> Stops if the second element is observed
>    <T extends Enum<T>> Collector<T, ?, EnumSet<T>> toEnumSet(Class<T>
> enumClass) -- collects stream of enum values to EnumSet. Stops if all
> values are already there
>    <T> Collector<T, ?, Optional<T>> first() -- just first element of the stream
>    <T> Collector<T, ?, List<T>> head(int n) -- first n elements of the stream
>    <T, S extends Collection<T>> Collector<S, ?, Set<T>> intersecting()
> -- intersection of collections in the stream. Stops if the current
> intersection is empty.
>    <T> Collector<T, ?, OptionalInt> andingInt(ToIntFunction<T> mapper)
> -- perform bitwise & on results of mapper function. Stops if the
> current result is 0.
>    Collector<CharSequence, ?, String> commonPrefix() -- finds common
> prefix for the stream of char sequences. Stops if common prefix is
> already empty
>    <T> Collector<T, ?, Optional<T>> reducingWithZero(T zero,
> BinaryOperator<T> op) -- more generic reduction collector which stops
> if current value reaches the supplied zero value
> And so on. The most useful is probably a flavor of joining() which can
> limit the resulting string, adding an ellipsis (and stopping consuming
> the input, of course). To implement them, we need only one new
> Predicate<A> finished() method in the collector that tells for current
> accumulator if we reached the end or not (and probably a new
> characteristic). Many combining collectors (mapping, filtering,
> flatMapping, partitioningBy) can be adapted to produce a
> short-circuiting collector if the downstream collector is
> short-circuiting. All of these collectors can parallelize nicely. I
> needed to hack the collect() terminal operation to support this stuff
> properly.
>
> With best regards,
> Tagir Valeev.
>
> On Tue, Jun 27, 2023 at 7:11 PM Viktor Klang<viktor.klang at oracle.com>  wrote:
>> Hi core-libs-dev,
>>
>> Over the past 6+ months I've been thinking about, and tinkering with, how we'd be able to expose a user-facing API for extensible intermediate java.util.stream.Stream operations—a feature envisioned all the way back when Streams were created.
>>
>> I'm now at a point where I have a viable design and implementation, and so I'm turning to you for your feedback: on the direction taken; the API concepts; and, in particular, is there anything which I have overlooked/missed?
>>
>> (If you, like myself, prefer reading pre-rendered markdown, click here)
>>
>>
>>
>>
>> # Gathering the streams
>>
>> ## Introduction
>>
>> Java 8 introduced the `java.util.stream` API, which represents a lazily
>> computed, potentially unbounded sequence of values (Streams was also the first
>> designed-for-lambdas API in the JDK). Streams supports the ability to process
>> the stream either sequentially or in parallel.
>>
>> A `Stream` pipeline consists of a source (collection, array, generator, etc),
>> zero or more intermediate operations (`Stream` -> `Stream` transforms), and an
>> eager terminal operation which produces a value or a side-effect.
>>
>> The Streams API come with a reasonably rich, but fixed set of built-in
>> operations (mapping, filtering, reduction, sorting, etc), as well as an
>> extensible terminal operation (`Stream::collect`) that enables the stream
>> contents to be flexibly summarized in a variety of forms.  The resulting API is
>> rich enough that users have had good experience with streams, but there are
>> repeated requests for "please add operation X to streams".
>>
>> In this document, we explore a corresponding extensible _intermediate_
>> operation, called `Stream::gather`, which is able to address many of the
>> requests we've gotten for additional operations.
>>
>> ## "Missing" intermediate operations
>>
>> Over the years, many new operations for streams have been proposed.  Each of
>> them may be useful in some situation, but many of them are too narrow to be
>> included in the core `Stream` API.  We'd like for people to be able to plug
>> these operations in via an extension point, as with `Stream::collect`.
>>
>> Here are some examples of proposed intermediate operations that are not easily
>> expressible as intermediate operations on `Stream` today.
>>
>> * **Folds**.  Folding is a generalization of reduction.  With reduction, the
>>    result type is the same as the element type, the combiner is associative, and
>>    the initial value is an identity for the combiner.   For a fold, these
>>    conditions are not required, though we give up parallelizability.
>>
>>    Example: `foldLeft("", (str,elem) -> str + " " + elem.toString())` over the values
>>    `[1,2,3]` yields `["1 2 3"]`.
>>
>>    Seehttps://bugs.openjdk.org/browse/JDK-8133680  &https://bugs.openjdk.org/browse/JDK-8292845
>>
>> * **Unfolds**.  This takes an aggregate and decomposes it into elements.  An
>>    `unfold` can reverse the operation of the above fold example using `Scanner`
>>    or `String::split`.
>>
>>    Example: `unfold(e -> new Scanner(e), Scanner::hasNextInt, Scanner::nextInt)`
>>    `["1 2 3"]` yields `[1,2,3]`.
>>
>>    See:https://bugs.openjdk.org/browse/JDK-8151408
>>
>> * **Barriers**.  Usually all operations of a stream pipeline can run
>>    simultaneously as data is available.  In some cases, we may wish to have a
>>    barrier that requires that the entirety of one operation is completed before
>>    providing any elements to the next operation.
>>
>>    Example: `someStream.a(...).barrier(someEffect)...` would require that all
>>    effects from `a` are completed before executing `someEffect` or producing any
>>    downstream values.
>>
>>    See:https://bugs.openjdk.org/browse/JDK-8294246
>>
>> * **Windowing**.  Given a stream, we may wish to construct a stream which
>>    groups the original elements, either in overlapping or disjoint groups.
>>
>>    Example: `fixedWindow(2)` over the values `[1,2,3,4,5]` yields `[[1,2],[3,4],[5]]`.
>>    Example: `slidingWindow(2)` over the values `[1,2,3]` yields `[[1,2],[2,3],[3,4]]`.
>>
>>    See:https://stackoverflow.com/questions/34158634/how-to-transform-a-java-stream-into-a-sliding-window
>>
>> * **Prefix scans**. Prefix scans are a stream of incremental reductions.
>>    Perhaps surprisingly, some prefix scans are parallelizable, and we are exploring adding support for that beyond `Arrays.parallelPrefix()`.
>>
>>    Example: `scan((sum, next) -> sum + next)` over  values `[1,2,3,4]` yields `[1,3,6,10]`
>>
>>    See:https://stackoverflow.com/questions/55265797/cumulative-sum-using-java-8-stream-api
>>
>> * **Duplicated elements**.  The `distinct` operation will remove duplicates; sometimes we want only the elements that are duplicated.
>>
>>    Example: `duplicates(Object::equals)` over values `[1,1,2,3,4]` yields `[1]`
>>
>>    See:https://stackoverflow.com/questions/27677256/java-8-streams-to-find-the-duplicate-elements
>>
>> * **Running duplicate elimination**.  Here, we want to eliminate adjacent repeated elements.
>>
>>    Example: `collapseRuns(Object::equals)` over values `[1,1,2,3,3]` yields `[1,2,3]`
>>
>> These are just a few of the stream operations people have wished for—
>> and while many operations are too specialized to include in the core library,
>> all are things we'd like for people to be able to express with streams.
>>
>> ## Extending streams
>>
>> It was always desired for `java.util.stream.Stream` to have an extension point
>> for intermediate operations; when the library was first built, this wish-list
>> item was recorded:https://bugs.openjdk.org/browse/JDK-8132369.  There were many
>> reasons that this was not included in the initial version, the most important
>> being that it was not at all obvious what the right API was.
>>
>> The set of intermediate operations on Streams carries the exact same trade-off
>> as [CISC](Complex Instruction Set Computing) vs [RISC](Reduced Instruction Set
>> Computing), namely: either you have to provide a rather sizeable set of
>> specialized operations, or you provide a small set of general operations.
>>
>> For the **CISC**-strategy, the main drawback is that not all operations is
>> equally useful, and having a large set of operations to choose from creates a
>> barrier to effective usage by making the correct choice harder to identify.
>>
>> For the **RISC**-strategy, the main drawback is that it puts more effort on the
>> user to encode more specialized operations over a set of generic operations,
>> making code harder to read, and harder to maintain, for the user.
>>
>> However, one success story was `Collector`; this allowed us to extend _terminal_
>> operations with a good balance of expressiveness, reuse, and API footprint.  We
>> would like to do the same for intermediate operations -- and we can draw some
>> inspiration from `Collector` to get there.
>>
>> ## Requirements
>>
>> Breaking down operations into intermediate and terminal is a useful distinction, however,
>> in reality there are many more characteristics we could use to describe operations,
>> and relatively few operations use exactly the same subset of characteristics.
>> The following is a curated set of characteristics of stream operations;
>> if we want to enable generic, user-defineable, intermediate operations on streams,
>> we probably have to provide for them all.
>>
>> | Ordinal | Feature                  | What this means in practice                                 | Example operation(s)                |
>> |---------|--------------------------|-------------------------------------------------------------|-------------------------------------|
>> |    1    | Intermediate operation   | Add custom new operations to Stream pipelines               | *map*, *filter*                     |
>> |    2    | Incremental operation    | Evaluate the Stream pipeline depth-first                    | *map*, *filter*                     |
>> |    3    | Stateful operation       | Remember information between processing elements            | *limit*, *foldLeft*                 |
>> |    4    | N:M inputs-to-outputs    | Encode a wide array of operations                           | *limit*, *flatMap*                  |
>> |    5    | Prefix consumption       | An operation should consume only as much as needed          | *limit*, *takeWhile*                |
>> |    6    | Beginning-of-stream hook | Ability to have initialization logic                        | *foldLeft*, *grouped*               |
>> |    7    | End-of-stream hook       | Flush elements downstream                                   | *sorted*, *grouped*                 |
>> |    8    | Opt-in parallelizability | Speed up CPU-bound processing for parallelizable operations | *reduce*, *map*                     |
>>
>> Now when we have a better idea of what we need to achieve, let's have a look at
>> what our options are.
>>
>> ## Comparing existing operations
>>
>> Let's look at the operations we already have, to see if they would fit the bill
>> as a generic extension point for intermediate `Stream`-operations.
>>
>> These are, in order, `Stream.map()`, `Stream.flatMap(…)` (`Stream.mapMulti(…)`
>> is analogous), and `Collector` & `Stream.collect(…)`—and in the table below we
>> see how they line up against our requirements.
>>
>> | Ordinal | Feature                  | Using Stream.map(…)       | Using Stream.flatMap(…)   | Using Collector |
>> |---------|--------------------------|---------------------------|---------------------------|-----------------|
>> |    1    | Intermediate operation   | Yes                       | Yes                       | No, terminal    |
>> |    2    | Incremental operation    | Yes                       | Yes                       | No, terminal    |
>> |    3    | Stateful operation       | Yes, via class or capture | Yes, via class or capture | Yes             |
>> |    4    | N:M inputs-to-outputs    | No, 1:1                   | No, 1:M                   | No, N:1         |
>> |    5    | Prefix consumption       | No                        | No                        | No              |
>> |    6    | Beginning-of-stream hook | No                        | No                        | Yes             |
>> |    7    | End-of-stream hook       | No                        | No                        | Yes             |
>> |    8    | Opt-in parallelizability | No, always on             | No, always on             | No, always on   |
>>
>> Looking at the options above, none of them are going to, as-is, fit the bill as
>> an extension point for generic intermediate `Stream` operations, but one of them
>> has more promise than the others: `Collector`.  Obvious needed upgrades are
>> supporting short-circuiting and incremental generation of output; let's see what
>> that looks like in practice.
>>
>> ## Collector 2.0
>>
>> Let's start with a clone of `Collector<T,A,R>` called `Gatherer<T,A,R>`:
>>
>> ```java
>> /** @param <T> the element type
>>   *  @param <A> the (mutable) intermediate accumulation type
>>   *  @param <R> the (probably immutable) final accumulation type
>>   */
>> interface Gatherer<T,A,R> {
>>      Supplier<A> supplier();
>>      BiConsumer<A, T> accumulator();
>>      BinaryOperator<A> combiner();
>>      Function<A, R> finisher();
>> }
>> ```
>>
>> Now, we need to make it output 0...M elements instead of strictly 1.  In order
>> to do that, we need a way to programmatically output elements from the
>> `accumulator` to make it incremental (and has the added benefit that it would be
>> able to run depth-first).  Adding an additional argument, a downstream handle,
>> to `accumulator` and `finisher` allows us to push elements downstream both for
>> each input, and at the end of stream. As there exists no suitable pre-existing
>> interface, we'll define a new interface named `Integrator` and rename `accumulator`
>> to the more fitting `integrator`.
>> Our `finisher` will also not have a useful return value, so we'll make it `void`:
>>
>> ```java
>> /** @param <T> the element type
>>   *  @param <A> the (mutable) intermediate accumulation type
>>   *  @param <R> the (probably immutable) final accumulation type
>>   */
>> interface Gatherer<T,A,R> {
>>      interface Integrator<A,T,R> {
>>          void integrate(A state, T element, Consumer<? super R> downstream);
>>      }
>>      Supplier<A> supplier();
>>      Integrator<A, T, R> integrator();
>>      BinaryOperator<A> combiner();
>>      BiConsumer<A, Consumer<? super> R> finisher();
>> }
>> ```
>>
>> Next, we need to tackle short-circuiting and add an upstream signal from each
>> invocation of the `integrator` so we can detect when the integration does not
>> want any more elements. We do this by returning a boolean from `integrate`:
>>
>> ```java
>> /** @param <T> the element type
>>   *  @param <A> the (mutable) intermediate accumulation type
>>   *  @param <R> the (probably immutable) final accumulation type
>>   */
>> interface Gatherer<T,A,R> {
>>      interface Integrator<A,T,R> {
>>          boolean integrate(A state, T element, Consumer<? super R> downstream);
>>      }
>>      Supplier<A> supplier();
>>      Integrator<A, T, R> integrator();
>>      BinaryOperator<A> combiner();
>>      BiConsumer<A, Consumer<? super> R> finisher();
>> }
>> ```
>>
>> Transitively supporting short-circuiting, i.e. that a downstream handle does
>> not want to receive any more elements, requires us to make the same change as
>> we did for `integrate`—return a `boolean` instead of `void`, and since the
>> only built-in candidate would be `Predicate`, which is not supposed to have
>> side-effects, let's introduce our own type `Sink`:
>>
>> ```java
>> /** @param <T> the element type
>>   *  @param <A> the (mutable) intermediate accumulation type
>>   *  @param <R> the (probably immutable) final accumulation type
>>   */
>> interface Gatherer<T,A,R> {
>>      interface Sink<R> {
>>          boolean flush(R element);
>>      }
>>      interface Integrator<A,T,R> {
>>          boolean integrate(A state, T element, Sink<? super R> downstream);
>>      }
>>      Supplier<A> supplier();
>>      Integrator<A, T, R> integrator();
>>      BinaryOperator<A> combiner();
>>      BiConsumer<A, Sink<? super R>> finisher();
>> }
>> ```
>>
>> Finally, we need to address parallelism.  To opt-in to parallelism we can make
>> the combiner optional -- we call it only for parallel executions, and if it is
>> not present, execution is constrained to sequential.
>>
>> To demonstrate how all of the above fits together, we implement `fixedWindow(N)`
>> where `fixedWindow(2)` over the values `[1,2,3,4,5]` yields `[[1,2],[3,4],[5]]`.
>>
>> ```java
>>      /**
>>       * Gathers elements into fixed-size windows. The last window can contain fewer elements.
>>       * @param windowSize the size of the windows
>>       * @return a new gatherer which gathers elements into fixed-size windows
>>       * @param <T> the type of elements this Gatherer gathers
>>       */
>>      public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {
>>          if (windowSize < 1)
>>              throw new IllegalArgumentException("'windowSize' must be greater than zero");
>>          else {
>>              Supplier<ArrayList<T>> supplier =
>>                  () ->  new ArrayList<T>(windowSize);
>>
>>              Integrator<ArrayList<T>, T, List<T>> integrator =
>>                  (openWindow, element, downstream) -> {
>>                      if (openWindow.add(element) && openWindow.size() >= windowSize) {
>>                          var closedWindow = List.copyOf(openWindow);
>>                          openWindow.clear();
>>                          return downstream.flush(closedWindow);
>>                      } else
>>                          return true;
>>                  };
>>
>>              // This combiner signals that we opt-out of parallelisation
>>              BinaryOperator<ArrayList<T>> combiner =
>>                  Gatherer.unsupportedCombiner();
>>
>>              BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher =
>>                  (openWindow, downstream) -> {
>>                      if(!openWindow.isEmpty()) {
>>                          downstream.flush(List.copyOf(openWindow));
>>                          openWindow.clear();
>>                      }
>>                  };
>>
>>              return Gatherer.of(supplier, integrator, combiner, finisher);
>>          }
>>      }
>> ```
>>
>> Or by implementing the `Gatherer` interface directly like so:
>>
>> ```java
>>      /**
>>       * Gathers elements into fixed-size windows. The last window can contain fewer elements.
>>       * @param windowSize the size of the windows
>>       * @return a new gatherer which gathers elements into fixed-size windows
>>       * @param <T> the type of elements this Gatherer gathers
>>       */
>>      public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {
>>          if (windowSize < 1)
>>              throw new IllegalArgumentException("'windowSize' must be greater than zero");
>>
>>          class FixedWindowGatherer implements Gatherer<T,ArrayList<T>,List<T>> {
>>              @Override
>>              public Supplier<ArrayList<T>> initializer() {
>>                  return () -> new ArrayList<>(windowSize);
>>              }
>>
>>              @Override
>>              public Integrator<ArrayList<T>, T, List<T>> integrator() {
>>                  return (openWindow, element, downstream) -> {
>>                      if (openWindow.add(element) && openWindow.size() >= windowSize) {
>>                          var closedWindow = List.copyOf(openWindow);
>>                          openWindow.clear();
>>                          return downstream.flush(closedWindow);
>>                      } else
>>                          return true;
>>                  };
>>              }
>>
>>              @Override
>>              public BinaryOperator<ArrayList<T>> combiner() {
>>                  return Gatherer.unsupportedCombiner();
>>              }
>>
>>              @Override
>>              public BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher() {
>>                  return (openWindow, downstream) -> {
>>                      if(!openWindow.isEmpty()) {
>>                          downstream.flush(List.copyOf(openWindow));
>>                          openWindow.clear();
>>                      }
>>                  };
>>              }
>>          }
>>          return new FixedWindowGatherer();
>>      }
>> ```
>>
>> Compared to Collector, it would look like the following:
>>
>> | Ordinal | Feature                  | `Collector<T,A,R>`                | `Gatherer<T,A,R>`                     |
>> |---------|--------------------------|-----------------------------------|---------------------------------------|
>> |    1    | Beginning-of-stream hook | `A supply()`                      | `A supply()`                          |
>> |    2    | Per element              | `void accept(A, T)`               | `boolean integrate(A, T, Sink<R>)`    |
>> |    3    | End-of-stream hook       | `R apply(A)`                      | `void accept(A, Sink<R>)`             |
>> |    4    | Parallelizability        | `A apply(A, A)`                   | `A apply(A, A)`                       |
>>
>> To integrate it with `java.util.stream.Stream` we add the following method to
>> it: `Stream<R> gather(Gatherer<T,?,R> gatherer)`
>>
>> Now we can use our `fixedWindow` Gatherer:
>>
>> ```java
>> jshell> Stream.iterate(0, i -> i + 1).limit(10).gather(fixedWindow(2)).toList();
>> $1 ==> [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>> ```
>>
>> Gatherer now provides all features:
>>
>> | Ordinal | Feature                  | Using Gatherer          | Explanation                                                                            |
>> |---------|--------------------------|-------------------------|----------------------------------------------------------------------------------------|
>> |    1    | Intermediate operation   | Yes                     | A Gatherer's output is the following operation's input                                 |
>> |    2    | Incremental operation    | Yes                     | A Gatherer can produce elements in response to consuming elements                      |
>> |    3    | Stateful operation       | Yes                     | A Gatherer supplies its own state, which can be Void/null if stateless                 |
>> |    4    | N:M inputs-to-outputs    | Yes                     | A Gatherer can consume and produce any number of elements                              |
>> |    5    | Prefix consumption       | Yes                     | A Gatherer can signal that it is done by returning `false` from its `integrate`-method |
>> |    6    | Beginning-of-stream hook | Yes                     | A Gatherer can run logic when creating its initial state                               |
>> |    7    | End-of-stream hook       | Yes                     | A Gatherer's `finisher` is invoked at the end of input                                 |
>> |    8    | Opt-in parallelizability | Yes                     | A Gatherer's `combiner` is optional                                                    |
>>
>> ## Summary
>>
>> None of the current candidates for a user-extensible API for generic
>> intermediate stream operations fit the set of requirements, but `Collector`
>> can form an excellent basis for a new construct, called `Gatherer`,
>> which after a limited set of modifications checks all the boxes:
>>
>> | Ordinal | Feature                  | Using Stream.map(…)       | Using Stream.flatMap(…)   | Using Collector | Using Gatherer |
>> |---------|--------------------------|---------------------------|---------------------------|-----------------|----------------|
>> |    1    | Intermediate operation   | Yes                       | Yes                       | No, terminal    | Yes            |
>> |    2    | Incremental operation    | Yes                       | Yes                       | No, terminal    | Yes            |
>> |    3    | Stateful operation       | Yes, via class or capture | Yes, via class or capture | Yes             | Yes            |
>> |    4    | N:M inputs-to-outputs    | No, 1:1                   | No, 1:M                   | No, N:1         | Yes            |
>> |    5    | Prefix consumption       | No                        | No                        | No              | Yes            |
>> |    6    | Beginning-of-stream hook | No                        | No                        | Yes             | Yes            |
>> |    7    | End-of-stream hook       | No                        | No                        | Yes             | Yes            |
>> |    8    | Opt-in parallelizability | No, always on             | No, always on             | No, always on   | Yes            |
>>
>> As it turned out, we were a short hop away from a generic, stand-alone,
>> reusable, parallelizable, API for intermediate operations all along!
>>
>> ## But wait … there's more
>>
>> While the aforementioned solution contains the bare-minimum to meet our requirements,
>> the following will provide some significant quality-of-life improvements.
>>
>> ### Compositionality
>>
>> The astute reader may have noticed that by adding the `gather(Gatherer)`-method on `Stream`,
>> one can in-effect **compose** `Gatherer`s together:
>>
>> ```java
>> Gatherer<Integer,?,String> someGatherer = …;
>> Gatherer<String,?,Long> someOtherGatherer = …;
>> Stream<Long> stream = Stream.of(1).gather(someGatherer).gather(someOtherGatherer);
>> ```
>>
>> As it turns out, it is possible to define a `default Gatherer andThen(Gatherer)`-method on `Gatherer` which composes `Gatherer`s:
>>
>> ```java
>> Gatherer<Integer,?,String> someGatherer = …;
>> Gatherer<String,?,Long> someOtherGatherer = …;
>> Gatherer<Integer,?,Long> gatherer = someGatherer.andThen(someOtherGatherer);
>> ```
>>
>> This makes it possible to both de-couple operations from their use-sites,
>> as well as create more sophisticated `Gatherer`s from simple ones.
>>
>> But that's not all—by introducing a `default Collector collect(Collector)`-method
>> it is also possible to compose a `Gatherer` with a `Collector`,
>> and the result is another `Collector`:
>>
>> ```java
>> Gatherer<Integer,?,String> someGatherer = …;
>> Gatherer<String,?,Long> someOtherGatherer = …;
>> Collector<Long,?,List<Long>> someGatherer.andThen(someOtherGatherer).collect(Collectors.toList());
>> ```
>>
>> All of this together, it is now possible to build `Stream`-processing:
>> "source-to-destination" using `Stream.gather(…).gather(…).gather(…)`;
>> "intermediaries-first" using `gatherer.andThen(…).andThen(…);` and,
>> "destination-to-source" using `gatherer.collect(otherGatherer.collect(…))`.
>>
>> ### Familiarity
>>
>> Since `Gatherer` is an evolution of `Collector` there's a clear on-ramp
>> for any and all Java developers who are already familiar with `Collector`.
>>
>> ### Reusability
>>
>> `Gatherer`s, by virtue of being stand-alone, reusable, intermediaries,
>> paired with compositionality and opt-in parallelization, allows for transformations
>> to be created once and maintained in a single place, no matter if it is going to be
>> used as a `Collector`, a `Gatherer`, or a `Stream`.
>>
>> ### Optimizability
>>
>> `Collector` has the notion of `Characteristics`, a `Set` of flags which describes them.
>> This information can then be used by the underlying machinery which uses them
>> to optimize their evaluation. The same approach can be used for `Gatherer`s.
>>
>> ### Summary 2.0
>>
>> Taking all of the above into consideration, we end up with a definition like the following:
>>
>> ```java
>> /** @param <T> the element type
>>   *  @param <A> the (mutable) intermediate accumulation type
>>   *  @param <R> the (probably immutable) final accumulation type
>>   */
>> interface Gatherer<T,A,R> {
>>
>>      interface Sink<R> {
>>          boolean flush(R element);
>>      }
>>
>>      interface Integrator<A,T,R> {
>>          boolean integrate(A state, T element, Sink<? super R> downstream);
>>      }
>>
>>      enum Characteristics {
>>          GREEDY,          // Never short-circuits
>>          SIZE_PRESERVING, // Emits exactly once per input element
>>          STATELESS;       // No need to initialize or combine state
>>      }
>>
>>      Supplier<A> supplier();
>>      Integrator<A, T, R> integrator();
>>      BinaryOperator<A> combiner();
>>      BiConsumer<A, Sink<? super R>> finisher();
>>      Set<Characteristics> characteristics();
>>
>>      default <AA, RR> Gatherer<T,?,RR> andThen(Gatherer<R,AA,RR> that) {
>>          // Gatherers is analoguous to Collectors
>>          return Gatherers.Composite.of(this, that);
>>      }
>>
>>      default <RR> Collector<T,?,RR> collect(Collector<R, ?, RR> collector) {
>>          // Gatherers is analoguous to Collectors
>>          return Gatherers.GathererCollector.of(this, collector);
>>      }
>> }
>> ```
>>
>> ## Appetizers
>>
>> The following examples showcase how `Gatherer` can be used to implement a diverse set
>> of pre-existing, and future, intermediate operations for `Stream`s.
>>
>> ### How to implement `map(mapper)`
>>
>> ```java
>> public final static <T,R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
>>      return Gatherer.of(
>>          () -> (Void)null,
>>          (nothing, element, downstream) ->
>>              downstream.flush(mapper.apply(element)),
>>          (l,r) -> l,
>>          (nothing, downstream) -> {}
>>      );
>> }
>> ```
>>
>> ```java
>> jshell> Stream.of(1,2,3,4).gather(map(i -> i + 1)).toList()
>> $1 ==> [2, 3, 4, 5]
>> ```
>>
>> ### How to implement `flatMap(mapper)`
>>
>> ```java
>> public final static <T,R> Gatherer<T, ?, R> flatMap(Function<? super T, ? extends Stream<R>> mapper) {
>>      return Gatherer.of(
>>          () -> (Void)null,
>>          (nothing, element, downstream) -> {
>>              try(Stream<? extends R> s = mapper.apply(element)) {
>>                  return s == null || s.sequential().allMatch(downstream::flush);
>>              }
>>          },
>>          (l,r) -> l,
>>          (nothing, downstream) -> {}
>>      );
>> }
>> ```
>>
>> ```java
>> jshell> Stream.of(1,2,3,4).gather(flatMap(i -> Stream.of(i + 1))).toList()
>> $1 ==> [2, 3, 4, 5]
>> ```
>>
>> ### How to implement `takeWhile(predicate)`
>>
>> ```java
>> public final static <T> Gatherer<T, ?, T> takeWhile(Predicate<? super T> predicate) {
>>      return Gatherer.of(
>>          () -> (Void)null,
>>          (nothing, element, downstream) ->
>>              predicate.test(element) && downstream.flush(element),
>>          (l, r) -> l,
>>          (nothing, downstream) -> {}
>>      );
>> }
>> ```
>>
>> ```java
>> jshell> Stream.of(1,2,3,4).gather(takeWhile(i -> i < 3)).toList()
>> $1 ==> [1, 2]
>> ```
>>
>> ### How to implement `limit(N)`
>>
>> ```java
>> public static <M> Gatherer<M, ?, M> limit(long limit) {
>>      if (limit < 0)
>>          throw new IllegalArgumentException("'limit' has to be non-negative");
>>
>>      class At { long n = 0; }
>>      return Gatherer.of(
>>              At::new,
>>              (at, element, downstream) ->
>>                  at.n < limit && downstream.flush(element) && ++at.n < limit,
>>              Gatherer.unsupportedCombiner(),
>>              (at, downstream) -> {}
>>      );
>> }
>> ```
>>
>> ```java
>> jshell> Stream.of(1,2,3,4).gather(limit(2)).toList()
>> $1 ==> [1, 2]
>> ```
>>
>> ### How to implement prefix `scanLeft`
>>
>> ```java
>> public static <T,R> Gatherer<T,?,R> scanLeft(R initial, BiFunction<R,T,R> scanner) {
>>      class State { R current = initial; }
>>      return Gatherer.of(
>>              State::new,
>>              (state, element, downstream) ->
>>                  downstream.flush(state.current = scanner.apply(state.current, element)),
>>              Gatherer.unsupportedCombiner(),
>>              (state, downstream) -> {}
>>      );
>> }
>> ```
>>
>> ```java
>> jshell> Stream.of(1,2,3,4).gather(scanLeft("'", (acc, elem) -> acc + elem + "'")).toList()
>> $1 ==> ['1', '1'2', '1'2'3', '1'2'3'4']
>> ```
>>
>> ### How to implement `fold(initial, folder)`
>>
>> ```java
>> public static <T> Gatherer<T,?,T> fold(T initial, BinaryOperator<T> folder) {
>>      class State { T current = initial; }
>>      return Gatherer.of(
>>          State::new,
>>          (state, element, downstream) -> {
>>              state.current = folder.apply(state.current, element);
>>              return true;
>>          },
>>          (l, r) -> {
>>              l.current = folder.apply(l.current, r.current);
>>              return l;
>>          },
>>          (state, downstream) -> downstream.flush(state.current)
>>      );
>> }
>> ```
>>
>> ```java
>> jshell> Stream.of(1,2,3,4).gather(fold(0, (acc, elem) -> acc + elem)).toList()
>> $1 ==> [10]
>> ```
>>
>> ### How to implement `foldLeft(initial, folder)`
>>
>> ```java
>> public static <T,R> Gatherer<T,?,R> foldLeft(R initial, BiFunction<R,T,R> folder) {
>>      class State { R current = initial; }
>>      return Gatherer.of(
>>          State::new,
>>          (state, element, downstream) -> {
>>              state.current = folder.apply(state.current, element);
>>              return true;
>>          },
>>          Gatherer.unsupportedCombiner(),
>>          (state, downstream) -> downstream.flush(state.current)
>>      );
>> }
>> ```
>>
>> ```java
>> jshell> Stream.of(1,2,3,4).gather(foldLeft(0L, (acc, elem) -> acc + elem)).toList()
>> $1 ==> [10]
>> ```
>>
>> ### How to implement `deduplicateAdjacent()`
>>
>> ```java
>> public static <T> Gatherer<T,?,T> deduplicateAdjacent() {
>>      class State { T prev; boolean hasPrev; }
>>      return Gatherer.of(
>>          State::new,
>>          (state, element, downstream) -> {
>>              if (!state.hasPrev) {
>>                  state.hasPrev = true;
>>                  state.prev = element;
>>                  return downstream.flush(element);
>>              } else if (!Objects.equals(state.prev, element)) {
>>                  state.prev = element;
>>                  return downstream.flush(element);
>>              } else {
>>                  return true; // skip duplicate
>>              }
>>          },
>>          Gatherer.unsupportedCombiner(),
>>          (state, downstream) -> {}
>>      );
>> }
>> ```
>>
>> ```java
>> jshell> Stream.of(1,2,2,3,2,4).gather(deduplicateAdjacent()).toList()
>> $1 ==> [1, 2, 3, 2, 4]
>> ```
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20230628/2be28e4f/attachment-0001.htm>


More information about the core-libs-dev mailing list