Constructing parallel streams
Remi Forax
forax at univ-mlv.fr
Fri Dec 14 14:47:49 PST 2012
A quick question, why isParallel() is public, it seems it's an
implementation details ?
RĂ©mi
On 12/14/2012 08:23 PM, Brian Goetz wrote:
> I've added .parallel() to Stream. The implementation is trivial:
>
> public Stream<U> parallel() {
> if (isParallel())
> return this;
> else {
> return Streams.parallel(spliterator(), getStreamFlags());
> }
> }
>
> When applied at the head of a stream (e.g.,
> String.chars().parallel()), the additional overhead is a few extra
> object instantiations, since we just copy the spliterator and unleash
> whatever parallelism it can offer. (We could consider additionally
> querying the spliterator, asking if it is splittable, and if not,
> getting the iterator and doing the "split the iterator" trick done by
> Iterators.spliterator.)
>
> When applied in the middle of a pipeline with stateless intermediate
> operations, the overhead is also pretty limited; we process the
> upstream ops serially (but jammed) and apply our "split the iterator"
> trick.
>
> When applied in the middle of a pipeline with stateful operations, we
> end up with a full serial computation through the last stateful op,
> and if there are any intervening stateless intermediate ops after
> that, it reduces to the previous case.
>
>
> I'm starting to think that Streamable is not carrying its weight. I'd
> like to consider dropping Streamable, at which point the base-most
> implementation of parallel() is in Collection, and I'd also suggest we
> consider renaming that to parallelStream().
>
> Where that would leave is us:
> - Persistent aggregates like Collection have two methods, stream()
> and parallelStream();
> - Aggregate views of other classes are handled by accessor methods
> like String.codePoints(), and you get parallelism by asking the
> resulting stream for a .parallel() view.
>
> So Collection.parallelStream() becomes unnecessary, but a desirable
> optimization for a very common case. And the streaminess of the
> resulting value is clearer.
>
> On 12/8/2012 11:58 AM, Brian Goetz wrote:
>> Following up on the previous note, now that the stream-building APIs
>> have settled into something fairly simple and regular, I'm not
>> completely happy with the arrangement of the stream() / parallel() buns.
>>
>> For collections, stream() and parallel() seem fine; the user already has
>> a collection in hand, and can ask for a sequential or parallel stream.
>> (Separately: I'm starting to prefer stream() / parallelStream() as the
>> bun names here.)
>>
>> But, there are other ways to get a stream:
>>
>> String.chars()
>> Reader.lines()
>> regex.matches(source)
>> etc
>>
>> It seems pretty natural for these things to return Streams. But, in
>> accordance with our "no implicit parallelism" dictum, these streams are
>> serial. But many of these streams can be operated on in parallel -- so
>> the question is, how would we get a parallel stream out of these?
>>
>> One obvious choice is to have two operations for each of these:
>>
>> String.chars()
>> String.charsAsParallelStream()
>>
>> That's pretty ugly, and unlikely to be consistently implemented.
>>
>>
>> Now that the Streams construction API and internals have shaken out,
>> another option has emerged. A Spliterator can be traversed sequentially
>> or in parallel. Many sequential streams are constructed out of
>> spliterators that already know how to split (e.g., Arrays.spliterator),
>> and, we know how to expose some parallelism from otherwise sequential
>> data sources anyway (see implementation of Iterators.spliterator). Just
>> because iteration is sequential does not mean there is no exploitable
>> parallelism.
>>
>>
>> So, here's what I propose. Currently, we have a .sequential()
>> operation, which is a no-op on sequential streams and on parallel
>> streams acts as a barrier so that upstream computation can occur in
>> parallel but downstream computation can occur serially, in encounter
>> order (if defined), within-thread. We've also got a spliterator()
>> "escape hatch".
>>
>> We can add to these a .parallel() operations, which on parallel streams
>> is a no-op. The implementation is very simple and efficient (if applied
>> early on in the pipeline.)
>>
>> Here's the default implementation (which is probably good enough for all
>> cases):
>>
>> Stream<T> parallel() {
>> if (isParallel())
>> return this;
>> else
>> return Streams.parallel(spliterator(), getStreamFlags());
>> }
>>
>> What makes this efficient is that if you apply this operation at the
>> very top of the pipeline, it just grabs the underlying spliterator,
>> wraps it in a new stream with the parallel flag set, and keeps going.
>> (If applied farther down the pipeline, spliterator() returns a
>> spliterator wrapped with the intervening operations.)
>>
>>
>> Bringing this back to our API, this enables us to have a .parallel()
>> operation on Stream, so users can say:
>>
>> string.chars().parallel()...
>>
>> if they want to operate on the characters in parallel.
>>
>> The default implementation of parallel / parallelStream in Streamable
>> could then be:
>>
>> default Stream<T> parallel() {
>> return stream().parallel();
>> }
>>
>> But I think it is still worth keeping the parallel / parallelStream bun
>> for collections since this is such an important use case (and is still
>> slightly more efficient; a few fewer object creations.)
>>
More information about the lambda-libs-spec-observers
mailing list