Constructing parallel streams
Brian Goetz
brian.goetz at oracle.com
Fri Dec 14 15:13:01 PST 2012
Now that parallel/sequential are no-ops on streams of the correct
orientation, we might be able to make it private. Before it was needed
for implementations of addAll(Stream). But we can simplify that now.
On 12/14/2012 5:47 PM, Remi Forax wrote:
> A quick question, why isParallel() is public, it seems it's an
> implementation details ?
>
> Rémi
>
> On 12/14/2012 08:23 PM, Brian Goetz wrote:
>> I've added .parallel() to Stream. The implementation is trivial:
>>
>> public Stream<U> parallel() {
>> if (isParallel())
>> return this;
>> else {
>> return Streams.parallel(spliterator(), getStreamFlags());
>> }
>> }
>>
>> When applied at the head of a stream (e.g.,
>> String.chars().parallel()), the additional overhead is a few extra
>> object instantiations, since we just copy the spliterator and unleash
>> whatever parallelism it can offer. (We could consider additionally
>> querying the spliterator, asking if it is splittable, and if not,
>> getting the iterator and doing the "split the iterator" trick done by
>> Iterators.spliterator.)
>>
>> When applied in the middle of a pipeline with stateless intermediate
>> operations, the overhead is also pretty limited; we process the
>> upstream ops serially (but jammed) and apply our "split the iterator"
>> trick.
>>
>> When applied in the middle of a pipeline with stateful operations, we
>> end up with a full serial computation through the last stateful op,
>> and if there are any intervening stateless intermediate ops after
>> that, it reduces to the previous case.
>>
>>
>> I'm starting to think that Streamable is not carrying its weight. I'd
>> like to consider dropping Streamable, at which point the base-most
>> implementation of parallel() is in Collection, and I'd also suggest we
>> consider renaming that to parallelStream().
>>
>> Where that would leave is us:
>> - Persistent aggregates like Collection have two methods, stream()
>> and parallelStream();
>> - Aggregate views of other classes are handled by accessor methods
>> like String.codePoints(), and you get parallelism by asking the
>> resulting stream for a .parallel() view.
>>
>> So Collection.parallelStream() becomes unnecessary, but a desirable
>> optimization for a very common case. And the streaminess of the
>> resulting value is clearer.
>>
>> On 12/8/2012 11:58 AM, Brian Goetz wrote:
>>> Following up on the previous note, now that the stream-building APIs
>>> have settled into something fairly simple and regular, I'm not
>>> completely happy with the arrangement of the stream() / parallel() buns.
>>>
>>> For collections, stream() and parallel() seem fine; the user already has
>>> a collection in hand, and can ask for a sequential or parallel stream.
>>> (Separately: I'm starting to prefer stream() / parallelStream() as the
>>> bun names here.)
>>>
>>> But, there are other ways to get a stream:
>>>
>>> String.chars()
>>> Reader.lines()
>>> regex.matches(source)
>>> etc
>>>
>>> It seems pretty natural for these things to return Streams. But, in
>>> accordance with our "no implicit parallelism" dictum, these streams are
>>> serial. But many of these streams can be operated on in parallel -- so
>>> the question is, how would we get a parallel stream out of these?
>>>
>>> One obvious choice is to have two operations for each of these:
>>>
>>> String.chars()
>>> String.charsAsParallelStream()
>>>
>>> That's pretty ugly, and unlikely to be consistently implemented.
>>>
>>>
>>> Now that the Streams construction API and internals have shaken out,
>>> another option has emerged. A Spliterator can be traversed sequentially
>>> or in parallel. Many sequential streams are constructed out of
>>> spliterators that already know how to split (e.g., Arrays.spliterator),
>>> and, we know how to expose some parallelism from otherwise sequential
>>> data sources anyway (see implementation of Iterators.spliterator). Just
>>> because iteration is sequential does not mean there is no exploitable
>>> parallelism.
>>>
>>>
>>> So, here's what I propose. Currently, we have a .sequential()
>>> operation, which is a no-op on sequential streams and on parallel
>>> streams acts as a barrier so that upstream computation can occur in
>>> parallel but downstream computation can occur serially, in encounter
>>> order (if defined), within-thread. We've also got a spliterator()
>>> "escape hatch".
>>>
>>> We can add to these a .parallel() operations, which on parallel streams
>>> is a no-op. The implementation is very simple and efficient (if applied
>>> early on in the pipeline.)
>>>
>>> Here's the default implementation (which is probably good enough for all
>>> cases):
>>>
>>> Stream<T> parallel() {
>>> if (isParallel())
>>> return this;
>>> else
>>> return Streams.parallel(spliterator(), getStreamFlags());
>>> }
>>>
>>> What makes this efficient is that if you apply this operation at the
>>> very top of the pipeline, it just grabs the underlying spliterator,
>>> wraps it in a new stream with the parallel flag set, and keeps going.
>>> (If applied farther down the pipeline, spliterator() returns a
>>> spliterator wrapped with the intervening operations.)
>>>
>>>
>>> Bringing this back to our API, this enables us to have a .parallel()
>>> operation on Stream, so users can say:
>>>
>>> string.chars().parallel()...
>>>
>>> if they want to operate on the characters in parallel.
>>>
>>> The default implementation of parallel / parallelStream in Streamable
>>> could then be:
>>>
>>> default Stream<T> parallel() {
>>> return stream().parallel();
>>> }
>>>
>>> But I think it is still worth keeping the parallel / parallelStream bun
>>> for collections since this is such an important use case (and is still
>>> slightly more efficient; a few fewer object creations.)
>>>
>
More information about the lambda-libs-spec-observers
mailing list