[External] : API proposal for StructuredTaskScope
Ron Pressler
ron.pressler at oracle.com
Thu Jan 5 14:37:06 UTC 2023
We have considered such approaches.
One glaring problem is the case of “heterogenous tasks”, i.e. tasks that each return a result of a different type. This is a very common scenario for ShutdownOnFailure.
In the homogenous case, we already have a similar API/concept we can exploit: Stream. I.e. the Stream API could be used for simple and common homogenous structured concurrency operations. This is now being actively explored.
— Ron
> On 5 Jan 2023, at 10:46, Remi Forax <forax at univ-mlv.fr> wrote:
>
> Hi all,
> this is a proposal of a slightly different API for StructuredTaskScope
>
> The idea is to provide as parameter of the constructor of StructuredTaskScope a "reducer" that get the value | error from each task and try to compute a result from it.
> A reducer is quite similar in intent to a stream Collector but it's a different API.
>
> Before deep diving into the details, here are some examples of how to use it.
> Like the current API, as task is submitted with fork() but unlike the current API, fork() returns void instead of a Future.
> Then there is a method result() that works like join() but also returns the computed result of the tasks by the reducer.
>
> If you ask for a Reducer.toList(), the method result() returns a List of all results (a Result is a record with three components, a state (SUCCEED or FAILED), an element is SUCCEED and a suppressed exception (which itself can has suppressed exceptions) for all the exceptions that you may care or not).
>
> public static void toList() throws InterruptedException {
> try(var scope = StructuredTaskScope.of(Reducer.<Integer>toList())) {
> scope.fork(() -> 3);
> scope.fork(() -> {
> throw new IOException();
> });
>
> List<Result<Integer>> list = scope.result();
> System.out.println(list); // [Result[state=FAILED, element=null, suppressed=java.io.IOException], Result[state=SUCCEED, element=3, suppressed=null]]
> }
> }
>
> When you run the code above several times, the two results exchange location in the list which is normal.
>
> You may want only one result, for that you have Reducer.max(comparator) that will give you either an Optional.empty() if there is no call to fork(), or a failed result if no results succeed or the maximum result once at least one succeeded result is available
>
>
> public static void max() throws InterruptedException {
> try(var scope = StructuredTaskScope.of(Reducer.max(Integer::compareTo))) {
> scope.fork(() -> 3);
> scope.fork(() -> {
> throw new IOException();
> });
> scope.fork(() -> 42);
>
> Optional<Result<Integer>> max = scope.result();
> System.out.println(max); // Optional[Result[state=SUCCEED, element=42, suppressed=java.io.IOException]]
> }
> }
>
> When you run this code several times, you always have the same max value.
>
>
> You may also want the first result that in success, for that you have Reducer.first() that returns Optional.empty if there is no fork(), a failed result if no task succeed or the first result that succeed (the previous suppressed exception are also available for logging if you want).
>
> public static void first() throws InterruptedException {
> try(var scope = StructuredTaskScope.of(Reducer.<Integer>first())) {
> scope.fork(() -> {
> throw new IOException();
> });
> scope.fork(() -> 3);
> scope.fork(() -> 42);
>
> Optional<Result<Integer>> first = scope.result();
> System.out.println(first); // Optional[Result[state=SUCCEED, element=3, suppressed=java.io.IOException]]
> }
> }
>
> When you run this code several times, the returned result can also be 42 and the suppressed exception may or may not be present.
>
>
> So what is a Reducer, it's a record with two functions, a combiner and a finisher, the combiner combines the different results and the finisher provides a nicer object like java.util.List or java.util.Optional as result at the end.
>
> @FunctionalInterface
> public interface Combiner<T, A> {
> A apply(A oldValue, Result<T> result, Runnable shouldShutdown);
> }
>
> public record Reducer<T, A, V>(Combiner<T, A> combiner, Function<? super A, ? extends V> finisher) {}
>
> The Reducer API is a little bit complex because it hides the fact that several virtual threads may want to combine their results at the same time.
> So the combiner function can be called several time with the same result (it's called in the loop that does a CAS).
> The API supposes that both functions do not do any side effects, otherwise there will be concurrency issues.
>
> The combiner also have a third parameter that ask to gently stop the computation (by calling shouldShutdown.run()), this interrupts all other threads if a result is good enough.
>
> The prototype is here
> https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/reducer/StructuredAsyncScope.java__;!!ACWV5N9M2RV99hQ!M1q6KXpg3ACLN-m4TGq83WRYJcNpU3ycnXg_ndGT9-uJxigOt9bKvfkAH0W3WSbOkN4YJyfT9RHcTKYHhEQ$
>
> If you want to see the gritty details :)
>
> regards,
> Rémi
More information about the loom-dev
mailing list