Simplifying sequential() / parallel()

Brian Goetz brian.goetz at oracle.com
Thu Mar 21 12:47:49 PDT 2013


Doug and I have been revisiting sequential() and parallel().  I think 
there's a nice simplification here.

The original motivation for sequential() was because we originally had 
.into(collection), and many collections were not thread-safe so we 
needed a means to bring the computation back to the current thread.  The 
.sequential() method did that, but it also brought back a constraint of 
encounter ordering with it, because if people did:

    stuff.parallel().map(...).sequential().into(new ArrayList<>());

not respecting encounter order would violate the principle of least 
astonishment.

So the original motivation for sequential() was "bring the computation 
back to the current thread, in order."  This was doable, but has a high 
price -- a full barrier where we buffer the contents of the entire 
stream before doing any forEach'ing.

Most of the time, sequential() only appears right before the terminal 
operation.  But, once implemented, there was no reason to constrain this 
to appear right before the forEach/into, so we didn't.

Once we discovered a need for .parallel(), it made sense it be the dual 
of .sequential() -- fully unconstrained.  And again, the implementation 
wasn't *that* bad -- better than .sequential().  But again, the most 
desirable position for .parallel() is right after the source.

Then we killed into() and replaced it with reduction, which is a much 
smarter way of managing ordering.  Eliminating half the justification 
for .sequential().

As far as I can tell, the remaining use cases for .sequential() are just 
modifiers to forEach to constrain it, in order, to the current thread.

As in:
   ints().parallel().filter(i -> isPrime(i))
         .sequential().forEach(System.out::println)

Which could be replaced by 
.forEachSequentialAndOrderedInCurrentThread(), with a suitably better 
name.  Which could further be simplified to ditch the "in current 
thread" part by doing some locking in the implementation, which brings 
us to .forEachOrdered(action).  Which nicely complements 
.collectUnordered, and the two actually stand better with their duals 
present (reduce is by default ordered; forEach is by default unordered.)

The "put it anywhere" behavior of .parallel was completely bootstrapped 
on the "put it anywhere" nature of .sequential; we never really set out 
to support transitions in the API.

So, pulling the rug out from under the house of cards, I think we can 
fall back to:

1.  Modify semantics of .sequential and .parallel to apply globally to 
the entire pipeline.  This works because pipelines are fully lazy 
anyway, so we don't commit to seq-ness/par-ness until we hit the 
terminal op.  So they are functional versions of "set the seq/par bit in 
the source".  And that simplifies the specification of seq/par down to a 
single property of the entire pipeline -- much easier to spec.

2.  Add .forEachOrdered.  For sequential streams, this is just .forEach. 
  For par streams, we use a lock to avoid concurrent invocation of the 
lambda, and loosen up the current behavior from "full barrier" to 
"partial barrier", so that when the next chunk is available, we can 
start working right away.  This is easy to accomplish using the existing 
AbstractTask machinery.


Before we go there, does anyone have use cases for .sequential() / 
.parallel() that *don't* put the parallel right after the source, or the 
sequential right before a forEach?




More information about the lambda-libs-spec-observers mailing list