Collectors.toConcurrentMap() strategy
Paul Sandoz
paul.sandoz at oracle.com
Wed Apr 23 13:47:31 UTC 2014
On Apr 23, 2014, at 1:00 PM, Peter Levart <peter.levart at gmail.com> wrote:
> Sorry Paul, scrap that. I just realized that this is already done - by supplying the Collector.Characteristics.CONCURRENT characteristic to the constructor of CollectorImpl().
>
Right, UNORDERED too, subtle code :-) it's a difference between a forEach or reduce on ReferencePipeline.collect:
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
Paul.
> Regards, Peter
>
> On 04/23/2014 12:55 PM, Peter Levart wrote:
>> Hi Paul,
>>
>> Unrelated to fixing the exception message, I realized that Collectors.toMap() and Collectors.toConcurrentMap() are practically equivalent. They use same strategy, only Map implementation used in collecting is different: HashMap vs. ConcurrentHashMap. This strategy does not require the map to be concurrent - each thread collects into it's own instance of the map and there is a merge step that reduces multiple map instances into a single map, mutating each map instance in a single thread at a time. Such approach is required for non-concurrent Map implementations, but ConcurrentHashMap could be better utilized by using a different approach where entries are collected concurrently into a shared single map instance. Merge-ing step is therefore not required. It's very easy to achieve that at least for the toConcurrentMap method that constructs a collector for unique keys, but I think it's also possible with other toConcurrentMap collectors that use ConcurrentMap.merge() - on top of the patch for exception message:
>>
>>
>> public static <T, K, U>
>> Collector<T, ?, ConcurrentMap<K,U>> toConcurrentMap(Function<? super T, ? extends K> keyMapper,
>> Function<? super T, ? extends U> valueMapper) {
>> ConcurrentMap<K, U> map = new ConcurrentHashMap<>();
>> return new CollectorImpl<>(() -> map,
>> uniqKeysMapAccumulator(keyMapper, valueMapper),
>> (m1, m2) -> m1,
>> CH_CONCURRENT_ID);
>> }
>>
>>
>> What do you think?
>>
>> Regards, Peter
>>
>
More information about the lambda-dev
mailing list