[External] : Re: Question about mapConcurrent() Behavior and Happens-Before Guarantees
Viktor Klang
viktor.klang at oracle.com
Mon Oct 6 14:58:44 UTC 2025
Hi Jige,
I'm not sure it's productive to discuss the in-order behavior of mapConcurrent() further, as it is specified to be in-order, so changing that would be an incompatible change regardless of whether the change itself would be beneficial or not.
For potential future Gatherer-implementations made available from the Gatherers class, there'd need to be new implementations made available which over time prove themselves to be candidates for inclusion (also taking into consideration everything which comes with contributing code to OpenJDK).
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
Confidential – Oracle Internal
________________________________
From: Jige Yu <yujige at gmail.com>
Sent: Sunday, 5 October 2025 19:06
To: Viktor Klang <viktor.klang at oracle.com>
Cc: core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
Subject: Re: [External] : Re: Question about mapConcurrent() Behavior and Happens-Before Guarantees
Hi Viktor,
Thanks again for explaining the current in-order contract. While I understand the implementation is currently compliant with the Javadoc, I have a few additional concerns about the practical consequences of this delayed exception propagation.
Potential for Starvation
While in theory it's also a lack of fail fast, it may show up more like a starvation or system hangs in certain concurrent architectures.
Imagine a scenario where the virtual threads are consuming work items from a shared, blocking queue (like what's being discussed in the other thread, the virtual threads could themselves be producing data into the queue):
1. Task 3 successfully takes an item from the queue and then immediately fails with an exception, without producing any data into the queue.
2. Tasks 1 and 2 block on the queue to get their work items, indefinitely.
3. Because the exception from Task 3 is not propagated until Tasks 1 and 2 finish, the main thread or stream pipeline is blocked indefinitely if Tasks 1 or 2 hang.
4. The work item consumed by Task 3 is lost, and the system is effectively halted, as the failure isn't reported and cleanup/cancellation is delayed.
So, except performance hit; it creates stability and liveness issues.
________________________________
Ambiguity in Current Javadoc
Regarding the Javadoc, while I did read it, I found the lack of an explicit fail-fast guarantee and the precise timing of exception propagation to be subtle. The phrase, "If a result of the function is to be pushed downstream," seems to leave ample room for interpretation, and it was not immediately clear whether the lack of fail-fast was an intentional design choice versus a side effect of the in-order implementation. Clarifying this timing can probably improve the API's contract clarity.
________________________________
Re-evaluating the "In-Order" Contract
Apologies for circling back to the same "in-order" question that has already been discussed.
But from a usability perspective, I suggest we also bring the "in-order" contract itself up for discussion.
I believe that the issues we've identified—the potential for starvation/hanging and the earlier discussed concurrency throughput concerns—are significant, realistic downsides of requiring strict in-order processing. These drawbacks should be carefully weighed against the benefits of the "in-order" contract, especially for concurrent processing APIs intended for I/O-bound tasks where strict ordering often has minimal value compared to resilience and performance.
If it's a trade off between having in-order or fail-fast (but not both simultaneously), it may warrant some more consideration about which is more useful. Particularly, Java users have traditionally been educated that parallel streams don't necessarily retain encounter order.
Cheers,
On Sun, Oct 5, 2025 at 2:13 AM Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>> wrote:
Hi Jige,
Thanks for your question!
It is important to acknowledge that Gatherers.mapConcurrent() does not specify fail-fast behavior.
From https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)
"In progress tasks will be attempted to be cancelled, on a best-effort basis, in situations where the downstream no longer wants to receive any more elements."
This does not refer to failed tasks
"If a result of the function is to be pushed downstream but instead the function completed exceptionally then the corresponding exception will instead be rethrown by this method as an instance of RuntimeException<https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/lang/RuntimeException.html>, after which any remaining tasks are canceled."
Since mapConcurrent is in-order, "is to be pushed downstream" means that it is next in line.
Now, I'm not saying that fail-fast wouldn't be desirable behavior, but it would require research into how to implement it without breaking observable behavior.
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
Confidential – Oracle Internal
________________________________
From: Jige Yu <yujige at gmail.com<mailto:yujige at gmail.com>>
Sent: Sunday, 5 October 2025 07:10
To: Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>>
Cc: core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org> <core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org>>
Subject: Re: [External] : Re: Question about mapConcurrent() Behavior and Happens-Before Guarantees
Hi, Viktor.
I recently suggested mapConcurrent() for a concurrent web crawling use case. But I was presented with a test case that showed surprising results. Basically, the current behavior may look like mapConcurrent() doesn't fail fast, and doesn't cancel in-flight tasks when a task has already failed.
What actually happens in the following example is that the first two tasks are sleeping, while the 3rd task has already failed. But mapConcurrent() doesn't yet know that the 3rd task has failed until the first two tasks have successfully finished sleeping.
Here's the test case:
int work(int input) {
if (input <= 2) {
try {
IO.println(input + " sleeping ");
Thread.sleep(2000);
IO.println("Returning " + (input * 2));
return input * 2;
} catch (InterruptedException e) {
println("Interrupted!");
throw new RuntimeException(e);
}
} else {
IO.println(input + " Throwing");
throw new RuntimeException();
}
}
@Test public void mainTest() {
var start = System.currentTimeMillis();
try {
List<Integer> results = Stream.of(1, 2, 3).gather(mapConcurrent(3, this::work)).toList();
println("Results = " + results);
} finally {
println("Took " + (System.currentTimeMillis() - start) + " ms");
}
}
On Mon, Jul 14, 2025 at 12:25 AM Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>> wrote:
Hi Jige,
The current behavior is what's currently achievable within the constraints of the Gatherer-model, if that changes in the future it would also mean that there could be stronger "guarantees" made.
In the mean time, the good news is that if you're not satisfied with the behavior offered by mapConcurrent()—you can create your own which does what you want it to do!
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yujige at gmail.com<mailto:yujige at gmail.com>>
Sent: Sunday, 13 July 2025 05:54
To: Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>>
Cc: core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org> <core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org>>
Subject: Re: [External] : Re: Question about mapConcurrent() Behavior and Happens-Before Guarantees
Thanks for pressing on that, Viktor!
I think I was fooled by my quick-and-dirty test. As I tried to harden it, I failed to deduce consistent behavior about what parallel stream does when unchecked exceptions happen. And in fact, it seems like it does *not* interrupt pending threads (?).
With this in mind, do you consider this behavior of mapConcurrent() cancelling and joining the virtual threads on a best-effort basis acceptable trade-off?
I wonder then if it's even worth it for mapConcurrent() to try to join the threads at all? If it can sometimes join and sometimes not, why not just always fail fast? At least then you get consistent fail-fast behavior: if a thread fails to respond to interruption and hangs, the main thread would still be able to respond to the exception.
Cheers,
On Tue, Jul 8, 2025 at 2:34 AM Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>> wrote:
>I understand that explicit API contracts are what matters. My concern, however, is that even if the API contract explicitly states no happens-before guarantee upon an unchecked exception, this behavior would still be a significant deviation from established visibility standards in other JDK APIs.
Would you mind clariying exactly what you mean here—what happens-before completion/exception?
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yujige at gmail.com<mailto:yujige at gmail.com>>
Sent: Tuesday, 8 July 2025 04:26
To: Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>>
Cc: core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org> <core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org>>
Subject: [External] : Re: Question about mapConcurrent() Behavior and Happens-Before Guarantees
Thanks for the quick reply, Viktor!
On Mon, Jul 7, 2025 at 2:35 AM Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>> wrote:
Hi Jige,
>Initially, I thought this design choice might provide a strong happens-before guarantee. My assumption was that an application catching a RuntimeException would be able to observe all side effects from the virtual threads, even though this practice is generally discouraged. This seemed like a potentially significant advantage, outweighing the risk of a virtual thread failing to respond to interruption or responding slowly.
Unless explicitly stated in the API contract, no such guarantees should be presumed to exist.
I understand that explicit API contracts are what matters. My concern, however, is that even if the API contract explicitly states no happens-before guarantee upon an unchecked exception, this behavior would still be a significant deviation from established visibility standards in other JDK APIs.
For instance, both parallel streams and Future.get() provide a happens-before guarantee upon completion (or exceptional completion in the case of Future.get()). So users will most likely take it for granted. If mapConcurrent() were to not offer this, it would potentially be the first blocking JDK API that doesn't honor happens-before in such a scenario. This inconsistency would likely be surprising and potentially confusing to users who have come to expect this behavior in concurrent programming constructs within the JDK.
As for general resource-management in Stream, I have contemplated designs for Gatherer (and Collector) to be able to participate in the onClose actions, but there's a lot of ground to cover to ensure correct ordering and sufficiently-encompassing of cleanup action execution.
Yeah. I agree that hooking into onClose() could provide a more reliable mechanism for cleanup.
My primary concern though, is the change it imposes on the call-site contract. Requiring all users of mapConcurrent() to adopt a try-with-resources syntax, while ideal for correctness, introduces a burden and is more subject to users forgetting to do so, potentially leading to resource leaks.
My previously proposed collectingAndThen(toList(), list -> list.stream().gather(mapConcurrent())) idea, on the other hand, avoids this call-site contract change. Being a collector, it needs to first consume the input, similar to how most Collectors operate. So it might be a less intrusive path to ensure proper resource handling without altering usage patterns.
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: core-libs-dev <core-libs-dev-retn at openjdk.org<mailto:core-libs-dev-retn at openjdk.org>> on behalf of Jige Yu <yujige at gmail.com<mailto:yujige at gmail.com>>
Sent: Thursday, 3 July 2025 16:36
To: core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org> <core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org>>
Subject: Question about mapConcurrent() Behavior and Happens-Before Guarantees
Hi JDK Core Devs,
I'm writing to you today with a question about the behavior of mapConcurrent() and its interaction with unchecked exceptions. I've been experimenting with the API and observed that mapConcurrent() blocks and joins all virtual threads upon an unchecked exception before propagating it.
Initially, I thought this design choice might provide a strong happens-before guarantee. My assumption was that an application catching a RuntimeException would be able to observe all side effects from the virtual threads, even though this practice is generally discouraged. This seemed like a potentially significant advantage, outweighing the risk of a virtual thread failing to respond to interruption or responding slowly.
However, I've since realized that mapConcurrent() cannot fully guarantee a strong happens-before relationship when an unchecked exception occurs somewhere in the stream pipeline. While it can block and wait for exceptions thrown by the mapper function or downstream operations, it appears unable to intercept unchecked exceptions thrown by an upstream source.
Consider a scenario with two input elements: if the first element starts a virtual thread, and then the second element causes an unchecked exception from the upstream before reaching the gather() call, the virtual thread initiated by the first element would not be interrupted. This makes the "happens-before" guarantee quite nuanced in practice.
This brings me to my core questions:
1. Is providing a happens-before guarantee upon an unchecked exception a design goal for mapConcurrent()?
2. If not, would it be more desirable to not join on virtual threads when unchecked exceptions occur? This would allow the application code to catch the exception sooner and avoid the risk of being blocked indefinitely.
Thank you for your time and insights.
Best regards,
Ben Yu
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20251006/c6252a15/attachment-0001.htm>
More information about the core-libs-dev
mailing list