<div dir="ltr"><p>Assuming the test code and my reading of the mapConcurrent() code haven't misled me, I think this observation suggests that if an <b>upstream exception</b> terminates the stream, <code>mapConcurrent()</code> may not be able to properly cancel or interrupt the virtual threads it has already started. This could potentially lead to <b>resource leaks</b> or "zombie" threads that continue to run or block indefinitely in the background, consuming resources unnecessarily.</p><p>This behavior might be surprising to developers. For instance, <code>Stream.parallel()</code>, which leverages <code>ForkJoinPool</code>, effectively manages its entire pipeline. It can interrupt running threads and join() them to enforce <b>happens-before guarantees</b> even when exceptions occur upstream. Users might naturally expect a similar level of robust resource management and memory visibility from <code>mapConcurrent()</code>. While explicit documentation could clarify this behavior, such a fundamental difference might still lead to confusion and unintended issues.</p><hr><p></p><h3>A Potential Design Adjustment for More Robustness</h3><p></p><p>To address these concerns while maintaining <code>mapConcurrent()</code>'s core benefits, I'd like to propose a strawman design adjustment.</p><p>Instead of <code>mapConcurrent()</code> being an intermediary <code>Gatherer</code> (which has limited control over upstream events), consider exposing its functionality as a <b>terminal <code>Collector </code></b>(let's tentatively call it concurrently())</p><p>For example, instead of:</p><span class="gmail-"><span class="gmail-ng-tns-c2272482559-186 gmail-ng-star-inserted"><div class="gmail-code-block gmail-ng-tns-c2272482559-186 gmail-ng-animate-disabled gmail-ng-trigger gmail-ng-trigger-codeBlockRevealAnimation"><div class="gmail-code-block-decoration gmail-header-formatted gmail-gds-title-s gmail-ng-tns-c2272482559-186 gmail-ng-star-inserted"><div class="gmail-buttons gmail-ng-tns-c2272482559-186 gmail-ng-star-inserted"><button aria-label="Copy code" class="gmail-mdc-icon-button gmail-mat-mdc-icon-button gmail-mat-mdc-button-base gmail-mat-mdc-tooltip-trigger gmail-copy-button gmail-ng-tns-c2272482559-186 gmail-mat-unthemed gmail-_mat-animation-noopable gmail-ng-star-inserted"><span class="gmail-mat-mdc-button-persistent-ripple gmail-mdc-icon-button__ripple"></span><span role="img" class="gmail-mat-icon gmail-notranslate gmail-google-symbols gmail-mat-ligature-font gmail-mat-icon-no-color" aria-hidden="true"></span><span class="gmail-mat-focus-indicator"></span><span class="gmail-mat-mdc-button-touch-target"></span></button></div></div><div class="gmail-formatted-code-block-internal-container gmail-ng-tns-c2272482559-186"><div class="gmail-animated-opacity gmail-ng-tns-c2272482559-186"><pre class="gmail-ng-tns-c2272482559-186"><code role="text" class="gmail-code-container gmail-formatted gmail-ng-tns-c2272482559-186">inputs.stream().filter(...).gather(mapConcurrent(<span class="gmail-hljs-number">10</span>, input -> callBackend(input))).toList();
</code></pre></div></div></div></span></span><p>We could use a <code>Collector</code>-based approach:</p><span class="gmail-"><span class="gmail-ng-tns-c2272482559-187 gmail-ng-star-inserted"><div class="gmail-code-block gmail-ng-tns-c2272482559-187 gmail-ng-animate-disabled gmail-ng-trigger gmail-ng-trigger-codeBlockRevealAnimation"><div class="gmail-code-block-decoration gmail-header-formatted gmail-gds-title-s gmail-ng-tns-c2272482559-187 gmail-ng-star-inserted"><div class="gmail-buttons gmail-ng-tns-c2272482559-187 gmail-ng-star-inserted"><button aria-label="Copy code" class="gmail-mdc-icon-button gmail-mat-mdc-icon-button gmail-mat-mdc-button-base gmail-mat-mdc-tooltip-trigger gmail-copy-button gmail-ng-tns-c2272482559-187 gmail-mat-unthemed gmail-_mat-animation-noopable gmail-ng-star-inserted"><span class="gmail-mat-mdc-button-persistent-ripple gmail-mdc-icon-button__ripple"></span><span role="img" class="gmail-mat-icon gmail-notranslate gmail-google-symbols gmail-mat-ligature-font gmail-mat-icon-no-color" aria-hidden="true"></span><span class="gmail-mat-focus-indicator"></span><span class="gmail-mat-mdc-button-touch-target"></span></button></div></div><div class="gmail-formatted-code-block-internal-container gmail-ng-tns-c2272482559-187"><div class="gmail-animated-opacity gmail-ng-tns-c2272482559-187"><pre class="gmail-ng-tns-c2272482559-187"><code role="text" class="gmail-code-container gmail-formatted gmail-ng-tns-c2272482559-187">inputs.stream().filter(...).collect(concurrently(<span class="gmail-hljs-number">10</span>, input -> callBackend(input))).toList();
</code></pre></div></div></div></span></span><p>Here's how this <code>Collector</code>-based design could mitigate the issues:</p><ul><li><p><b>Pre-emptive Upstream Processing:</b> The <code>concurrently()</code> <code>Collector</code> could internally use <code>collectingAndThen(toList(), ...)</code> or a similar mechanism. This would ensure that <b>all upstream elements are eagerly consumed</b> and any upstream exceptions are thrown and handled <i>before</i> the concurrent virtual threads are ever launched for the <code>mapConcurrent</code>-like operations.</p></li><li><p><b>Controlled Concurrent Operations:</b> Only once the upstream processing is complete and validated would the concurrent mapping begin. The concurrent operations would still be <b>lazy and concurrency-limited</b>, but they would only apply to the concurrent operations started by concurrently().</p></li><li><p><b>Resolved Concerns:</b> This design would effectively make the <b>happens-before</b> and <b>resource leak</b> concerns caused by upstream exceptions moot, as such exceptions would occur and be dealt with at an earlier, more controlled stage with zero concurrency complications.</p></li></ul><p>The most apparent trade-off is that this <code>Collector</code> approach would eagerly consume all input elements, potentially increasing memory usage and sacrificing the full laziness a <code>Gatherer</code> might offer. However, <code>mapConcurrent()</code>'s primary use cases typically involve I/O-intensive fan-outs, which generally don't operate on an extremely large number of inputs. For the rare instances involving infinite or very large streams, the limitation becomes a clear "<i>not designed for this</i>" scenario, simpler than navigating the current nuanced caveats around <code>happens-before</code> enforcement and thread cancellation.</p><div>Just my 2c,</div><div><br></div><div>Cheers</div><div><br></div></div><br><div class="gmail_quote gmail_quote_container"><div dir="ltr" class="gmail_attr">On Fri, Jul 4, 2025 at 8:55 AM Jige Yu <<a href="mailto:yujige@gmail.com">yujige@gmail.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div dir="ltr">Yes. <div><br></div><div>I got two tests that show the behavior difference between downstream exception and upstream exception:</div><div><br></div><div><div style="padding:0px 0px 0px 2px"><div style="color:rgb(0,0,0);font-family:Menlo;font-size:12pt;white-space:pre-wrap"><p style="margin:0px"><br></p><p style="margin:0px">  <span style="color:rgb(100,100,100)">@Test</span> <span style="color:rgb(127,0,85);font-weight:bold">public</span> <span style="color:rgb(127,0,85);font-weight:bold">void</span> mapConcurrently_upstreamFailureDoesNotInterrupt() {</p><p style="margin:0px">    ConcurrentLinkedQueue<Integer> <span style="color:rgb(106,62,62)">started</span> = <span style="color:rgb(127,0,85);font-weight:bold">new</span> ConcurrentLinkedQueue<>();</p><p style="margin:0px">    ConcurrentLinkedQueue<Integer> <span style="color:rgb(106,62,62)">interrupted</span> = <span style="color:rgb(127,0,85);font-weight:bold">new</span> ConcurrentLinkedQueue<>();</p><p style="margin:0px">    RuntimeException <span style="color:rgb(106,62,62)">thrown</span> = <span style="font-style:italic">assertThrows</span>(</p><p style="margin:0px">        RuntimeException.<span style="color:rgb(127,0,85);font-weight:bold">class</span>,</p><p style="margin:0px">        () -> Stream.<span style="font-style:italic">of</span>(1, 10, 3, 0)</p><p style="margin:0px">            .peek(<span style="color:rgb(106,62,62)">n</span> -> {</p><p style="margin:0px">              <span style="color:rgb(127,0,85);font-weight:bold">if</span> (<span style="color:rgb(106,62,62)">n</span> == 3) {</p><p style="margin:0px">                <span style="color:rgb(127,0,85);font-weight:bold">try</span> { <span style="color:rgb(63,127,95)">// give 1 and 3 some time to have at least started</span></p><p style="margin:0px">                  Thread.<span style="font-style:italic">sleep</span>(100);</p><p style="margin:0px">                } <span style="color:rgb(127,0,85);font-weight:bold">catch</span> (InterruptedException <span style="color:rgb(106,62,62)">e</span>) {</p><p style="margin:0px">                  <span style="color:rgb(106,62,62)">interrupted</span>.add(<span style="color:rgb(106,62,62)">n</span>);</p><p style="margin:0px">                }</p><p style="margin:0px">                <span style="color:rgb(127,0,85);font-weight:bold">throw</span> <span style="color:rgb(127,0,85);font-weight:bold">new</span> IllegalArgumentException(String.<span style="font-style:italic">valueOf</span>(<span style="color:rgb(106,62,62)">n</span>));</p><p style="margin:0px">              }</p><p style="margin:0px">            })</p><p style="margin:0px">            .gather(<span style="font-style:italic">mapConcurrently</span>(3, <span style="color:rgb(106,62,62)">n</span> -> {</p><p style="margin:0px">              <span style="color:rgb(106,62,62)">started</span>.add(<span style="color:rgb(106,62,62)">n</span>);</p><p style="margin:0px">              <span style="color:rgb(127,0,85);font-weight:bold">try</span> {</p><p style="margin:0px">                Thread.<span style="font-style:italic">sleep</span>(<span style="color:rgb(106,62,62)">n</span> * 10000);</p><p style="margin:0px">              } <span style="color:rgb(127,0,85);font-weight:bold">catch</span> (InterruptedException <span style="color:rgb(106,62,62)">e</span>) {</p><p style="margin:0px">                <span style="color:rgb(106,62,62)">interrupted</span>.add(<span style="color:rgb(106,62,62)">n</span>);</p><p style="margin:0px">              }</p><p style="margin:0px">              <span style="color:rgb(127,0,85);font-weight:bold">return</span> <span style="color:rgb(106,62,62)">n</span>;</p><p style="margin:0px">            }))</p><p style="margin:0px">            .findAny());</p><p style="margin:0px">    <span style="font-style:italic">assertThat</span>(<span style="color:rgb(106,62,62)">started</span>).containsExactly(10, 1);</p><p style="margin:0px">    <span style="font-style:italic">assertThat</span>(<span style="color:rgb(106,62,62)">interrupted</span>).isEmpty();</p><p style="margin:0px">    <span style="font-style:italic">assertThat</span>(<span style="color:rgb(106,62,62)">thrown</span>).hasMessageThat().isEqualTo(<span style="color:rgb(42,0,255)">"3"</span>);</p><p style="margin:0px">  }</p><p style="margin:0px"><br></p><p style="margin:0px">  <span style="color:rgb(100,100,100)">@Test</span> <span style="color:rgb(127,0,85);font-weight:bold">public</span> <span style="color:rgb(127,0,85);font-weight:bold">void</span> mapConcurrently_downstreamFailurePropagated() {</p><p style="margin:0px">    ConcurrentLinkedQueue<Integer> <span style="color:rgb(106,62,62)">started</span> = <span style="color:rgb(127,0,85);font-weight:bold">new</span> ConcurrentLinkedQueue<>();</p><p style="margin:0px">    ConcurrentLinkedQueue<Integer> <span style="color:rgb(106,62,62)">interrupted</span> = <span style="color:rgb(127,0,85);font-weight:bold">new</span> ConcurrentLinkedQueue<>();</p><p style="margin:0px">    RuntimeException <span style="color:rgb(106,62,62)">thrown</span> = <span style="font-style:italic">assertThrows</span>(</p><p style="margin:0px">        RuntimeException.<span style="color:rgb(127,0,85);font-weight:bold">class</span>,</p><p style="margin:0px">        () -> Stream.<span style="font-style:italic">of</span>(10, 1, 3, 0)</p><p style="margin:0px">            .gather(<span style="font-style:italic">mapConcurrently</span>(3, <span style="color:rgb(106,62,62)">n</span> -> {</p><p style="margin:0px">              <span style="color:rgb(106,62,62)">started</span>.add(<span style="color:rgb(106,62,62)">n</span>);</p><p style="margin:0px">              <span style="color:rgb(127,0,85);font-weight:bold">try</span> {</p><p style="margin:0px">                Thread.<span style="font-style:italic">sleep</span>(<span style="color:rgb(106,62,62)">n</span> * 1000);</p><p style="margin:0px">              } <span style="color:rgb(127,0,85);font-weight:bold">catch</span> (InterruptedException <span style="color:rgb(106,62,62)">e</span>) {</p><p style="margin:0px">                <span style="color:rgb(106,62,62)">interrupted</span>.add(<span style="color:rgb(106,62,62)">n</span>);</p><p style="margin:0px">              }</p><p style="margin:0px">              <span style="color:rgb(127,0,85);font-weight:bold">return</span> <span style="color:rgb(106,62,62)">n</span>;</p><p style="margin:0px">            }))</p><p style="margin:0px">            .peek(<span style="color:rgb(106,62,62)">n</span> -> {</p><p style="margin:0px">              <span style="color:rgb(127,0,85);font-weight:bold">throw</span> <span style="color:rgb(127,0,85);font-weight:bold">new</span> IllegalArgumentException(String.<span style="font-style:italic">valueOf</span>(<span style="color:rgb(106,62,62)">n</span>));</p><p style="margin:0px">            })</p><p style="margin:0px">            .findAny());</p><p style="margin:0px">    <span style="font-style:italic">assertThat</span>(<span style="color:rgb(106,62,62)">started</span>).containsExactly(10, 1, 3);</p><p style="margin:0px">    <span style="font-style:italic">assertThat</span>(<span style="color:rgb(106,62,62)">interrupted</span>).containsExactly(3, 10);</p><p style="margin:0px">    <span style="font-style:italic">assertThat</span>(<span style="text-decoration-line:underline;text-decoration-style:solid;text-decoration-color:rgb(0,102,204);color:rgb(0,102,204)">thrown</span>).hasMessageThat().isEqualTo(<span style="color:rgb(42,0,255)">"1"</span>);</p><p style="margin:0px">  }</p></div></div></div><div><br></div><div><br></div><div>Both with maxConcurrenc=3.</div><div><br></div><div>When the downstream peek() throws on the first element "1" it gets, it will interrupt (and join) the other two pending threads (3 and 10).</div><div>The interrupted ConcurrentLinkedQueue is guaranteed to see [3, 10] because of the happens-before guarantee.</div><div><br></div><div>When the upstream peek() throws on 3, [1, 10] are also already running concurrently. But no thread is interrupted.</div><div><br></div><div><br></div></div><br><div class="gmail_quote"><div dir="ltr" class="gmail_attr">On Thu, Jul 3, 2025 at 10:29 AM David Alayachew <<a href="mailto:davidalayachew@gmail.com" target="_blank">davidalayachew@gmail.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div dir="auto">These questions necessitate runnable examples. Do you have any</div><br><div class="gmail_quote"><div dir="ltr" class="gmail_attr">On Thu, Jul 3, 2025, 10:37 AM Jige Yu <<a href="mailto:yujige@gmail.com" rel="noreferrer" target="_blank">yujige@gmail.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div dir="ltr"><p>Hi JDK Core Devs,</p><p>I'm writing to you today with a question about the behavior of <code>mapConcurrent()</code> and its interaction with unchecked exceptions. I've been experimenting with the API and observed that <code>mapConcurrent()</code> blocks and joins all virtual threads upon an unchecked exception before propagating it.</p><p>Initially, I thought this design choice might provide a strong happens-before guarantee. My assumption was that an application catching a <code>RuntimeException</code> would be able to <b>observe all side effects</b> from the virtual threads, even though this practice is generally discouraged. This seemed like a potentially significant advantage, outweighing the risk of a virtual thread failing to respond to interruption or responding slowly.</p><p>However, I've since realized that <code>mapConcurrent()</code> cannot fully guarantee a strong happens-before relationship when an unchecked exception occurs <i>somewhere</i> in the stream pipeline. While it can block and wait for exceptions thrown by the mapper function or downstream operations, it appears unable to intercept unchecked exceptions <b>thrown by an upstream</b> source.</p><p>Consider a scenario with two input elements: if the first element starts a virtual thread, and then the second element causes an unchecked exception from the upstream <i>before</i> reaching the <code>gather()</code> call, the virtual thread initiated by the first element would not be interrupted. This makes the "happens-before" guarantee quite nuanced in practice.</p><p>This brings me to my core questions:</p><ol start="1"><li><p>Is providing a happens-before guarantee upon an unchecked exception a design goal for <code>mapConcurrent()</code>?</p></li><li><p>If not, would it be more desirable to <i>not</i> join on virtual threads when unchecked exceptions occur? This would allow the application code to catch the exception sooner and avoid the risk of being blocked indefinitely.</p></li></ol><p>Thank you for your time and insights.</p><p>Best regards,</p><p>Ben Yu</p></div>
</blockquote></div>
</blockquote></div>
</blockquote></div>