Setting of UNORDERED on concurrent collectors

Brian Goetz brian.goetz at oracle.com
Mon Apr 8 12:50:48 PDT 2013


> What is groupingByConcurrent good for?  What's the difference between
> parallel and concurrent in this context?

For sequential streams, concurrent is irrelevant.  So this is only 
relevant for parallel streams.

When doing a reduction on a parallel stream, there are two obvious ways 
to do it:

  - Partition the input, reduce the chunks separately into isolated 
subresults, then combine the subresults "up the tree" into a complete 
result (call this a "traditional" parallel reduction)

  - Use some sort of thread-safe combiner, and blast input elements at 
some shared combiner from all threads (call this a "concurrent" 
reduction.)  This is more like a forEach than a reduce.  Requirements 
for this to be safe include: the combiner must be thread-safe, and the 
user must not care about order, since there's no telling in what order 
the elements will be blasted.

When the reduction is a groupBy into a Map, this can make a big 
difference because of the merging performance of HashMap.

The traditional reduction looks like this:
  - create a HashMap per partition
  - Insert the elements of this partition into the HashMap
  - Go up the tree, merging two HashMaps into one.  This involves 
iterating a key-by-key merge.  This is slow.

The concurrent reduction looks like:
  - create one ConcurrentHashMap
  - Blast elements into it using atomic methods like putIfAbsent
  - Return that, no merging

In most reasonable cases, concurrent parallel reduction with CHM blows 
away traditional parallel reduction with HashMap.  On the other hand, 
one of the casualties of the concurrent approach is ordering.

If your input is (ordered):

   [ 1, 2, 3, 4, 5, 6, 7, 8 ]

and your classifier function is:

   e % 2

then the traditional approach must yield:
  { 0 => [ 2, 4, 6, 8 ],
    1 => [ 1, 3, 5, 7] }

but the concurrent approach could yield:

  { 0 => [ 6, 2, 4, 8 ],
    1 => [ 7, 1, 3, 5 ] }

So the question is, when confronted with an obvious desire to use a 
concurrent-safe collector, do we infer that the user must not care about 
ordering?

> The most succinct indication of its function is:
>
>     The collect(Collector) method currently performs a
>     concurrent collection when all of the following are true:
>       - the stream is parallel
>       - the collector is *concurrent*
>       - the collector is unordered OR the stream is unordered

This is the current rule about whether or not collect() does a 
concurrent reduction.  My question here is whether we wish to make our 
existing concurrent collectors always be unordered, so that the last 
bullet is trivially satisfied for the built-in concurrent collectors.

> In other words, *if* I happen to use groupingByConcurrent *then* maybe a
> concurrent collection will be performed, but maybe not, depending on a
> couple other factors...

That is the current state of affairs.

> Can we make this simpler and more intuitive/predictable?  I realize
> that's what you're addressing now, but can't we go a lot farther?
>
> Can we, say, get rid of groupingByConcurrent and just assume that if the
> stream is parallel?  What do we lose?  Do we lose any functionality that
> can't be derived another way?

That would cause us to access a non-thread-safe HashMap concurrently 
from multiple threads.



More information about the lambda-libs-spec-observers mailing list