Simplifying sequential() / parallel()

Remi Forax forax at univ-mlv.fr
Thu Mar 21 13:02:53 PDT 2013


On 03/21/2013 08:47 PM, Brian Goetz wrote:
> Doug and I have been revisiting sequential() and parallel().  I think 
> there's a nice simplification here.
>
> The original motivation for sequential() was because we originally had 
> .into(collection), and many collections were not thread-safe so we 
> needed a means to bring the computation back to the current thread.  
> The .sequential() method did that, but it also brought back a 
> constraint of encounter ordering with it, because if people did:
>
>    stuff.parallel().map(...).sequential().into(new ArrayList<>());
>
> not respecting encounter order would violate the principle of least 
> astonishment.
>
> So the original motivation for sequential() was "bring the computation 
> back to the current thread, in order."  This was doable, but has a 
> high price -- a full barrier where we buffer the contents of the 
> entire stream before doing any forEach'ing.
>
> Most of the time, sequential() only appears right before the terminal 
> operation.  But, once implemented, there was no reason to constrain 
> this to appear right before the forEach/into, so we didn't.
>
> Once we discovered a need for .parallel(), it made sense it be the 
> dual of .sequential() -- fully unconstrained.  And again, the 
> implementation wasn't *that* bad -- better than .sequential(). But 
> again, the most desirable position for .parallel() is right after the 
> source.
>
> Then we killed into() and replaced it with reduction, which is a much 
> smarter way of managing ordering.  Eliminating half the justification 
> for .sequential().
>
> As far as I can tell, the remaining use cases for .sequential() are 
> just modifiers to forEach to constrain it, in order, to the current 
> thread.
>
> As in:
>   ints().parallel().filter(i -> isPrime(i))
>         .sequential().forEach(System.out::println)
>
> Which could be replaced by 
> .forEachSequentialAndOrderedInCurrentThread(), with a suitably better 
> name.  Which could further be simplified to ditch the "in current 
> thread" part by doing some locking in the implementation, which brings 
> us to .forEachOrdered(action).  Which nicely complements 
> .collectUnordered, and the two actually stand better with their duals 
> present (reduce is by default ordered; forEach is by default unordered.)
>
> The "put it anywhere" behavior of .parallel was completely 
> bootstrapped on the "put it anywhere" nature of .sequential; we never 
> really set out to support transitions in the API.
>
> So, pulling the rug out from under the house of cards, I think we can 
> fall back to:
>
> 1.  Modify semantics of .sequential and .parallel to apply globally to 
> the entire pipeline.  This works because pipelines are fully lazy 
> anyway, so we don't commit to seq-ness/par-ness until we hit the 
> terminal op.  So they are functional versions of "set the seq/par bit 
> in the source".  And that simplifies the specification of seq/par down 
> to a single property of the entire pipeline -- much easier to spec.
>
> 2.  Add .forEachOrdered.  For sequential streams, this is just 
> .forEach.  For par streams, we use a lock to avoid concurrent 
> invocation of the lambda, and loosen up the current behavior from 
> "full barrier" to "partial barrier", so that when the next chunk is 
> available, we can start working right away.  This is easy to 
> accomplish using the existing AbstractTask machinery.
>
>
> Before we go there, does anyone have use cases for .sequential() / 
> .parallel() that *don't* put the parallel right after the source, or 
> the sequential right before a forEach?

Supporting stateful mappers or filters require to be able to use 
sequential()
just before them if the start of the stream is parallel.
We may just not supporting them.

Rémi



More information about the lambda-libs-spec-experts mailing list