Forms for reduce() -- part 1

Brian Goetz brian.goetz at oracle.com
Fri Dec 14 10:33:09 PST 2012


I think I've got something on this.  I think the number of new API 
abstractions can be collapsed down a bit but I've left it a little 
fluffy for clarity for the moment.

Homogeneous forms:

     T reduce(T zero, BinaryOperator<T> reducer);
     Optional<T> reduce(BinaryOperator<T> reducer);

Nonhomogeneous forms (aka inject), in mutable and non-mutable flavors:

     <U> U reduce(U zero,
                  BiFunction<U, T, U> accumulator,
                  BinaryOperator<U> reducer);

     <R> R mutableReduce(Supplier<R> seedFactory,
                         BiBlock<R, T> accumulator,
                         BiBlock<R, R> reducer);

When these are needed, I suspect the mutable forms will generally be 
preferred to the functional ones(E.g., sumPages = 
docs.map(Document::getPageCount).reduce(0, Integer::sum).  This is in 
part because many catamorphisms can be described with map+reduce, and 
Java's paucity of usable immutable types other than primitives and 
strings make the remainder easier with mutable accumulators, but this 
could improve in the future, which I think makes it worth keeping the 
functional version.

Open question: do we keep the name mutableReduce to make it clear what 
is going on?

For reasons that mostly have to do with the more complex cases (see 
below), I added a type to capture mutable reduction:

public interface MutableReducer<T, R> {
     R makeAccumulator();
     void accumulate(R accumulator, T value);
     void combine(R accumulator, R other);
}

and some static factories for common mutable reductions like "create a 
new collection from the elements":

static<T, C extends Collection<T>>
      MutableReducer<T,C> intoCollection(Supplier<C> collectionFactory)


None of this new.  The new stuff is what replaces groupBy and reduceBy. 
  For the time being I'm putting these in the category of "tabulation", 
even though these are really just reductions.  (We can discuss whether 
the Tabulator abstractions carry their weight.)

There are two approaches to tabulation, I'm calling them currently 
Tabulator and ConcurrentTabulator.  The regular approach is like any 
other parallel reduce; break the input into chunks, compute the 
sub-result for each chunk, and merge the sub-results.  The concurrent 
approach (suitable for groupBy/reduceBy applications IF your reducers 
are commutative and/or you don't care about encounter order) uses a 
single ConcurrentMap to accumulate the results, relying on the 
implementation's natural contention management to allow concurrent 
updates.  Note that concurrent tabulation is more like forEach than like 
reduce.

One thing that's new here is we move the concurrent/traditional choice 
explicitly to the user, rather than trying to guess from the ORDERED 
flag.  This is good for a number of reasons, including that using the 
ORDERED flag is just a guess about what the user wants.

Currently we've got two new abstractions:

public interface Tabulator<T,R> extends MutableReducer<T,R> {
}

public interface ConcurrentTabulator<T, R> {
     R makeAccumulator();
     Block<T> makeSink(R accumulator);
}

and two new Stream methods:

     <R> R tabulate(Tabulator<T, R> tabulator);
     <R> R tabulate(ConcurrentTabulator<T, R> tabulator);

which replace groupBy / reduceBy (and also provide the equivalent of 
mapped (Stream<T>+Function<T,U> -> Map<T,U>), partition, allow 
multi-level groupBy / reduceBy, allow groupBy/reduceBy to use targets 
like Guava multimap, etc.)

The key new thing is the introduction of tabulator factory methods 
(currently in Tabulators and ConcurrentTabulators.)  There are a lot of 
variants but most of them are one-line wrappers around a small number of 
general implementations.

The simplest is groupBy:

   Tabulator<T, M> groupBy(Function<? super T, ? extends K> classifier)

This is a simple groupBy -- take a Stream<T> and a function T->K and 
produce a Map<K, Collection<T>>.  A variant takes supplier lambdas 
(generally constructor refs) if you want to customize what kind of Map 
and Collection are used (e.g., TreeMap<K, Set<T>>):

   Tabulator<T, M> groupBy(Function<? super T, ? extends K> classifier,
                           Supplier<M> mapFactory,
                           Supplier<C> rowFactory)

The user uses this like:

     Map<Author, Collection<Doc>>
         byAuthor = docs.tabulate(groupBy(Doc::getAuthor));

Under the hood groupBy is really just a reducer.

We can build what we've been calling reduceBy (awful name) on top of 
this, by providing a reducer to handle the "downstream" elements.  We 
know there will always be at least one, so there's no need to provide a 
zero or handle Optional.  Currently these are called groupBy but I think 
groupedReduce might be a better name?  There are three useful forms:

   // homogeneous reduce
   Tabulator<T, M> groupBy(Function<? super T, ? extends K> classifier,
                           BinaryOperator<T> reducer)

   // map+reduce
   Tabulator<T, M> groupBy(Function<? super T, ? extends K> classifier,
                           Function<T,U> mapper,
                           BinaryOperator<U> reducer)

   // mutable reduce
   Tabulator<T, M> groupBy(Function<? super T, ? extends K> classifier,
                           MutableReducer<T,M> downstreamReducer)

Because a Tabulator is a MutableReducer, the latter form allows 
multi-level group-by.  Some examples, using this domain model:

interface Txn {
     Buyer buyer();
     Seller seller();
     int amount();
     Date date();
}

   // Transactions by buyer
   Map<Buyer, Collection<Txn>> m = txns.tabulate(groupBy(Txn::buyer));

   // Sum of amounts by buyer
   Map<Buyer, Integer> m = txns.tabulate(groupBy(Txn::buyer,
                                                 Txn::amount,
                                                 Integer::sum));

   // Most recent transaction by buyer
   Map<Buyer, Txn> m = txns.tabulate(groupBy(Txn::buyer, moreRecent)));

Where moreRecent is defined using a new (one line) combinator in 
Comparators to take a Comparator and turn it into a BinaryOperator that 
chooses the larger:

   BinaryOperator<Txn> moreRecent
       = Comparators.greaterOf(Comparators.comparing(Txn::date)));

   // Transactions by buyer, seller

   Map<Buyer, Map<Seller, Collection<Txn>>> m
     m = txns.tabulate(groupBy(Txn::buyer, groupBy(Txn::seller)));

You can go arbitrarily deep with grouping and use any reduction you want 
at the bottom.  Specializations can customize the Map type.  Guava can 
provide tabulators that tabulate into multimaps.

So, that's the new and improved groupBy/reduceBy.  But there's more.

     Tabulator<T, M> mappedTo(Function<? super T, ? extends U> mapper,
                              BinaryOperator<U> mergeFunction)

This takes a Stream<T> and a function T->U and produces a materialized 
Map<T,U>.  The merge function deals with duplicates in the stream, using 
the new Map.merge.

     Map<Person, History<Person>>
         m = people.tabulate(p -> getHistory(p));

Partition is also trivial.  Like groupBy, we don't want to just do a 
one-level partition; we want to do an arbitrary reduction on each half. 
  Versions:

     Tabulator<T, Collection<T>[]> partition(Predicate<T>)

     Tabulator<T, R[]> partition(Predicate<T>, MutableReducer<T,R>)
     Tabulator<T, T[]> partition(Predicate<T>, BinaryOperator<T>)
     Tabulator<T, U[]> partition(Predicate<T>, Mapper<T,U>,
                                 BinaryOperator<U>)

Because tabulators are reducers, you can nest these too:

   Map<Buyer, Collection<Txn>>[] partitioned
     = txns.tabulate(partition(pred, groupBy(Txn::buyer)));

   Map<Buyer, Collection<Txn>[]>  byBuyerPartitioned
     = txns.tabulate(groupBy(Txn::buyer), partition(pred)));

There are (or will be) concurrent versions of all of these too, in 
ConcurrentTabulators.  To be checked in shortly.

I think there's no question that this approach beats the pants off of 
the existing groupBy/reduceBy.  But there are still many open issues, 
including:

  - How many forms of these can we tolerate?  The explosion is already 
slightly unfortunate (though most are one-liners.)
  - Naming -- all names are up for discussion
    - MutableReducer
    - Tabulator
    - ConcurrentTabulator
    - groupBy vs groupedReduce
    - reduce vs mutableReduce
    - reduce vs tabulate
  - Is the current concurrent-vs-non the right way to introduce this 
choice?
  - Does this approach suggest a better way to do into()?



On 12/10/2012 3:54 PM, Brian Goetz wrote:
> I have been doing some brainstorming on forms for "fold".  My primary
> goals for revisiting this include:
>
>   - As mentioned in an earlier note, I want to get Map and Collection
> out of the Streams API (groupBy and reduceBy currently intrude these).
> This message lays the groundwork for this and I will follow up on these
> in a separate note.  As I noted, there are many things currently wrong
> with the current groupBy/reduceBy that I want to fix.
>
>   - Support "mutable fold" cases better, where the "seed" is really a
> mutable container (like a StringBuffer.)
>
> I'll start with use cases.  There are some that fit purely into a
> traditional functional model, and others that fit better into a mutable
> model.  While one can wedge one into the other, I think it may be better
> to be explicit about both.
>
> I am not suggesting naming right now, they could all be called reduce,
> though we may want to use different names to describe the functional vs
> mutable cases.
>
>
> Use cases -- purely functional
> ------------------------------
>
> 1.  Homogeneous operations on monoid (e.g., sum).  Here, there is a
> monoid with a known zero.
>
>    T reduce(T zero, BinaryOperator<T> reducer)
>
> 2.  Homogeneous operations on non-monoids (e.g., min).  Here, there is
> no sensible zero, so we use Optional to reflect "nothing there". Ideally
> we would like to delay boxing to Optional until the very last operation
> (in other words, use (boolean, T) as the internal state and box to
> Optional<T> at the very end.)
>
>    Optional<T> reduce(BinaryOperator<T> reducer)
>
> 3.  Nonhomogeneous operations (aka foldl, such as "sum of weights").
> This requires an additional combiner function for this to work in parallel.
>
>   <U> U reduce(U zero, (U,T) -> U reducer, (U,U -> U) combiner)
>   <U> Optional<U> reduce(T->U first, (U,T) -> U reducer,
>                          (U,U -> U) combiner)
>
> Note that most cases where we might be inclined to return Optional<U>
> can be written as stream.map(T->U).reduce(BinaryOperator<T>).
>
> Doug points out: if we went with "null means nothing", we wouldn't need
> the optional forms.
>
> This is basically what we have now, though we're currently calling the
> last form "fold".  Doug has suggested we call them all reduce.
>
> Sub-question: people are constantly pointing out "but you don't need the
> combiner for the serial case."  My orientation here is that the serial
> case is a special case, and while we want to ensure that those cases are
> well-served, we don't necessarily want to distort the API to include
> things that *only* work in the serial case.
>
>
> Use cases -- mutable
> --------------------
>
> Many fold-like operations are better expressed with mutable state.  We
> could easily simulate them with the foldl form, but it may well be
> better to call this form out specially.  In these cases, there is also
> often a distinct internal and external representation.  I'll give them
> the deliberately stupid name mReduce for now.
>
> The general form is:
>
>   <I, R> mReduce(Supplier<I> makeEmpty,
>                  BiBlock<I, T> addElement,
>                  BiBlock<I, I> combineResults,
>                  Function<I, E> getFinalResult)
>
> Here, I is the intermediate form, and E is the result.  There are many
> cases where computations with an intermediate form is more efficient, so
> we want to maintain the intermediate form for as long as possible --
> ideally until the last possible minute (when the whole reduction is done.)
>
> The analogue of reducer/combiner in the functional forms is "accept a
> new element" (addElement) and "combine one intermediate form with
> another" (combineResults).
>
> Examples:
>
> 3.  Average.  Here, we use an array of two ints to hold length and
> count.  (Alternately we could use a custom tuple class.)  Our
> intermediate form is int[2] and our final form is Double.
>
>    Double average = integers.mReduce(() -> new int[2],
>                         (a, i) -> { a[0] += i; a[1]++ },
>                         (a, b) -> { a[0] += b[0]; a[1] += b[1] },
>                         a -> (double) a[0] / a[1]);
>
> Here, we maintain the int[2] form all the way throughout the
> computation, including as we combine up the tree, and only convert to
> double at the last minute.
>
> 4.  String concatenation
>
> The signatures of the SAMs in mReduce were chosen to work with existing
> builder-y classes such as StringBuffer or ArrayList.  We can do string
> concatenation using the functional form using String::concat, but it is
> inefficient -- lots of copying as we go up the tree.  We can still use a
> mutable fold to do a concatenation with StringBuilder and mReduce.  It
> has the nice property that all the arguments already have methods that
> have the right signature, so we can do it all with method refs.
>
>    String s = strings.mReduce(StringBuilder::new,
>                               StringBuilder::append,
>                               StringBuilder::append,
>                               StringBuilder::toString);
>
> In this example, the two append method refs are targeting different
> versions of StringBuilder.append; the first is append(String) and the
> second is append(StringBuilder).  But the compiler will figure this out.
>
> 5.  toArray
>
> We can express "toArray" as a mutable fold using ArrayList to accumulate
> values and converting to an array at the end, just as with StringBuilder:
>
>    Object[] array = foos.reduce(ArrayList::new,
>                                 ArrayList::add,
>                                 ArrayList::addAll,
>                                 ArrayList::toArray);
>
> There are other mutable reduction use cases too.  For example, sort can
> be implemented by providing a "insert in order" and a "merge sorted
> lists" method.  While these are not necessarily the most efficient
> implementation, they may well make reasonable last-ditch defaults.
>
> Both of these examples use separate internal forms (StringBuffer,
> ArrayList) and external forms (String, array).
>
>
> Finally, for reasons that may become clearer in the next message, I
> think we should consider having an abstraction for "Reducer" or
> "Reduction" that captures all the bits needed for a reduction.  This
> would allow the averager above to be reused:
>
>    double average = integers.reduce(Reducers.INT_AVERAGER);
>
> This turns into a win when we try to recast groupBy/reduceBy into being
> general reductions (next message).
>
>
> So, summary:
>
> Functional forms:
>
>      public U reduce(final U seed, final BinaryOperator<U> op) {
>
>      public Optional<U> reduce(BinaryOperator<U> op) {
>
>      public <R> R reduce(R base, Combiner<R, U, R> reducer,
> BinaryOperator<R> combiner) {
>
> Mutable form:
>
>      public <I, R> R reduce(Supplier<I> baseFactory,
>                             BiBlock<I, U> reducer,
>                             BiBlock<I, I> combiner,
>                             Function<I, R> finalResultMapper) {
>
> (and possibly a mutable form for special case where I=R)
>
> Possibly a form for a canned Reducer:
>
>     public<R> R reduce(Reducer<T,R> reducer);
>
>
>
>


More information about the lambda-libs-spec-observers mailing list