Re: RFR[8238286]: 'Add new flatMap stream operation that is more amenable to pushing’

Patrick Concannon patrick.concannon at oracle.com
Thu Jul 2 14:38:46 UTC 2020


Hi Tagir,

Thank you for your input. However, we feel that this approach may add more complexity than is needed at this stage. It's a good point, and one that we think is reasonable to consider and evaluate as a possible follow on from this initial proposal.

Kind regards,

Patrick

> On 25 Jun 2020, at 05:04, Tagir Valeev <amaembo at gmail.com> wrote:
> 
> Hello!
> 
> To me, it looks like it's possible to make the better default
> implementation. It could be done even as a separate static method:
> 
> static <T> Stream<T> ofPusher(Consumer<Consumer<? super T>> pusher) {
>  return StreamSupport.stream(new Spliterator<>() {
>    private Spliterator<T> delegate;
> 
>    @Override
>    public boolean tryAdvance(Consumer<? super T> action) {
>      initDelegate();
>      return delegate.tryAdvance(action);
>    }
> 
>    private void initDelegate() {
>      if (delegate == null) {
>        Stream.Builder<T> builder = Stream.builder(); // or use
> SpinedBuffer directly
>        pusher.accept(builder);
>        delegate = builder.build().spliterator();
>      }
>    }
> 
>    @Override
>    public void forEachRemaining(Consumer<? super T> action) {
>      if (delegate != null) {
>        delegate.forEachRemaining(action);
>      } else {
>        pusher.accept(action);
>      }
>    }
> 
>    @Override
>    public Spliterator<T> trySplit() {
>      initDelegate();
>      return delegate.trySplit();
>    }
> 
>    @Override
>    public long estimateSize() {
>      return Long.MAX_VALUE;
>    }
> 
>    @Override
>    public int characteristics() {
>      return 0;
>    }
>  }, false);
> }
> 
> In this case, we are buffering only if short-circuit operation or
> splitting is requested. Otherwise, forEachRemaining will just delegate
> to the pusher.
> Now, the default implementation could be rewritten as
> 
> <T, R> Stream<R> mapMulti(Stream<T> stream, BiConsumer<Consumer<?
> super R>, ? super T> mapper) {
>  Objects.requireNonNull(mapper);
>  return stream.flatMap(e -> ofPusher(sink -> mapper.accept(sink, e)));
> }
> 
> And now, I don't think it's necessary to specialize it at all.
> Probably it's not necessary to introduce mapMulti at all as well, as
> now it's a trivial delegate to ofPusher.
> 
> With best regards,
> Tagir Valeev.
> 
> On Wed, Jun 24, 2020 at 5:58 PM Patrick Concannon
> <patrick.concannon at oracle.com> wrote:
>> 
>> Hi,
>> 
>> Could someone please review myself and Julia's RFE and CSR for JDK-8238286 - 'Add new flatMap stream operation that is more amenable to pushing’?
>> 
>> This proposal is to add a new flatMap-like operation:
>> 
>> `<R> Stream<R> mapMulti(BiConsumer<Consumer<R>, ? super T> mapper)`
>> 
>> to the java.util.Stream class. This operation is more receptive to the pushing or yielding of values than the current implementation that internally assembles values (if any) into one or more streams. This addition includes the primitive variations of the operation i.e. mapMultiToInt, IntStream mapMulti, etc.
>> 
>> issue: https://bugs.openjdk.java.net/browse/JDK-8238286 <https://bugs.openjdk.java.net/browse/JDK-8238286>
>> csr: https://bugs.openjdk.java.net/browse/JDK-8248166 <https://bugs.openjdk.java.net/browse/JDK-8248166>
>> 
>> webrev: http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/ <http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/>
>> specdiff: http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html  <http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html>
>> 
>> 
>> Kind regards,
>> Patrick & Julia



More information about the core-libs-dev mailing list