[External] : API proposal for StructuredTaskScope

Ron Pressler ron.pressler at oracle.com
Thu Jan 5 14:37:06 UTC 2023


We have considered such approaches.

One glaring problem is the case of “heterogenous tasks”, i.e. tasks that each return a result of a different type. This is a very common scenario for ShutdownOnFailure.

In the homogenous case, we already have a similar API/concept we can exploit: Stream. I.e. the Stream API could be used for simple and common homogenous structured concurrency operations. This is now being actively explored.

— Ron


> On 5 Jan 2023, at 10:46, Remi Forax <forax at univ-mlv.fr> wrote:
> 
> Hi all,
> this is a proposal of a slightly different API for StructuredTaskScope
> 
> The idea is to provide as parameter of the constructor of StructuredTaskScope a "reducer" that get the value | error from each task and try to compute a result from it.
> A reducer is quite similar in intent to a stream Collector but it's a different API.
> 
> Before deep diving into the details, here are some examples of how to use it.
> Like the current API, as task is submitted with fork() but unlike the current API, fork() returns void instead of a Future.
> Then there is a method result() that works like join() but also returns the computed result of the tasks by the reducer.
> 
> If you ask for a Reducer.toList(), the method result() returns a List of all results (a Result is a record with three components, a state (SUCCEED or FAILED), an element is SUCCEED and a suppressed exception (which itself can has suppressed exceptions) for all the exceptions that you may care or not).
> 
> public static void toList() throws InterruptedException {
>    try(var scope = StructuredTaskScope.of(Reducer.<Integer>toList())) {
>      scope.fork(() -> 3);
>      scope.fork(() -> {
>        throw new IOException();
>      });
> 
>      List<Result<Integer>> list = scope.result();
>      System.out.println(list);  // [Result[state=FAILED, element=null, suppressed=java.io.IOException], Result[state=SUCCEED, element=3, suppressed=null]]
>    }
>  }
> 
> When you run the code above several times, the two results exchange location in the list which is normal.
> 
> You may want only one result, for that you have Reducer.max(comparator) that will give you either an Optional.empty() if there is no call to fork(), or a failed result if no results succeed or the maximum result once at least one succeeded result is available 
> 
> 
>  public static void max() throws InterruptedException {
>    try(var scope = StructuredTaskScope.of(Reducer.max(Integer::compareTo))) {
>      scope.fork(() -> 3);
>      scope.fork(() -> {
>        throw new IOException();
>      });
>      scope.fork(() -> 42);
> 
>      Optional<Result<Integer>> max = scope.result();
>      System.out.println(max);  // Optional[Result[state=SUCCEED, element=42, suppressed=java.io.IOException]]
>    }
>  }
> 
> When you run this code several times, you always have the same max value.
> 
> 
> You may also want the first result that in success, for that you have Reducer.first() that returns Optional.empty if there is no fork(), a failed result if no task succeed or the first result that succeed (the previous suppressed exception are also available for logging if you want).
> 
>  public static void first() throws InterruptedException {
>    try(var scope = StructuredTaskScope.of(Reducer.<Integer>first())) {
>      scope.fork(() -> {
>        throw new IOException();
>      });
>      scope.fork(() -> 3);
>      scope.fork(() -> 42);
> 
>      Optional<Result<Integer>> first = scope.result();
>      System.out.println(first);  // Optional[Result[state=SUCCEED, element=3, suppressed=java.io.IOException]]
>    }
>  }
> 
> When you run this code several times, the returned result can also be 42 and the suppressed exception may or may not be present.
> 
> 
> So what is a Reducer, it's a record with two functions, a combiner and a finisher, the combiner combines the different results and the finisher provides a nicer object like java.util.List or java.util.Optional as result at the end.
> 
>  @FunctionalInterface
>  public interface Combiner<T, A> {
>    A apply(A oldValue, Result<T> result, Runnable shouldShutdown);
>  }
> 
>  public record Reducer<T, A, V>(Combiner<T, A> combiner, Function<? super A, ? extends V> finisher) {}
> 
> The Reducer API is a little bit complex because it hides the fact that several virtual threads may want to combine their results at the same time.
> So the combiner function can be called several time with the same result (it's called in the loop that does a CAS).
> The API supposes that both functions do not do any side effects, otherwise there will be concurrency issues.
> 
> The combiner also have a third parameter that ask to gently stop the computation (by calling shouldShutdown.run()), this interrupts all other threads if a result is good enough.
> 
> The prototype is here
>  https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/reducer/StructuredAsyncScope.java__;!!ACWV5N9M2RV99hQ!M1q6KXpg3ACLN-m4TGq83WRYJcNpU3ycnXg_ndGT9-uJxigOt9bKvfkAH0W3WSbOkN4YJyfT9RHcTKYHhEQ$ 
> 
> If you want to see the gritty details :)
> 
> regards,
> Rémi



More information about the loom-dev mailing list