Tabulators, reducers, etc
Brian Goetz
brian.goetz at oracle.com
Thu Dec 27 16:47:36 PST 2012
> I really like this suggested API. I think it would be easier to digest
> with concrete examples that show that these choices are orthogonal and
> necessary .
>
> Sam
>
> On Thu, Dec 27, 2012 at 10:31 AM, Brian Goetz <brian.goetz at oracle.com
> <mailto:brian.goetz at oracle.com>> wrote:
>
> One option might be: use "reduce" for the purely functional forms,
> use accumulate/__accumulateConcurrent for the others:
>
> T reduce(T zero, BinaryOperator<T> reducer);
"Compute the sum of the squares of the first 100 integers."
int sumOfSquares = integers().map(x -> x*x)
.limit(100)
.reduce(0, Integer::sum);
where integers() generates an infinite IntStream (or maybe one that
stops at MAX_VALUE.)
> Optional<T> reduce(BinaryOperator<T> reducer);
"How tall is the tallest person?"
Optional<Integer> tallest
= people.map(Person::getHeight)
.reduce(greaterOf(naturalOrder()))
where
Comparators.naturalOrder -> Comparator<T>
Comparators.greaterOf(Comparator) -> BinaryOperator<T>
"Who is the tallest person"
Optional<Person> tallest
= people.reduce(greaterOf(comparing(Person::getHeight)));
> <U> U reduce(U zero, BiFunction<U, T, U> accumulator,
> BinaryOperator<U> reducer);
"How many pages are there in this stack of documents"
int pageCount = documents.reduce(0, (c, d) -> c + d.pages(),
Integer::sum);
While this can be represented as a map+reduce, sometimes the three-arg
form provides more efficiency or flexibility. For example, Joe came up
with this one today -- perform String.compare on two strings of equal
length, in parallel.
You could do this:
intRange(0, s1.length())
.parallel()
.map(i -> cmp(i))
.reduce(0, (l, r) -> (l != 0) ? l : r);
where
cmp(i) = Character.compare(s1.charAt(i), s2.charAt(i));
But, using the three-arg form, we can optimize away the irrelevant
comparisons:
intRange(0, s1.length())
.parallel()
.reduce(0, (l, i) -> (l != 0) ? l : cmp(i),
(l, r) -> (l != 0) ? l : r));
> <R> R accumulate(Supplier<R> seedFactory,
> BiBlock<R, T> accumulator,
> BiBlock<R, R> reducer);
This is the mutable version of the previous form, where instead of a
seed value, there is a factory to create mutable containers, and instead
of functions to compute a new aggregation result, we fold new values
into the container.
Examples:
ArrayList<String> asList
= strings.parallel()
.accumulate(ArrayList::new,
ArrayList::add, // add(t)
ArrayList::addAll) // addAll(Collection)
String concatted
= strings.parallel()
.accumulate(StringBuilder::new,
StringBuilder::add, // add(s)
StringBuilder::add) // add(StringBuilder)
.toString();
BitSet bs = numbers.parallel()
.aggregate(BitSet::new, BitSet::set, BitSet::or);
> <R> R accumulate(Accumulator<T, R> reducer);
This one is a convenient form of the previous one, where instead of
specifying three lambdas, we tie them together so they can be reused
and/or composed.
Accumulator.OfInt<BitSet> TO_BIT_SET
= Accumulators.make(BitSet::new, BitSet::set, BitSet::or);
BitSet bs = numbers.accumulate(TO_BIT_SET);
The reuse part is nice, but the composition part is even more important.
With an abstraction for Accumulator, all of our aggregations like
groupBy, reduceBy, partition, mapTo, etc, are just accumulations, and
its trivial to cascade them. For example:
"Transactions by (buyer, seller)"
Map<Buyer, Map<Seller, Collection<Transaction>>
map = txns.accumulate(groupBy(Txn::buyer, groupBy(Txn::seller));
The inner groupBy returns an Accumulator<Transaction, Map<Seller,
Collection<Transaction>>; the outer groupBy treats this simply as a
downstream reduction, and produces a new Accumulator.
"Largest transaction by (buyer, seller)"
Map<Buyer, Map<Seller, Transaction>> m
= txns.accumulate(groupBy(Txn::buyer,
groupBy(Txn::seller,
greaterOf(comparing(Txn::amount)))
"Profitable and unprofitable transactions by salesman"
Map<Seller, Collection<Transaction>[]> map =
txns.groupBy(Txn::seller, partition(t -> t.margin() > X)));
Here, partition() returns an Accumulator<Transaction,
Collection<Transaction>[]>.
> <R> R accumulateConcurrent(__ConcurrentAccumulator<T, R>
> tabulator);
All the above accumulations were order-preserving, and some used mutable
but not shared containers. This means that containers have to be
merged, which often involves nontrivial copying cost.
If you have a concurrent container, AND you don't care about encounter
order, AND your reduction functions are commutative (not just
associative), you have another choice: shovel things into a concurrent
data structure, and hope its contention management is less expensive
than merging. Note that this shoveling is really just forEach, not any
form of reduce.
The accumulateConcurrent (open to alternate names) makes it clear that
you are choosing this mode that has different semantics. However, given
a suitable container, all the same aggregations can be done as
concurrent/mutative/order-ignoring/commutative accumulations. So:
"Transactions by (buyer, seller)"
ConcurrentMap<Buyer, Map<Seller, Collection<Transaction>>
map = txns.accumulateConcurrent(groupBy(ConcurrentHashMap::new,
Txn::buyer,
groupBy(Txn::seller));
The preference for concurrent in the method name is that without it, it
wouldn't be possible to tell whether a given accumulation is concurrent
or not. Because the semantics are so different, I think this is a
choice we shouldn't encourage brushing under the rug.
Finding another name for "accumulateConcurrent" would also be OK, maybe
one that has "forEach" in it, like:
map = txns.forEachInto(ConcurrentHashMap::new,
groupBy(Txn::buyer))
More information about the lambda-libs-spec-observers
mailing list