Loom Evades Exception Handling

Paul Bjorkstrand paul.bjorkstrand at gmail.com
Fri Nov 26 02:35:47 UTC 2021


I could see StructuredExecutor.fork() throwing an IllegalStateException
when called after .join(), but that might unnecessarily preclude running
some tasks in the executor, joining, then reusing the same executor for
another set of tasks. If StructuredExecutor is intended to be cheap to
create, maybe it is the right approach to disallow .fork() after .join().

I may be a bit naive, but I can't think of a way to detect the
join-before-fork situation without the above or some pretty heroic (and
possibly error prone) detection logic in StructuredExecutor.

-Paul


On Thu, Nov 25, 2021 at 6:41 PM Eric Kolotyluk <eric at kolotyluk.net> wrote:

> I have been reflecting more on my previous dysfunctional experiment and
> what led to my dysfunctional thinking...
>
>    1. Having spent too long in idiomatic Scala, with Scala Collections, I
>    am still not familiar with idiomatic Java, where you need to convert
>    Collections to Streams and back to Collections. An Akka/Scala Stream is
> a
>    whole different beast... suffice it to say different idioms have led me
> to
>    being an idiot.
>       - Cache invalidation is an easier problem to solve than naming
>       things... ��
>    2. I thought I was clever using Streams to spawn Tasks. I forgot that
>    clever programming leads to clever problems.
>       - Possibly for(...; ...; ...) or while() is the better model, but
>       neither return a collection as a result.
>       3. I suspect that Rémi is right, that perhaps join() should throw the
>    exception sooner, with better context than waiting until calling
>    Future::resultNow.
>       - However, this may be impossible because it's not until later when
>       the Stream is terminated that Tasks are spawned, so technically Loom
> is
>       doing the right thing, doing the best it can.
>       - Maybe a better Exception message such as "Task has not started yet,
>       and join() has already been called, are you doing things in the wrong
>       order?" Not sure how hard it would be to detect that situation?
>       4. It is frustrating to me that I cannot do with collections, as in
>    Kotlin/Scala, that I must do with Streams.
>       - In Kotlin and Scala, I can just do collection.map{ item ->
>       spawn(item) }, but in Java, I have to convert it to a Stream and
>       remember to terminate the Stream.
>       - At least I know how to use Project Loom from Kotlin, but that has
>       other issues too because Kotlin does not have try-with-resources, so
> you
>       need to hack it
>       - Scala does have try-with-resources, so maybe I should try Loom in
>       Scala... ��
>    5. I know it's not nice to raise problems without solutions, but I
>    really don't have a solution for this little trap I fell into...
>       - Maybe this is a good topic for the next Java Puzzlers book ��
>
> Again, just thinking out loud again...
>
> Cheers, Eric
>
> On Thu, Nov 25, 2021 at 2:48 PM Eric Kolotyluk <eric at kolotyluk.net> wrote:
>
> > Correction, the Stream had not terminated and needed to be terminated
> > before calling join()...
> >
> > Cheers, Eric
> >
> > On Thu, Nov 25, 2021 at 2:46 PM Eric Kolotyluk <eric at kolotyluk.net>
> wrote:
> >
> >> Ahhhhhh..... now I understand the fix...
> >>
> >>         try (var structuredExecutor =
> StructuredExecutor.open("Experiment00", virtualThreadFactory)) {
> >>             var completionHandler = new
> StructuredExecutor.ShutdownOnFailure();
> >>             var futureStream = IntStream.range(0, 15).mapToObj(item -> {
> >>                 System.out.printf("item = %d, Thread ID = %s\n", item,
> Thread.currentThread());
> >>                 return structuredExecutor.fork(() -> {
> >>                     System.out.printf("\ttask = %d, Thread ID = %s\n",
> item, Thread.currentThread());
> >>                     return item;
> >>                 }, completionHandler);
> >>             });
> >>             var futureList = futureStream.toList(); // wait for the
> Stream to finish
> >>             structuredExecutor.join();
> >>             completionHandler.throwIfFailed();
> >>             var completedResults =
> futureList.stream().map(Future::resultNow).toList();
> >>             completedResults.forEach(System.out::println);
> >>             System.out.println("Finished Processing");
> >>         }
> >>         catch  (InterruptedException e) {
> >>             e.printStackTrace();
> >>         } catch (ExecutionException e) {
> >>             e.printStackTrace();
> >>         } catch (IllegalStateException e) {
> >>             e.printStackTrace();
> >>         }
> >>         finally {
> >>             System.out.println("Finished Finally");
> >>         }
> >>
> >>
> >> Thanks so much, Rémi, that was a really subtle point for me... There was
> >> more concurrency going on than I expected...
> >>
> >> We learn more from our failures than our successes...
> >>
> >> Cheers, Eric
> >>
> >>
> >> On Thu, Nov 25, 2021 at 2:25 PM <forax at univ-mlv.fr> wrote:
> >>
> >>>
> >>>
> >>> ------------------------------
> >>>
> >>> *From: *"Eric Kolotyluk" <eric at kolotyluk.net>
> >>> *To: *"Remi Forax" <forax at univ-mlv.fr>
> >>> *Cc: *"loom-dev" <loom-dev at openjdk.java.net>
> >>> *Sent: *Jeudi 25 Novembre 2021 23:16:11
> >>> *Subject: *Re: Loom Evades Exception Handling
> >>>
> >>> Thanks for the suggestion Remi, but that does not fix it, it does
> >>> however result in different Exception Handling...
> >>>
> >>> [image: image.png]
> >>>
> >>> Actually, my concern is more about the stack traces, and messages, I
> >>> cannot really pinpoint the root cause of the exception. It's like
> there is
> >>> missing information about the root cause.
> >>>
> >>> Through another piece of experimental code, I have isolated it to
> >>> something like
> >>>
> >>> structuredExecutor.join();
> >>> completionHandler.throwIfFailed();
> >>> var completedResults = futureResults.map(Future::resultNow).toList();
> >>>
> >>> where the last statement produces
> >>>
> >>> java.lang.IllegalStateException: Task has not completed
> >>> at
> >>>
> java.base/java.util.concurrent.FutureTask.resultNow(FutureTask.java:220)
> >>> at
> >>>
> java.base/java.util.concurrent.StructuredExecutor$FutureImpl.resultNow(StructuredExecutor.java:726)
> >>> at
> >>>
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
> >>> at
> >>> java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
> >>> at
> >>>
> java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
> >>> at
> >>>
> java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:711)
> >>> at
> >>>
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> >>> at
> >>>
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> >>> at
> >>>
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575)
> >>> at
> >>>
> java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> >>> at
> >>>
> java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
> >>> at
> >>>
> java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)
> >>> at
> >>>
> java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)
> >>> at net.kolotyluk.loom.Experiment00.main(Experiment00.java:160)
> >>>
> >>> Which begs the question, how can this throw an exception after the
> >>> previous two statements?
> >>>
> >>>
> >>> Again, a stream delay the calculation so you call fork() when toList is
> >>> called after calling join() not before,
> >>> You have to call toList() before calling throwIfFailed().
> >>>
> >>> For Alan or Ron, fork() should throw an exception if called after
> join()
> >>> ?
> >>>
> >>>
> >>> Cheers, Eric
> >>>
> >>>
> >>> Rémi
> >>>
> >>>
> >>>
> >>> On Thu, Nov 25, 2021 at 1:06 PM Remi Forax <forax at univ-mlv.fr> wrote:
> >>>
> >>>> Hi Eric,
> >>>> getRemoteString() returns a Stream, and a Stream delays the call to
> the
> >>>> intermediary operations to the point where the terminal operation,
> here
> >>>> forEach(), is called.
> >>>> When forEach() is called, the executor is already closed, so forEach()
> >>>> calls map() that calls fork() that throws an IllegalStateException.
> >>>>
> >>>> I see no issue here, if you return a List instead of a Stream, you
> will
> >>>> have the behavior I believe you want.
> >>>>
> >>>> regards,
> >>>> Rémi
> >>>>
> >>>> ----- Original Message -----
> >>>> > From: "Eric Kolotyluk" <eric at kolotyluk.net>
> >>>> > To: "loom-dev" <loom-dev at openjdk.java.net>
> >>>> > Sent: Jeudi 25 Novembre 2021 21:27:04
> >>>> > Subject: Loom Evades Exception Handling
> >>>>
> >>>> > Yes, that subject line is click-bait, but this is not intuitive to
> >>>> me... Am
> >>>> > I just seeing an incomplete implementation of Project Loom, and this
> >>>> will
> >>>> > be resolved in the future?
> >>>> >
> >>>> > public class Experiment06 {
> >>>> >
> >>>> >    public static void main(String args[]) {
> >>>> >        Context.printHeader(Experiment06.class);
> >>>> >
> >>>> >        try {
> >>>> >            getRemoteStrings().forEach(System.out::println);
> >>>> >        } catch (ExperimentException e) {
> >>>> >            e.printStackTrace();
> >>>> >        }
> >>>> >    }
> >>>> >
> >>>> >    static Stream<String> getRemoteStrings() throws
> >>>> ExperimentException {
> >>>> >
> >>>> >        try (var structuredExecutor =
> >>>> StructuredExecutor.open("Experiment06")) {
> >>>> >
> >>>> >            // We want complete results, so we won't tolerate
> failure.
> >>>> >            var completionHandler = new
> >>>> StructuredExecutor.ShutdownOnFailure();
> >>>> >
> >>>> >            var futureResults = IntStream.range(0, 15).mapToObj(item
> >>>> -> {
> >>>> >                try {
> >>>> >                    System.out.printf("item = %d, Thread ID = %s\n",
> >>>> > item, Thread.currentThread());
> >>>> >                    return structuredExecutor.fork(() -> {
> >>>> >                        try {
> >>>> >                            System.out.printf("\ttask = %d, Thread ID
> >>>> > = %s\n", item, Thread.currentThread());
> >>>> >                            return getRemoteString(item, new
> >>>> > URI("https://server1/foobar.com/item"));
> >>>> >                        }
> >>>> >                        catch (Throwable t) {
> >>>> >                            System.out.printf("TASK EXCEPTION
> >>>> > %s\n\t%s\n\n", t.getMessage(), t.getCause());
> >>>> >                            t.printStackTrace();
> >>>> >                            throw t;
> >>>> >                        }
> >>>> >                    }, completionHandler);
> >>>> >                } catch (Throwable t) {
> >>>> >                    System.out.printf("SPAWN EXCEPTION %s\n\t%s\n\n",
> >>>> > t.getMessage(), t.getCause());
> >>>> >                    t.printStackTrace();
> >>>> >                    throw t;
> >>>> >                }
> >>>> >            });
> >>>> >
> >>>> structuredExecutor.joinUntil(Instant.now().plusSeconds(10));
> >>>> >            completionHandler.throwIfFailed();
> >>>> >            return futureResults.map(Future::resultNow);
> >>>> >        }
> >>>> >        catch  (InterruptedException e) {
> >>>> >            e.printStackTrace();
> >>>> >            throw new ExperimentException(e);
> >>>> >        } catch (ExecutionException e) {
> >>>> >            e.printStackTrace();
> >>>> >            throw new ExperimentException(e);
> >>>> >        } catch (TimeoutException e) {
> >>>> >            e.printStackTrace();
> >>>> >            throw new ExperimentException(e);
> >>>> >        } catch (IllegalStateException e) {
> >>>> >            e.printStackTrace();
> >>>> >            throw new ExperimentException(e);
> >>>> >        }
> >>>> >        catch (Throwable t) {
> >>>> >            t.printStackTrace();
> >>>> >            throw new ExperimentException(t);
> >>>> >        }
> >>>> >    }
> >>>> >
> >>>> >    static String getRemoteString(int item, URI from) {
> >>>> >        return "Item %d from %s".formatted(item, from);
> >>>> >    }
> >>>> >
> >>>> >    static class ExperimentException extends Exception {
> >>>> >        ExperimentException(Throwable cause) {
> >>>> >            super(cause);
> >>>> >        }
> >>>> >    }
> >>>> > }
> >>>> >
> >>>> > where I get
> >>>> >
> >>>> > [image: image.png]
> >>>> >
> >>>> >
> >>>> > (Experiment06.java:50) is
> >>>> >
> >>>> > return structuredExecutor.fork(() -> {
> >>>> >
> >>>> > (Experiment06.java:34) is
> >>>> >
> >>>> > getRemoteStrings().forEach(System.out::println);
> >>>> >
> >>>> > Is there some way I can actually find out more from the Exception
> >>>> handling,
> >>>> > or other techniques?
> >>>> >
> >>>> > Cheers, Eric
> >>>>
> >>>
> >>>
>


More information about the loom-dev mailing list