<html><body><div style="font-family: arial, helvetica, sans-serif; font-size: 12pt; color: #000000"><div><br></div><div><br></div><hr id="zwchr" data-marker="__DIVIDER__"><div data-marker="__HEADERS__"><blockquote style="border-left:2px solid #1010FF;margin-left:5px;padding-left:5px;color:#000;font-weight:normal;font-style:normal;text-decoration:none;font-family:Helvetica,Arial,sans-serif;font-size:12pt;"><b>From: </b>"Viktor Klang" <viktor.klang@oracle.com><br><b>To: </b>"core-libs-dev" <core-libs-dev@openjdk.org><br><b>Sent: </b>Tuesday, June 27, 2023 7:10:42 PM<br><b>Subject: </b>java.util.stream.Stream: API for user-extensible intermediate operations<br></blockquote></div><div><style style="display:none;"> P {margin-top:0;margin-bottom:0;} </style></div><div data-marker="__QUOTED_TEXT__"><blockquote style="border-left: 2px solid #1010FF; margin-left: 5px; padding-left: 5px; color: #000; font-weight: normal; font-style: normal; text-decoration: none; font-family: Helvetica,Arial,sans-serif; font-size: 12pt;" data-mce-style="border-left: 2px solid #1010FF; margin-left: 5px; padding-left: 5px; color: #000; font-weight: normal; font-style: normal; text-decoration: none; font-family: Helvetica,Arial,sans-serif; font-size: 12pt;"><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;">Hi core-libs-dev,</div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"><br></div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;">Over the past 6+ months I've been thinking about, and tinkering with, how we'd be able to expose a user-facing API for extensible intermediate java.util.stream.Stream operations—a feature envisioned all the way back when Streams were created.<br></div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"><br></div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;">I'm now at a point where I have a viable design and implementation, and so I'm turning to you for your feedback: on the direction taken; the API concepts; and, in particular, is there anything which I have overlooked/missed?</div></blockquote><div><br></div><div>I think this API is overly generic and hard to reason about it, for users and IDEs. <br></div><div><br></div><div>The main issue is that the same API is used for both stateless and stateful operations, which means that as a user, we have no idea if a call to stream.gather() is stateful or not. Which is a departure from the current API that cleanly separate stateless and staful operations. Here, we are left in the dark. In a sense, this API is too powerful, it can do too much thing, so as a user we can not reason about it.</div><div><br></div><div>I like the idea of a Collector 2.0 i.e. using the Gatherer API at the end of the stream (not in the middle), but currently, the Gatherer API is not a Collector, so we now have two different APIs for doing partially the same job. I wonder if the Collector API can be retroffitted to act as a Gatherer API, avoiding to have to choose which one to use, a gatherer being the equivalent of a "flat-collector" + short-circuit.</div><div><br></div>The idea of unsupportedCombiner() seems out of place, like a patch to be able to clobble different things together. I'm not sure to understand why it's needed for a Gatherer, and why it is not needed for Collectors ?<br><div><br data-mce-bogus="1"></div><div>So I would prefer that API to extends the current Collector API but not the intermediary operations. Yes, it's less powerful.<br></div><div>It means that instead of using one stream with a collect like operation in the middle, users will have to use two streams, one after the other, but it makes the code easier to understand (also having two streams give users better control on which part should be in parallel).</div><div><br></div><div>Rémi<br></div><div><br></div><blockquote style="border-left: 2px solid #1010FF; margin-left: 5px; padding-left: 5px; color: #000; font-weight: normal; font-style: normal; text-decoration: none; font-family: Helvetica,Arial,sans-serif; font-size: 12pt;" data-mce-style="border-left: 2px solid #1010FF; margin-left: 5px; padding-left: 5px; color: #000; font-weight: normal; font-style: normal; text-decoration: none; font-family: Helvetica,Arial,sans-serif; font-size: 12pt;"><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof ContentPasted0" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"><div class="ContentPasted0 elementToProof"><br></div><div class="ContentPasted0 elementToProof">(If you, like myself, prefer reading pre-rendered markdown, <a href="https://cr.openjdk.org/~vklang/Gatherers.html" title="https://cr.openjdk.org/~vklang/Gatherers.html" id="LPlnk689721" target="_blank" rel="noopener" data-mce-href="https://cr.openjdk.org/~vklang/Gatherers.html"> click here</a>)<br></div></div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof ContentPasted0" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"><br></div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof ContentPasted0" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"><br></div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof ContentPasted0" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"><br></div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof ContentPasted0" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"><br></div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof ContentPasted0 ContentPasted1" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"># Gathering the streams<div><br class="ContentPasted1"></div><div class="ContentPasted1">## Introduction</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Java 8 introduced the `java.util.stream` API, which represents a lazily</div><div class="ContentPasted1">computed, potentially unbounded sequence of values (Streams was also the first</div><div class="ContentPasted1">designed-for-lambdas API in the JDK). Streams supports the ability to process</div><div class="ContentPasted1">the stream either sequentially or in parallel.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">A `Stream` pipeline consists of a source (collection, array, generator, etc),</div><div class="ContentPasted1">zero or more intermediate operations (`Stream` -> `Stream` transforms), and an</div><div class="ContentPasted1">eager terminal operation which produces a value or a side-effect.  </div><div><br class="ContentPasted1"></div><div class="ContentPasted1">The Streams API come with a reasonably rich, but fixed set of built-in</div><div class="ContentPasted1">operations (mapping, filtering, reduction, sorting, etc), as well as an</div><div class="ContentPasted1">extensible terminal operation (`Stream::collect`) that enables the stream</div><div class="ContentPasted1">contents to be flexibly summarized in a variety of forms.  The resulting API is</div><div class="ContentPasted1">rich enough that users have had good experience with streams, but there are</div><div class="ContentPasted1">repeated requests for "please add operation X to streams".  </div><div><br class="ContentPasted1"></div><div class="ContentPasted1">In this document, we explore a corresponding extensible _intermediate_</div><div class="ContentPasted1">operation, called `Stream::gather`, which is able to address many of the</div><div class="ContentPasted1">requests we've gotten for additional operations.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">## "Missing" intermediate operations</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Over the years, many new operations for streams have been proposed.  Each of</div><div class="ContentPasted1">them may be useful in some situation, but many of them are too narrow to be</div><div class="ContentPasted1">included in the core `Stream` API.  We'd like for people to be able to plug</div><div class="ContentPasted1">these operations in via an extension point, as with `Stream::collect`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Here are some examples of proposed intermediate operations that are not easily</div><div class="ContentPasted1">expressible as intermediate operations on `Stream` today.  </div><div><br class="ContentPasted1"></div><div class="ContentPasted1">* **Folds**.  Folding is a generalization of reduction.  With reduction, the</div><div class="ContentPasted1">  result type is the same as the element type, the combiner is associative, and</div><div class="ContentPasted1">  the initial value is an identity for the combiner.   For a fold, these</div><div class="ContentPasted1">  conditions are not required, though we give up parallelizability.  </div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  Example: `foldLeft("", (str,elem) -> str + " " + elem.toString())` over the values</div><div class="ContentPasted1">  `[1,2,3]` yields `["1 2 3"]`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  See https://bugs.openjdk.org/browse/JDK-8133680 & https://bugs.openjdk.org/browse/JDK-8292845</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">* **Unfolds**.  This takes an aggregate and decomposes it into elements.  An</div><div class="ContentPasted1">  `unfold` can reverse the operation of the above fold example using `Scanner`</div><div class="ContentPasted1">  or `String::split`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  Example: `unfold(e -> new Scanner(e), Scanner::hasNextInt, Scanner::nextInt)`</div><div class="ContentPasted1">  `["1 2 3"]` yields `[1,2,3]`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  See: https://bugs.openjdk.org/browse/JDK-8151408</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">* **Barriers**.  Usually all operations of a stream pipeline can run</div><div class="ContentPasted1">  simultaneously as data is available.  In some cases, we may wish to have a</div><div class="ContentPasted1">  barrier that requires that the entirety of one operation is completed before</div><div class="ContentPasted1">  providing any elements to the next operation.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  Example: `someStream.a(...).barrier(someEffect)...` would require that all</div><div class="ContentPasted1">  effects from `a` are completed before executing `someEffect` or producing any</div><div class="ContentPasted1">  downstream values.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  See: https://bugs.openjdk.org/browse/JDK-8294246</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">* **Windowing**.  Given a stream, we may wish to construct a stream which</div><div class="ContentPasted1">  groups the original elements, either in overlapping or disjoint groups.  </div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  Example: `fixedWindow(2)` over the values `[1,2,3,4,5]` yields `[[1,2],[3,4],[5]]`.</div><div class="ContentPasted1">  Example: `slidingWindow(2)` over the values `[1,2,3]` yields `[[1,2],[2,3],[3,4]]`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  See: https://stackoverflow.com/questions/34158634/how-to-transform-a-java-stream-into-a-sliding-window</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">* **Prefix scans**. Prefix scans are a stream of incremental reductions.</div><div class="ContentPasted1">  Perhaps surprisingly, some prefix scans are parallelizable, and we are exploring adding support for that beyond `Arrays.parallelPrefix()`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  Example: `scan((sum, next) -> sum + next)` over  values `[1,2,3,4]` yields `[1,3,6,10]`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  See: https://stackoverflow.com/questions/55265797/cumulative-sum-using-java-8-stream-api</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">* **Duplicated elements**.  The `distinct` operation will remove duplicates; sometimes we want only the elements that are duplicated.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  Example: `duplicates(Object::equals)` over values `[1,1,2,3,4]` yields `[1]`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  See: https://stackoverflow.com/questions/27677256/java-8-streams-to-find-the-duplicate-elements</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">* **Running duplicate elimination**.  Here, we want to eliminate adjacent repeated elements.  </div><div><br class="ContentPasted1"></div><div class="ContentPasted1">  Example: `collapseRuns(Object::equals)` over values `[1,1,2,3,3]` yields `[1,2,3]`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">These are just a few of the stream operations people have wished for—</div><div class="ContentPasted1">and while many operations are too specialized to include in the core library,</div><div class="ContentPasted1">all are things we'd like for people to be able to express with streams.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">## Extending streams</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">It was always desired for `java.util.stream.Stream` to have an extension point</div><div class="ContentPasted1">for intermediate operations; when the library was first built, this wish-list</div><div class="ContentPasted1">item was recorded: https://bugs.openjdk.org/browse/JDK-8132369.  There were many</div><div class="ContentPasted1">reasons that this was not included in the initial version, the most important</div><div class="ContentPasted1">being that it was not at all obvious what the right API was.  </div><div><br class="ContentPasted1"></div><div class="ContentPasted1">The set of intermediate operations on Streams carries the exact same trade-off</div><div class="ContentPasted1">as [CISC](Complex Instruction Set Computing) vs [RISC](Reduced Instruction Set</div><div class="ContentPasted1">Computing), namely: either you have to provide a rather sizeable set of</div><div class="ContentPasted1">specialized operations, or you provide a small set of general operations.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">For the **CISC**-strategy, the main drawback is that not all operations is</div><div class="ContentPasted1">equally useful, and having a large set of operations to choose from creates a</div><div class="ContentPasted1">barrier to effective usage by making the correct choice harder to identify.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">For the **RISC**-strategy, the main drawback is that it puts more effort on the</div><div class="ContentPasted1">user to encode more specialized operations over a set of generic operations,</div><div class="ContentPasted1">making code harder to read, and harder to maintain, for the user.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">However, one success story was `Collector`; this allowed us to extend _terminal_</div><div class="ContentPasted1">operations with a good balance of expressiveness, reuse, and API footprint.  We</div><div class="ContentPasted1">would like to do the same for intermediate operations -- and we can draw some</div><div class="ContentPasted1">inspiration from `Collector` to get there.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">## Requirements</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Breaking down operations into intermediate and terminal is a useful distinction, however,</div><div class="ContentPasted1">in reality there are many more characteristics we could use to describe operations,</div><div class="ContentPasted1">and relatively few operations use exactly the same subset of characteristics.</div><div class="ContentPasted1">The following is a curated set of characteristics of stream operations;</div><div class="ContentPasted1">if we want to enable generic, user-defineable, intermediate operations on streams,</div><div class="ContentPasted1">we probably have to provide for them all.  </div><div><br class="ContentPasted1"></div><div class="ContentPasted1">| Ordinal | Feature                  | What this means in practice                                 | Example operation(s)                |</div><div class="ContentPasted1">|---------|--------------------------|-------------------------------------------------------------|-------------------------------------|</div><div class="ContentPasted1">|    1    | Intermediate operation   | Add custom new operations to Stream pipelines               | *map*, *filter*                     |</div><div class="ContentPasted1">|    2    | Incremental operation    | Evaluate the Stream pipeline depth-first                    | *map*, *filter*                     |</div><div class="ContentPasted1">|    3    | Stateful operation       | Remember information between processing elements            | *limit*, *foldLeft*                 |</div><div class="ContentPasted1">|    4    | N:M inputs-to-outputs    | Encode a wide array of operations                           | *limit*, *flatMap*                  |</div><div class="ContentPasted1">|    5    | Prefix consumption       | An operation should consume only as much as needed          | *limit*, *takeWhile*                |</div><div class="ContentPasted1">|    6    | Beginning-of-stream hook | Ability to have initialization logic                        | *foldLeft*, *grouped*               |</div><div class="ContentPasted1">|    7    | End-of-stream hook       | Flush elements downstream                                   | *sorted*, *grouped*                 |</div><div class="ContentPasted1">|    8    | Opt-in parallelizability | Speed up CPU-bound processing for parallelizable operations | *reduce*, *map*                     |</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Now when we have a better idea of what we need to achieve, let's have a look at</div><div class="ContentPasted1">what our options are.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">## Comparing existing operations</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Let's look at the operations we already have, to see if they would fit the bill</div><div class="ContentPasted1">as a generic extension point for intermediate `Stream`-operations.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">These are, in order, `Stream.map()`, `Stream.flatMap(…)` (`Stream.mapMulti(…)`</div><div class="ContentPasted1">is analogous), and `Collector` & `Stream.collect(…)`—and in the table below we</div><div class="ContentPasted1">see how they line up against our requirements.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">| Ordinal | Feature                  | Using Stream.map(…)       | Using Stream.flatMap(…)   | Using Collector |</div><div class="ContentPasted1">|---------|--------------------------|---------------------------|---------------------------|-----------------|</div><div class="ContentPasted1">|    1    | Intermediate operation   | Yes                       | Yes                       | No, terminal    |</div><div class="ContentPasted1">|    2    | Incremental operation    | Yes                       | Yes                       | No, terminal    |</div><div class="ContentPasted1">|    3    | Stateful operation       | Yes, via class or capture | Yes, via class or capture | Yes             |</div><div class="ContentPasted1">|    4    | N:M inputs-to-outputs    | No, 1:1                   | No, 1:M                   | No, N:1         |</div><div class="ContentPasted1">|    5    | Prefix consumption       | No                        | No                        | No              |</div><div class="ContentPasted1">|    6    | Beginning-of-stream hook | No                        | No                        | Yes             |</div><div class="ContentPasted1">|    7    | End-of-stream hook       | No                        | No                        | Yes             |</div><div class="ContentPasted1">|    8    | Opt-in parallelizability | No, always on             | No, always on             | No, always on   |</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Looking at the options above, none of them are going to, as-is, fit the bill as</div><div class="ContentPasted1">an extension point for generic intermediate `Stream` operations, but one of them</div><div class="ContentPasted1">has more promise than the others: `Collector`.  Obvious needed upgrades are</div><div class="ContentPasted1">supporting short-circuiting and incremental generation of output; let's see what</div><div class="ContentPasted1">that looks like in practice.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">## Collector 2.0</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Let's start with a clone of `Collector<T,A,R>` called `Gatherer<T,A,R>`:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">/** @param <T> the element type</div><div class="ContentPasted1"> *  @param <A> the (mutable) intermediate accumulation type</div><div class="ContentPasted1"> *  @param <R> the (probably immutable) final accumulation type</div><div class="ContentPasted1"> */</div><div class="ContentPasted1">interface Gatherer<T,A,R> {</div><div class="ContentPasted1">    Supplier<A> supplier();</div><div class="ContentPasted1">    BiConsumer<A, T> accumulator();</div><div class="ContentPasted1">    BinaryOperator<A> combiner();</div><div class="ContentPasted1">    Function<A, R> finisher();</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Now, we need to make it output 0...M elements instead of strictly 1.  In order</div><div class="ContentPasted1">to do that, we need a way to programmatically output elements from the</div><div class="ContentPasted1">`accumulator` to make it incremental (and has the added benefit that it would be</div><div class="ContentPasted1">able to run depth-first).  Adding an additional argument, a downstream handle,</div><div class="ContentPasted1">to `accumulator` and `finisher` allows us to push elements downstream both for</div><div class="ContentPasted1">each input, and at the end of stream. As there exists no suitable pre-existing</div><div class="ContentPasted1">interface, we'll define a new interface named `Integrator` and rename `accumulator`</div><div class="ContentPasted1">to the more fitting `integrator`.</div><div class="ContentPasted1">Our `finisher` will also not have a useful return value, so we'll make it `void`:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">/** @param <T> the element type</div><div class="ContentPasted1"> *  @param <A> the (mutable) intermediate accumulation type</div><div class="ContentPasted1"> *  @param <R> the (probably immutable) final accumulation type</div><div class="ContentPasted1"> */</div><div class="ContentPasted1">interface Gatherer<T,A,R> {</div><div class="ContentPasted1">    interface Integrator<A,T,R> {</div><div class="ContentPasted1">        void integrate(A state, T element, Consumer<? super R> downstream);</div><div class="ContentPasted1">    }</div><div class="ContentPasted1">    Supplier<A> supplier();</div><div class="ContentPasted1">    Integrator<A, T, R> integrator();</div><div class="ContentPasted1">    BinaryOperator<A> combiner();</div><div class="ContentPasted1">    BiConsumer<A, Consumer<? super> R> finisher();</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Next, we need to tackle short-circuiting and add an upstream signal from each</div><div class="ContentPasted1">invocation of the `integrator` so we can detect when the integration does not</div><div class="ContentPasted1">want any more elements. We do this by returning a boolean from `integrate`:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">/** @param <T> the element type</div><div class="ContentPasted1"> *  @param <A> the (mutable) intermediate accumulation type</div><div class="ContentPasted1"> *  @param <R> the (probably immutable) final accumulation type</div><div class="ContentPasted1"> */</div><div class="ContentPasted1">interface Gatherer<T,A,R> {</div><div class="ContentPasted1">    interface Integrator<A,T,R> {</div><div class="ContentPasted1">        boolean integrate(A state, T element, Consumer<? super R> downstream);</div><div class="ContentPasted1">    }</div><div class="ContentPasted1">    Supplier<A> supplier();</div><div class="ContentPasted1">    Integrator<A, T, R> integrator();</div><div class="ContentPasted1">    BinaryOperator<A> combiner();</div><div class="ContentPasted1">    BiConsumer<A, Consumer<? super> R> finisher();</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Transitively supporting short-circuiting, i.e. that a downstream handle does</div><div class="ContentPasted1">not want to receive any more elements, requires us to make the same change as</div><div class="ContentPasted1">we did for `integrate`—return a `boolean` instead of `void`, and since the</div><div class="ContentPasted1">only built-in candidate would be `Predicate`, which is not supposed to have</div><div class="ContentPasted1">side-effects, let's introduce our own type `Sink`:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">/** @param <T> the element type</div><div class="ContentPasted1"> *  @param <A> the (mutable) intermediate accumulation type</div><div class="ContentPasted1"> *  @param <R> the (probably immutable) final accumulation type</div><div class="ContentPasted1"> */</div><div class="ContentPasted1">interface Gatherer<T,A,R> {</div><div class="ContentPasted1">    interface Sink<R> {</div><div class="ContentPasted1">        boolean flush(R element);</div><div class="ContentPasted1">    }</div><div class="ContentPasted1">    interface Integrator<A,T,R> {</div><div class="ContentPasted1">        boolean integrate(A state, T element, Sink<? super R> downstream);</div><div class="ContentPasted1">    }</div><div class="ContentPasted1">    Supplier<A> supplier();</div><div class="ContentPasted1">    Integrator<A, T, R> integrator();</div><div class="ContentPasted1">    BinaryOperator<A> combiner();</div><div class="ContentPasted1">    BiConsumer<A, Sink<? super R>> finisher();</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Finally, we need to address parallelism.  To opt-in to parallelism we can make</div><div class="ContentPasted1">the combiner optional -- we call it only for parallel executions, and if it is</div><div class="ContentPasted1">not present, execution is constrained to sequential.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">To demonstrate how all of the above fits together, we implement `fixedWindow(N)`</div><div class="ContentPasted1">where `fixedWindow(2)` over the values `[1,2,3,4,5]` yields `[[1,2],[3,4],[5]]`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">    /**</div><div class="ContentPasted1">     * Gathers elements into fixed-size windows. The last window can contain fewer elements.</div><div class="ContentPasted1">     * @param windowSize the size of the windows</div><div class="ContentPasted1">     * @return a new gatherer which gathers elements into fixed-size windows</div><div class="ContentPasted1">     * @param <T> the type of elements this Gatherer gathers</div><div class="ContentPasted1">     */</div><div class="ContentPasted1">    public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {</div><div class="ContentPasted1">        if (windowSize < 1)</div><div class="ContentPasted1">            throw new IllegalArgumentException("'windowSize' must be greater than zero");</div><div class="ContentPasted1">        else {</div><div class="ContentPasted1">            Supplier<ArrayList<T>> supplier =</div><div class="ContentPasted1">                () ->  new ArrayList<T>(windowSize);</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">            Integrator<ArrayList<T>, T, List<T>> integrator =</div><div class="ContentPasted1">                (openWindow, element, downstream) -> {</div><div class="ContentPasted1">                    if (openWindow.add(element) && openWindow.size() >= windowSize) {</div><div class="ContentPasted1">                        var closedWindow = List.copyOf(openWindow);</div><div class="ContentPasted1">                        openWindow.clear();</div><div class="ContentPasted1">                        return downstream.flush(closedWindow);</div><div class="ContentPasted1">                    } else</div><div class="ContentPasted1">                        return true;</div><div class="ContentPasted1">                };</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">            // This combiner signals that we opt-out of parallelisation</div><div class="ContentPasted1">            BinaryOperator<ArrayList<T>> combiner =</div><div class="ContentPasted1">                Gatherer.unsupportedCombiner();</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">            BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher =</div><div class="ContentPasted1">                (openWindow, downstream) -> {</div><div class="ContentPasted1">                    if(!openWindow.isEmpty()) {</div><div class="ContentPasted1">                        downstream.flush(List.copyOf(openWindow));</div><div class="ContentPasted1">                        openWindow.clear();</div><div class="ContentPasted1">                    }</div><div class="ContentPasted1">                };</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">            return Gatherer.of(supplier, integrator, combiner, finisher);</div><div class="ContentPasted1">        }</div><div class="ContentPasted1">    }</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Or by implementing the `Gatherer` interface directly like so:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">    /**</div><div class="ContentPasted1">     * Gathers elements into fixed-size windows. The last window can contain fewer elements.</div><div class="ContentPasted1">     * @param windowSize the size of the windows</div><div class="ContentPasted1">     * @return a new gatherer which gathers elements into fixed-size windows</div><div class="ContentPasted1">     * @param <T> the type of elements this Gatherer gathers</div><div class="ContentPasted1">     */</div><div class="ContentPasted1">    public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {</div><div class="ContentPasted1">        if (windowSize < 1)</div><div class="ContentPasted1">            throw new IllegalArgumentException("'windowSize' must be greater than zero");</div><div class="ContentPasted1">       </div><div class="ContentPasted1">        class FixedWindowGatherer implements Gatherer<T,ArrayList<T>,List<T>> {</div><div class="ContentPasted1">            @Override</div><div class="ContentPasted1">            public Supplier<ArrayList<T>> initializer() {</div><div class="ContentPasted1">                return () -> new ArrayList<>(windowSize);</div><div class="ContentPasted1">            }</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">            @Override</div><div class="ContentPasted1">            public Integrator<ArrayList<T>, T, List<T>> integrator() {</div><div class="ContentPasted1">                return (openWindow, element, downstream) -> {</div><div class="ContentPasted1">                    if (openWindow.add(element) && openWindow.size() >= windowSize) {</div><div class="ContentPasted1">                        var closedWindow = List.copyOf(openWindow);</div><div class="ContentPasted1">                        openWindow.clear();</div><div class="ContentPasted1">                        return downstream.flush(closedWindow);</div><div class="ContentPasted1">                    } else</div><div class="ContentPasted1">                        return true;</div><div class="ContentPasted1">                };</div><div class="ContentPasted1">            }</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">            @Override</div><div class="ContentPasted1">            public BinaryOperator<ArrayList<T>> combiner() {</div><div class="ContentPasted1">                return Gatherer.unsupportedCombiner();</div><div class="ContentPasted1">            }</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">            @Override</div><div class="ContentPasted1">            public BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher() {</div><div class="ContentPasted1">                return (openWindow, downstream) -> {</div><div class="ContentPasted1">                    if(!openWindow.isEmpty()) {</div><div class="ContentPasted1">                        downstream.flush(List.copyOf(openWindow));</div><div class="ContentPasted1">                        openWindow.clear();</div><div class="ContentPasted1">                    }</div><div class="ContentPasted1">                };</div><div class="ContentPasted1">            }</div><div class="ContentPasted1">        }</div><div class="ContentPasted1">        return new FixedWindowGatherer();</div><div class="ContentPasted1">    }</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Compared to Collector, it would look like the following:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">| Ordinal | Feature                  | `Collector<T,A,R>`                | `Gatherer<T,A,R>`                     |</div><div class="ContentPasted1">|---------|--------------------------|-----------------------------------|---------------------------------------|</div><div class="ContentPasted1">|    1    | Beginning-of-stream hook | `A supply()`                      | `A supply()`                          |</div><div class="ContentPasted1">|    2    | Per element              | `void accept(A, T)`               | `boolean integrate(A, T, Sink<R>)`    |</div><div class="ContentPasted1">|    3    | End-of-stream hook       | `R apply(A)`                      | `void accept(A, Sink<R>)`             |</div><div class="ContentPasted1">|    4    | Parallelizability        | `A apply(A, A)`                   | `A apply(A, A)`                       |</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">To integrate it with `java.util.stream.Stream` we add the following method to</div><div class="ContentPasted1">it: `Stream<R> gather(Gatherer<T,?,R> gatherer)`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Now we can use our `fixedWindow` Gatherer:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.iterate(0, i -> i + 1).limit(10).gather(fixedWindow(2)).toList();</div><div class="ContentPasted1">$1 ==> [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Gatherer now provides all features:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">| Ordinal | Feature                  | Using Gatherer          | Explanation                                                                            |</div><div class="ContentPasted1">|---------|--------------------------|-------------------------|----------------------------------------------------------------------------------------|</div><div class="ContentPasted1">|    1    | Intermediate operation   | Yes                     | A Gatherer's output is the following operation's input                                 |</div><div class="ContentPasted1">|    2    | Incremental operation    | Yes                     | A Gatherer can produce elements in response to consuming elements                      |</div><div class="ContentPasted1">|    3    | Stateful operation       | Yes                     | A Gatherer supplies its own state, which can be Void/null if stateless                 |</div><div class="ContentPasted1">|    4    | N:M inputs-to-outputs    | Yes                     | A Gatherer can consume and produce any number of elements                              |</div><div class="ContentPasted1">|    5    | Prefix consumption       | Yes                     | A Gatherer can signal that it is done by returning `false` from its `integrate`-method |</div><div class="ContentPasted1">|    6    | Beginning-of-stream hook | Yes                     | A Gatherer can run logic when creating its initial state                               |</div><div class="ContentPasted1">|    7    | End-of-stream hook       | Yes                     | A Gatherer's `finisher` is invoked at the end of input                                 |</div><div class="ContentPasted1">|    8    | Opt-in parallelizability | Yes                     | A Gatherer's `combiner` is optional                                                    |</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">## Summary</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">None of the current candidates for a user-extensible API for generic</div><div class="ContentPasted1">intermediate stream operations fit the set of requirements, but `Collector`</div><div class="ContentPasted1">can form an excellent basis for a new construct, called `Gatherer`,</div><div class="ContentPasted1">which after a limited set of modifications checks all the boxes:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">| Ordinal | Feature                  | Using Stream.map(…)       | Using Stream.flatMap(…)   | Using Collector | Using Gatherer |</div><div class="ContentPasted1">|---------|--------------------------|---------------------------|---------------------------|-----------------|----------------|</div><div class="ContentPasted1">|    1    | Intermediate operation   | Yes                       | Yes                       | No, terminal    | Yes            |</div><div class="ContentPasted1">|    2    | Incremental operation    | Yes                       | Yes                       | No, terminal    | Yes            |</div><div class="ContentPasted1">|    3    | Stateful operation       | Yes, via class or capture | Yes, via class or capture | Yes             | Yes            |</div><div class="ContentPasted1">|    4    | N:M inputs-to-outputs    | No, 1:1                   | No, 1:M                   | No, N:1         | Yes            |</div><div class="ContentPasted1">|    5    | Prefix consumption       | No                        | No                        | No              | Yes            |</div><div class="ContentPasted1">|    6    | Beginning-of-stream hook | No                        | No                        | Yes             | Yes            |</div><div class="ContentPasted1">|    7    | End-of-stream hook       | No                        | No                        | Yes             | Yes            |</div><div class="ContentPasted1">|    8    | Opt-in parallelizability | No, always on             | No, always on             | No, always on   | Yes            |</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">As it turned out, we were a short hop away from a generic, stand-alone,</div><div class="ContentPasted1">reusable, parallelizable, API for intermediate operations all along!</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">## But wait … there's more</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">While the aforementioned solution contains the bare-minimum to meet our requirements,</div><div class="ContentPasted1">the following will provide some significant quality-of-life improvements.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### Compositionality</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">The astute reader may have noticed that by adding the `gather(Gatherer)`-method on `Stream`,</div><div class="ContentPasted1">one can in-effect **compose** `Gatherer`s together:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">Gatherer<Integer,?,String> someGatherer = …;</div><div class="ContentPasted1">Gatherer<String,?,Long> someOtherGatherer = …;</div><div class="ContentPasted1">Stream<Long> stream = Stream.of(1).gather(someGatherer).gather(someOtherGatherer);</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">As it turns out, it is possible to define a `default Gatherer andThen(Gatherer)`-method on `Gatherer` which composes `Gatherer`s:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">Gatherer<Integer,?,String> someGatherer = …;</div><div class="ContentPasted1">Gatherer<String,?,Long> someOtherGatherer = …;</div><div class="ContentPasted1">Gatherer<Integer,?,Long> gatherer = someGatherer.andThen(someOtherGatherer);</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">This makes it possible to both de-couple operations from their use-sites,</div><div class="ContentPasted1">as well as create more sophisticated `Gatherer`s from simple ones.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">But that's not all—by introducing a `default Collector collect(Collector)`-method</div><div class="ContentPasted1">it is also possible to compose a `Gatherer` with a `Collector`,</div><div class="ContentPasted1">and the result is another `Collector`:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">Gatherer<Integer,?,String> someGatherer = …;</div><div class="ContentPasted1">Gatherer<String,?,Long> someOtherGatherer = …;</div><div class="ContentPasted1">Collector<Long,?,List<Long>> someGatherer.andThen(someOtherGatherer).collect(Collectors.toList());</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">All of this together, it is now possible to build `Stream`-processing:</div><div class="ContentPasted1">"source-to-destination" using `Stream.gather(…).gather(…).gather(…)`;</div><div class="ContentPasted1">"intermediaries-first" using `gatherer.andThen(…).andThen(…);` and,</div><div class="ContentPasted1">"destination-to-source" using `gatherer.collect(otherGatherer.collect(…))`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### Familiarity</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Since `Gatherer` is an evolution of `Collector` there's a clear on-ramp</div><div class="ContentPasted1">for any and all Java developers who are already familiar with `Collector`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### Reusability</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">`Gatherer`s, by virtue of being stand-alone, reusable, intermediaries,</div><div class="ContentPasted1">paired with compositionality and opt-in parallelization, allows for transformations</div><div class="ContentPasted1">to be created once and maintained in a single place, no matter if it is going to be</div><div class="ContentPasted1">used as a `Collector`, a `Gatherer`, or a `Stream`.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### Optimizability</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">`Collector` has the notion of `Characteristics`, a `Set` of flags which describes them.</div><div class="ContentPasted1">This information can then be used by the underlying machinery which uses them</div><div class="ContentPasted1">to optimize their evaluation. The same approach can be used for `Gatherer`s.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### Summary 2.0</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">Taking all of the above into consideration, we end up with a definition like the following:</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">/** @param <T> the element type</div><div class="ContentPasted1"> *  @param <A> the (mutable) intermediate accumulation type</div><div class="ContentPasted1"> *  @param <R> the (probably immutable) final accumulation type</div><div class="ContentPasted1"> */</div><div class="ContentPasted1">interface Gatherer<T,A,R> {</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">    interface Sink<R> {</div><div class="ContentPasted1">        boolean flush(R element);</div><div class="ContentPasted1">    }</div><div class="ContentPasted1">   </div><div class="ContentPasted1">    interface Integrator<A,T,R> {</div><div class="ContentPasted1">        boolean integrate(A state, T element, Sink<? super R> downstream);</div><div class="ContentPasted1">    }</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">    enum Characteristics {</div><div class="ContentPasted1">        GREEDY,          // Never short-circuits</div><div class="ContentPasted1">        SIZE_PRESERVING, // Emits exactly once per input element</div><div class="ContentPasted1">        STATELESS;       // No need to initialize or combine state</div><div class="ContentPasted1">    }</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">    Supplier<A> supplier();</div><div class="ContentPasted1">    Integrator<A, T, R> integrator();</div><div class="ContentPasted1">    BinaryOperator<A> combiner();</div><div class="ContentPasted1">    BiConsumer<A, Sink<? super R>> finisher();</div><div class="ContentPasted1">    Set<Characteristics> characteristics();</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">    default <AA, RR> Gatherer<T,?,RR> andThen(Gatherer<R,AA,RR> that) {</div><div class="ContentPasted1">        // Gatherers is analoguous to Collectors</div><div class="ContentPasted1">        return Gatherers.Composite.of(this, that);</div><div class="ContentPasted1">    }</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">    default <RR> Collector<T,?,RR> collect(Collector<R, ?, RR> collector) {</div><div class="ContentPasted1">        // Gatherers is analoguous to Collectors</div><div class="ContentPasted1">        return Gatherers.GathererCollector.of(this, collector);</div><div class="ContentPasted1">    }</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">## Appetizers</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">The following examples showcase how `Gatherer` can be used to implement a diverse set</div><div class="ContentPasted1">of pre-existing, and future, intermediate operations for `Stream`s.</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### How to implement `map(mapper)`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">public final static <T,R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {</div><div class="ContentPasted1">    return Gatherer.of(</div><div class="ContentPasted1">        () -> (Void)null,</div><div class="ContentPasted1">        (nothing, element, downstream) -></div><div class="ContentPasted1">            downstream.flush(mapper.apply(element)),</div><div class="ContentPasted1">        (l,r) -> l,</div><div class="ContentPasted1">        (nothing, downstream) -> {}</div><div class="ContentPasted1">    );</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(map(i -> i + 1)).toList()</div><div class="ContentPasted1">$1 ==> [2, 3, 4, 5]</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### How to implement `flatMap(mapper)`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">public final static <T,R> Gatherer<T, ?, R> flatMap(Function<? super T, ? extends Stream<R>> mapper) {</div><div class="ContentPasted1">    return Gatherer.of(</div><div class="ContentPasted1">        () -> (Void)null,</div><div class="ContentPasted1">        (nothing, element, downstream) -> {</div><div class="ContentPasted1">            try(Stream<? extends R> s = mapper.apply(element)) {</div><div class="ContentPasted1">                return s == null || s.sequential().allMatch(downstream::flush);</div><div class="ContentPasted1">            }</div><div class="ContentPasted1">        },</div><div class="ContentPasted1">        (l,r) -> l,</div><div class="ContentPasted1">        (nothing, downstream) -> {}</div><div class="ContentPasted1">    );</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(flatMap(i -> Stream.of(i + 1))).toList()</div><div class="ContentPasted1">$1 ==> [2, 3, 4, 5]</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### How to implement `takeWhile(predicate)`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">public final static <T> Gatherer<T, ?, T> takeWhile(Predicate<? super T> predicate) {</div><div class="ContentPasted1">    return Gatherer.of(</div><div class="ContentPasted1">        () -> (Void)null,</div><div class="ContentPasted1">        (nothing, element, downstream) -></div><div class="ContentPasted1">            predicate.test(element) && downstream.flush(element),</div><div class="ContentPasted1">        (l, r) -> l,</div><div class="ContentPasted1">        (nothing, downstream) -> {}</div><div class="ContentPasted1">    );</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(takeWhile(i -> i < 3)).toList()</div><div class="ContentPasted1">$1 ==> [1, 2]</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### How to implement `limit(N)`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">public static <M> Gatherer<M, ?, M> limit(long limit) {</div><div class="ContentPasted1">    if (limit < 0)</div><div class="ContentPasted1">        throw new IllegalArgumentException("'limit' has to be non-negative");</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">    class At { long n = 0; }</div><div class="ContentPasted1">    return Gatherer.of(</div><div class="ContentPasted1">            At::new,</div><div class="ContentPasted1">            (at, element, downstream) -></div><div class="ContentPasted1">                at.n < limit && downstream.flush(element) && ++at.n < limit,</div><div class="ContentPasted1">            Gatherer.unsupportedCombiner(),</div><div class="ContentPasted1">            (at, downstream) -> {}</div><div class="ContentPasted1">    );</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(limit(2)).toList()</div><div class="ContentPasted1">$1 ==> [1, 2]</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### How to implement prefix `scanLeft`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">public static <T,R> Gatherer<T,?,R> scanLeft(R initial, BiFunction<R,T,R> scanner) {</div><div class="ContentPasted1">    class State { R current = initial; }</div><div class="ContentPasted1">    return Gatherer.of(</div><div class="ContentPasted1">            State::new,</div><div class="ContentPasted1">            (state, element, downstream) -></div><div class="ContentPasted1">                downstream.flush(state.current = scanner.apply(state.current, element)),</div><div class="ContentPasted1">            Gatherer.unsupportedCombiner(),</div><div class="ContentPasted1">            (state, downstream) -> {}</div><div class="ContentPasted1">    );</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(scanLeft("'", (acc, elem) -> acc + elem + "'")).toList()</div><div class="ContentPasted1">$1 ==> ['1', '1'2', '1'2'3', '1'2'3'4']</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### How to implement `fold(initial, folder)`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">public static <T> Gatherer<T,?,T> fold(T initial, BinaryOperator<T> folder) {</div><div class="ContentPasted1">    class State { T current = initial; }</div><div class="ContentPasted1">    return Gatherer.of(</div><div class="ContentPasted1">        State::new,</div><div class="ContentPasted1">        (state, element, downstream) -> {</div><div class="ContentPasted1">            state.current = folder.apply(state.current, element);</div><div class="ContentPasted1">            return true;</div><div class="ContentPasted1">        },</div><div class="ContentPasted1">        (l, r) -> {</div><div class="ContentPasted1">            l.current = folder.apply(l.current, r.current);</div><div class="ContentPasted1">            return l;</div><div class="ContentPasted1">        },</div><div class="ContentPasted1">        (state, downstream) -> downstream.flush(state.current)</div><div class="ContentPasted1">    );</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(fold(0, (acc, elem) -> acc + elem)).toList()</div><div class="ContentPasted1">$1 ==> [10]</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### How to implement `foldLeft(initial, folder)`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">public static <T,R> Gatherer<T,?,R> foldLeft(R initial, BiFunction<R,T,R> folder) {</div><div class="ContentPasted1">    class State { R current = initial; }</div><div class="ContentPasted1">    return Gatherer.of(</div><div class="ContentPasted1">        State::new,</div><div class="ContentPasted1">        (state, element, downstream) -> {</div><div class="ContentPasted1">            state.current = folder.apply(state.current, element);</div><div class="ContentPasted1">            return true;</div><div class="ContentPasted1">        },</div><div class="ContentPasted1">        Gatherer.unsupportedCombiner(),</div><div class="ContentPasted1">        (state, downstream) -> downstream.flush(state.current)</div><div class="ContentPasted1">    );</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(foldLeft(0L, (acc, elem) -> acc + elem)).toList()</div><div class="ContentPasted1">$1 ==> [10]</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">### How to implement `deduplicateAdjacent()`</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">public static <T> Gatherer<T,?,T> deduplicateAdjacent() {</div><div class="ContentPasted1">    class State { T prev; boolean hasPrev; }</div><div class="ContentPasted1">    return Gatherer.of(</div><div class="ContentPasted1">        State::new,</div><div class="ContentPasted1">        (state, element, downstream) -> {</div><div class="ContentPasted1">            if (!state.hasPrev) {</div><div class="ContentPasted1">                state.hasPrev = true;</div><div class="ContentPasted1">                state.prev = element;</div><div class="ContentPasted1">                return downstream.flush(element);</div><div class="ContentPasted1">            } else if (!Objects.equals(state.prev, element)) {</div><div class="ContentPasted1">                state.prev = element;</div><div class="ContentPasted1">                return downstream.flush(element);</div><div class="ContentPasted1">            } else {</div><div class="ContentPasted1">                return true; // skip duplicate</div><div class="ContentPasted1">            }</div><div class="ContentPasted1">        },</div><div class="ContentPasted1">        Gatherer.unsupportedCombiner(),</div><div class="ContentPasted1">        (state, downstream) -> {}</div><div class="ContentPasted1">    );</div><div class="ContentPasted1">}</div><div class="ContentPasted1">```</div><div><br class="ContentPasted1"></div><div class="ContentPasted1">```java</div><div class="ContentPasted1">jshell> Stream.of(1,2,2,3,2,4).gather(deduplicateAdjacent()).toList()</div><div class="ContentPasted1">$1 ==> [1, 2, 3, 2, 4]</div>```</div><div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;" class="elementToProof ContentPasted0" data-mce-style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: #000000;"><br></div><br></blockquote></div></div></body></html>