Yet another run at reduce
Brian Goetz
brian.goetz at oracle.com
Tue Jan 8 07:15:26 PST 2013
Here's a regular reducer that does a two-level groupBy with a downstream
reducer:
static <T, K, D, M extends Map<K, D>>
Reducer<T, M>
groupBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Reducer<T, D> downstream) {
return new MergingMapReducer<T, K, D, M>(mapFactory,
downstream::combine) {
@Override
public void accumulate(M map, T value) {
K key = Objects.requireNonNull(classifier.apply(value),
"element cannot be mapped to a null key");
downstream.accumulate(map.computeIfAbsent(key, k ->
downstream.makeResult()), value);
}
};
}
where this helper class defines the stuff common to all merging-map
reducers:
private static abstract class MergingMapReducer<T, K, V, M extends
Map<K, V>> implements Reducer<T,M> {
private final Supplier<M> mapFactory;
private final BinaryOperator<V> mergeFunction;
protected MergingMapReducer(Supplier<M> mapFactory,
BinaryOperator<V> mergeFunction) {
this.mapFactory = mapFactory;
this.mergeFunction = mergeFunction;
}
@Override
public M makeResult() {
return mapFactory.get();
}
protected void accumulate(M map, K key, V newValue) {
map.merge(key, newValue, mergeFunction);
}
@Override
public M combine(M map, M other) {
for (Map.Entry<K, V> e : other.entrySet())
map.merge(e.getKey(), e.getValue(), mergeFunction);
return map;
}
}
And here's the concurrent version:
static <T, K, D, M extends ConcurrentMap<K, D>>
Reducer<T, M> groupBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Reducer<T, D> downstream) {
return new ConcurrentMapReducer<T, K, D, M>(mapFactory,
downstream::combine) {
@Override
public void accumulate(M map, T t) {
D container = map.computeIfAbsent(classifier.apply(t),
k -> downstream.makeResult());
if (downstream.isConcurrent()) {
downstream.accumulate(container, t);
}
else {
synchronized (container) {
downstream.accumulate(container, t);
}
}
}
};
}
Or by examples, did you mean use examples?
On 1/8/2013 9:41 AM, Tim Peierls wrote:
> Sounds great -- I'd be interested in seeing some basic examples of both
> types of reducer.
>
> --tim
>
> On Mon, Jan 7, 2013 at 8:31 PM, Brian Goetz <brian.goetz at oracle.com
> <mailto:brian.goetz at oracle.com>> wrote:
>
> I think Doug and I made some progress on reduce forms today.
>
> Recap: there are two ways to do a reduction to a mutable data
> structure like a Map; a classic fold (where you create lots of
> little maps, one per leaf of the computation tree, accumulate leaf
> data into them in a thread-confined manner, then merge their
> contents up the tree into one big map), and a concurrent bombardment
> (where you create one big concurrent-safe map, and blast elements in
> from many threads.) Call these "A" and "B".
>
> The requirement for A is that:
> - your combining functions are associative
> - you can create multiple containers
> - you can incorporate a new element into a container
> - you can merge containers in a way that satisfies the obvious
> distributive requirement
>
> If you meet these requirements, you can do a parallel A reduce that
> respects encounter order.
>
> The requirement for B is that:
> - your combining functions are associative and commutative
> - you can incorporate a new element into a container
> - your container support concurrent incorporation
>
> If you meet these requirements, you can do a parallel B reduce that
> does NOT respect order, but may be faster (or slower, if contention
> is high enough.)
>
> Key observation: A "B" reducer can become an "A" reducer simply by
> adding the merge ability, which is no harder than for regular A
> reducers.
>
> So rather than have A reducers and B reducers, we can have A
> reducers and A+B reducers. It's only marginally more work for B
> reducer writers.
>
> So...
>
> public interface Reducer<T, R> {
> boolean isConcurrent();
> R makeResult();
> void accumulate(R result, T value);
> R combine(R result, R other);
> }
>
> A reducers return 'false' for isConcurrent; B reducers return 'true'.
>
> What was Concurrent{Reducer,Tabulator,__Accumulator} goes away.
>
> Now, in addition to the various forms of reduce(), we add overloaded:
>
> reduce(Reducer)
> reduceUnordered(Reducer)
>
> You will get a B reduction if (a) you selected reduceUnordered and
> (b) your reducer is concurrent. Otherwise you will get an A reduction.
>
> This is nice because the knowledge of properties of "user doesn't
> care about order" and "target supports concurrent insertion"
> naturally live in different places; this separates them properly.
> The latter is a property of the reducer implementation; the former
> only the user knows about and therefore should live at the call
> site. Each contributes their bit and can mostly remain ignorant of
> the other; the library combines these bits, and if both are present,
> you get a B reduction.
>
> The reduceUnordered() method can be cast as an optimization to
> reduce(); it is always correct to use reduce(), but may be faster to
> use reduceUnordered(), as long as you are willing to forgo order and
> the reducer is coooperative.
>
> In neither case (assuming a properly implemented reducer) will you
> ever get a thread-unsafe result; if you ask for an unordered
> reduction with a nonconcurrent reducer, you get a safe ordered
> reduction instead, which might be slower (or faster.)
>
> Pros:
> - ConcurrentReducer goes away
> - .unordered() goes away
> - Properly separates order-sensitivity from concurrency
>
> Cons:
> - Two variations on various aspects of reducing are surfaced in
> the API (unordered and concurrent) that will make users say "huh?"
> (but if you get it wrong /choose to stay ignorant you still get the
> right answer.)
>
> Implementation coming.
>
>
More information about the lambda-libs-spec-experts
mailing list