Is this a misbehaving Collector?
Brian Goetz
brian.goetz at oracle.com
Thu Aug 29 07:08:59 PDT 2013
On 8/29/2013 2:42 AM, Michael Hixson wrote:
> Hello,
>
> I wrote some Collectors today that did a few things differently than
> those in j.u.stream.Collectors. They seemed to work properly, but it
> wasn't clear whether they were "bad collectors", depending on
> implementation details they shouldn't. I marked these areas (1, 2, 3)
> in the example below:
>
> class WordCounter {
> private final ConcurrentMap<String, LongAdder>
> frequencies = new ConcurrentHashMap<>();
>
> private final Collector<String, ?, ConcurrentMap<String, LongAdder>> // 1
> intoFrequencies = Collector.of(
> (Supplier<ConcurrentMap<String, LongAdder>>) () -> frequencies, // 2
> (frq, key) -> frq.computeIfAbsent(key, k -> new LongAdder())
> .increment(),
> (left, right) -> { throw new UnsupportedOperationException(); }, // 3
> CONCURRENT, UNORDERED, IDENTITY_FINISH);
>
> public void ingest(Collection<String> words) {
> words.parallelStream().collect(intoFrequencies);
> }
> }
>
> 1. The methods in j.u.stream.Collectors return a new Collector
> instance each time. Is that just how they happen to be implemented,
> or is there some reason that reusing a collector for multiple streams
> is not a good idea?
For Collectors that have no parameters, like toList() or counting(),
using the same one is fine.
> 2. I think I am violating the spec of Collector here by not returning
> a new container each time. Is this safe to do, or am I doomed?
Doomed, for multiple reasons.
1. Subsequent collection will be polluted by the results of previous
collections.
2. The container may get returned to the user, who may later decide to
mutate it for unrelated reasons.
3. For a parallel but not concurrent collection, each sub-task expects
an empty container.
4. What if multiple threads do a collect() on unrelated data sets?
> 3. It seems the combiner isn't used when the collector reports
> CONCURRENT. Is it safe for me to write an always-throwing combiner
> like this, given that I know my collector is concurrent?
Mostly true. IF collect() decides to do a concurrent collection, then
the combiner won't get called. If you intend it to be a programming
error if this collector is used in a non-concurrent situation, then
throwing is probably OK.
> Also, since I happened to be browsing the javadocs...
>
> The code example in Collectors.groupingByConcurrent(Function,
> Collector) may have a copy & paste error:
>
> For example, to compute the set of last names of people in each
> city, where the city names are sorted:
>
> ConcurrentMap<City, Set<String>> namesByCity
> = people.stream().collect(groupingByConcurrent(Person::getCity,
> ConcurrentSkipListMap::new,
>
> mapping(Person::getLastName, toSet())));
>
> It looks like ", where the city names are sorted" and
> "ConcurrentSkipListMap::new," are not supposed to be there.
Thanks!
More information about the lambda-dev
mailing list