Loom Evades Exception Handling
Eric Kolotyluk
eric at kolotyluk.net
Thu Nov 25 22:48:27 UTC 2021
Correction, the Stream had not terminated and needed to be terminated
before calling join()...
Cheers, Eric
On Thu, Nov 25, 2021 at 2:46 PM Eric Kolotyluk <eric at kolotyluk.net> wrote:
> Ahhhhhh..... now I understand the fix...
>
> try (var structuredExecutor = StructuredExecutor.open("Experiment00", virtualThreadFactory)) {
> var completionHandler = new StructuredExecutor.ShutdownOnFailure();
> var futureStream = IntStream.range(0, 15).mapToObj(item -> {
> System.out.printf("item = %d, Thread ID = %s\n", item, Thread.currentThread());
> return structuredExecutor.fork(() -> {
> System.out.printf("\ttask = %d, Thread ID = %s\n", item, Thread.currentThread());
> return item;
> }, completionHandler);
> });
> var futureList = futureStream.toList(); // wait for the Stream to finish
> structuredExecutor.join();
> completionHandler.throwIfFailed();
> var completedResults = futureList.stream().map(Future::resultNow).toList();
> completedResults.forEach(System.out::println);
> System.out.println("Finished Processing");
> }
> catch (InterruptedException e) {
> e.printStackTrace();
> } catch (ExecutionException e) {
> e.printStackTrace();
> } catch (IllegalStateException e) {
> e.printStackTrace();
> }
> finally {
> System.out.println("Finished Finally");
> }
>
>
> Thanks so much, Rémi, that was a really subtle point for me... There was
> more concurrency going on than I expected...
>
> We learn more from our failures than our successes...
>
> Cheers, Eric
>
>
> On Thu, Nov 25, 2021 at 2:25 PM <forax at univ-mlv.fr> wrote:
>
>>
>>
>> ------------------------------
>>
>> *From: *"Eric Kolotyluk" <eric at kolotyluk.net>
>> *To: *"Remi Forax" <forax at univ-mlv.fr>
>> *Cc: *"loom-dev" <loom-dev at openjdk.java.net>
>> *Sent: *Jeudi 25 Novembre 2021 23:16:11
>> *Subject: *Re: Loom Evades Exception Handling
>>
>> Thanks for the suggestion Remi, but that does not fix it, it does however
>> result in different Exception Handling...
>>
>> [image: image.png]
>>
>> Actually, my concern is more about the stack traces, and messages, I
>> cannot really pinpoint the root cause of the exception. It's like there is
>> missing information about the root cause.
>>
>> Through another piece of experimental code, I have isolated it to
>> something like
>>
>> structuredExecutor.join();
>> completionHandler.throwIfFailed();
>> var completedResults = futureResults.map(Future::resultNow).toList();
>>
>> where the last statement produces
>>
>> java.lang.IllegalStateException: Task has not completed
>> at
>> java.base/java.util.concurrent.FutureTask.resultNow(FutureTask.java:220)
>> at
>> java.base/java.util.concurrent.StructuredExecutor$FutureImpl.resultNow(StructuredExecutor.java:726)
>> at
>> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
>> at java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
>> at
>> java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
>> at
>> java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:711)
>> at
>> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>> at
>> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575)
>> at
>> java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>> at
>> java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
>> at
>> java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)
>> at
>> java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)
>> at net.kolotyluk.loom.Experiment00.main(Experiment00.java:160)
>>
>> Which begs the question, how can this throw an exception after the
>> previous two statements?
>>
>>
>> Again, a stream delay the calculation so you call fork() when toList is
>> called after calling join() not before,
>> You have to call toList() before calling throwIfFailed().
>>
>> For Alan or Ron, fork() should throw an exception if called after join() ?
>>
>>
>> Cheers, Eric
>>
>>
>> Rémi
>>
>>
>>
>> On Thu, Nov 25, 2021 at 1:06 PM Remi Forax <forax at univ-mlv.fr> wrote:
>>
>>> Hi Eric,
>>> getRemoteString() returns a Stream, and a Stream delays the call to the
>>> intermediary operations to the point where the terminal operation, here
>>> forEach(), is called.
>>> When forEach() is called, the executor is already closed, so forEach()
>>> calls map() that calls fork() that throws an IllegalStateException.
>>>
>>> I see no issue here, if you return a List instead of a Stream, you will
>>> have the behavior I believe you want.
>>>
>>> regards,
>>> Rémi
>>>
>>> ----- Original Message -----
>>> > From: "Eric Kolotyluk" <eric at kolotyluk.net>
>>> > To: "loom-dev" <loom-dev at openjdk.java.net>
>>> > Sent: Jeudi 25 Novembre 2021 21:27:04
>>> > Subject: Loom Evades Exception Handling
>>>
>>> > Yes, that subject line is click-bait, but this is not intuitive to
>>> me... Am
>>> > I just seeing an incomplete implementation of Project Loom, and this
>>> will
>>> > be resolved in the future?
>>> >
>>> > public class Experiment06 {
>>> >
>>> > public static void main(String args[]) {
>>> > Context.printHeader(Experiment06.class);
>>> >
>>> > try {
>>> > getRemoteStrings().forEach(System.out::println);
>>> > } catch (ExperimentException e) {
>>> > e.printStackTrace();
>>> > }
>>> > }
>>> >
>>> > static Stream<String> getRemoteStrings() throws ExperimentException
>>> {
>>> >
>>> > try (var structuredExecutor =
>>> StructuredExecutor.open("Experiment06")) {
>>> >
>>> > // We want complete results, so we won't tolerate failure.
>>> > var completionHandler = new
>>> StructuredExecutor.ShutdownOnFailure();
>>> >
>>> > var futureResults = IntStream.range(0, 15).mapToObj(item ->
>>> {
>>> > try {
>>> > System.out.printf("item = %d, Thread ID = %s\n",
>>> > item, Thread.currentThread());
>>> > return structuredExecutor.fork(() -> {
>>> > try {
>>> > System.out.printf("\ttask = %d, Thread ID
>>> > = %s\n", item, Thread.currentThread());
>>> > return getRemoteString(item, new
>>> > URI("https://server1/foobar.com/item"));
>>> > }
>>> > catch (Throwable t) {
>>> > System.out.printf("TASK EXCEPTION
>>> > %s\n\t%s\n\n", t.getMessage(), t.getCause());
>>> > t.printStackTrace();
>>> > throw t;
>>> > }
>>> > }, completionHandler);
>>> > } catch (Throwable t) {
>>> > System.out.printf("SPAWN EXCEPTION %s\n\t%s\n\n",
>>> > t.getMessage(), t.getCause());
>>> > t.printStackTrace();
>>> > throw t;
>>> > }
>>> > });
>>> > structuredExecutor.joinUntil(Instant.now().plusSeconds(10));
>>> > completionHandler.throwIfFailed();
>>> > return futureResults.map(Future::resultNow);
>>> > }
>>> > catch (InterruptedException e) {
>>> > e.printStackTrace();
>>> > throw new ExperimentException(e);
>>> > } catch (ExecutionException e) {
>>> > e.printStackTrace();
>>> > throw new ExperimentException(e);
>>> > } catch (TimeoutException e) {
>>> > e.printStackTrace();
>>> > throw new ExperimentException(e);
>>> > } catch (IllegalStateException e) {
>>> > e.printStackTrace();
>>> > throw new ExperimentException(e);
>>> > }
>>> > catch (Throwable t) {
>>> > t.printStackTrace();
>>> > throw new ExperimentException(t);
>>> > }
>>> > }
>>> >
>>> > static String getRemoteString(int item, URI from) {
>>> > return "Item %d from %s".formatted(item, from);
>>> > }
>>> >
>>> > static class ExperimentException extends Exception {
>>> > ExperimentException(Throwable cause) {
>>> > super(cause);
>>> > }
>>> > }
>>> > }
>>> >
>>> > where I get
>>> >
>>> > [image: image.png]
>>> >
>>> >
>>> > (Experiment06.java:50) is
>>> >
>>> > return structuredExecutor.fork(() -> {
>>> >
>>> > (Experiment06.java:34) is
>>> >
>>> > getRemoteStrings().forEach(System.out::println);
>>> >
>>> > Is there some way I can actually find out more from the Exception
>>> handling,
>>> > or other techniques?
>>> >
>>> > Cheers, Eric
>>>
>>
>>
More information about the loom-dev
mailing list