[External] : Re: Question about mapConcurrent() Behavior and Happens-Before Guarantees
Jige Yu
yujige at gmail.com
Tue Oct 7 01:25:21 UTC 2025
To put it into context, it came up from the discussion thread regarding the
web crawler use case (still being discussed in "Problem report on the usage
of Structured Concurrency").
The use case of crawled page spawning more urls to be further crawled would
seem pretty trivial to implement using mapConcurrent():
int maxConcurrency = 10;
Set<String> seen = new HashSet<>();
seen.add(root);
for (List<String> toCrawl = List.of(root); toCrawl.size() > 0; ) {
toCrawl = toCrawl.stream()
.gather(mapConcurrent(maxConcurrency, url -> loadWebPage(url)))
.flatMap(page -> page.getLinks().stream())
.filter(seen::add)
.toList();
}
It's a breadth-first traversal, where each round it crawls all remaining
urls concurrently with a max concurrency.
The potential power of mapConcurrent() is very appealing to me. It can
solve rather complicated concurrency problems elegantly.
And then someone brought it up that mapConcurrent() doesn't cancel when it
should have. And it turned out not to be a cancellation issue, but about
the missing fail-fast.
Just fwiw.
Thanks again for patiently explaining the design rationale, Viktor!
On Mon, Oct 6, 2025 at 8:41 AM Archie Cobbs <archie.cobbs at gmail.com> wrote:
> Just a drive-by comment...
>
> I agree with Victor about being careful about making any API/semantic
> changes. However, I also have a lot of sympathy for what is being asked for.
>
> A common pattern is to want to say, "Try doing these N things in parallel;
> if any of them fails, just bail out - immediately cancel all of them and
> throw an exception".
>
> In other words, in a particular scenario you are expecting perfection, and
> if anything less occurs, it's preferable to just completely give up. Trying
> to salvage it is not worth it.
>
> So it would be nice if there were some parallel stream map() variant with
> those semantics (it could also preserve order, but that seems orthogonal).
>
> -Archie
>
> On Mon, Oct 6, 2025 at 9:59 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
>> Hi Jige,
>>
>> I'm not sure it's productive to discuss the in-order behavior of
>> mapConcurrent() further, as it is specified to be in-order, so changing
>> that would be an incompatible change regardless of whether the change
>> itself would be beneficial or not.
>>
>> For potential future Gatherer-implementations made available from the
>> Gatherers class, there'd need to be new implementations made available
>> which over time prove themselves to be candidates for inclusion (also
>> taking into consideration everything which comes with contributing code to
>> OpenJDK).
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>>
>> Confidential – Oracle Internal
>> ------------------------------
>> *From:* Jige Yu <yujige at gmail.com>
>> *Sent:* Sunday, 5 October 2025 19:06
>> *To:* Viktor Klang <viktor.klang at oracle.com>
>> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
>> *Subject:* Re: [External] : Re: Question about mapConcurrent() Behavior
>> and Happens-Before Guarantees
>>
>>
>> Hi Viktor,
>>
>> Thanks again for explaining the current in-order contract. While I
>> understand the implementation is currently compliant with the Javadoc, I
>> have a few additional concerns about the practical consequences of this
>> delayed exception propagation.
>>
>> Potential for Starvation
>>
>> While in theory it's also a lack of fail fast, it may show up more like a *starvation
>> or system hangs* in certain concurrent architectures.
>>
>> Imagine a scenario where the virtual threads are consuming work items
>> from a shared, blocking queue (like what's being discussed in the other
>> thread, the virtual threads could themselves be producing data into the
>> queue):
>>
>> 1.
>>
>> Task 3 successfully takes an item from the queue and then immediately
>> fails with an exception, without producing any data into the queue.
>> 2.
>>
>> Tasks 1 and 2 block on the queue to get their work items,
>> *indefinitely*.
>> 3.
>>
>> Because the exception from Task 3 is not propagated until Tasks 1 and
>> 2 finish, the main thread or stream pipeline is blocked indefinitely if
>> Tasks 1 or 2 hang.
>> 4.
>>
>> The work item consumed by Task 3 is lost, and the system is
>> effectively halted, as the failure isn't reported and cleanup/cancellation
>> is delayed.
>>
>> So, except performance hit; it creates stability and liveness issues.
>> ------------------------------
>>
>> Ambiguity in Current Javadoc
>>
>> Regarding the Javadoc, while I did read it, I found the lack of an
>> explicit fail-fast guarantee and the precise timing of exception
>> propagation to be subtle. The phrase, *"If a result of the function is
>> to be pushed downstream,"* seems to leave ample room for interpretation,
>> and it was not immediately clear whether the lack of fail-fast was an
>> intentional design choice versus a side effect of the in-order
>> implementation. Clarifying this timing can probably improve the API's
>> contract clarity.
>> ------------------------------
>>
>> Re-evaluating the "In-Order" Contract
>>
>> Apologies for circling back to the same "in-order" question that has
>> already been discussed.
>>
>> But from a usability perspective, I suggest we also bring the *"in-order"
>> contract itself* up for discussion.
>>
>> I believe that the issues we've identified—the potential for
>> starvation/hanging and the earlier discussed concurrency throughput
>> concerns—are significant, realistic downsides of requiring strict in-order
>> processing. These drawbacks should be carefully weighed against the
>> benefits of the "in-order" contract, especially for concurrent processing
>> APIs intended for I/O-bound tasks where strict ordering often has minimal
>> value compared to resilience and performance.
>>
>> If it's a trade off between having *in-order or fail-fast* (but not both
>> simultaneously), it may warrant some more consideration about which is more
>> useful. Particularly, Java users have traditionally been educated that
>> parallel streams don't necessarily retain encounter order.
>>
>> Cheers,
>>
>> On Sun, Oct 5, 2025 at 2:13 AM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>>
>> Hi Jige,
>>
>> Thanks for your question!
>>
>> It is important to acknowledge that Gatherers.mapConcurrent() does not
>> specify fail-fast behavior.
>>
>> From
>> https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)
>>
>> *"In progress tasks will be attempted to be cancelled, on a best-effort
>> basis, in situations where the downstream no longer wants to receive any
>> more elements."*
>>
>> This does not refer to failed tasks
>>
>> *"If a result of the function is to be pushed downstream but instead the
>> function completed exceptionally then the corresponding exception will
>> instead be rethrown by this method as an instance of **RuntimeException
>> <https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/lang/RuntimeException.html>**,
>> after which any remaining tasks are canceled."*
>>
>> Since mapConcurrent is in-order, "is to be pushed downstream" means that
>> it is next in line.
>>
>>
>>
>> Now, I'm not saying that fail-fast wouldn't be desirable behavior, but it
>> would require research into how to implement it without breaking observable
>> behavior.
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>>
>> Confidential – Oracle Internal
>> ------------------------------
>> *From:* Jige Yu <yujige at gmail.com>
>> *Sent:* Sunday, 5 October 2025 07:10
>> *To:* Viktor Klang <viktor.klang at oracle.com>
>> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
>> *Subject:* Re: [External] : Re: Question about mapConcurrent() Behavior
>> and Happens-Before Guarantees
>>
>> Hi, Viktor.
>>
>> I recently suggested mapConcurrent() for a concurrent web crawling use
>> case. But I was presented with a test case that showed surprising results.
>> Basically, the current behavior may look like mapConcurrent() doesn't fail
>> fast, and doesn't cancel in-flight tasks when a task has already failed.
>>
>> What actually happens in the following example is that the first two
>> tasks are sleeping, while the 3rd task has already failed. But
>> mapConcurrent() doesn't yet know that the 3rd task has failed until the
>> first two tasks have successfully finished sleeping.
>>
>> Here's the test case:
>>
>> int work(int input) {
>>
>> if (input <= 2) {
>>
>> try {
>>
>> IO.println(input + " sleeping ");
>>
>> Thread.sleep(2000);
>>
>> IO.println("Returning " + (input * 2));
>>
>> return input * 2;
>>
>> } catch (InterruptedException e) {
>>
>> println("Interrupted!");
>>
>> throw new RuntimeException(e);
>>
>> }
>>
>> } else {
>>
>> IO.println(input + " Throwing");
>>
>> throw new RuntimeException();
>>
>> }
>>
>> }
>>
>>
>> @Test public void mainTest() {
>>
>> var start = System.currentTimeMillis();
>>
>> try {
>>
>> List<Integer> results = Stream.of(1, 2, 3).gather(mapConcurrent(3, this
>> ::work)).toList();
>>
>> println("Results = " + results);
>>
>> } finally {
>>
>> println("Took " + (System.currentTimeMillis() - start) + " ms");
>>
>> }
>>
>> }
>>
>>
>>
>>
>> On Mon, Jul 14, 2025 at 12:25 AM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>>
>> Hi Jige,
>>
>> The current behavior is what's currently achievable within the
>> constraints of the Gatherer-model, if that changes in the future it would
>> also mean that there could be stronger "guarantees" made.
>>
>> In the mean time, the good news is that if you're not satisfied with the
>> behavior offered by mapConcurrent()—you can create your own which does what
>> you want it to do!
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* Jige Yu <yujige at gmail.com>
>> *Sent:* Sunday, 13 July 2025 05:54
>> *To:* Viktor Klang <viktor.klang at oracle.com>
>> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
>> *Subject:* Re: [External] : Re: Question about mapConcurrent() Behavior
>> and Happens-Before Guarantees
>>
>> Thanks for pressing on that, Viktor!
>>
>> I think I was fooled by my quick-and-dirty test. As I tried to harden it,
>> I failed to deduce consistent behavior about what parallel stream does when
>> unchecked exceptions happen. And in fact, it seems like it does *not*
>> interrupt pending threads (?).
>>
>> With this in mind, do you consider this behavior of mapConcurrent()
>> cancelling and joining the virtual threads on a best-effort basis
>> acceptable trade-off?
>>
>> I wonder then if it's even worth it for mapConcurrent() to try to join
>> the threads at all? If it can sometimes join and sometimes not, why not
>> just always fail fast? At least then you get consistent fail-fast behavior:
>> if a thread fails to respond to interruption and hangs, the main thread
>> would still be able to respond to the exception.
>>
>> Cheers,
>>
>>
>>
>> On Tue, Jul 8, 2025 at 2:34 AM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>>
>> >I understand that explicit API contracts are what matters. My concern,
>> however, is that even if the API contract explicitly states *no* happens-before
>> guarantee upon an unchecked exception, this behavior would still be a
>> significant deviation from established visibility standards in other JDK
>> APIs.
>>
>> Would you mind clariying *exactly* what you mean here—*what* happens-before
>> completion/exception?
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>>
>> ------------------------------
>> *From:* Jige Yu <yujige at gmail.com>
>> *Sent:* Tuesday, 8 July 2025 04:26
>> *To:* Viktor Klang <viktor.klang at oracle.com>
>> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
>> *Subject:* [External] : Re: Question about mapConcurrent() Behavior and
>> Happens-Before Guarantees
>>
>> Thanks for the quick reply, Viktor!
>>
>> On Mon, Jul 7, 2025 at 2:35 AM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>>
>> Hi Jige,
>>
>> >Initially, I thought this design choice might provide a strong
>> happens-before guarantee. My assumption was that an application catching a
>> RuntimeException would be able to *observe all side effects* from the
>> virtual threads, even though this practice is generally discouraged. This
>> seemed like a potentially significant advantage, outweighing the risk of a
>> virtual thread failing to respond to interruption or responding slowly.
>>
>> Unless explicitly stated in the API contract, no such guarantees should
>> be presumed to exist.
>>
>>
>> I understand that explicit API contracts are what matters. My concern,
>> however, is that even if the API contract explicitly states *no* happens-before
>> guarantee upon an unchecked exception, this behavior would still be a
>> significant deviation from established visibility standards in other JDK
>> APIs.
>>
>> For instance, both *parallel streams* and Future.get() provide a
>> happens-before guarantee upon completion (or exceptional completion in the
>> case of Future.get()). So users will most likely take it for granted. If
>> mapConcurrent() were to *not* offer this, it would potentially be the *first
>> blocking JDK API that doesn't honor happens-before* in such a scenario.
>> This inconsistency would likely be surprising and potentially confusing to
>> users who have come to expect this behavior in concurrent programming
>> constructs within the JDK.
>>
>>
>>
>> As for general resource-management in Stream, I have contemplated designs
>> for Gatherer (and Collector) to be able to participate in the onClose
>> actions, but there's a lot of ground to cover to ensure correct ordering
>> and sufficiently-encompassing of cleanup action execution.
>>
>>
>> Yeah. I agree that hooking into onClose() could provide a more reliable
>> mechanism for cleanup.
>>
>> My primary concern though, is the change it imposes on the call-site
>> contract. Requiring all users of mapConcurrent() to adopt a
>> try-with-resources syntax, while ideal for correctness, introduces a
>> burden and is more subject to users forgetting to do so, potentially
>> leading to resource leaks.
>>
>> My previously proposed collectingAndThen(toList(), list ->
>> list.stream().gather(mapConcurrent())) idea, on the other hand, avoids
>> this call-site contract change. Being a collector, it needs to first
>> consume the input, similar to how most Collectors operate. So it might
>> be a less intrusive path to ensure proper resource handling without
>> altering usage patterns.
>>
>>
>> Cheers,
>> √
>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of Jige
>> Yu <yujige at gmail.com>
>> *Sent:* Thursday, 3 July 2025 16:36
>> *To:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
>> *Subject:* Question about mapConcurrent() Behavior and Happens-Before
>> Guarantees
>>
>>
>> Hi JDK Core Devs,
>>
>> I'm writing to you today with a question about the behavior of
>> mapConcurrent() and its interaction with unchecked exceptions. I've been
>> experimenting with the API and observed that mapConcurrent() blocks and
>> joins all virtual threads upon an unchecked exception before propagating it.
>>
>> Initially, I thought this design choice might provide a strong
>> happens-before guarantee. My assumption was that an application catching a
>> RuntimeException would be able to *observe all side effects* from the
>> virtual threads, even though this practice is generally discouraged. This
>> seemed like a potentially significant advantage, outweighing the risk of a
>> virtual thread failing to respond to interruption or responding slowly.
>>
>> However, I've since realized that mapConcurrent() cannot fully guarantee
>> a strong happens-before relationship when an unchecked exception occurs
>> *somewhere* in the stream pipeline. While it can block and wait for
>> exceptions thrown by the mapper function or downstream operations, it
>> appears unable to intercept unchecked exceptions *thrown by an upstream*
>> source.
>>
>> Consider a scenario with two input elements: if the first element starts
>> a virtual thread, and then the second element causes an unchecked exception
>> from the upstream *before* reaching the gather() call, the virtual
>> thread initiated by the first element would not be interrupted. This makes
>> the "happens-before" guarantee quite nuanced in practice.
>>
>> This brings me to my core questions:
>>
>> 1.
>>
>> Is providing a happens-before guarantee upon an unchecked exception a
>> design goal for mapConcurrent()?
>> 2.
>>
>> If not, would it be more desirable to *not* join on virtual threads
>> when unchecked exceptions occur? This would allow the application code to
>> catch the exception sooner and avoid the risk of being blocked indefinitely.
>>
>> Thank you for your time and insights.
>>
>> Best regards,
>>
>> Ben Yu
>>
>>
>
> --
> Archie L. Cobbs
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20251006/19ea3e19/attachment-0001.htm>
More information about the core-libs-dev
mailing list