Stream.parallel() Uses Virtual Threads?

Viktor Klang viktor.klang at oracle.com
Sat May 11 10:58:44 UTC 2024


Hi,

As I have spent a fair amount of time thinking about this topic specifically, there's a lot of nuance to what it *means* to back parallel streams with VirtualThreads.

Background
Given the design of the reference implementation of java.util.stream.Stream, in parallel mode the stream pipeline is split up into segments ending with an operation which needs to run to completion, collecting its output as the input for the next segment of the pipeline. That parallelization is using customized implementations of CountedCompleters to literally (Spliterator) split the workload amongst a constructed tree of tasks.

CountedCompleters are specialized implementations of ForkJoinTasks (https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/CountedCompleter.html ) and enjoy a bit of support from ForkJoinPool (ex:https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java#L1561)

On the topic of VirtualThread
With that said, backing parallel streams with VirtualThreads would entail a lot of work to reimplement every bespoke CountedCompleter-based operation in parallel streams to perform the equivalent coordination of result completion to achieve the same performance profile as current CPU-bound workloads using parallel streams.
There's also no real reason to believe it would provide any significant benefit on a per-workload basis.

For IO-bound workloads the situation is somewhat different, because they are often subject to externally-imposed sources of stalls. However, my experience tells me that the areas of IO are often well-understood and typically limited to specific points in a pipeline (usually Input near the beginning, and Output near the end, but there are also cases where you might have request-response-style operations somewhere in the middle).

For that case, as Alan mentioned previously, Gatherers (Preview feature) offers a mapConcurrent(maxConcurrency, io-function)<https://docs.oracle.com/en/java/javase/22/docs/api/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)> which will preserve ordering of elements in the stream, and execute the supplied function inside a VirtualThread up to the configured concurrency limit, and supports cancellation of tasks in the event of short-circuiting.

If you have any opportunity to test this functionality, your feedback would be much appreciated.
Gatherers (Java SE 22 & JDK 22)<https://docs.oracle.com/en/java/javase/22/docs/api/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)>
declaration: module: java.base, package: java.util.stream, class: Gatherers
docs.oracle.com


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: loom-dev <loom-dev-retn at openjdk.org> on behalf of Alan Bateman <Alan.Bateman at oracle.com>
Sent: Saturday, 11 May 2024 07:16
To: Nathan Reynolds <numeralnathan at gmail.com>; loom-dev at openjdk.org <loom-dev at openjdk.org>
Subject: Re: Stream.parallel() Uses Virtual Threads?

On 10/05/2024 18:36, Nathan Reynolds wrote:
> Does/will Stream.parallel() use virtual threads or stay with the Fork
> Join common pool threads?
>
> Let's say I have a Stream where the "loop" is I/O bound. Here is an
> example.
>
> list.
>    stream().
>    parallel().
>    map(MyClass::ioBoundOperation).
>    ...
>
> In Java 8, 11, and other pre-Java 21 versions, parallel() would use
> the Fork Join common pool to execute ioBoundOperation() in multiple
> threads.  If the list has many elements, then the Fork Join common
> pool will run out of threads since there are roughly N threads where N
> = the number of vCPUs.  Other parallel streams will then have to
> wait.  I hope I captured the situation correctly.
>
> If parallel() uses virtual threads, then multiple parallel streams can
> execute and they won't block each other since the number of virtual
> threads can be very large.
>
> Let's say parallel() uses virtual threads, would it make sense to
> always use parallel()?  I guess there is some small overhead for
> creating a virtual thread.  So, processing a small list with short
> CPU-bound operations may take more time than it saves.
>
We aren't planning to change parallel to use virtual threads, I don't
think it would make sense in general.

Have you looked at Gatherers for doing intermediate operations yet? One
of the built-ins is mapConcurrent [1] so in your example you could
replace parallel().map(MyClass:ioBoundOperation) with
mapConcurrent(MyClass:ioBoundOperation) and have the mapping function
for each element run in its own virtual thread.

-Alan

[1]
https://docs.oracle.com/en/java/javase/22/docs/api/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20240511/3b1d9169/attachment-0001.htm>


More information about the loom-dev mailing list