Collectors -- finally!

Brian Goetz brian.goetz at oracle.com
Sat Mar 16 09:22:42 PDT 2013


I believe I have the last word on Collector.

Recall the overriding goal of this effort is to support *composibility*, 
so that "intermediate collecting stages" like groupBy / partition / 
mapping can be combined with other collections/reductions to let the 
user easily mix and match, rather than providing limited ad-hoc 
reductions like "groupBy".

In retrospect, the central tension in the API, which was causing in 
various versions API bloat and API complexity, was the fact that we were 
treating cascaded functional reduction and cascaded mutable reduction 
differently, when both are really 95% the same thing.  In the first 
version, it meant 16 forms of grouping{By,Reduce}, and half of those 
were due to reduction.  In the version from last week, this trouble came 
out as overloading the GroupingCollector as being both a Collector and a 
factory for more complex Collectors.

The answer, I believe, is to "detune" the Collector abstraction so that 
it can model either a functional or a mutable reduction.  This slightly 
increases the pain of implementing a Collector (but not really), and 
slightly-more-than-slightly increases the pain of implementing reduction 
atop a Collector -- but that's good because the only true client of 
Collector is the Streams implementation, and that's where the pain 
belongs.  By moving the pain to the core framework implementation, the 
user code gets simpler, there are fewer exposed concepts, and the 
explosion of combinations becomes more manageable.

Here's Collector now:

public interface Collector<T, R> {
     Supplier<R> resultSupplier();
     BiFunction<R, T, R> accumulator();
     BinaryOperator<R> combiner();
     default boolean isConcurrent() { return false; }
     default boolean isStable() { return false; }
}

API-wise, what's changed is accumulator returns a BiFunction, not a 
BiConsumer, raising the possibility that the accumulation operation 
could change the container.  This opens the doors to some more 
interesting Collector implementations, and makes it more parallel with 
the combiner, which we turned into a BiFunction in the first round of 
Collector.  The other new method is "isStable" (better name invited), 
which is merely an indication that this collector will act as an "old 
style" mutable Collector which opens the doors to some optimizations in 
the concurrent implementation.  (Ignore for now, its purely an 
optimization.)  Spec-wise, it gets more complicated because there's more 
things a Collector can do.  But again, that's mostly our problem.

What this means is that we can now (finally) define a Collector for 
reduction:

   Collector<T,T> reducing(BinaryOperator<T>)

Which means that half the forms (reduce, map-reduce) of the grouping and 
partitioning combinators go away, and instead just fold into the 
"cascaded collector" form.  Which leaves us with the following grouping 
forms:

   groupingBy(classifier)
   groupingBy(classifier, mapCtor)
   groupingBy(classifier, downstreamCollector)
   groupingBy(classifier, mapCtor, downstreamCollector)

along with groupingByConcurrent version of both.  This is still a few 
versions, but it should be clear enough how they differ.

The "max sale by salesman" example now becomes:

Map<Seller, Integer>
   txns.collect(groupingBy(Txn::seller,
                           mapping(Txn::amount,
                                   reducing(Integer::max)));

 From the previous version, the intermediate types 
GroupingCollector/PartitionCollector go away, as does the unfortunate 
type fudgery with the map constructors.  This is basically like the 
original version, but with half the groupingBy forms replaced with a 
single reducing() form.

The Collectors inventory now stands at:

  - toList()
  - toSet()
  - toCollection(ctor)
  - toStringBuilder()
  - toStringJoiner(sep)
  - to{Int,Long,Double}Statistics

  - toMap(mappingFn)
  - toMap(mappingFn, mapCtor, mergeFn)
  - toConcurrentMap(mappingFn)
  - toConcurrentMap(mappingFn, mapCtor, mergeFn)

  - mapping(fn, collector) // plus primitive specializations
  - reducing(BinaryOperator) // plus primitive specializations

  - groupingBy(classifier)
  - groupingBy(classifier, mapCtor)
  - groupingBy(classifier, downstreamCollector)
  - groupingBy(classifier, mapCtor, downstreamCollector)
  - groupingByConcurrent(classifier)
  - groupingByConcurrent(classifier, mapCtor)
  - groupingByConcurrent(classifier, downstreamCollector)
  - groupingByConcurrent(classifier, mapCtor, downstreamCollector)

  - partitioningBy(predicate)

The more flexible Collector API gives us new opportunities, too.  For 
example, toList used to use exclusively ArrayList.  But this version is 
more memory efficient:

     public static<T>
     Collector<T, List<T>> toList() {
         BiFunction<List<T>, T, List<T>> accumulator = (list, t) -> {
             int s = list.size();
             if (s == 0)
                 return Collections.singletonList(t);
             else if (s == 1) {
                 List<T> newList = new ArrayList<>();
                 newList.add(list.get(0));
                 newList.add(t);
                 return newList;
             }
             else {
                 list.add(t);
                 return list;
             }
         };
         BinaryOperator<List<T>> combiner = (left, right) -> {
             if (left.size() > 1) {
                 left.addAll(right);
                 return left;
             }
             else {
                 List<T> newList = new ArrayList<>(left.size() + 
right.size());
                 newList.addAll(left);
                 newList.addAll(right);
                 return newList;
             }
         };
         return new CollectorImpl<>(Collections::emptyList, accumulator, 
combiner, false, false);
     }



More information about the lambda-libs-spec-experts mailing list