[External] : API proposal for StructuredTaskScope

forax at univ-mlv.fr forax at univ-mlv.fr
Thu Jan 5 15:22:51 UTC 2023


----- Original Message -----
> From: "Ron Pressler" <ron.pressler at oracle.com>
> To: "Remi Forax" <forax at univ-mlv.fr>
> Cc: "loom-dev" <loom-dev at openjdk.java.net>
> Sent: Thursday, January 5, 2023 3:37:06 PM
> Subject: Re: [External] : API proposal for StructuredTaskScope

> We have considered such approaches.
> 
> One glaring problem is the case of “heterogenous tasks”, i.e. tasks that each
> return a result of a different type. This is a very common scenario for
> ShutdownOnFailure.

I believe that using a static factory instead of a constructor (here StructuredTaskScope.of()) solve that pretty well.
If no type is explicitly provided, the inference will default to Object which is exactly what you want in case of a shutdown on failure.

> 
> In the homogenous case, we already have a similar API/concept we can exploit:
> Stream. I.e. the Stream API could be used for simple and common homogenous
> structured concurrency operations. This is now being actively explored.

yes, i've written a prototype of that too.
I maybe wrong but using a Stream requires an intermediary queue, something you don't need if you reduce the task values one at a time.
Also, a stream of tuples (result, exception) is not something that is currently easy to use.
Sure, you can add new Collectors to make it easier to use ... at that point, using a reducer/collector directly starts to become appealing :)

> 
> — Ron

Rémi

> 
> 
>> On 5 Jan 2023, at 10:46, Remi Forax <forax at univ-mlv.fr> wrote:
>> 
>> Hi all,
>> this is a proposal of a slightly different API for StructuredTaskScope
>> 
>> The idea is to provide as parameter of the constructor of StructuredTaskScope a
>> "reducer" that get the value | error from each task and try to compute a result
>> from it.
>> A reducer is quite similar in intent to a stream Collector but it's a different
>> API.
>> 
>> Before deep diving into the details, here are some examples of how to use it.
>> Like the current API, as task is submitted with fork() but unlike the current
>> API, fork() returns void instead of a Future.
>> Then there is a method result() that works like join() but also returns the
>> computed result of the tasks by the reducer.
>> 
>> If you ask for a Reducer.toList(), the method result() returns a List of all
>> results (a Result is a record with three components, a state (SUCCEED or
>> FAILED), an element is SUCCEED and a suppressed exception (which itself can has
>> suppressed exceptions) for all the exceptions that you may care or not).
>> 
>> public static void toList() throws InterruptedException {
>>    try(var scope = StructuredTaskScope.of(Reducer.<Integer>toList())) {
>>      scope.fork(() -> 3);
>>      scope.fork(() -> {
>>        throw new IOException();
>>      });
>> 
>>      List<Result<Integer>> list = scope.result();
>>      System.out.println(list);  // [Result[state=FAILED, element=null,
>>      suppressed=java.io.IOException], Result[state=SUCCEED, element=3,
>>      suppressed=null]]
>>    }
>>  }
>> 
>> When you run the code above several times, the two results exchange location in
>> the list which is normal.
>> 
>> You may want only one result, for that you have Reducer.max(comparator) that
>> will give you either an Optional.empty() if there is no call to fork(), or a
>> failed result if no results succeed or the maximum result once at least one
>> succeeded result is available
>> 
>> 
>>  public static void max() throws InterruptedException {
>>    try(var scope = StructuredTaskScope.of(Reducer.max(Integer::compareTo))) {
>>      scope.fork(() -> 3);
>>      scope.fork(() -> {
>>        throw new IOException();
>>      });
>>      scope.fork(() -> 42);
>> 
>>      Optional<Result<Integer>> max = scope.result();
>>      System.out.println(max);  // Optional[Result[state=SUCCEED, element=42,
>>      suppressed=java.io.IOException]]
>>    }
>>  }
>> 
>> When you run this code several times, you always have the same max value.
>> 
>> 
>> You may also want the first result that in success, for that you have
>> Reducer.first() that returns Optional.empty if there is no fork(), a failed
>> result if no task succeed or the first result that succeed (the previous
>> suppressed exception are also available for logging if you want).
>> 
>>  public static void first() throws InterruptedException {
>>    try(var scope = StructuredTaskScope.of(Reducer.<Integer>first())) {
>>      scope.fork(() -> {
>>        throw new IOException();
>>      });
>>      scope.fork(() -> 3);
>>      scope.fork(() -> 42);
>> 
>>      Optional<Result<Integer>> first = scope.result();
>>      System.out.println(first);  // Optional[Result[state=SUCCEED, element=3,
>>      suppressed=java.io.IOException]]
>>    }
>>  }
>> 
>> When you run this code several times, the returned result can also be 42 and the
>> suppressed exception may or may not be present.
>> 
>> 
>> So what is a Reducer, it's a record with two functions, a combiner and a
>> finisher, the combiner combines the different results and the finisher provides
>> a nicer object like java.util.List or java.util.Optional as result at the end.
>> 
>>  @FunctionalInterface
>>  public interface Combiner<T, A> {
>>    A apply(A oldValue, Result<T> result, Runnable shouldShutdown);
>>  }
>> 
>>  public record Reducer<T, A, V>(Combiner<T, A> combiner, Function<? super A, ?
>>  extends V> finisher) {}
>> 
>> The Reducer API is a little bit complex because it hides the fact that several
>> virtual threads may want to combine their results at the same time.
>> So the combiner function can be called several time with the same result (it's
>> called in the loop that does a CAS).
>> The API supposes that both functions do not do any side effects, otherwise there
>> will be concurrency issues.
>> 
>> The combiner also have a third parameter that ask to gently stop the computation
>> (by calling shouldShutdown.run()), this interrupts all other threads if a
>> result is good enough.
>> 
>> The prototype is here
>>  https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/reducer/StructuredAsyncScope.java__;!!ACWV5N9M2RV99hQ!M1q6KXpg3ACLN-m4TGq83WRYJcNpU3ycnXg_ndGT9-uJxigOt9bKvfkAH0W3WSbOkN4YJyfT9RHcTKYHhEQ$
>> 
>> If you want to see the gritty details :)
>> 
>> regards,
> > Rémi


More information about the loom-dev mailing list