Simplifying sequential() / parallel()
Howard Lovatt
howard.lovatt at gmail.com
Thu Mar 21 15:17:57 PDT 2013
In my in house library I only support parallel in order, which makes things a lot simpler. I have a variation on your option 1 in that sequential parallel is for the whole stream, but more controls are available.
I have a tuning parameter that suggests a split size for parallel computation and my mappers, reducers, sources, etc., have the equivalent of isConcurrent. For example the following forces sequential (uses my library but so similar should be obvious):
Double sequentialSumOfSquares = asStream(-Math.PI, Math.PI, 2 * Math.PI / 1000)
.suggestSplitSize(Long.MAX_VAL)
.m(Math::sin)
.m((sinTheta) -> sinTheta * sinTheta)
.t(reduce(0, Double::sum));
It is sequential because the split size is large. Also the source, any of the mappers, reducers can force serial via their isConcurrent method returning false.
This has worked well for me and simplifies the API.
-- Howard.
PS As an aside the naming is odd in that the adjectives concurrent and parallel are both used, suggest you pick one.
Sent from my iPad
On 22/03/2013, at 6:47 AM, Brian Goetz <brian.goetz at oracle.com> wrote:
> Doug and I have been revisiting sequential() and parallel(). I think there's a nice simplification here.
>
> The original motivation for sequential() was because we originally had .into(collection), and many collections were not thread-safe so we needed a means to bring the computation back to the current thread. The .sequential() method did that, but it also brought back a constraint of encounter ordering with it, because if people did:
>
> stuff.parallel().map(...).sequential().into(new ArrayList<>());
>
> not respecting encounter order would violate the principle of least astonishment.
>
> So the original motivation for sequential() was "bring the computation back to the current thread, in order." This was doable, but has a high price -- a full barrier where we buffer the contents of the entire stream before doing any forEach'ing.
>
> Most of the time, sequential() only appears right before the terminal operation. But, once implemented, there was no reason to constrain this to appear right before the forEach/into, so we didn't.
>
> Once we discovered a need for .parallel(), it made sense it be the dual of .sequential() -- fully unconstrained. And again, the implementation wasn't *that* bad -- better than .sequential(). But again, the most desirable position for .parallel() is right after the source.
>
> Then we killed into() and replaced it with reduction, which is a much smarter way of managing ordering. Eliminating half the justification for .sequential().
>
> As far as I can tell, the remaining use cases for .sequential() are just modifiers to forEach to constrain it, in order, to the current thread.
>
> As in:
> ints().parallel().filter(i -> isPrime(i))
> .sequential().forEach(System.out::println)
>
> Which could be replaced by .forEachSequentialAndOrderedInCurrentThread(), with a suitably better name. Which could further be simplified to ditch the "in current thread" part by doing some locking in the implementation, which brings us to .forEachOrdered(action). Which nicely complements .collectUnordered, and the two actually stand better with their duals present (reduce is by default ordered; forEach is by default unordered.)
>
> The "put it anywhere" behavior of .parallel was completely bootstrapped on the "put it anywhere" nature of .sequential; we never really set out to support transitions in the API.
>
> So, pulling the rug out from under the house of cards, I think we can fall back to:
>
> 1. Modify semantics of .sequential and .parallel to apply globally to the entire pipeline. This works because pipelines are fully lazy anyway, so we don't commit to seq-ness/par-ness until we hit the terminal op. So they are functional versions of "set the seq/par bit in the source". And that simplifies the specification of seq/par down to a single property of the entire pipeline -- much easier to spec.
>
> 2. Add .forEachOrdered. For sequential streams, this is just .forEach. For par streams, we use a lock to avoid concurrent invocation of the lambda, and loosen up the current behavior from "full barrier" to "partial barrier", so that when the next chunk is available, we can start working right away. This is easy to accomplish using the existing AbstractTask machinery.
>
>
> Before we go there, does anyone have use cases for .sequential() / .parallel() that *don't* put the parallel right after the source, or the sequential right before a forEach?
>
>
More information about the lambda-libs-spec-observers
mailing list