Collectors.toConcurrentMap() strategy

Paul Sandoz paul.sandoz at oracle.com
Wed Apr 23 13:47:31 UTC 2014


On Apr 23, 2014, at 1:00 PM, Peter Levart <peter.levart at gmail.com> wrote:

> Sorry Paul, scrap that. I just realized that this is already done - by supplying the Collector.Characteristics.CONCURRENT characteristic to the constructor of CollectorImpl().
> 

Right, UNORDERED too, subtle code :-) it's a difference between a forEach or reduce on ReferencePipeline.collect:

    public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        }
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

Paul.

> Regards, Peter
> 
> On 04/23/2014 12:55 PM, Peter Levart wrote:
>> Hi Paul,
>> 
>> Unrelated to fixing the exception message, I realized that Collectors.toMap() and Collectors.toConcurrentMap() are practically equivalent. They use same strategy, only Map implementation used in collecting is different: HashMap vs. ConcurrentHashMap. This strategy does not require the map to be concurrent - each thread collects into it's own instance of the map and there is a merge step that reduces multiple map instances into a single map, mutating each map instance in a single thread at a time. Such approach is required for non-concurrent Map implementations, but ConcurrentHashMap could be better utilized by using a different approach where entries are collected concurrently into a shared single map instance. Merge-ing step is therefore not required. It's very easy to achieve that at least for the toConcurrentMap method that constructs a collector for unique keys, but I think it's also possible with other toConcurrentMap collectors that use ConcurrentMap.merge() - on top of the patch for exception message:
>> 
>> 
>>    public static <T, K, U>
>>    Collector<T, ?, ConcurrentMap<K,U>> toConcurrentMap(Function<? super T, ? extends K> keyMapper,
>> Function<? super T, ? extends U> valueMapper) {
>>        ConcurrentMap<K, U> map = new ConcurrentHashMap<>();
>>        return new CollectorImpl<>(() -> map,
>> uniqKeysMapAccumulator(keyMapper, valueMapper),
>>                                   (m1, m2) -> m1,
>>                                   CH_CONCURRENT_ID);
>>    }
>> 
>> 
>> What do you think?
>> 
>> Regards, Peter
>> 
> 



More information about the lambda-dev mailing list