[External] : Re: mapConcurrent() with InterruptedException
Jige Yu
yujige at gmail.com
Wed Feb 5 18:20:37 UTC 2025
Oh good call!
I forgot to check what parallel streams do upon interruption (didn't think
they do any blocking calls, but at least the main thread must block).
On Wed, Feb 5, 2025 at 8:18 AM Viktor Klang <viktor.klang at oracle.com> wrote:
> Hi Jige,
>
> I opened an issue to track the concern, and I have proposed a change which
> seems to align well with how parallel streams behave under caller thread
> interruption.
>
> I've opened the following PR for review:
> https://github.com/openjdk/jdk/pull/23467
>
> If you are able to make a local OpenJDK build with that solution you could
> check if it addresses your use-cases (or not).
> <https://github.com/openjdk/jdk/pull/23467>
> 8349462: Gatherers.mapConcurrent could support async interrupts by
> viktorklang-ora · Pull Request #23467 · openjdk/jdk
> <https://github.com/openjdk/jdk/pull/23467>
> This change is likely going to need some extra verbiage in the spec for
> mapConcurrent, and thus a CSR. This behavior aligns mapConcurrent with how
> parallel streams work in conjunction with interrup...
> github.com
>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yujige at gmail.com>
> *Sent:* Wednesday, 5 February 2025 16:24
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>
> Thanks Viktor!
>
> I understand the problem.
>
> The main reason I asked is because I want to understand how the core Java
> team thinks of throwing an unchecked exception.
>
> As explained above, I consider losing cancellability a big deal, a deal
> breaker even. And I thought throwing unchecked is more acceptable. Because
> the most common reason the mapConcurrent() VT can be interrupted is due to
> cancellation from a parent mapConcurrent(), or a parent Structured
> Concurrency scope. The cancellation could be either from an organic
> exception, or from the downstream not needing more elements, like maybe due
> to findFirst() already getting an element.
>
> In both cases, since the concurrent operation is already cancelled (result
> ignored), what exception pops up to the top level isn't that big of a deal
> (perhaps only a log record will be seen?)
>
> But if the core Java team considers it a bad idea, I would love to learn
> and adjust.
>
> On Tue, Feb 4, 2025 at 4:41 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> Hi,
>
> The problem is that mapConcurrent cannot throw InterruptedException
> because that is a checked exception, so we cannot clear the interrupted
> flag and throw that exception.
>
> So the updated semantics is to not cut the stream short but instead run to
> completion, restoring the interruption flag.
>
> There exists a couple of alternatives to this approach which I am
> contemplating, but they need to be further explored before I consider
> moving forward with any of them.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yujige at gmail.com>
> *Sent:* Monday, 27 January 2025 17:00
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException
>
> Thanks Viktor!
>
> It looks like the current fix ignores interruption.
>
> I want to make sure my concern of it defeating cancellation is heard and
> understood.
>
> The scenarios I worry about is for a mapConcurrent() that fans out to
> another method call, which internally calls mapConcurrent() as
> implementation detail.
>
> An example:
>
> List<RefundResponse> refundHelper(transaction) {
> transaction.creditCardAccounts.stream()
> .gather(mapConcurrent(acct -> service.refund(acct))
> .toList();
> }
>
> transactions.stream()
> .gather(mapConcurrent(transaction -> refundHelper(transaction));
>
>
> It seems undesirable that in such a case all the service.refund() calls
> become non cancellable, because the only way the outer mapConcurrent()
> cancels the refundHelper() calls is through Thread.interrupt() the virtual
> threads that call refundHelper(), which would then be disabled by the inner
> mapConcurrent().
>
> Does this example make sense to you? I can further explain if anything
> isn't clear.
>
> But I want to make sure the decision to disable interruption is deliberate
> judgement call that such nested mapConcurrent() is unlikely,or not
> important.
>
> Cheers,
>
>
>
> On Mon, Jan 27, 2025 at 6:11 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> Hi!
>
> Please see: https://github.com/openjdk/jdk/pull/23100
> <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23100__;!!ACWV5N9M2RV99hQ!IzQs0G26M7ZGPwJ3YJpCcS0gxi6BjqoBux2T5u0cHud_zb_mHLfiIrASSZiP0ynNgnaAuwuOh__WinK8$>
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yujige at gmail.com>
> *Sent:* Sunday, 26 January 2025 23:03
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* [External] : Re: mapConcurrent() with InterruptedException
>
> Checking in on what you've found out, Viktor.
>
> From where we left off, I understand that you were looking at alternatives
> instead of silent truncation?
>
> Have you reached any conclusion?
>
> We touched on disallowing interruption during mapConcurrent(). I still
> have concerns with disabling cancellation, because it basically undoes this
> API note from the javadoc
> <https://cr.openjdk.org/~alanb/sc-20240503/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)>
> :
>
> API Note: In progress tasks will be attempted to be cancelled, on a
> best-effort basis, in situations where the downstream no longer wants to
> receive any more elements.
> In reality, people will use mapConcurrent() to fan out rpcs. Sometimes
> these rpcs are just a single blocking call; yet sometimes they may
> themselves be a Structured Concurrency scope, with 2 or 3 rpcs that
> constitute a single logical operation. Under two conditions, cancellation
> is imho important semantic:
>
> 1. The downstream code uses filter().findFirst(), and when it sees an
> element, it will return and no longer needs the other pending rpcs to
> complete. If cancellation is disabled, these unnecessary rpcs will waste
> system resources.
> 2. One of the rpc throws and the Stream pipeline needs to propagate
> the exception. Again, if the other rpcs cannot be cancelled, we'll have
> many zombie rpcs.
>
> Zombie rpcs may or may not be a deal breaker, depending on the specific
> use case. But for a JDK library, losing cancellation would have a negative
> impact on usability.
>
> My 2c,
>
>
> On Fri, Jan 3, 2025 at 9:18 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> Hi Ben,
>
> Thanks for raising these questions—getting feedback is crucial in the
> Preview stage of features.
>
> I wrote a reply to the Reddit thread so I'll just summarize here:
>
> It is important to note that *mapConcurrent()* is not a part of the
> Structured Concurrency JEPs, so it is not designed to join SC scopes.
>
> I'm currently experimenting with ignoring-but-restoring interrupts on the
> "calling thread" for *mapConcurrent()*, as well as capping
> work-in-progress to *maxConcurrency* (not only capping the concurrency
> but also the amount of completed-but-yet-to-be-pushed work). Both of these
> adjustments should increase predictability of behavior in the face of
> blocking operations with variable delays.
>
> Another adjustment I'm looking at right now is to harden/improve the
> cleanup to wait for concurrent tasks to acknowledge cancellation, so that
> once the finisher is done executing the VTs are known to have terminated.
>
> As for not preserving the encounter order, that would be a completely
> different thing, and I'd encourage you to experiment with that if that
> functionality would be interesting for your use-case(s).
>
> Again, thanks for your feedback!
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of Jige
> Yu <yujige at gmail.com>
> *Sent:* Friday, 3 January 2025 17:53
> *To:* core-libs-dev at openjdk.org <core-libs-dev at openjdk.org>
> *Subject:* mapConcurrent() with InterruptedException
>
> Hi Java Experts,
>
> I sent this email incorrectly to loom-dev@ and was told on Reddit that
> core-libs-dev is the right list.
>
> The question is about the behavior of mapConcurrent() when the thread is
> interrupted.
>
> Currently mapConcurrent()'s finisher phase will re-interrupt the thread,
> then stop at whatever element that has already been processed and return.
>
> This strikes me as a surprising behavior, because for example if I'm
> running:
>
> Stream.of(1, 2, 3)
> .gather(mapConcurrent(i -> i * 2))
> .toList()
>
> and the thread is being interrupted, the result could be any of [2], [2,
> 4] or [2, 4, 6].
>
> Since thread interruption is cooperative, there is no guarantee that the
> thread being interrupted will just abort. It's quite possible that it'll
> keep going and then will use for example [2] as the result of doubling the
> list of [1, 2, 3], which is imho incorrect.
>
> In the Reddit
> <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/observations_of_gatherersmapconcurrent/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8M4PjPb7L$> thread,
> someone argued that interruption rarely happens so it's more of a
> theoretical issue. But interruption can easily happen in Structured
> Concurrency or in mapConcurrent() itself if any subtask has failed in order
> to cancel/interrupt the other ongoing tasks.
>
> There had been discussion about alternative strategies:
>
> 1. Don't respond to interruption and just keep running to completion.
> 2. Re-interrupt thread and wrap the InterruptedException in a standard
> unchecked exception (StructuredConcurrencyInterruptedException?).
>
>
> I have concerns with option 1 because it disables cancellation propagation
> when mapConcurrent() itself is used in a subtask of a parent
> mapConcurrent() or in a StructuredConcurrencyScope.
>
> Both equivalent Future-composition async code, or C++'s fiber trees
> support cancellation propagation and imho it's a critical feature or else
> it's possible that a zombie thread is still sending RPCs long after the
> main thread has exited (failed, or falled back to some default action).
>
> My arguments for option 2:
>
> 1. InterruptedException is more error prone than traditional checked
> exceptions for *users* to catch and handle. They can forget to
> re-interrupt the thread. It's so confusing that even seasoned programmers
> may not know they are *supposed to* re-interrupt the thread.
> 2. With Stream API using functional interfaces like Supplier,
> Function, the option of just tacking on "throws IE" isn't available to many
> users.
> 3. With Virtual Threads, it will be more acceptable, or even become
> common to do blocking calls from a stream operation (including but
> exclusive to mapConcurrent()). So the chance users are forced to deal with
> IE will become substantially higher.
> 4. Other APIs such as the Structured Concurrency API have already
> started wrapping system checked exceptions like ExecutionException,
> TimeoutException in unchecked exceptions ( join()
> <https://urldefense.com/v3/__https://download.java.net/java/early_access/loom/docs/api/java.base/java/util/concurrent/StructuredTaskScope.html*join()__;Iw!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MxGG4HzA$> for
> example).
> 5. Imho, exceptions that we'd rather users not catch and handle but
> instead should mostly just propagate up as is, should be unchecked.
>
> There is also a side discussion
> <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/comment/m4z4f8c/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MyZYl02k$> about
> whether mapConcurrent() is better off preserving input order or push to
> downstream as soon as an element is computed. I'd love to discuss that
> topic too but maybe it's better to start a separate thread?
>
> Thank you and cheers!
>
> Ben Yu
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20250205/d05e1f74/attachment-0001.htm>
More information about the core-libs-dev
mailing list