Structured Concurrency API ?
Remi Forax
forax at univ-mlv.fr
Wed Apr 6 15:49:29 UTC 2022
Hi all,
this is a proposal of an API to enable structured concurrency in Java, this is similar to the StructuredTaskScope in its goal but i believe easier to use because more high level.
The idea of structured concurrency is to use the control flow block to delimit the asynchronous computation,
it bonds well with the idea of loom, code in a synchronous way, works in an asynchronous way.
Conceptually, i think there is a missing piece in the StructuredTaskScope API, it delimits the computation but fails to control the async computation.
And now i show my age, i always kind of a like the OpenMP parallel loop abstraction,
i think it's what missing in the current API, there is no notion of loop.
In Java, we have already an abstraction (we even have several) for a loop, the Stream API, it can represent classical ordered loop but also unordered loop (because a stream can be parallel) and loop with a shortcut. I believe we should try to have an API that combines the try-with-resources for the delimitation and a stream to express how the different async computation results are combined.
I also want to get ride of Futures that are too low-level, do not track the type of the exception and have too many states to represent the async computation (i would like to limit the state of a computation to be either a success or an exception, like in the synchronous case).
I see 3 phases for an API that control asynchronous computations
- the fork phase, the tasks are spawn
- the semantics of the whole computation phase, where we define if we want only the first result, shutdown on error, a timeout, the handling of exception, etc
- the collection of the results
Note that the default semantics should be the same as the synchronous loop, so
- the result are available in order (like with a for loop).
- an exception should be propagated and stop the whole computation (so cancel all the remaining tasks).
I propose the AsyncMonad API (i know the name should be changed),
it works that way
int sum;
try(var asyncMonad = AsyncMonad.<Integer, RuntimeException>of(forker -> {
forker.fork(() -> {
System.out.println(Thread.currentThread());
Thread.sleep(500);
return 500;
});
forker.fork(() -> {
System.out.println(Thread.currentThread());
Thread.sleep(100);
return 100;
});
})) {
sum = asyncMonad
.unordered()
.result(stream -> stream.mapToInt(v -> v).sum());
}
When creating the async monad, the static method of() provide a forker that can be used to spawn tasks, once this is done, the async monad is created.
Then the async monad is configured, here with unordered() to indicate that we will see the result of the computation out of order (in the order of the tasks completion)
And to finish, the terminal operation result() provides a stream of the results and ask a user how the results should be combined, here the results are summed.
Here are a non exhaustive list of the intermediary operations:
- unordered() relax the ordering constraints
- recover(exceptionHandler) the equivalent of a try/catch called when a checked exception is raised to either wrap the exception or replace it by a value
- timeout(deadline) (not yet implemented)
The API is defined here:
https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncMonad.java
And there are more examples here:
https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncMonadMain.java
regards,
Rémi
More information about the loom-dev
mailing list