[External] : Re: mapConcurrent() with InterruptedException
Jige Yu
yujige at gmail.com
Mon May 26 01:07:45 UTC 2025
Thanks Viktor!
Sorry, I was giving myself more time to read the code, but then I lost
track.
If I'm reading the code right, the current behavior is that if the current
thread is interrupted, the mapConcurrent() would suppress the interruption
until all elements are processed? (It will then restore the interruption
bit)
Doesn't this break fail fast?
For example, I may have a mapConcurrent() called from within structured
concurrency, and that fans out to two operations:
†
scope.fork(() -> doA());
scope.fork(() -> doB()); // calls mapConcurrent() internally
scope.join().throwIfFailed();
If doA() fails, doB() will be cancelled (thread interrupted).
If mapConcurrent() ignores the interruption, and if doB has a long list to
process, it'll continue to consume system resources even when the caller no
longer needs the results, no?
On Fri, Feb 7, 2025 at 2:16 AM Viktor Klang <viktor.klang at oracle.com> wrote:
> >Sorry, did the PR stop using Semaphore?
>
> No, not that PR. See:
> https://github.com/openjdk/jdk/commit/450636ae28b84ded083b6861c6cba85fbf87e16e
>
> The problem with interruption under parallel evaluation is that there is
> no general support for propagation of interruption in CountedCompleters.
> Adding support for such in (at least) GathererOp needs further study before
> contemplating making any changes to mapConcurrent()'s interruption policy.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
>
> ------------------------------
> *From:* Jige Yu <yujige at gmail.com>
> *Sent:* Thursday, 6 February 2025 17:04
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>
> Sorry, did the PR stop using Semaphore?
>
> I had naively thought that mapConcurrent() will keep a buffer of Future of
> all currently-running concurrent tasks (it can be a ConcurrentMap<TaskKey,
> Future> if we don't have to ensure FIFO).
>
> Upon interruption, the main thread can call .cancel(true) on all pending
> Futures; optionally join with the VTs (if we need to block until all VTs
> exit); then propagate exception.
>
> Upon completion, each task just removes itself from the ConcurrentMap.
>
> Just in case it adds anything.
>
>
>
> On Thu, Feb 6, 2025 at 6:47 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> After some more investigation it seems tractable to propagate interruption
> of the caller in sequential mode, but parallel mode will require much
> bigger considerations.
>
> I made a comment to that effect on the JBS issue:
> https://bugs.openjdk.org/browse/JDK-8349462?focusedId=14750017&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14750017
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Viktor Klang <viktor.klang at oracle.com>
> *Sent:* Thursday, 6 February 2025 11:51
> *To:* Jige Yu <yujige at gmail.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>
> I think alignment in behavior between parallel Stream and mapConcurrent in
> terms of how interruptions are handled is a possible path forward.
>
> I decided to close the PR for now as I realized my parallel Stream example
> had misled me regarding its exception throwing, so I'll need to go back and
> refine the solution.
>
> It still seems solvable though.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yujige at gmail.com>
> *Sent:* Wednesday, 5 February 2025 19:20
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>
> Oh good call!
>
> I forgot to check what parallel streams do upon interruption (didn't think
> they do any blocking calls, but at least the main thread must block).
>
> On Wed, Feb 5, 2025 at 8:18 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> Hi Jige,
>
> I opened an issue to track the concern, and I have proposed a change which
> seems to align well with how parallel streams behave under caller thread
> interruption.
>
> I've opened the following PR for review:
> https://github.com/openjdk/jdk/pull/23467
> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$>
>
> If you are able to make a local OpenJDK build with that solution you could
> check if it addresses your use-cases (or not).
>
> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$>
> 8349462: Gatherers.mapConcurrent could support async interrupts by
> viktorklang-ora · Pull Request #23467 · openjdk/jdk
> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$>
> This change is likely going to need some extra verbiage in the spec for
> mapConcurrent, and thus a CSR. This behavior aligns mapConcurrent with how
> parallel streams work in conjunction with interrup...
> github.com
> <https://urldefense.com/v3/__http://github.com__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89DBkMefT$>
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yujige at gmail.com>
> *Sent:* Wednesday, 5 February 2025 16:24
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>
> Thanks Viktor!
>
> I understand the problem.
>
> The main reason I asked is because I want to understand how the core Java
> team thinks of throwing an unchecked exception.
>
> As explained above, I consider losing cancellability a big deal, a deal
> breaker even. And I thought throwing unchecked is more acceptable. Because
> the most common reason the mapConcurrent() VT can be interrupted is due to
> cancellation from a parent mapConcurrent(), or a parent Structured
> Concurrency scope. The cancellation could be either from an organic
> exception, or from the downstream not needing more elements, like maybe due
> to findFirst() already getting an element.
>
> In both cases, since the concurrent operation is already cancelled (result
> ignored), what exception pops up to the top level isn't that big of a deal
> (perhaps only a log record will be seen?)
>
> But if the core Java team considers it a bad idea, I would love to learn
> and adjust.
>
> On Tue, Feb 4, 2025 at 4:41 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> Hi,
>
> The problem is that mapConcurrent cannot throw InterruptedException
> because that is a checked exception, so we cannot clear the interrupted
> flag and throw that exception.
>
> So the updated semantics is to not cut the stream short but instead run to
> completion, restoring the interruption flag.
>
> There exists a couple of alternatives to this approach which I am
> contemplating, but they need to be further explored before I consider
> moving forward with any of them.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yujige at gmail.com>
> *Sent:* Monday, 27 January 2025 17:00
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>
> Thanks Viktor!
>
> It looks like the current fix ignores interruption.
>
> I want to make sure my concern of it defeating cancellation is heard and
> understood.
>
> The scenarios I worry about is for a mapConcurrent() that fans out to
> another method call, which internally calls mapConcurrent() as
> implementation detail.
>
> An example:
>
> List<RefundResponse> refundHelper(transaction) {
> transaction.creditCardAccounts.stream()
> .gather(mapConcurrent(acct -> service.refund(acct))
> .toList();
> }
>
> transactions.stream()
> .gather(mapConcurrent(transaction -> refundHelper(transaction));
>
>
> It seems undesirable that in such a case all the service.refund() calls
> become non cancellable, because the only way the outer mapConcurrent()
> cancels the refundHelper() calls is through Thread.interrupt() the virtual
> threads that call refundHelper(), which would then be disabled by the inner
> mapConcurrent().
>
> Does this example make sense to you? I can further explain if anything
> isn't clear.
>
> But I want to make sure the decision to disable interruption is deliberate
> judgement call that such nested mapConcurrent() is unlikely,or not
> important.
>
> Cheers,
>
>
>
> On Mon, Jan 27, 2025 at 6:11 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> Hi!
>
> Please see: https://github.com/openjdk/jdk/pull/23100
> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23100__;!!ACWV5N9M2RV99hQ!IzQs0G26M7ZGPwJ3YJpCcS0gxi6BjqoBux2T5u0cHud_zb_mHLfiIrASSZiP0ynNgnaAuwuOh__WinK8$>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yujige at gmail.com>
> *Sent:* Sunday, 26 January 2025 23:03
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* [External] : Re: mapConcurrent() with InterruptedException
>
> Checking in on what you've found out, Viktor.
>
> From where we left off, I understand that you were looking at alternatives
> instead of silent truncation?
>
> Have you reached any conclusion?
>
> We touched on disallowing interruption during mapConcurrent(). I still
> have concerns with disabling cancellation, because it basically undoes this
> API note from the javadoc
> <https://cr.openjdk.org/~alanb/sc-20240503/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)>
> :
>
> API Note: In progress tasks will be attempted to be cancelled, on a
> best-effort basis, in situations where the downstream no longer wants to
> receive any more elements.
> In reality, people will use mapConcurrent() to fan out rpcs. Sometimes
> these rpcs are just a single blocking call; yet sometimes they may
> themselves be a Structured Concurrency scope, with 2 or 3 rpcs that
> constitute a single logical operation. Under two conditions, cancellation
> is imho important semantic:
>
> 1. The downstream code uses filter().findFirst(), and when it sees an
> element, it will return and no longer needs the other pending rpcs to
> complete. If cancellation is disabled, these unnecessary rpcs will waste
> system resources.
> 2. One of the rpc throws and the Stream pipeline needs to propagate
> the exception. Again, if the other rpcs cannot be cancelled, we'll have
> many zombie rpcs.
>
> Zombie rpcs may or may not be a deal breaker, depending on the specific
> use case. But for a JDK library, losing cancellation would have a negative
> impact on usability.
>
> My 2c,
>
>
> On Fri, Jan 3, 2025 at 9:18 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> Hi Ben,
>
> Thanks for raising these questions—getting feedback is crucial in the
> Preview stage of features.
>
> I wrote a reply to the Reddit thread so I'll just summarize here:
>
> It is important to note that *mapConcurrent()* is not a part of the
> Structured Concurrency JEPs, so it is not designed to join SC scopes.
>
> I'm currently experimenting with ignoring-but-restoring interrupts on the
> "calling thread" for *mapConcurrent()*, as well as capping
> work-in-progress to *maxConcurrency* (not only capping the concurrency
> but also the amount of completed-but-yet-to-be-pushed work). Both of these
> adjustments should increase predictability of behavior in the face of
> blocking operations with variable delays.
>
> Another adjustment I'm looking at right now is to harden/improve the
> cleanup to wait for concurrent tasks to acknowledge cancellation, so that
> once the finisher is done executing the VTs are known to have terminated.
>
> As for not preserving the encounter order, that would be a completely
> different thing, and I'd encourage you to experiment with that if that
> functionality would be interesting for your use-case(s).
>
> Again, thanks for your feedback!
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of Jige
> Yu <yujige at gmail.com>
> *Sent:* Friday, 3 January 2025 17:53
> *To:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* mapConcurrent() with InterruptedException
>
> Hi Java Experts,
>
> I sent this email incorrectly to loom-dev@ and was told on Reddit that
> core-libs-dev is the right list.
>
> The question is about the behavior of mapConcurrent() when the thread is
> interrupted.
>
> Currently mapConcurrent()'s finisher phase will re-interrupt the thread,
> then stop at whatever element that has already been processed and return.
>
> This strikes me as a surprising behavior, because for example if I'm
> running:
>
> Stream.of(1, 2, 3)
> .gather(mapConcurrent(i -> i * 2))
> .toList()
>
> and the thread is being interrupted, the result could be any of [2], [2,
> 4] or [2, 4, 6].
>
> Since thread interruption is cooperative, there is no guarantee that the
> thread being interrupted will just abort. It's quite possible that it'll
> keep going and then will use for example [2] as the result of doubling the
> list of [1, 2, 3], which is imho incorrect.
>
> In the Reddit
> <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/observations_of_gatherersmapconcurrent/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8M4PjPb7L$> thread,
> someone argued that interruption rarely happens so it's more of a
> theoretical issue. But interruption can easily happen in Structured
> Concurrency or in mapConcurrent() itself if any subtask has failed in order
> to cancel/interrupt the other ongoing tasks.
>
> There had been discussion about alternative strategies:
>
> 1. Don't respond to interruption and just keep running to completion.
> 2. Re-interrupt thread and wrap the InterruptedException in a standard
> unchecked exception (StructuredConcurrencyInterruptedException?).
>
>
> I have concerns with option 1 because it disables cancellation propagation
> when mapConcurrent() itself is used in a subtask of a parent
> mapConcurrent() or in a StructuredConcurrencyScope.
>
> Both equivalent Future-composition async code, or C++'s fiber trees
> support cancellation propagation and imho it's a critical feature or else
> it's possible that a zombie thread is still sending RPCs long after the
> main thread has exited (failed, or falled back to some default action).
>
> My arguments for option 2:
>
> 1. InterruptedException is more error prone than traditional checked
> exceptions for *users* to catch and handle. They can forget to
> re-interrupt the thread. It's so confusing that even seasoned programmers
> may not know they are *supposed to* re-interrupt the thread.
> 2. With Stream API using functional interfaces like Supplier,
> Function, the option of just tacking on "throws IE" isn't available to many
> users.
> 3. With Virtual Threads, it will be more acceptable, or even become
> common to do blocking calls from a stream operation (including but
> exclusive to mapConcurrent()). So the chance users are forced to deal with
> IE will become substantially higher.
> 4. Other APIs such as the Structured Concurrency API have already
> started wrapping system checked exceptions like ExecutionException,
> TimeoutException in unchecked exceptions ( join()
> <https://urldefense.com/v3/__https://download.java.net/java/early_access/loom/docs/api/java.base/java/util/concurrent/StructuredTaskScope.html*join()__;Iw!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MxGG4HzA$> for
> example).
> 5. Imho, exceptions that we'd rather users not catch and handle but
> instead should mostly just propagate up as is, should be unchecked.
>
> There is also a side discussion
> <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/comment/m4z4f8c/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MyZYl02k$> about
> whether mapConcurrent() is better off preserving input order or push to
> downstream as soon as an element is computed. I'd love to discuss that
> topic too but maybe it's better to start a separate thread?
>
> Thank you and cheers!
>
> Ben Yu
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20250525/512a20b9/attachment-0001.htm>
More information about the core-libs-dev
mailing list