Tabulators, reducers, etc

Brian Goetz brian.goetz at oracle.com
Thu Dec 27 16:47:36 PST 2012


> I really like this suggested API. I think it would be easier to digest
> with concrete examples that show that these choices are orthogonal and
> necessary .
>
> Sam
>
> On Thu, Dec 27, 2012 at 10:31 AM, Brian Goetz <brian.goetz at oracle.com
> <mailto:brian.goetz at oracle.com>> wrote:
>
>     One option might be: use "reduce" for the purely functional forms,
>     use accumulate/__accumulateConcurrent for the others:
>
>          T reduce(T zero, BinaryOperator<T> reducer);

"Compute the sum of the squares of the first 100 integers."

int sumOfSquares = integers().map(x -> x*x)
                              .limit(100)
                              .reduce(0, Integer::sum);

where integers() generates an infinite IntStream (or maybe one that 
stops at MAX_VALUE.)

>          Optional<T> reduce(BinaryOperator<T> reducer);

"How tall is the tallest person?"

Optional<Integer> tallest
     = people.map(Person::getHeight)
             .reduce(greaterOf(naturalOrder()))

where

Comparators.naturalOrder -> Comparator<T>
Comparators.greaterOf(Comparator) -> BinaryOperator<T>


"Who is the tallest person"

Optional<Person> tallest
     = people.reduce(greaterOf(comparing(Person::getHeight)));


>          <U> U reduce(U zero, BiFunction<U, T, U> accumulator,
>     BinaryOperator<U> reducer);

"How many pages are there in this stack of documents"

int pageCount = documents.reduce(0, (c, d) -> c + d.pages(),
                                  Integer::sum);

While this can be represented as a map+reduce, sometimes the three-arg 
form provides more efficiency or flexibility.  For example, Joe came up 
with this one today -- perform String.compare on two strings of equal 
length, in parallel.

You could do this:

   intRange(0, s1.length())
     .parallel()
     .map(i -> cmp(i))
     .reduce(0, (l, r) -> (l != 0) ? l : r);

where

cmp(i) = Character.compare(s1.charAt(i), s2.charAt(i));

But, using the three-arg form, we can optimize away the irrelevant 
comparisons:

   intRange(0, s1.length())
     .parallel()
     .reduce(0, (l, i) -> (l != 0) ? l : cmp(i),
             (l, r) -> (l != 0) ? l : r));


>          <R> R accumulate(Supplier<R> seedFactory,
>                           BiBlock<R, T> accumulator,
>                           BiBlock<R, R> reducer);

This is the mutable version of the previous form, where instead of a 
seed value, there is a factory to create mutable containers, and instead 
of functions to compute a new aggregation result, we fold new values 
into the container.

Examples:

   ArrayList<String> asList
      = strings.parallel()
               .accumulate(ArrayList::new,
                           ArrayList::add,    // add(t)
                           ArrayList::addAll) // addAll(Collection)

   String concatted
      = strings.parallel()
               .accumulate(StringBuilder::new,
                           StringBuilder::add, // add(s)
                           StringBuilder::add) // add(StringBuilder)
               .toString();

   BitSet bs = numbers.parallel()
                   .aggregate(BitSet::new, BitSet::set, BitSet::or);

>          <R> R accumulate(Accumulator<T, R> reducer);

This one is a convenient form of the previous one, where instead of 
specifying three lambdas, we tie them together so they can be reused 
and/or composed.

Accumulator.OfInt<BitSet> TO_BIT_SET
     = Accumulators.make(BitSet::new, BitSet::set, BitSet::or);

BitSet bs = numbers.accumulate(TO_BIT_SET);

The reuse part is nice, but the composition part is even more important. 
  With an abstraction for Accumulator, all of our aggregations like 
groupBy, reduceBy, partition, mapTo, etc, are just accumulations, and 
its trivial to cascade them.  For example:

"Transactions by (buyer, seller)"

Map<Buyer, Map<Seller, Collection<Transaction>>
   map = txns.accumulate(groupBy(Txn::buyer, groupBy(Txn::seller));

The inner groupBy returns an Accumulator<Transaction, Map<Seller, 
Collection<Transaction>>; the outer groupBy treats this simply as a 
downstream reduction, and produces a new Accumulator.

"Largest transaction by (buyer, seller)"

Map<Buyer, Map<Seller, Transaction>> m
   = txns.accumulate(groupBy(Txn::buyer,
                             groupBy(Txn::seller,
                                     greaterOf(comparing(Txn::amount)))

"Profitable and unprofitable transactions by salesman"

Map<Seller, Collection<Transaction>[]> map =
    txns.groupBy(Txn::seller, partition(t -> t.margin() > X)));

Here, partition() returns an Accumulator<Transaction, 
Collection<Transaction>[]>.

>          <R> R accumulateConcurrent(__ConcurrentAccumulator<T, R>
>     tabulator);

All the above accumulations were order-preserving, and some used mutable 
but not shared containers.  This means that containers have to be 
merged, which often involves nontrivial copying cost.

If you have a concurrent container, AND you don't care about encounter 
order, AND your reduction functions are commutative (not just 
associative), you have another choice: shovel things into a concurrent 
data structure, and hope its contention management is less expensive 
than merging.  Note that this shoveling is really just forEach, not any 
form of reduce.

The accumulateConcurrent (open to alternate names) makes it clear that 
you are choosing this mode that has different semantics.  However, given 
a suitable container, all the same aggregations can be done as 
concurrent/mutative/order-ignoring/commutative accumulations.  So:

"Transactions by (buyer, seller)"

ConcurrentMap<Buyer, Map<Seller, Collection<Transaction>>
   map = txns.accumulateConcurrent(groupBy(ConcurrentHashMap::new,
                                           Txn::buyer,
                                           groupBy(Txn::seller));

The preference for concurrent in the method name is that without it, it 
wouldn't be possible to tell whether a given accumulation is concurrent 
or not.  Because the semantics are so different, I think this is a 
choice we shouldn't encourage brushing under the rug.

Finding another name for "accumulateConcurrent" would also be OK, maybe 
one that has "forEach" in it, like:

   map = txns.forEachInto(ConcurrentHashMap::new,
                          groupBy(Txn::buyer))



More information about the lambda-libs-spec-observers mailing list