<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=Windows-1252">
<style type="text/css" style="display:none;"> P {margin-top:0;margin-bottom:0;} </style>
</head>
<body dir="ltr">
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof">
Hi core-libs-dev,</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof">
<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof">
Over the past 6+ months I've been thinking about, and tinkering with, how we'd be able to expose a user-facing API for extensible intermediate java.util.stream.Stream operations—a feature envisioned all the way back when Streams were created.<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof">
<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof">
I'm now at a point where I have a viable design and implementation, and so I'm turning to you for your feedback: on the direction taken; the API concepts; and, in particular, is there anything which I have overlooked/missed?</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof ContentPasted0">
<div class="ContentPasted0 elementToProof"><br>
</div>
<div class="ContentPasted0 elementToProof">(If you, like myself, prefer reading pre-rendered markdown,
<a href="https://cr.openjdk.org/~vklang/Gatherers.html" title="https://cr.openjdk.org/~vklang/Gatherers.html" data-loopstyle="linkonly" id="LPlnk689721">
click here</a>)<br>
</div>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof ContentPasted0">
<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof ContentPasted0">
<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof ContentPasted0">
<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof ContentPasted0">
<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof ContentPasted0 ContentPasted1">
# Gathering the streams
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## Introduction</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Java 8 introduced the `java.util.stream` API, which represents a lazily</div>
<div class="ContentPasted1">computed, potentially unbounded sequence of values (Streams was also the first</div>
<div class="ContentPasted1">designed-for-lambdas API in the JDK). Streams supports the ability to process</div>
<div class="ContentPasted1">the stream either sequentially or in parallel.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">A `Stream` pipeline consists of a source (collection, array, generator, etc),</div>
<div class="ContentPasted1">zero or more intermediate operations (`Stream` -> `Stream` transforms), and an</div>
<div class="ContentPasted1">eager terminal operation which produces a value or a side-effect. </div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">The Streams API come with a reasonably rich, but fixed set of built-in</div>
<div class="ContentPasted1">operations (mapping, filtering, reduction, sorting, etc), as well as an</div>
<div class="ContentPasted1">extensible terminal operation (`Stream::collect`) that enables the stream</div>
<div class="ContentPasted1">contents to be flexibly summarized in a variety of forms. The resulting API is</div>
<div class="ContentPasted1">rich enough that users have had good experience with streams, but there are</div>
<div class="ContentPasted1">repeated requests for "please add operation X to streams". </div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">In this document, we explore a corresponding extensible _intermediate_</div>
<div class="ContentPasted1">operation, called `Stream::gather`, which is able to address many of the</div>
<div class="ContentPasted1">requests we've gotten for additional operations.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## "Missing" intermediate operations</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Over the years, many new operations for streams have been proposed. Each of</div>
<div class="ContentPasted1">them may be useful in some situation, but many of them are too narrow to be</div>
<div class="ContentPasted1">included in the core `Stream` API. We'd like for people to be able to plug</div>
<div class="ContentPasted1">these operations in via an extension point, as with `Stream::collect`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Here are some examples of proposed intermediate operations that are not easily</div>
<div class="ContentPasted1">expressible as intermediate operations on `Stream` today. </div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">* **Folds**. Folding is a generalization of reduction. With reduction, the</div>
<div class="ContentPasted1"> result type is the same as the element type, the combiner is associative, and</div>
<div class="ContentPasted1"> the initial value is an identity for the combiner. For a fold, these</div>
<div class="ContentPasted1"> conditions are not required, though we give up parallelizability. </div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Example: `foldLeft("", (str,elem) -> str + " " + elem.toString())` over the values</div>
<div class="ContentPasted1"> `[1,2,3]` yields `["1 2 3"]`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> See https://bugs.openjdk.org/browse/JDK-8133680 & https://bugs.openjdk.org/browse/JDK-8292845</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">* **Unfolds**. This takes an aggregate and decomposes it into elements. An</div>
<div class="ContentPasted1"> `unfold` can reverse the operation of the above fold example using `Scanner`</div>
<div class="ContentPasted1"> or `String::split`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Example: `unfold(e -> new Scanner(e), Scanner::hasNextInt, Scanner::nextInt)`</div>
<div class="ContentPasted1"> `["1 2 3"]` yields `[1,2,3]`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> See: https://bugs.openjdk.org/browse/JDK-8151408</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">* **Barriers**. Usually all operations of a stream pipeline can run</div>
<div class="ContentPasted1"> simultaneously as data is available. In some cases, we may wish to have a</div>
<div class="ContentPasted1"> barrier that requires that the entirety of one operation is completed before</div>
<div class="ContentPasted1"> providing any elements to the next operation.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Example: `someStream.a(...).barrier(someEffect)...` would require that all</div>
<div class="ContentPasted1"> effects from `a` are completed before executing `someEffect` or producing any</div>
<div class="ContentPasted1"> downstream values.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> See: https://bugs.openjdk.org/browse/JDK-8294246</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">* **Windowing**. Given a stream, we may wish to construct a stream which</div>
<div class="ContentPasted1"> groups the original elements, either in overlapping or disjoint groups. </div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Example: `fixedWindow(2)` over the values `[1,2,3,4,5]` yields `[[1,2],[3,4],[5]]`.</div>
<div class="ContentPasted1"> Example: `slidingWindow(2)` over the values `[1,2,3]` yields `[[1,2],[2,3],[3,4]]`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> See: https://stackoverflow.com/questions/34158634/how-to-transform-a-java-stream-into-a-sliding-window</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">* **Prefix scans**. Prefix scans are a stream of incremental reductions.</div>
<div class="ContentPasted1"> Perhaps surprisingly, some prefix scans are parallelizable, and we are exploring adding support for that beyond `Arrays.parallelPrefix()`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Example: `scan((sum, next) -> sum + next)` over values `[1,2,3,4]` yields `[1,3,6,10]`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> See: https://stackoverflow.com/questions/55265797/cumulative-sum-using-java-8-stream-api</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">* **Duplicated elements**. The `distinct` operation will remove duplicates; sometimes we want only the elements that are duplicated.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Example: `duplicates(Object::equals)` over values `[1,1,2,3,4]` yields `[1]`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> See: https://stackoverflow.com/questions/27677256/java-8-streams-to-find-the-duplicate-elements</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">* **Running duplicate elimination**. Here, we want to eliminate adjacent repeated elements. </div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Example: `collapseRuns(Object::equals)` over values `[1,1,2,3,3]` yields `[1,2,3]`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">These are just a few of the stream operations people have wished for—</div>
<div class="ContentPasted1">and while many operations are too specialized to include in the core library,</div>
<div class="ContentPasted1">all are things we'd like for people to be able to express with streams.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## Extending streams</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">It was always desired for `java.util.stream.Stream` to have an extension point</div>
<div class="ContentPasted1">for intermediate operations; when the library was first built, this wish-list</div>
<div class="ContentPasted1">item was recorded: https://bugs.openjdk.org/browse/JDK-8132369. There were many</div>
<div class="ContentPasted1">reasons that this was not included in the initial version, the most important</div>
<div class="ContentPasted1">being that it was not at all obvious what the right API was. </div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">The set of intermediate operations on Streams carries the exact same trade-off</div>
<div class="ContentPasted1">as [CISC](Complex Instruction Set Computing) vs [RISC](Reduced Instruction Set</div>
<div class="ContentPasted1">Computing), namely: either you have to provide a rather sizeable set of</div>
<div class="ContentPasted1">specialized operations, or you provide a small set of general operations.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">For the **CISC**-strategy, the main drawback is that not all operations is</div>
<div class="ContentPasted1">equally useful, and having a large set of operations to choose from creates a</div>
<div class="ContentPasted1">barrier to effective usage by making the correct choice harder to identify.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">For the **RISC**-strategy, the main drawback is that it puts more effort on the</div>
<div class="ContentPasted1">user to encode more specialized operations over a set of generic operations,</div>
<div class="ContentPasted1">making code harder to read, and harder to maintain, for the user.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">However, one success story was `Collector`; this allowed us to extend _terminal_</div>
<div class="ContentPasted1">operations with a good balance of expressiveness, reuse, and API footprint. We</div>
<div class="ContentPasted1">would like to do the same for intermediate operations -- and we can draw some</div>
<div class="ContentPasted1">inspiration from `Collector` to get there.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## Requirements</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Breaking down operations into intermediate and terminal is a useful distinction, however,</div>
<div class="ContentPasted1">in reality there are many more characteristics we could use to describe operations,</div>
<div class="ContentPasted1">and relatively few operations use exactly the same subset of characteristics.</div>
<div class="ContentPasted1">The following is a curated set of characteristics of stream operations;</div>
<div class="ContentPasted1">if we want to enable generic, user-defineable, intermediate operations on streams,</div>
<div class="ContentPasted1">we probably have to provide for them all. </div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">| Ordinal | Feature | What this means in practice | Example operation(s) |</div>
<div class="ContentPasted1">|---------|--------------------------|-------------------------------------------------------------|-------------------------------------|</div>
<div class="ContentPasted1">| 1 | Intermediate operation | Add custom new operations to Stream pipelines | *map*, *filter* |</div>
<div class="ContentPasted1">| 2 | Incremental operation | Evaluate the Stream pipeline depth-first | *map*, *filter* |</div>
<div class="ContentPasted1">| 3 | Stateful operation | Remember information between processing elements | *limit*, *foldLeft* |</div>
<div class="ContentPasted1">| 4 | N:M inputs-to-outputs | Encode a wide array of operations | *limit*, *flatMap* |</div>
<div class="ContentPasted1">| 5 | Prefix consumption | An operation should consume only as much as needed | *limit*, *takeWhile* |</div>
<div class="ContentPasted1">| 6 | Beginning-of-stream hook | Ability to have initialization logic | *foldLeft*, *grouped* |</div>
<div class="ContentPasted1">| 7 | End-of-stream hook | Flush elements downstream | *sorted*, *grouped* |</div>
<div class="ContentPasted1">| 8 | Opt-in parallelizability | Speed up CPU-bound processing for parallelizable operations | *reduce*, *map* |</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Now when we have a better idea of what we need to achieve, let's have a look at</div>
<div class="ContentPasted1">what our options are.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## Comparing existing operations</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Let's look at the operations we already have, to see if they would fit the bill</div>
<div class="ContentPasted1">as a generic extension point for intermediate `Stream`-operations.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">These are, in order, `Stream.map()`, `Stream.flatMap(…)` (`Stream.mapMulti(…)`</div>
<div class="ContentPasted1">is analogous), and `Collector` & `Stream.collect(…)`—and in the table below we</div>
<div class="ContentPasted1">see how they line up against our requirements.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">| Ordinal | Feature | Using Stream.map(…) | Using Stream.flatMap(…) | Using Collector |</div>
<div class="ContentPasted1">|---------|--------------------------|---------------------------|---------------------------|-----------------|</div>
<div class="ContentPasted1">| 1 | Intermediate operation | Yes | Yes | No, terminal |</div>
<div class="ContentPasted1">| 2 | Incremental operation | Yes | Yes | No, terminal |</div>
<div class="ContentPasted1">| 3 | Stateful operation | Yes, via class or capture | Yes, via class or capture | Yes |</div>
<div class="ContentPasted1">| 4 | N:M inputs-to-outputs | No, 1:1 | No, 1:M | No, N:1 |</div>
<div class="ContentPasted1">| 5 | Prefix consumption | No | No | No |</div>
<div class="ContentPasted1">| 6 | Beginning-of-stream hook | No | No | Yes |</div>
<div class="ContentPasted1">| 7 | End-of-stream hook | No | No | Yes |</div>
<div class="ContentPasted1">| 8 | Opt-in parallelizability | No, always on | No, always on | No, always on |</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Looking at the options above, none of them are going to, as-is, fit the bill as</div>
<div class="ContentPasted1">an extension point for generic intermediate `Stream` operations, but one of them</div>
<div class="ContentPasted1">has more promise than the others: `Collector`. Obvious needed upgrades are</div>
<div class="ContentPasted1">supporting short-circuiting and incremental generation of output; let's see what</div>
<div class="ContentPasted1">that looks like in practice.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## Collector 2.0</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Let's start with a clone of `Collector<T,A,R>` called `Gatherer<T,A,R>`:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">/** @param <T> the element type</div>
<div class="ContentPasted1"> * @param <A> the (mutable) intermediate accumulation type</div>
<div class="ContentPasted1"> * @param <R> the (probably immutable) final accumulation type</div>
<div class="ContentPasted1"> */</div>
<div class="ContentPasted1">interface Gatherer<T,A,R> {</div>
<div class="ContentPasted1"> Supplier<A> supplier();</div>
<div class="ContentPasted1"> BiConsumer<A, T> accumulator();</div>
<div class="ContentPasted1"> BinaryOperator<A> combiner();</div>
<div class="ContentPasted1"> Function<A, R> finisher();</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Now, we need to make it output 0...M elements instead of strictly 1. In order</div>
<div class="ContentPasted1">to do that, we need a way to programmatically output elements from the</div>
<div class="ContentPasted1">`accumulator` to make it incremental (and has the added benefit that it would be</div>
<div class="ContentPasted1">able to run depth-first). Adding an additional argument, a downstream handle,</div>
<div class="ContentPasted1">to `accumulator` and `finisher` allows us to push elements downstream both for</div>
<div class="ContentPasted1">each input, and at the end of stream. As there exists no suitable pre-existing</div>
<div class="ContentPasted1">interface, we'll define a new interface named `Integrator` and rename `accumulator`</div>
<div class="ContentPasted1">to the more fitting `integrator`.</div>
<div class="ContentPasted1">Our `finisher` will also not have a useful return value, so we'll make it `void`:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">/** @param <T> the element type</div>
<div class="ContentPasted1"> * @param <A> the (mutable) intermediate accumulation type</div>
<div class="ContentPasted1"> * @param <R> the (probably immutable) final accumulation type</div>
<div class="ContentPasted1"> */</div>
<div class="ContentPasted1">interface Gatherer<T,A,R> {</div>
<div class="ContentPasted1"> interface Integrator<A,T,R> {</div>
<div class="ContentPasted1"> void integrate(A state, T element, Consumer<? super R> downstream);</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> Supplier<A> supplier();</div>
<div class="ContentPasted1"> Integrator<A, T, R> integrator();</div>
<div class="ContentPasted1"> BinaryOperator<A> combiner();</div>
<div class="ContentPasted1"> BiConsumer<A, Consumer<? super> R> finisher();</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Next, we need to tackle short-circuiting and add an upstream signal from each</div>
<div class="ContentPasted1">invocation of the `integrator` so we can detect when the integration does not</div>
<div class="ContentPasted1">want any more elements. We do this by returning a boolean from `integrate`:
</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">/** @param <T> the element type</div>
<div class="ContentPasted1"> * @param <A> the (mutable) intermediate accumulation type</div>
<div class="ContentPasted1"> * @param <R> the (probably immutable) final accumulation type</div>
<div class="ContentPasted1"> */</div>
<div class="ContentPasted1">interface Gatherer<T,A,R> {</div>
<div class="ContentPasted1"> interface Integrator<A,T,R> {</div>
<div class="ContentPasted1"> boolean integrate(A state, T element, Consumer<? super R> downstream);</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> Supplier<A> supplier();</div>
<div class="ContentPasted1"> Integrator<A, T, R> integrator();</div>
<div class="ContentPasted1"> BinaryOperator<A> combiner();</div>
<div class="ContentPasted1"> BiConsumer<A, Consumer<? super> R> finisher();</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Transitively supporting short-circuiting, i.e. that a downstream handle does</div>
<div class="ContentPasted1">not want to receive any more elements, requires us to make the same change as</div>
<div class="ContentPasted1">we did for `integrate`—return a `boolean` instead of `void`, and since the</div>
<div class="ContentPasted1">only built-in candidate would be `Predicate`, which is not supposed to have</div>
<div class="ContentPasted1">side-effects, let's introduce our own type `Sink`:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">/** @param <T> the element type</div>
<div class="ContentPasted1"> * @param <A> the (mutable) intermediate accumulation type</div>
<div class="ContentPasted1"> * @param <R> the (probably immutable) final accumulation type</div>
<div class="ContentPasted1"> */</div>
<div class="ContentPasted1">interface Gatherer<T,A,R> {</div>
<div class="ContentPasted1"> interface Sink<R> {</div>
<div class="ContentPasted1"> boolean flush(R element);</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> interface Integrator<A,T,R> {</div>
<div class="ContentPasted1"> boolean integrate(A state, T element, Sink<? super R> downstream);</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> Supplier<A> supplier();</div>
<div class="ContentPasted1"> Integrator<A, T, R> integrator();</div>
<div class="ContentPasted1"> BinaryOperator<A> combiner();</div>
<div class="ContentPasted1"> BiConsumer<A, Sink<? super R>> finisher();</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Finally, we need to address parallelism. To opt-in to parallelism we can make</div>
<div class="ContentPasted1">the combiner optional -- we call it only for parallel executions, and if it is</div>
<div class="ContentPasted1">not present, execution is constrained to sequential.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">To demonstrate how all of the above fits together, we implement `fixedWindow(N)`</div>
<div class="ContentPasted1">where `fixedWindow(2)` over the values `[1,2,3,4,5]` yields `[[1,2],[3,4],[5]]`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1"> /**</div>
<div class="ContentPasted1"> * Gathers elements into fixed-size windows. The last window can contain fewer elements.</div>
<div class="ContentPasted1"> * @param windowSize the size of the windows</div>
<div class="ContentPasted1"> * @return a new gatherer which gathers elements into fixed-size windows</div>
<div class="ContentPasted1"> * @param <T> the type of elements this Gatherer gathers</div>
<div class="ContentPasted1"> */</div>
<div class="ContentPasted1"> public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {</div>
<div class="ContentPasted1"> if (windowSize < 1)</div>
<div class="ContentPasted1"> throw new IllegalArgumentException("'windowSize' must be greater than zero");</div>
<div class="ContentPasted1"> else {</div>
<div class="ContentPasted1"> Supplier<ArrayList<T>> supplier =</div>
<div class="ContentPasted1"> () -> new ArrayList<T>(windowSize);</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Integrator<ArrayList<T>, T, List<T>> integrator =</div>
<div class="ContentPasted1"> (openWindow, element, downstream) -> {</div>
<div class="ContentPasted1"> if (openWindow.add(element) && openWindow.size() >= windowSize) {</div>
<div class="ContentPasted1"> var closedWindow = List.copyOf(openWindow);</div>
<div class="ContentPasted1"> openWindow.clear();</div>
<div class="ContentPasted1"> return downstream.flush(closedWindow);</div>
<div class="ContentPasted1"> } else</div>
<div class="ContentPasted1"> return true;</div>
<div class="ContentPasted1"> };</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> // This combiner signals that we opt-out of parallelisation</div>
<div class="ContentPasted1"> BinaryOperator<ArrayList<T>> combiner = </div>
<div class="ContentPasted1"> Gatherer.unsupportedCombiner();</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher =</div>
<div class="ContentPasted1"> (openWindow, downstream) -> {</div>
<div class="ContentPasted1"> if(!openWindow.isEmpty()) {</div>
<div class="ContentPasted1"> downstream.flush(List.copyOf(openWindow));</div>
<div class="ContentPasted1"> openWindow.clear();</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> };</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> return Gatherer.of(supplier, integrator, combiner, finisher);</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Or by implementing the `Gatherer` interface directly like so:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1"> /**</div>
<div class="ContentPasted1"> * Gathers elements into fixed-size windows. The last window can contain fewer elements.</div>
<div class="ContentPasted1"> * @param windowSize the size of the windows</div>
<div class="ContentPasted1"> * @return a new gatherer which gathers elements into fixed-size windows</div>
<div class="ContentPasted1"> * @param <T> the type of elements this Gatherer gathers</div>
<div class="ContentPasted1"> */</div>
<div class="ContentPasted1"> public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {</div>
<div class="ContentPasted1"> if (windowSize < 1)</div>
<div class="ContentPasted1"> throw new IllegalArgumentException("'windowSize' must be greater than zero");</div>
<div class="ContentPasted1"> </div>
<div class="ContentPasted1"> class FixedWindowGatherer implements Gatherer<T,ArrayList<T>,List<T>> {</div>
<div class="ContentPasted1"> @Override</div>
<div class="ContentPasted1"> public Supplier<ArrayList<T>> initializer() {</div>
<div class="ContentPasted1"> return () -> new ArrayList<>(windowSize);</div>
<div class="ContentPasted1"> }</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> @Override</div>
<div class="ContentPasted1"> public Integrator<ArrayList<T>, T, List<T>> integrator() {</div>
<div class="ContentPasted1"> return (openWindow, element, downstream) -> {</div>
<div class="ContentPasted1"> if (openWindow.add(element) && openWindow.size() >= windowSize) {</div>
<div class="ContentPasted1"> var closedWindow = List.copyOf(openWindow);</div>
<div class="ContentPasted1"> openWindow.clear();</div>
<div class="ContentPasted1"> return downstream.flush(closedWindow);</div>
<div class="ContentPasted1"> } else</div>
<div class="ContentPasted1"> return true;</div>
<div class="ContentPasted1"> };</div>
<div class="ContentPasted1"> }</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> @Override</div>
<div class="ContentPasted1"> public BinaryOperator<ArrayList<T>> combiner() {</div>
<div class="ContentPasted1"> return Gatherer.unsupportedCombiner();</div>
<div class="ContentPasted1"> }</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> @Override</div>
<div class="ContentPasted1"> public BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher() {</div>
<div class="ContentPasted1"> return (openWindow, downstream) -> {</div>
<div class="ContentPasted1"> if(!openWindow.isEmpty()) {</div>
<div class="ContentPasted1"> downstream.flush(List.copyOf(openWindow));</div>
<div class="ContentPasted1"> openWindow.clear();</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> };</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> return new FixedWindowGatherer();</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Compared to Collector, it would look like the following:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">| Ordinal | Feature | `Collector<T,A,R>` | `Gatherer<T,A,R>` |</div>
<div class="ContentPasted1">|---------|--------------------------|-----------------------------------|---------------------------------------|</div>
<div class="ContentPasted1">| 1 | Beginning-of-stream hook | `A supply()` | `A supply()` |</div>
<div class="ContentPasted1">| 2 | Per element | `void accept(A, T)` | `boolean integrate(A, T, Sink<R>)` |</div>
<div class="ContentPasted1">| 3 | End-of-stream hook | `R apply(A)` | `void accept(A, Sink<R>)` |</div>
<div class="ContentPasted1">| 4 | Parallelizability | `A apply(A, A)` | `A apply(A, A)` |</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">To integrate it with `java.util.stream.Stream` we add the following method to</div>
<div class="ContentPasted1">it: `Stream<R> gather(Gatherer<T,?,R> gatherer)`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Now we can use our `fixedWindow` Gatherer:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.iterate(0, i -> i + 1).limit(10).gather(fixedWindow(2)).toList();</div>
<div class="ContentPasted1">$1 ==> [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Gatherer now provides all features:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">| Ordinal | Feature | Using Gatherer | Explanation |</div>
<div class="ContentPasted1">|---------|--------------------------|-------------------------|----------------------------------------------------------------------------------------|</div>
<div class="ContentPasted1">| 1 | Intermediate operation | Yes | A Gatherer's output is the following operation's input |</div>
<div class="ContentPasted1">| 2 | Incremental operation | Yes | A Gatherer can produce elements in response to consuming elements |</div>
<div class="ContentPasted1">| 3 | Stateful operation | Yes | A Gatherer supplies its own state, which can be Void/null if stateless |</div>
<div class="ContentPasted1">| 4 | N:M inputs-to-outputs | Yes | A Gatherer can consume and produce any number of elements |</div>
<div class="ContentPasted1">| 5 | Prefix consumption | Yes | A Gatherer can signal that it is done by returning `false` from its `integrate`-method |</div>
<div class="ContentPasted1">| 6 | Beginning-of-stream hook | Yes | A Gatherer can run logic when creating its initial state |</div>
<div class="ContentPasted1">| 7 | End-of-stream hook | Yes | A Gatherer's `finisher` is invoked at the end of input |</div>
<div class="ContentPasted1">| 8 | Opt-in parallelizability | Yes | A Gatherer's `combiner` is optional |</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## Summary</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">None of the current candidates for a user-extensible API for generic</div>
<div class="ContentPasted1">intermediate stream operations fit the set of requirements, but `Collector`</div>
<div class="ContentPasted1">can form an excellent basis for a new construct, called `Gatherer`,</div>
<div class="ContentPasted1">which after a limited set of modifications checks all the boxes:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">| Ordinal | Feature | Using Stream.map(…) | Using Stream.flatMap(…) | Using Collector | Using Gatherer |</div>
<div class="ContentPasted1">|---------|--------------------------|---------------------------|---------------------------|-----------------|----------------|</div>
<div class="ContentPasted1">| 1 | Intermediate operation | Yes | Yes | No, terminal | Yes |</div>
<div class="ContentPasted1">| 2 | Incremental operation | Yes | Yes | No, terminal | Yes |</div>
<div class="ContentPasted1">| 3 | Stateful operation | Yes, via class or capture | Yes, via class or capture | Yes | Yes |</div>
<div class="ContentPasted1">| 4 | N:M inputs-to-outputs | No, 1:1 | No, 1:M | No, N:1 | Yes |</div>
<div class="ContentPasted1">| 5 | Prefix consumption | No | No | No | Yes |</div>
<div class="ContentPasted1">| 6 | Beginning-of-stream hook | No | No | Yes | Yes |</div>
<div class="ContentPasted1">| 7 | End-of-stream hook | No | No | Yes | Yes |</div>
<div class="ContentPasted1">| 8 | Opt-in parallelizability | No, always on | No, always on | No, always on | Yes |</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">As it turned out, we were a short hop away from a generic, stand-alone,</div>
<div class="ContentPasted1">reusable, parallelizable, API for intermediate operations all along!</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## But wait … there's more</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">While the aforementioned solution contains the bare-minimum to meet our requirements,</div>
<div class="ContentPasted1">the following will provide some significant quality-of-life improvements.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### Compositionality</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">The astute reader may have noticed that by adding the `gather(Gatherer)`-method on `Stream`,</div>
<div class="ContentPasted1">one can in-effect **compose** `Gatherer`s together:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">Gatherer<Integer,?,String> someGatherer = …;</div>
<div class="ContentPasted1">Gatherer<String,?,Long> someOtherGatherer = …;</div>
<div class="ContentPasted1">Stream<Long> stream = Stream.of(1).gather(someGatherer).gather(someOtherGatherer);</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">As it turns out, it is possible to define a `default Gatherer andThen(Gatherer)`-method on `Gatherer` which composes `Gatherer`s:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">Gatherer<Integer,?,String> someGatherer = …;</div>
<div class="ContentPasted1">Gatherer<String,?,Long> someOtherGatherer = …;</div>
<div class="ContentPasted1">Gatherer<Integer,?,Long> gatherer = someGatherer.andThen(someOtherGatherer);</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">This makes it possible to both de-couple operations from their use-sites,</div>
<div class="ContentPasted1">as well as create more sophisticated `Gatherer`s from simple ones.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">But that's not all—by introducing a `default Collector collect(Collector)`-method</div>
<div class="ContentPasted1">it is also possible to compose a `Gatherer` with a `Collector`,</div>
<div class="ContentPasted1">and the result is another `Collector`:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">Gatherer<Integer,?,String> someGatherer = …;</div>
<div class="ContentPasted1">Gatherer<String,?,Long> someOtherGatherer = …;</div>
<div class="ContentPasted1">Collector<Long,?,List<Long>> someGatherer.andThen(someOtherGatherer).collect(Collectors.toList());</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">All of this together, it is now possible to build `Stream`-processing:</div>
<div class="ContentPasted1">"source-to-destination" using `Stream.gather(…).gather(…).gather(…)`;</div>
<div class="ContentPasted1">"intermediaries-first" using `gatherer.andThen(…).andThen(…);` and,</div>
<div class="ContentPasted1">"destination-to-source" using `gatherer.collect(otherGatherer.collect(…))`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### Familiarity</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Since `Gatherer` is an evolution of `Collector` there's a clear on-ramp</div>
<div class="ContentPasted1">for any and all Java developers who are already familiar with `Collector`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### Reusability</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">`Gatherer`s, by virtue of being stand-alone, reusable, intermediaries,</div>
<div class="ContentPasted1">paired with compositionality and opt-in parallelization, allows for transformations</div>
<div class="ContentPasted1">to be created once and maintained in a single place, no matter if it is going to be</div>
<div class="ContentPasted1">used as a `Collector`, a `Gatherer`, or a `Stream`.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### Optimizability</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">`Collector` has the notion of `Characteristics`, a `Set` of flags which describes them.</div>
<div class="ContentPasted1">This information can then be used by the underlying machinery which uses them</div>
<div class="ContentPasted1">to optimize their evaluation. The same approach can be used for `Gatherer`s.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### Summary 2.0</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">Taking all of the above into consideration, we end up with a definition like the following:</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">/** @param <T> the element type</div>
<div class="ContentPasted1"> * @param <A> the (mutable) intermediate accumulation type</div>
<div class="ContentPasted1"> * @param <R> the (probably immutable) final accumulation type</div>
<div class="ContentPasted1"> */</div>
<div class="ContentPasted1">interface Gatherer<T,A,R> {</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> interface Sink<R> {</div>
<div class="ContentPasted1"> boolean flush(R element);</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> </div>
<div class="ContentPasted1"> interface Integrator<A,T,R> {</div>
<div class="ContentPasted1"> boolean integrate(A state, T element, Sink<? super R> downstream);</div>
<div class="ContentPasted1"> }</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> enum Characteristics {</div>
<div class="ContentPasted1"> GREEDY, // Never short-circuits</div>
<div class="ContentPasted1"> SIZE_PRESERVING, // Emits exactly once per input element</div>
<div class="ContentPasted1"> STATELESS; // No need to initialize or combine state</div>
<div class="ContentPasted1"> }</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> Supplier<A> supplier();</div>
<div class="ContentPasted1"> Integrator<A, T, R> integrator();</div>
<div class="ContentPasted1"> BinaryOperator<A> combiner();</div>
<div class="ContentPasted1"> BiConsumer<A, Sink<? super R>> finisher();</div>
<div class="ContentPasted1"> Set<Characteristics> characteristics();</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> default <AA, RR> Gatherer<T,?,RR> andThen(Gatherer<R,AA,RR> that) {</div>
<div class="ContentPasted1"> // Gatherers is analoguous to Collectors</div>
<div class="ContentPasted1"> return Gatherers.Composite.of(this, that);</div>
<div class="ContentPasted1"> }</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> default <RR> Collector<T,?,RR> collect(Collector<R, ?, RR> collector) {</div>
<div class="ContentPasted1"> // Gatherers is analoguous to Collectors</div>
<div class="ContentPasted1"> return Gatherers.GathererCollector.of(this, collector);</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">## Appetizers</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">The following examples showcase how `Gatherer` can be used to implement a diverse set</div>
<div class="ContentPasted1">of pre-existing, and future, intermediate operations for `Stream`s.</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### How to implement `map(mapper)`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">public final static <T,R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {</div>
<div class="ContentPasted1"> return Gatherer.of(</div>
<div class="ContentPasted1"> () -> (Void)null,</div>
<div class="ContentPasted1"> (nothing, element, downstream) -></div>
<div class="ContentPasted1"> downstream.flush(mapper.apply(element)),</div>
<div class="ContentPasted1"> (l,r) -> l,</div>
<div class="ContentPasted1"> (nothing, downstream) -> {}</div>
<div class="ContentPasted1"> );</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(map(i -> i + 1)).toList()</div>
<div class="ContentPasted1">$1 ==> [2, 3, 4, 5]</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### How to implement `flatMap(mapper)`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">public final static <T,R> Gatherer<T, ?, R> flatMap(Function<? super T, ? extends Stream<R>> mapper) {</div>
<div class="ContentPasted1"> return Gatherer.of(</div>
<div class="ContentPasted1"> () -> (Void)null,</div>
<div class="ContentPasted1"> (nothing, element, downstream) -> {</div>
<div class="ContentPasted1"> try(Stream<? extends R> s = mapper.apply(element)) {</div>
<div class="ContentPasted1"> return s == null || s.sequential().allMatch(downstream::flush);</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> },</div>
<div class="ContentPasted1"> (l,r) -> l,</div>
<div class="ContentPasted1"> (nothing, downstream) -> {}</div>
<div class="ContentPasted1"> );</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(flatMap(i -> Stream.of(i + 1))).toList()</div>
<div class="ContentPasted1">$1 ==> [2, 3, 4, 5]</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### How to implement `takeWhile(predicate)`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">public final static <T> Gatherer<T, ?, T> takeWhile(Predicate<? super T> predicate) {</div>
<div class="ContentPasted1"> return Gatherer.of(</div>
<div class="ContentPasted1"> () -> (Void)null,</div>
<div class="ContentPasted1"> (nothing, element, downstream) -></div>
<div class="ContentPasted1"> predicate.test(element) && downstream.flush(element),</div>
<div class="ContentPasted1"> (l, r) -> l,</div>
<div class="ContentPasted1"> (nothing, downstream) -> {}</div>
<div class="ContentPasted1"> );</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(takeWhile(i -> i < 3)).toList()</div>
<div class="ContentPasted1">$1 ==> [1, 2]</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### How to implement `limit(N)`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">public static <M> Gatherer<M, ?, M> limit(long limit) {</div>
<div class="ContentPasted1"> if (limit < 0)</div>
<div class="ContentPasted1"> throw new IllegalArgumentException("'limit' has to be non-negative");</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1"> class At { long n = 0; }</div>
<div class="ContentPasted1"> return Gatherer.of(</div>
<div class="ContentPasted1"> At::new,</div>
<div class="ContentPasted1"> (at, element, downstream) -></div>
<div class="ContentPasted1"> at.n < limit && downstream.flush(element) && ++at.n < limit,</div>
<div class="ContentPasted1"> Gatherer.unsupportedCombiner(),</div>
<div class="ContentPasted1"> (at, downstream) -> {}</div>
<div class="ContentPasted1"> );</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(limit(2)).toList()</div>
<div class="ContentPasted1">$1 ==> [1, 2]</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### How to implement prefix `scanLeft`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">public static <T,R> Gatherer<T,?,R> scanLeft(R initial, BiFunction<R,T,R> scanner) {</div>
<div class="ContentPasted1"> class State { R current = initial; }</div>
<div class="ContentPasted1"> return Gatherer.of(</div>
<div class="ContentPasted1"> State::new,</div>
<div class="ContentPasted1"> (state, element, downstream) -></div>
<div class="ContentPasted1"> downstream.flush(state.current = scanner.apply(state.current, element)),</div>
<div class="ContentPasted1"> Gatherer.unsupportedCombiner(),</div>
<div class="ContentPasted1"> (state, downstream) -> {}</div>
<div class="ContentPasted1"> );</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(scanLeft("'", (acc, elem) -> acc + elem + "'")).toList()</div>
<div class="ContentPasted1">$1 ==> ['1', '1'2', '1'2'3', '1'2'3'4']</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### How to implement `fold(initial, folder)`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">public static <T> Gatherer<T,?,T> fold(T initial, BinaryOperator<T> folder) {</div>
<div class="ContentPasted1"> class State { T current = initial; }</div>
<div class="ContentPasted1"> return Gatherer.of(</div>
<div class="ContentPasted1"> State::new,</div>
<div class="ContentPasted1"> (state, element, downstream) -> {</div>
<div class="ContentPasted1"> state.current = folder.apply(state.current, element);</div>
<div class="ContentPasted1"> return true;</div>
<div class="ContentPasted1"> },</div>
<div class="ContentPasted1"> (l, r) -> {</div>
<div class="ContentPasted1"> l.current = folder.apply(l.current, r.current);</div>
<div class="ContentPasted1"> return l;</div>
<div class="ContentPasted1"> },</div>
<div class="ContentPasted1"> (state, downstream) -> downstream.flush(state.current)</div>
<div class="ContentPasted1"> );</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(fold(0, (acc, elem) -> acc + elem)).toList()</div>
<div class="ContentPasted1">$1 ==> [10]</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### How to implement `foldLeft(initial, folder)`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">public static <T,R> Gatherer<T,?,R> foldLeft(R initial, BiFunction<R,T,R> folder) {</div>
<div class="ContentPasted1"> class State { R current = initial; }</div>
<div class="ContentPasted1"> return Gatherer.of(</div>
<div class="ContentPasted1"> State::new,</div>
<div class="ContentPasted1"> (state, element, downstream) -> {</div>
<div class="ContentPasted1"> state.current = folder.apply(state.current, element);</div>
<div class="ContentPasted1"> return true;</div>
<div class="ContentPasted1"> },</div>
<div class="ContentPasted1"> Gatherer.unsupportedCombiner(),</div>
<div class="ContentPasted1"> (state, downstream) -> downstream.flush(state.current)</div>
<div class="ContentPasted1"> );</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.of(1,2,3,4).gather(foldLeft(0L, (acc, elem) -> acc + elem)).toList()</div>
<div class="ContentPasted1">$1 ==> [10]</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">### How to implement `deduplicateAdjacent()`</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">public static <T> Gatherer<T,?,T> deduplicateAdjacent() {</div>
<div class="ContentPasted1"> class State { T prev; boolean hasPrev; }</div>
<div class="ContentPasted1"> return Gatherer.of(</div>
<div class="ContentPasted1"> State::new,</div>
<div class="ContentPasted1"> (state, element, downstream) -> {</div>
<div class="ContentPasted1"> if (!state.hasPrev) {</div>
<div class="ContentPasted1"> state.hasPrev = true;</div>
<div class="ContentPasted1"> state.prev = element;</div>
<div class="ContentPasted1"> return downstream.flush(element);</div>
<div class="ContentPasted1"> } else if (!Objects.equals(state.prev, element)) {</div>
<div class="ContentPasted1"> state.prev = element;</div>
<div class="ContentPasted1"> return downstream.flush(element);</div>
<div class="ContentPasted1"> } else {</div>
<div class="ContentPasted1"> return true; // skip duplicate</div>
<div class="ContentPasted1"> }</div>
<div class="ContentPasted1"> },</div>
<div class="ContentPasted1"> Gatherer.unsupportedCombiner(),</div>
<div class="ContentPasted1"> (state, downstream) -> {}</div>
<div class="ContentPasted1"> );</div>
<div class="ContentPasted1">}</div>
<div class="ContentPasted1">```</div>
<div><br class="ContentPasted1">
</div>
<div class="ContentPasted1">```java</div>
<div class="ContentPasted1">jshell> Stream.of(1,2,2,3,2,4).gather(deduplicateAdjacent()).toList()</div>
<div class="ContentPasted1">$1 ==> [1, 2, 3, 2, 4]</div>
```</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);" class="elementToProof ContentPasted0">
<br>
</div>
</body>
</html>