java.util.stream.Stream: API for user-extensible intermediate operations
Viktor Klang
viktor.klang at oracle.com
Tue Jun 27 17:10:42 UTC 2023
Hi core-libs-dev,
Over the past 6+ months I've been thinking about, and tinkering with, how we'd be able to expose a user-facing API for extensible intermediate java.util.stream.Stream operations—a feature envisioned all the way back when Streams were created.
I'm now at a point where I have a viable design and implementation, and so I'm turning to you for your feedback: on the direction taken; the API concepts; and, in particular, is there anything which I have overlooked/missed?
(If you, like myself, prefer reading pre-rendered markdown, click here<https://cr.openjdk.org/~vklang/Gatherers.html>)
# Gathering the streams
## Introduction
Java 8 introduced the `java.util.stream` API, which represents a lazily
computed, potentially unbounded sequence of values (Streams was also the first
designed-for-lambdas API in the JDK). Streams supports the ability to process
the stream either sequentially or in parallel.
A `Stream` pipeline consists of a source (collection, array, generator, etc),
zero or more intermediate operations (`Stream` -> `Stream` transforms), and an
eager terminal operation which produces a value or a side-effect.
The Streams API come with a reasonably rich, but fixed set of built-in
operations (mapping, filtering, reduction, sorting, etc), as well as an
extensible terminal operation (`Stream::collect`) that enables the stream
contents to be flexibly summarized in a variety of forms. The resulting API is
rich enough that users have had good experience with streams, but there are
repeated requests for "please add operation X to streams".
In this document, we explore a corresponding extensible _intermediate_
operation, called `Stream::gather`, which is able to address many of the
requests we've gotten for additional operations.
## "Missing" intermediate operations
Over the years, many new operations for streams have been proposed. Each of
them may be useful in some situation, but many of them are too narrow to be
included in the core `Stream` API. We'd like for people to be able to plug
these operations in via an extension point, as with `Stream::collect`.
Here are some examples of proposed intermediate operations that are not easily
expressible as intermediate operations on `Stream` today.
* **Folds**. Folding is a generalization of reduction. With reduction, the
result type is the same as the element type, the combiner is associative, and
the initial value is an identity for the combiner. For a fold, these
conditions are not required, though we give up parallelizability.
Example: `foldLeft("", (str,elem) -> str + " " + elem.toString())` over the values
`[1,2,3]` yields `["1 2 3"]`.
See https://bugs.openjdk.org/browse/JDK-8133680 & https://bugs.openjdk.org/browse/JDK-8292845
* **Unfolds**. This takes an aggregate and decomposes it into elements. An
`unfold` can reverse the operation of the above fold example using `Scanner`
or `String::split`.
Example: `unfold(e -> new Scanner(e), Scanner::hasNextInt, Scanner::nextInt)`
`["1 2 3"]` yields `[1,2,3]`.
See: https://bugs.openjdk.org/browse/JDK-8151408
* **Barriers**. Usually all operations of a stream pipeline can run
simultaneously as data is available. In some cases, we may wish to have a
barrier that requires that the entirety of one operation is completed before
providing any elements to the next operation.
Example: `someStream.a(...).barrier(someEffect)...` would require that all
effects from `a` are completed before executing `someEffect` or producing any
downstream values.
See: https://bugs.openjdk.org/browse/JDK-8294246
* **Windowing**. Given a stream, we may wish to construct a stream which
groups the original elements, either in overlapping or disjoint groups.
Example: `fixedWindow(2)` over the values `[1,2,3,4,5]` yields `[[1,2],[3,4],[5]]`.
Example: `slidingWindow(2)` over the values `[1,2,3]` yields `[[1,2],[2,3],[3,4]]`.
See: https://stackoverflow.com/questions/34158634/how-to-transform-a-java-stream-into-a-sliding-window
* **Prefix scans**. Prefix scans are a stream of incremental reductions.
Perhaps surprisingly, some prefix scans are parallelizable, and we are exploring adding support for that beyond `Arrays.parallelPrefix()`.
Example: `scan((sum, next) -> sum + next)` over values `[1,2,3,4]` yields `[1,3,6,10]`
See: https://stackoverflow.com/questions/55265797/cumulative-sum-using-java-8-stream-api
* **Duplicated elements**. The `distinct` operation will remove duplicates; sometimes we want only the elements that are duplicated.
Example: `duplicates(Object::equals)` over values `[1,1,2,3,4]` yields `[1]`
See: https://stackoverflow.com/questions/27677256/java-8-streams-to-find-the-duplicate-elements
* **Running duplicate elimination**. Here, we want to eliminate adjacent repeated elements.
Example: `collapseRuns(Object::equals)` over values `[1,1,2,3,3]` yields `[1,2,3]`
These are just a few of the stream operations people have wished for—
and while many operations are too specialized to include in the core library,
all are things we'd like for people to be able to express with streams.
## Extending streams
It was always desired for `java.util.stream.Stream` to have an extension point
for intermediate operations; when the library was first built, this wish-list
item was recorded: https://bugs.openjdk.org/browse/JDK-8132369. There were many
reasons that this was not included in the initial version, the most important
being that it was not at all obvious what the right API was.
The set of intermediate operations on Streams carries the exact same trade-off
as [CISC](Complex Instruction Set Computing) vs [RISC](Reduced Instruction Set
Computing), namely: either you have to provide a rather sizeable set of
specialized operations, or you provide a small set of general operations.
For the **CISC**-strategy, the main drawback is that not all operations is
equally useful, and having a large set of operations to choose from creates a
barrier to effective usage by making the correct choice harder to identify.
For the **RISC**-strategy, the main drawback is that it puts more effort on the
user to encode more specialized operations over a set of generic operations,
making code harder to read, and harder to maintain, for the user.
However, one success story was `Collector`; this allowed us to extend _terminal_
operations with a good balance of expressiveness, reuse, and API footprint. We
would like to do the same for intermediate operations -- and we can draw some
inspiration from `Collector` to get there.
## Requirements
Breaking down operations into intermediate and terminal is a useful distinction, however,
in reality there are many more characteristics we could use to describe operations,
and relatively few operations use exactly the same subset of characteristics.
The following is a curated set of characteristics of stream operations;
if we want to enable generic, user-defineable, intermediate operations on streams,
we probably have to provide for them all.
| Ordinal | Feature | What this means in practice | Example operation(s) |
|---------|--------------------------|-------------------------------------------------------------|-------------------------------------|
| 1 | Intermediate operation | Add custom new operations to Stream pipelines | *map*, *filter* |
| 2 | Incremental operation | Evaluate the Stream pipeline depth-first | *map*, *filter* |
| 3 | Stateful operation | Remember information between processing elements | *limit*, *foldLeft* |
| 4 | N:M inputs-to-outputs | Encode a wide array of operations | *limit*, *flatMap* |
| 5 | Prefix consumption | An operation should consume only as much as needed | *limit*, *takeWhile* |
| 6 | Beginning-of-stream hook | Ability to have initialization logic | *foldLeft*, *grouped* |
| 7 | End-of-stream hook | Flush elements downstream | *sorted*, *grouped* |
| 8 | Opt-in parallelizability | Speed up CPU-bound processing for parallelizable operations | *reduce*, *map* |
Now when we have a better idea of what we need to achieve, let's have a look at
what our options are.
## Comparing existing operations
Let's look at the operations we already have, to see if they would fit the bill
as a generic extension point for intermediate `Stream`-operations.
These are, in order, `Stream.map()`, `Stream.flatMap(…)` (`Stream.mapMulti(…)`
is analogous), and `Collector` & `Stream.collect(…)`—and in the table below we
see how they line up against our requirements.
| Ordinal | Feature | Using Stream.map(…) | Using Stream.flatMap(…) | Using Collector |
|---------|--------------------------|---------------------------|---------------------------|-----------------|
| 1 | Intermediate operation | Yes | Yes | No, terminal |
| 2 | Incremental operation | Yes | Yes | No, terminal |
| 3 | Stateful operation | Yes, via class or capture | Yes, via class or capture | Yes |
| 4 | N:M inputs-to-outputs | No, 1:1 | No, 1:M | No, N:1 |
| 5 | Prefix consumption | No | No | No |
| 6 | Beginning-of-stream hook | No | No | Yes |
| 7 | End-of-stream hook | No | No | Yes |
| 8 | Opt-in parallelizability | No, always on | No, always on | No, always on |
Looking at the options above, none of them are going to, as-is, fit the bill as
an extension point for generic intermediate `Stream` operations, but one of them
has more promise than the others: `Collector`. Obvious needed upgrades are
supporting short-circuiting and incremental generation of output; let's see what
that looks like in practice.
## Collector 2.0
Let's start with a clone of `Collector<T,A,R>` called `Gatherer<T,A,R>`:
```java
/** @param <T> the element type
* @param <A> the (mutable) intermediate accumulation type
* @param <R> the (probably immutable) final accumulation type
*/
interface Gatherer<T,A,R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
}
```
Now, we need to make it output 0...M elements instead of strictly 1. In order
to do that, we need a way to programmatically output elements from the
`accumulator` to make it incremental (and has the added benefit that it would be
able to run depth-first). Adding an additional argument, a downstream handle,
to `accumulator` and `finisher` allows us to push elements downstream both for
each input, and at the end of stream. As there exists no suitable pre-existing
interface, we'll define a new interface named `Integrator` and rename `accumulator`
to the more fitting `integrator`.
Our `finisher` will also not have a useful return value, so we'll make it `void`:
```java
/** @param <T> the element type
* @param <A> the (mutable) intermediate accumulation type
* @param <R> the (probably immutable) final accumulation type
*/
interface Gatherer<T,A,R> {
interface Integrator<A,T,R> {
void integrate(A state, T element, Consumer<? super R> downstream);
}
Supplier<A> supplier();
Integrator<A, T, R> integrator();
BinaryOperator<A> combiner();
BiConsumer<A, Consumer<? super> R> finisher();
}
```
Next, we need to tackle short-circuiting and add an upstream signal from each
invocation of the `integrator` so we can detect when the integration does not
want any more elements. We do this by returning a boolean from `integrate`:
```java
/** @param <T> the element type
* @param <A> the (mutable) intermediate accumulation type
* @param <R> the (probably immutable) final accumulation type
*/
interface Gatherer<T,A,R> {
interface Integrator<A,T,R> {
boolean integrate(A state, T element, Consumer<? super R> downstream);
}
Supplier<A> supplier();
Integrator<A, T, R> integrator();
BinaryOperator<A> combiner();
BiConsumer<A, Consumer<? super> R> finisher();
}
```
Transitively supporting short-circuiting, i.e. that a downstream handle does
not want to receive any more elements, requires us to make the same change as
we did for `integrate`—return a `boolean` instead of `void`, and since the
only built-in candidate would be `Predicate`, which is not supposed to have
side-effects, let's introduce our own type `Sink`:
```java
/** @param <T> the element type
* @param <A> the (mutable) intermediate accumulation type
* @param <R> the (probably immutable) final accumulation type
*/
interface Gatherer<T,A,R> {
interface Sink<R> {
boolean flush(R element);
}
interface Integrator<A,T,R> {
boolean integrate(A state, T element, Sink<? super R> downstream);
}
Supplier<A> supplier();
Integrator<A, T, R> integrator();
BinaryOperator<A> combiner();
BiConsumer<A, Sink<? super R>> finisher();
}
```
Finally, we need to address parallelism. To opt-in to parallelism we can make
the combiner optional -- we call it only for parallel executions, and if it is
not present, execution is constrained to sequential.
To demonstrate how all of the above fits together, we implement `fixedWindow(N)`
where `fixedWindow(2)` over the values `[1,2,3,4,5]` yields `[[1,2],[3,4],[5]]`.
```java
/**
* Gathers elements into fixed-size windows. The last window can contain fewer elements.
* @param windowSize the size of the windows
* @return a new gatherer which gathers elements into fixed-size windows
* @param <T> the type of elements this Gatherer gathers
*/
public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {
if (windowSize < 1)
throw new IllegalArgumentException("'windowSize' must be greater than zero");
else {
Supplier<ArrayList<T>> supplier =
() -> new ArrayList<T>(windowSize);
Integrator<ArrayList<T>, T, List<T>> integrator =
(openWindow, element, downstream) -> {
if (openWindow.add(element) && openWindow.size() >= windowSize) {
var closedWindow = List.copyOf(openWindow);
openWindow.clear();
return downstream.flush(closedWindow);
} else
return true;
};
// This combiner signals that we opt-out of parallelisation
BinaryOperator<ArrayList<T>> combiner =
Gatherer.unsupportedCombiner();
BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher =
(openWindow, downstream) -> {
if(!openWindow.isEmpty()) {
downstream.flush(List.copyOf(openWindow));
openWindow.clear();
}
};
return Gatherer.of(supplier, integrator, combiner, finisher);
}
}
```
Or by implementing the `Gatherer` interface directly like so:
```java
/**
* Gathers elements into fixed-size windows. The last window can contain fewer elements.
* @param windowSize the size of the windows
* @return a new gatherer which gathers elements into fixed-size windows
* @param <T> the type of elements this Gatherer gathers
*/
public static <T> Gatherer<T, ?, List<T>> fixedWindow(int windowSize) {
if (windowSize < 1)
throw new IllegalArgumentException("'windowSize' must be greater than zero");
class FixedWindowGatherer implements Gatherer<T,ArrayList<T>,List<T>> {
@Override
public Supplier<ArrayList<T>> initializer() {
return () -> new ArrayList<>(windowSize);
}
@Override
public Integrator<ArrayList<T>, T, List<T>> integrator() {
return (openWindow, element, downstream) -> {
if (openWindow.add(element) && openWindow.size() >= windowSize) {
var closedWindow = List.copyOf(openWindow);
openWindow.clear();
return downstream.flush(closedWindow);
} else
return true;
};
}
@Override
public BinaryOperator<ArrayList<T>> combiner() {
return Gatherer.unsupportedCombiner();
}
@Override
public BiConsumer<ArrayList<T>, Sink<? super List<T>>> finisher() {
return (openWindow, downstream) -> {
if(!openWindow.isEmpty()) {
downstream.flush(List.copyOf(openWindow));
openWindow.clear();
}
};
}
}
return new FixedWindowGatherer();
}
```
Compared to Collector, it would look like the following:
| Ordinal | Feature | `Collector<T,A,R>` | `Gatherer<T,A,R>` |
|---------|--------------------------|-----------------------------------|---------------------------------------|
| 1 | Beginning-of-stream hook | `A supply()` | `A supply()` |
| 2 | Per element | `void accept(A, T)` | `boolean integrate(A, T, Sink<R>)` |
| 3 | End-of-stream hook | `R apply(A)` | `void accept(A, Sink<R>)` |
| 4 | Parallelizability | `A apply(A, A)` | `A apply(A, A)` |
To integrate it with `java.util.stream.Stream` we add the following method to
it: `Stream<R> gather(Gatherer<T,?,R> gatherer)`
Now we can use our `fixedWindow` Gatherer:
```java
jshell> Stream.iterate(0, i -> i + 1).limit(10).gather(fixedWindow(2)).toList();
$1 ==> [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
```
Gatherer now provides all features:
| Ordinal | Feature | Using Gatherer | Explanation |
|---------|--------------------------|-------------------------|----------------------------------------------------------------------------------------|
| 1 | Intermediate operation | Yes | A Gatherer's output is the following operation's input |
| 2 | Incremental operation | Yes | A Gatherer can produce elements in response to consuming elements |
| 3 | Stateful operation | Yes | A Gatherer supplies its own state, which can be Void/null if stateless |
| 4 | N:M inputs-to-outputs | Yes | A Gatherer can consume and produce any number of elements |
| 5 | Prefix consumption | Yes | A Gatherer can signal that it is done by returning `false` from its `integrate`-method |
| 6 | Beginning-of-stream hook | Yes | A Gatherer can run logic when creating its initial state |
| 7 | End-of-stream hook | Yes | A Gatherer's `finisher` is invoked at the end of input |
| 8 | Opt-in parallelizability | Yes | A Gatherer's `combiner` is optional |
## Summary
None of the current candidates for a user-extensible API for generic
intermediate stream operations fit the set of requirements, but `Collector`
can form an excellent basis for a new construct, called `Gatherer`,
which after a limited set of modifications checks all the boxes:
| Ordinal | Feature | Using Stream.map(…) | Using Stream.flatMap(…) | Using Collector | Using Gatherer |
|---------|--------------------------|---------------------------|---------------------------|-----------------|----------------|
| 1 | Intermediate operation | Yes | Yes | No, terminal | Yes |
| 2 | Incremental operation | Yes | Yes | No, terminal | Yes |
| 3 | Stateful operation | Yes, via class or capture | Yes, via class or capture | Yes | Yes |
| 4 | N:M inputs-to-outputs | No, 1:1 | No, 1:M | No, N:1 | Yes |
| 5 | Prefix consumption | No | No | No | Yes |
| 6 | Beginning-of-stream hook | No | No | Yes | Yes |
| 7 | End-of-stream hook | No | No | Yes | Yes |
| 8 | Opt-in parallelizability | No, always on | No, always on | No, always on | Yes |
As it turned out, we were a short hop away from a generic, stand-alone,
reusable, parallelizable, API for intermediate operations all along!
## But wait … there's more
While the aforementioned solution contains the bare-minimum to meet our requirements,
the following will provide some significant quality-of-life improvements.
### Compositionality
The astute reader may have noticed that by adding the `gather(Gatherer)`-method on `Stream`,
one can in-effect **compose** `Gatherer`s together:
```java
Gatherer<Integer,?,String> someGatherer = …;
Gatherer<String,?,Long> someOtherGatherer = …;
Stream<Long> stream = Stream.of(1).gather(someGatherer).gather(someOtherGatherer);
```
As it turns out, it is possible to define a `default Gatherer andThen(Gatherer)`-method on `Gatherer` which composes `Gatherer`s:
```java
Gatherer<Integer,?,String> someGatherer = …;
Gatherer<String,?,Long> someOtherGatherer = …;
Gatherer<Integer,?,Long> gatherer = someGatherer.andThen(someOtherGatherer);
```
This makes it possible to both de-couple operations from their use-sites,
as well as create more sophisticated `Gatherer`s from simple ones.
But that's not all—by introducing a `default Collector collect(Collector)`-method
it is also possible to compose a `Gatherer` with a `Collector`,
and the result is another `Collector`:
```java
Gatherer<Integer,?,String> someGatherer = …;
Gatherer<String,?,Long> someOtherGatherer = …;
Collector<Long,?,List<Long>> someGatherer.andThen(someOtherGatherer).collect(Collectors.toList());
```
All of this together, it is now possible to build `Stream`-processing:
"source-to-destination" using `Stream.gather(…).gather(…).gather(…)`;
"intermediaries-first" using `gatherer.andThen(…).andThen(…);` and,
"destination-to-source" using `gatherer.collect(otherGatherer.collect(…))`.
### Familiarity
Since `Gatherer` is an evolution of `Collector` there's a clear on-ramp
for any and all Java developers who are already familiar with `Collector`.
### Reusability
`Gatherer`s, by virtue of being stand-alone, reusable, intermediaries,
paired with compositionality and opt-in parallelization, allows for transformations
to be created once and maintained in a single place, no matter if it is going to be
used as a `Collector`, a `Gatherer`, or a `Stream`.
### Optimizability
`Collector` has the notion of `Characteristics`, a `Set` of flags which describes them.
This information can then be used by the underlying machinery which uses them
to optimize their evaluation. The same approach can be used for `Gatherer`s.
### Summary 2.0
Taking all of the above into consideration, we end up with a definition like the following:
```java
/** @param <T> the element type
* @param <A> the (mutable) intermediate accumulation type
* @param <R> the (probably immutable) final accumulation type
*/
interface Gatherer<T,A,R> {
interface Sink<R> {
boolean flush(R element);
}
interface Integrator<A,T,R> {
boolean integrate(A state, T element, Sink<? super R> downstream);
}
enum Characteristics {
GREEDY, // Never short-circuits
SIZE_PRESERVING, // Emits exactly once per input element
STATELESS; // No need to initialize or combine state
}
Supplier<A> supplier();
Integrator<A, T, R> integrator();
BinaryOperator<A> combiner();
BiConsumer<A, Sink<? super R>> finisher();
Set<Characteristics> characteristics();
default <AA, RR> Gatherer<T,?,RR> andThen(Gatherer<R,AA,RR> that) {
// Gatherers is analoguous to Collectors
return Gatherers.Composite.of(this, that);
}
default <RR> Collector<T,?,RR> collect(Collector<R, ?, RR> collector) {
// Gatherers is analoguous to Collectors
return Gatherers.GathererCollector.of(this, collector);
}
}
```
## Appetizers
The following examples showcase how `Gatherer` can be used to implement a diverse set
of pre-existing, and future, intermediate operations for `Stream`s.
### How to implement `map(mapper)`
```java
public final static <T,R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
return Gatherer.of(
() -> (Void)null,
(nothing, element, downstream) ->
downstream.flush(mapper.apply(element)),
(l,r) -> l,
(nothing, downstream) -> {}
);
}
```
```java
jshell> Stream.of(1,2,3,4).gather(map(i -> i + 1)).toList()
$1 ==> [2, 3, 4, 5]
```
### How to implement `flatMap(mapper)`
```java
public final static <T,R> Gatherer<T, ?, R> flatMap(Function<? super T, ? extends Stream<R>> mapper) {
return Gatherer.of(
() -> (Void)null,
(nothing, element, downstream) -> {
try(Stream<? extends R> s = mapper.apply(element)) {
return s == null || s.sequential().allMatch(downstream::flush);
}
},
(l,r) -> l,
(nothing, downstream) -> {}
);
}
```
```java
jshell> Stream.of(1,2,3,4).gather(flatMap(i -> Stream.of(i + 1))).toList()
$1 ==> [2, 3, 4, 5]
```
### How to implement `takeWhile(predicate)`
```java
public final static <T> Gatherer<T, ?, T> takeWhile(Predicate<? super T> predicate) {
return Gatherer.of(
() -> (Void)null,
(nothing, element, downstream) ->
predicate.test(element) && downstream.flush(element),
(l, r) -> l,
(nothing, downstream) -> {}
);
}
```
```java
jshell> Stream.of(1,2,3,4).gather(takeWhile(i -> i < 3)).toList()
$1 ==> [1, 2]
```
### How to implement `limit(N)`
```java
public static <M> Gatherer<M, ?, M> limit(long limit) {
if (limit < 0)
throw new IllegalArgumentException("'limit' has to be non-negative");
class At { long n = 0; }
return Gatherer.of(
At::new,
(at, element, downstream) ->
at.n < limit && downstream.flush(element) && ++at.n < limit,
Gatherer.unsupportedCombiner(),
(at, downstream) -> {}
);
}
```
```java
jshell> Stream.of(1,2,3,4).gather(limit(2)).toList()
$1 ==> [1, 2]
```
### How to implement prefix `scanLeft`
```java
public static <T,R> Gatherer<T,?,R> scanLeft(R initial, BiFunction<R,T,R> scanner) {
class State { R current = initial; }
return Gatherer.of(
State::new,
(state, element, downstream) ->
downstream.flush(state.current = scanner.apply(state.current, element)),
Gatherer.unsupportedCombiner(),
(state, downstream) -> {}
);
}
```
```java
jshell> Stream.of(1,2,3,4).gather(scanLeft("'", (acc, elem) -> acc + elem + "'")).toList()
$1 ==> ['1', '1'2', '1'2'3', '1'2'3'4']
```
### How to implement `fold(initial, folder)`
```java
public static <T> Gatherer<T,?,T> fold(T initial, BinaryOperator<T> folder) {
class State { T current = initial; }
return Gatherer.of(
State::new,
(state, element, downstream) -> {
state.current = folder.apply(state.current, element);
return true;
},
(l, r) -> {
l.current = folder.apply(l.current, r.current);
return l;
},
(state, downstream) -> downstream.flush(state.current)
);
}
```
```java
jshell> Stream.of(1,2,3,4).gather(fold(0, (acc, elem) -> acc + elem)).toList()
$1 ==> [10]
```
### How to implement `foldLeft(initial, folder)`
```java
public static <T,R> Gatherer<T,?,R> foldLeft(R initial, BiFunction<R,T,R> folder) {
class State { R current = initial; }
return Gatherer.of(
State::new,
(state, element, downstream) -> {
state.current = folder.apply(state.current, element);
return true;
},
Gatherer.unsupportedCombiner(),
(state, downstream) -> downstream.flush(state.current)
);
}
```
```java
jshell> Stream.of(1,2,3,4).gather(foldLeft(0L, (acc, elem) -> acc + elem)).toList()
$1 ==> [10]
```
### How to implement `deduplicateAdjacent()`
```java
public static <T> Gatherer<T,?,T> deduplicateAdjacent() {
class State { T prev; boolean hasPrev; }
return Gatherer.of(
State::new,
(state, element, downstream) -> {
if (!state.hasPrev) {
state.hasPrev = true;
state.prev = element;
return downstream.flush(element);
} else if (!Objects.equals(state.prev, element)) {
state.prev = element;
return downstream.flush(element);
} else {
return true; // skip duplicate
}
},
Gatherer.unsupportedCombiner(),
(state, downstream) -> {}
);
}
```
```java
jshell> Stream.of(1,2,2,3,2,4).gather(deduplicateAdjacent()).toList()
$1 ==> [1, 2, 3, 2, 4]
```
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20230627/a7811988/attachment-0001.htm>
More information about the core-libs-dev
mailing list