API proposal for StructuredTaskScope
Remi Forax
forax at univ-mlv.fr
Thu Jan 5 10:46:06 UTC 2023
Hi all,
this is a proposal of a slightly different API for StructuredTaskScope
The idea is to provide as parameter of the constructor of StructuredTaskScope a "reducer" that get the value | error from each task and try to compute a result from it.
A reducer is quite similar in intent to a stream Collector but it's a different API.
Before deep diving into the details, here are some examples of how to use it.
Like the current API, as task is submitted with fork() but unlike the current API, fork() returns void instead of a Future.
Then there is a method result() that works like join() but also returns the computed result of the tasks by the reducer.
If you ask for a Reducer.toList(), the method result() returns a List of all results (a Result is a record with three components, a state (SUCCEED or FAILED), an element is SUCCEED and a suppressed exception (which itself can has suppressed exceptions) for all the exceptions that you may care or not).
public static void toList() throws InterruptedException {
try(var scope = StructuredTaskScope.of(Reducer.<Integer>toList())) {
scope.fork(() -> 3);
scope.fork(() -> {
throw new IOException();
});
List<Result<Integer>> list = scope.result();
System.out.println(list); // [Result[state=FAILED, element=null, suppressed=java.io.IOException], Result[state=SUCCEED, element=3, suppressed=null]]
}
}
When you run the code above several times, the two results exchange location in the list which is normal.
You may want only one result, for that you have Reducer.max(comparator) that will give you either an Optional.empty() if there is no call to fork(), or a failed result if no results succeed or the maximum result once at least one succeeded result is available
public static void max() throws InterruptedException {
try(var scope = StructuredTaskScope.of(Reducer.max(Integer::compareTo))) {
scope.fork(() -> 3);
scope.fork(() -> {
throw new IOException();
});
scope.fork(() -> 42);
Optional<Result<Integer>> max = scope.result();
System.out.println(max); // Optional[Result[state=SUCCEED, element=42, suppressed=java.io.IOException]]
}
}
When you run this code several times, you always have the same max value.
You may also want the first result that in success, for that you have Reducer.first() that returns Optional.empty if there is no fork(), a failed result if no task succeed or the first result that succeed (the previous suppressed exception are also available for logging if you want).
public static void first() throws InterruptedException {
try(var scope = StructuredTaskScope.of(Reducer.<Integer>first())) {
scope.fork(() -> {
throw new IOException();
});
scope.fork(() -> 3);
scope.fork(() -> 42);
Optional<Result<Integer>> first = scope.result();
System.out.println(first); // Optional[Result[state=SUCCEED, element=3, suppressed=java.io.IOException]]
}
}
When you run this code several times, the returned result can also be 42 and the suppressed exception may or may not be present.
So what is a Reducer, it's a record with two functions, a combiner and a finisher, the combiner combines the different results and the finisher provides a nicer object like java.util.List or java.util.Optional as result at the end.
@FunctionalInterface
public interface Combiner<T, A> {
A apply(A oldValue, Result<T> result, Runnable shouldShutdown);
}
public record Reducer<T, A, V>(Combiner<T, A> combiner, Function<? super A, ? extends V> finisher) {}
The Reducer API is a little bit complex because it hides the fact that several virtual threads may want to combine their results at the same time.
So the combiner function can be called several time with the same result (it's called in the loop that does a CAS).
The API supposes that both functions do not do any side effects, otherwise there will be concurrency issues.
The combiner also have a third parameter that ask to gently stop the computation (by calling shouldShutdown.run()), this interrupts all other threads if a result is good enough.
The prototype is here
https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/reducer/StructuredAsyncScope.java
If you want to see the gritty details :)
regards,
Rémi
More information about the loom-dev
mailing list