Simplifying sequential() / parallel()
Brian Goetz
brian.goetz at oracle.com
Thu Mar 21 12:47:49 PDT 2013
Doug and I have been revisiting sequential() and parallel(). I think
there's a nice simplification here.
The original motivation for sequential() was because we originally had
.into(collection), and many collections were not thread-safe so we
needed a means to bring the computation back to the current thread. The
.sequential() method did that, but it also brought back a constraint of
encounter ordering with it, because if people did:
stuff.parallel().map(...).sequential().into(new ArrayList<>());
not respecting encounter order would violate the principle of least
astonishment.
So the original motivation for sequential() was "bring the computation
back to the current thread, in order." This was doable, but has a high
price -- a full barrier where we buffer the contents of the entire
stream before doing any forEach'ing.
Most of the time, sequential() only appears right before the terminal
operation. But, once implemented, there was no reason to constrain this
to appear right before the forEach/into, so we didn't.
Once we discovered a need for .parallel(), it made sense it be the dual
of .sequential() -- fully unconstrained. And again, the implementation
wasn't *that* bad -- better than .sequential(). But again, the most
desirable position for .parallel() is right after the source.
Then we killed into() and replaced it with reduction, which is a much
smarter way of managing ordering. Eliminating half the justification
for .sequential().
As far as I can tell, the remaining use cases for .sequential() are just
modifiers to forEach to constrain it, in order, to the current thread.
As in:
ints().parallel().filter(i -> isPrime(i))
.sequential().forEach(System.out::println)
Which could be replaced by
.forEachSequentialAndOrderedInCurrentThread(), with a suitably better
name. Which could further be simplified to ditch the "in current
thread" part by doing some locking in the implementation, which brings
us to .forEachOrdered(action). Which nicely complements
.collectUnordered, and the two actually stand better with their duals
present (reduce is by default ordered; forEach is by default unordered.)
The "put it anywhere" behavior of .parallel was completely bootstrapped
on the "put it anywhere" nature of .sequential; we never really set out
to support transitions in the API.
So, pulling the rug out from under the house of cards, I think we can
fall back to:
1. Modify semantics of .sequential and .parallel to apply globally to
the entire pipeline. This works because pipelines are fully lazy
anyway, so we don't commit to seq-ness/par-ness until we hit the
terminal op. So they are functional versions of "set the seq/par bit in
the source". And that simplifies the specification of seq/par down to a
single property of the entire pipeline -- much easier to spec.
2. Add .forEachOrdered. For sequential streams, this is just .forEach.
For par streams, we use a lock to avoid concurrent invocation of the
lambda, and loosen up the current behavior from "full barrier" to
"partial barrier", so that when the next chunk is available, we can
start working right away. This is easy to accomplish using the existing
AbstractTask machinery.
Before we go there, does anyone have use cases for .sequential() /
.parallel() that *don't* put the parallel right after the source, or the
sequential right before a forEach?
More information about the lambda-libs-spec-observers
mailing list