Re: RFR[8238286]: 'Add new flatMap stream operation that is more amenable to pushing’

Tagir Valeev amaembo at gmail.com
Thu Jun 25 04:04:17 UTC 2020


Hello!

To me, it looks like it's possible to make the better default
implementation. It could be done even as a separate static method:

static <T> Stream<T> ofPusher(Consumer<Consumer<? super T>> pusher) {
  return StreamSupport.stream(new Spliterator<>() {
    private Spliterator<T> delegate;

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
      initDelegate();
      return delegate.tryAdvance(action);
    }

    private void initDelegate() {
      if (delegate == null) {
        Stream.Builder<T> builder = Stream.builder(); // or use
SpinedBuffer directly
        pusher.accept(builder);
        delegate = builder.build().spliterator();
      }
    }

    @Override
    public void forEachRemaining(Consumer<? super T> action) {
      if (delegate != null) {
        delegate.forEachRemaining(action);
      } else {
        pusher.accept(action);
      }
    }

    @Override
    public Spliterator<T> trySplit() {
      initDelegate();
      return delegate.trySplit();
    }

    @Override
    public long estimateSize() {
      return Long.MAX_VALUE;
    }

    @Override
    public int characteristics() {
      return 0;
    }
  }, false);
}

In this case, we are buffering only if short-circuit operation or
splitting is requested. Otherwise, forEachRemaining will just delegate
to the pusher.
Now, the default implementation could be rewritten as

<T, R> Stream<R> mapMulti(Stream<T> stream, BiConsumer<Consumer<?
super R>, ? super T> mapper) {
  Objects.requireNonNull(mapper);
  return stream.flatMap(e -> ofPusher(sink -> mapper.accept(sink, e)));
}

And now, I don't think it's necessary to specialize it at all.
Probably it's not necessary to introduce mapMulti at all as well, as
now it's a trivial delegate to ofPusher.

With best regards,
Tagir Valeev.

On Wed, Jun 24, 2020 at 5:58 PM Patrick Concannon
<patrick.concannon at oracle.com> wrote:
>
> Hi,
>
> Could someone please review myself and Julia's RFE and CSR for JDK-8238286 - 'Add new flatMap stream operation that is more amenable to pushing’?
>
> This proposal is to add a new flatMap-like operation:
>
> `<R> Stream<R> mapMulti(BiConsumer<Consumer<R>, ? super T> mapper)`
>
> to the java.util.Stream class. This operation is more receptive to the pushing or yielding of values than the current implementation that internally assembles values (if any) into one or more streams. This addition includes the primitive variations of the operation i.e. mapMultiToInt, IntStream mapMulti, etc.
>
> issue: https://bugs.openjdk.java.net/browse/JDK-8238286 <https://bugs.openjdk.java.net/browse/JDK-8238286>
> csr: https://bugs.openjdk.java.net/browse/JDK-8248166 <https://bugs.openjdk.java.net/browse/JDK-8248166>
>
> webrev: http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/ <http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/>
> specdiff: http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html  <http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html>
>
>
> Kind regards,
> Patrick & Julia


More information about the core-libs-dev mailing list