Yet another run at reduce

Brian Goetz brian.goetz at oracle.com
Tue Jan 8 07:15:26 PST 2013


Here's a regular reducer that does a two-level groupBy with a downstream 
reducer:

     static <T, K, D, M extends Map<K, D>>
     Reducer<T, M>
     groupBy(Function<? super T, ? extends K> classifier,
             Supplier<M> mapFactory,
             Reducer<T, D> downstream) {
         return new MergingMapReducer<T, K, D, M>(mapFactory, 
downstream::combine) {
             @Override
             public void accumulate(M map, T value) {
                 K key = Objects.requireNonNull(classifier.apply(value), 
"element cannot be mapped to a null key");
                 downstream.accumulate(map.computeIfAbsent(key, k -> 
downstream.makeResult()), value);
             }
         };
     }

where this helper class defines the stuff common to all merging-map 
reducers:

     private static abstract class MergingMapReducer<T, K, V, M extends 
Map<K, V>> implements Reducer<T,M> {
         private final Supplier<M> mapFactory;
         private final BinaryOperator<V> mergeFunction;

         protected MergingMapReducer(Supplier<M> mapFactory,
                                     BinaryOperator<V> mergeFunction) {
             this.mapFactory = mapFactory;
             this.mergeFunction = mergeFunction;
         }

         @Override
         public M makeResult() {
             return mapFactory.get();
         }

         protected void accumulate(M map, K key, V newValue) {
             map.merge(key, newValue, mergeFunction);
         }

         @Override
         public M combine(M map, M other) {
             for (Map.Entry<K, V> e : other.entrySet())
                 map.merge(e.getKey(), e.getValue(), mergeFunction);
             return map;
         }
     }

And here's the concurrent version:

     static <T, K, D, M extends ConcurrentMap<K, D>>
     Reducer<T, M> groupBy(Function<? super T, ? extends K> classifier,
                           Supplier<M> mapFactory,
                           Reducer<T, D> downstream) {
         return new ConcurrentMapReducer<T, K, D, M>(mapFactory, 
downstream::combine) {
             @Override
             public void accumulate(M map, T t) {
                 D container = map.computeIfAbsent(classifier.apply(t), 
k -> downstream.makeResult());
                 if (downstream.isConcurrent()) {
                     downstream.accumulate(container, t);
                 }
                 else {
                     synchronized (container) {
                         downstream.accumulate(container, t);
                     }
                 }
             }
         };
     }

Or by examples, did you mean use examples?





On 1/8/2013 9:41 AM, Tim Peierls wrote:
> Sounds great -- I'd be interested in seeing some basic examples of both
> types of reducer.
>
> --tim
>
> On Mon, Jan 7, 2013 at 8:31 PM, Brian Goetz <brian.goetz at oracle.com
> <mailto:brian.goetz at oracle.com>> wrote:
>
>     I think Doug and I made some progress on reduce forms today.
>
>     Recap: there are two ways to do a reduction to a mutable data
>     structure like a Map; a classic fold (where you create lots of
>     little maps, one per leaf of the computation tree, accumulate leaf
>     data into them in a thread-confined manner, then merge their
>     contents up the tree into one big map), and a concurrent bombardment
>     (where you create one big concurrent-safe map, and blast elements in
>     from many threads.)  Call these "A" and "B".
>
>     The requirement for A is that:
>       - your combining functions are associative
>       - you can create multiple containers
>       - you can incorporate a new element into a container
>       - you can merge containers in a way that satisfies the obvious
>     distributive requirement
>
>     If you meet these requirements, you can do a parallel A reduce that
>     respects encounter order.
>
>     The requirement for B is that:
>       - your combining functions are associative and commutative
>       - you can incorporate a new element into a container
>       - your container support concurrent incorporation
>
>     If you meet these requirements, you can do a parallel B reduce that
>     does NOT respect order, but may be faster (or slower, if contention
>     is high enough.)
>
>     Key observation: A "B" reducer can become an "A" reducer simply by
>     adding the merge ability, which is no harder than for regular A
>     reducers.
>
>     So rather than have A reducers and B reducers, we can have A
>     reducers and A+B reducers.  It's only marginally more work for B
>     reducer writers.
>
>     So...
>
>     public interface Reducer<T, R> {
>          boolean isConcurrent();
>          R makeResult();
>          void accumulate(R result, T value);
>          R combine(R result, R other);
>     }
>
>     A reducers return 'false' for isConcurrent; B reducers return 'true'.
>
>     What was Concurrent{Reducer,Tabulator,__Accumulator} goes away.
>
>     Now, in addition to the various forms of reduce(), we add overloaded:
>
>        reduce(Reducer)
>        reduceUnordered(Reducer)
>
>     You will get a B reduction if (a) you selected reduceUnordered and
>     (b) your reducer is concurrent.  Otherwise you will get an A reduction.
>
>     This is nice because the knowledge of properties of "user doesn't
>     care about order" and "target supports concurrent insertion"
>     naturally live in different places; this separates them properly.
>       The latter is a property of the reducer implementation; the former
>     only the user knows about and therefore should live at the call
>     site.  Each contributes their bit and can mostly remain ignorant of
>     the other; the library combines these bits, and if both are present,
>     you get a B reduction.
>
>     The reduceUnordered() method can be cast as an optimization to
>     reduce(); it is always correct to use reduce(), but may be faster to
>     use reduceUnordered(), as long as you are willing to forgo order and
>     the reducer is coooperative.
>
>     In neither case (assuming a properly implemented reducer) will you
>     ever get a thread-unsafe result; if you ask for an unordered
>     reduction with a nonconcurrent reducer, you get a safe ordered
>     reduction instead, which might be slower (or faster.)
>
>     Pros:
>       - ConcurrentReducer goes away
>       - .unordered() goes away
>       - Properly separates order-sensitivity from concurrency
>
>     Cons:
>       - Two variations on various aspects of reducing are surfaced in
>     the API (unordered and concurrent) that will make users say "huh?"
>     (but if you get it wrong /choose to stay ignorant you still get the
>     right answer.)
>
>     Implementation coming.
>
>


More information about the lambda-libs-spec-experts mailing list