Is this a misbehaving Collector?

Paul Sandoz paul.sandoz at oracle.com
Thu Aug 29 03:33:57 PDT 2013


On Aug 29, 2013, at 8:42 AM, Michael Hixson <michael.hixson at gmail.com> wrote:

> Hello,
> 
> I wrote some Collectors today that did a few things differently than
> those in j.u.stream.Collectors.  They seemed to work properly, but it
> wasn't clear whether they were "bad collectors", depending on
> implementation details they shouldn't.  I marked these areas (1, 2, 3)
> in the example below:
> 
>  class WordCounter {
>    private final ConcurrentMap<String, LongAdder>
>    frequencies = new ConcurrentHashMap<>();
> 
>    private final Collector<String, ?, ConcurrentMap<String, LongAdder>> // 1
>    intoFrequencies = Collector.of(
>        (Supplier<ConcurrentMap<String, LongAdder>>) () -> frequencies,  // 2
>        (frq, key) -> frq.computeIfAbsent(key, k -> new LongAdder())
>                         .increment(),
>        (left, right) -> { throw new UnsupportedOperationException(); }, // 3
>        CONCURRENT, UNORDERED, IDENTITY_FINISH);
> 
>    public void ingest(Collection<String> words) {
>      words.parallelStream().collect(intoFrequencies);
>    }
>  }
> 
> 1. The methods in j.u.stream.Collectors return a new Collector
> instance each time.  Is that just how they happen to be implemented,
> or is there some reason that reusing a collector for multiple streams
> is not a good idea?
> 

You can reuse a Collector, but the methods on Collectors do return new instances, since they hold functions that are passed to it. However, in certain cases we could optimise for zero argument cases such as toList or toSet, rather than reusing the telescoping methods.


> 2. I think I am violating the spec of Collector here by not returning
> a new container each time.  Is this safe to do, or am I doomed?
> 

Yes, you are breaking the contract. 

If this collector, for some reason is not used concurrently it will not work. You guarded against this by the Collector reporting UNORDERED, but it will break if you use as a downstream collector of, say, partitioningBy. Now that could arguably suggest an optimization bug in partioningBy, but the UNORDERED/CONCURRENT can be considered hints, it should be possible to ignore them and still produce correct results, but not necessarily optimally.


> 3. It seems the combiner isn't used when the collector reports
> CONCURRENT.  Is it safe for me to write an always-throwing combiner
> like this, given that I know my collector is concurrent?
> 

No, because, as above, you don't know the context it will be used in. So the rule is: stick the contract, if the "thing" doing the collector processing knows how to managing concurrent processing it will optimize it's use of the collector, so that the supplier is only called once and the combiner is never called.

I suppose you probably know that there are also alternative ways of implementing historgram-like functionality using toMap, groupingBy, or groupingByConcurrent, for example:

  toMap(Function.identity(), c -> 1, Integer::sum);
  groupingBy(Function.identity(), reducing(0, c -> 1, Integer::sum))
  groupingByConcurrent(Function.identity(), reducing(0, c -> 1, Integer::sum))



> Also, since I happened to be browsing the javadocs...
> 
> The code example in Collectors.groupingByConcurrent(Function,
> Collector) may have a copy & paste error:
> 
>  For example, to compute the set of last names of people in each
> city, where the city names are sorted:
> 
>     ConcurrentMap<City, Set<String>> namesByCity
>         = people.stream().collect(groupingByConcurrent(Person::getCity,
> ConcurrentSkipListMap::new,
> 
> mapping(Person::getLastName, toSet())));
> 
> It looks like ", where the city names are sorted" and
> "ConcurrentSkipListMap::new," are not supposed to be there.
> 

Thanks! fixed.  I urge more reporting stuff like this it is most helpful, i am sure there are more errors lurking around. In fact, in general, reviews of the JavaDoc would be very much appreciated.

Paul.


More information about the lambda-dev mailing list