RFR 8071600: Add a flat-mapping collector

Peter Levart peter.levart at gmail.com
Thu Feb 12 16:05:42 UTC 2015


On 02/12/2015 04:51 PM, Peter Levart wrote:
>
> On 02/03/2015 02:48 PM, Paul Sandoz wrote:
>> Hi,
>>
>> http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8071600-Collector-flatMapping/webrev/
>>
>> This patch adds a new flat mapping collector to Collectors. This can be useful if one needs to map 0 or more items into a downstream collector.
>>
>> A CCC will be filed.
>>
>> A following patch, which i plan to fold into the above patch, performs some renames on the collectors test to be consistent with the current naming:
>>
>> http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8071600-Collector-flatMapping-test-rename/webrev/
>>
>> Thanks,
>> Paul.
>
> Hi Paul,
>
> Would the following "optimization" make any sense?
>
>     public static <T, U, A, R>
>     Collector<T, ?, R> flatMapping(Function<? super T, ? extends 
> Stream<? extends U>> mapper,
>                                    Collector<? super U, A, R> 
> downstream) {
>         BiConsumer<A, ? super U> downstreamAccumulator = 
> downstream.accumulator();
>         return new CollectorImpl<>(downstream.supplier(),
>             (r, t) -> {
>                 try (Stream<? extends U> result = mapper.apply(t)) {
>                     if (result != null) {
>                         ((result.isParallel() &&
> !downstream.characteristics().contains(Collector.Characteristics.CONCURRENT))
>                             ? result.sequential() : result).forEach(u 
> -> downstreamAccumulator.accept(r, u));
>                     }
>                 }
>             },
>             downstream.combiner(), downstream.finisher(),
>             downstream.characteristics());
>     }
>
>
> Regards, Peter
>

Collector does not know how it is being used by the feeding Stream in 
advance, right? I guess above "optimization" would only make sense if 
the feeding Stream was sequential and flat-mapping function returned a 
parallel stream and the downstream Collector supported CONCURRENT 
collecting. But if feeding Stream is already parallel and Collector 
supports CONCURRENT collecting, then accumulating function is already 
being invoked in multiple threads and expanding each invocation into 
separate concurrent tasks would make FJ pool execute more tasks (nested 
parallelism). I don't know if this works best currently.

Peter




More information about the core-libs-dev mailing list