Structured Concurrency yet again

Holo The Sage Wolf holo3146 at gmail.com
Wed May 10 06:47:15 UTC 2023


The way to handle these cases is to put the logic after the task ends into
the scope itself.

For example:

class ContinuableScope<T> extends StructuredTaskScope<T, K> {
@FunctionalInterface
    public interface FanLogic<T, K> {
         K get(TaskHandle<T> handle) throws Throwable;
    }
    private final Queue<K> results = new ConcurrentLinkedQueue<>();
    private final Queue<Throwable> exceptions = new
ConcurrentLinkedQueue<>();

   ContinuableScope(FanLogic<T,K> next, ShutdownStrategy<K> strategy) {
       this.next = next;
       this.strategt = strategy;
       super(null, Thread.ofVirtual().factory());
   }

    @Override
    protected void handleComplete(TaskHandle<T> handle) {
        try {
            var nr = next.get(handle);
            if (nr == null) return; // using null to not make this more
complicated in the mail
            if (strategy.testResult(nr, results, exceptions)) {
                 // Kill the scope
                 return;
             }
             results.add(nr);
        } catch(Throwable e) {
             if (strategy.testFailure(e, results, exceptions)) {
                // Kill the scope
                 return;
             }
             exceptions.add(nr);
        }
    }

   public Stream<T> results() {
        super.ensureOwnerAndJoined();
        return results.stream();
    }
    public Stream<Throwable> exceptions() {
        super.ensureOwnerAndJoined();
        return exceptions.stream();
    }
}

---

public void main() {
    var strategy = new ShutdownStrategy<String>(...); // Shut down on
unknown host, and on results length == 3
    var fanAction = (th) -> {
         var r = transform(th);
         if (validate(r)) return r;
         return null; //again, using null for briefity
    }
    try (var scope = new ContinuableScope<Result>(fanAction, strategy)) {
         for (...) {
              scope.fork(() -> call(...));
          }
          scope.join();
          if (scope.results().isEmpty()) {
              throw ...
          }
          // Return an object storing both scope.results() and
scope.exceptions()
     }
}



On Wed, May 10, 2023, 01:39 <forax at univ-mlv.fr> wrote:

> ----- Original Message -----
> > From: "Ron Pressler" <ron.pressler at oracle.com>
> > To: "Remi Forax" <forax at univ-mlv.fr>
> > Cc: "Alan Bateman" <alan.bateman at oracle.com>, "loom-dev" <
> loom-dev at openjdk.java.net>
> > Sent: Tuesday, May 9, 2023 10:33:25 PM
> > Subject: Re: Structured Concurrency yet again
>
> >> On 9 May 2023, at 15:29, forax at univ-mlv.fr wrote:
> >>
> >>
> >> It is interesting to note that we seem to agree that fork() should not
> return a
> >> Future. I can identify two logical consequences.
> >>
> >> Firstly, as mentioned in the JEP, get() or state() should only be
> called after
> >> joinAll(). Calling task.state() on a running computation would throw an
> >> IllegalStateException, making State.RUNNING unnecessary. Furthermore,
> when
> >> scope.shutdown() is invoked, a cancelled task can be in state FAILED or
> >> CANCELLED, as explained in the documentation, which is a pain to deal
> with as a
> >> user. I propose to simplify the design and remove the state CANCELLED
> (perhaps
> >> making it a sub-state of FAILED with a method isCancelled() if we want
> to keep
> >> it). This way, users will have only to deal with two states SUCCESS or
> FAILED.
> >> It is worth noting that existing reactive APIs do not differentiate
> between
> >> CANCELLED and FAILED too.
> >>
> >> The second consequence concerns the TaskHandle.exception() method. It
> should not
> >> return a Throwable, since Callable throws an Exception and storing an
> Error in
> >> a field is not a good practice. Exception handling is a significant
> concern in
> >> Java, and we have the opportunity to provide a transparent way to
> address it
> >> without being constrained by the design of Future. A TaskHandle can be
> >> parameterized by both the result type and the exception type. When
> calling
> >> get(), exceptions are transparently re-spawned. However, this means that
> >> TaskHandle cannot be a Supplier.
> >>
> >> Then I prefer to use a Stream to deal with the different semantics of a
> scope
> >> instead of asking to subclass it and hoping that users will correctly
> manage
> >> the concurrency when overriding handleComplete().
> >>
> >>>
> >>> -Alan.
> >>>
> >>> [1] https://openjdk.org/jeps/8306641
> >>
> >> regards,
> >> Rémi
> >
> > When a policy is used, there is no need for the scope to ever call either
> > `state` or `exception` on subtasks’ handles; these methods are intended
> to be
> > used only by handleComplete (or from the scope only if no policy is
> used, but
> > preferably one should be). As the JEP describes, all exceptions should be
> > handled centrally, as part of the policy, before calling
> TaskHandle.get(), the
> > only TaskHandle method the scope should call when using a policy. Could
> you
> > show what problems you’ve run into with exceptions when policies are
> used ?
>
> Usually when you do something synchronous with some network calls, you mix
> an algorithm (i need at most the first three valid values) with some
> IO/Network specific concerns (i want to stop if the is an
> UnknownHostException (aka DNS error) but an IOException (aka unknown
> network error) is fine).
>
> In term of synchronous code, it's something like
>
>   var list = new ArrayList<Result>();
>   var error = (IOException) null;
>   for(...) {
>     Result result;
>     try {
>       result = synchronousCall(...);
>     } catch(UnknownHostException e) {
>       throw ... (e);
>     } catch(IOException e) {
>       if (error == null) {
>         error = e;  // just record the error
>       }
>       continue;
>     }
>     if (valid(result)) {
>       list.add(result);
>       if (list.size() == 3) {
>         break;
>       }
>     }
>   }
>   if (list.isEmpty()) {
>     throw ... (error);
>   }
>   ...
>
> How can i write this kind of stuff as a reusable policy that centralizes
> all the exceptions ?
>
> >
> > — Ron
>
> Rémi
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20230510/46d44d57/attachment.htm>


More information about the loom-dev mailing list