Structured Concurrency yet again
Remi Forax
forax at univ-mlv.fr
Tue May 9 16:33:12 UTC 2023
I think we should restart the work on the structured concurrency API, here is a proposal.
If Java had async/await keyword, computing the sum of two asynchronous call, it will be something like
int value = await async () -> {
Thread.sleep(100);
return 10;
};
int value2 = await async () -> {
Thread.sleep(300);
return 30;
};
assertEquals(40, value + value2);
Apart of the colored function problem, another problem of this API is that the asynchronous calls are executed sequentially and not at the same time.
If we use the Executor/Future API instead, the calls are called in parallel
var executor = ...
var future = executor.submit(() -> {
Thread.sleep(100);
return 10;
});
var future2 = executor.submit(() -> {
Thread.sleep(300);
return 30;
});
int value = future.get();
int value2 = future2.get();
assertEquals(40, value + value2);
But there are several other issues with this API
- if "future.get()" throws an exception, future2.get() is never called so the second task is now a runaway task. It should be cancelled.
- submit returns a Future which is only parameterized by the type of the result, all exceptions are stored as a Throwable, there is no exception transparency (something the async/await approach provides).
- A future has 4 states, RUNNING, SUCCESS, FAILED and CANCELLED, but as a user we are only need to know if the computation succeed or failed. We should not be able to access to the state of the computation if the computation is not finished (Future.resultNow()/exceptionnNow() already does that) and a cancelled task should be a sub-category of a failed task.
To solve the first problem, we introduce a try-with-resources that guarantees that all asynchronous tasks can not escape the scope and a method awaitAll() that guarantee that all computations are finished at that point.
(If you wonder why we need both, see later).
To solve the two other problems, we need to replace the Callable/Future pair by a functional interface Computation and a class AsyncTask that correctly propagates the exceptions and provides a simpler API, with only two methods, getNow() that either return the result or throw the exception and result() that provides a result object that either represent a SUCCESS with a value or FAILED with an exception.
public interface Computation<R, E extends Exception> {
/**
* Compute the computation.
* @return a result
* @throws E an exception
* @throws InterruptedException if the computation is interrupted
*/
R compute() throws E, InterruptedException;
}
public interface AsyncTask<R, E extends Exception> {
/**
* Returns a result object corresponding to the computation if the computation is done.
* @return a result object corresponding to the computation if the computation is done.
* @throws IllegalStateException if the computation is not done.
*/
Result<R, E> result();
/**
* Returns the value of the computation
* @return the value of the computation
* @throws E the exception thrown by the computation
* @throws InterruptedException if the task was cancelled.
* @throws IllegalStateException if the computation is not done.
*/
R getNow() throws E, InterruptedException;
}
public static final class Result<R, E extends Exception> {
public enum State {
/**
* if the computation succeed.
*/
SUCCESS,
/**
* If the computation failed because an exception is thrown
*/
FAILED
}
/**
* Returns the state of the result.
* @return the state of the result.
*/
public State state() {
return state;
}
/**
* Returns the result of the computation.
* @throws IllegalStateException if the state is not {@link State#SUCCESS}.
* @return the result of the computation.
*/
public R result() {
if (state != State.SUCCESS) {
throw new IllegalStateException("state not a success");
}
return result;
}
/**
* Returns the failure thrown by the computation.
* @throws IllegalStateException if the state is not {@link State#FAILED}.
* @return the failure thrown by the computation or null if the task has been cancelled.
*/
public E failure() {
if (state != State.FAILED) {
throw new IllegalStateException("state not a failure");
}
return failure;
}
...
}
If we put everything together, we get the following code
try(var scope = new AsyncScope<Integer, RuntimeException>()) {
AsyncTask<Integer, RuntimeException> task = scope.async(() -> {
Thread.sleep(100);
return 10;
});
var task2 = scope.async(() -> {
Thread.sleep(300);
return 30;
});
scope.awaitAll();
int value = task.getNow();
int value2 = task2.getNow();
assertEquals(40, value + value2);
}
We may also want the results as a stream, for that we introduce a method await() that takes a function that take a stream of the results and return a value. Using a stream is interesting because it's a way to abstract a loop over the results at the same time the results are produced, so if the stream is short-circuited, we do not need to wait all results to be computed and the results are ordered by completion order.
This API is far better than the current StructuredTaskScope.handleComplete() which requires to subclass StructuredTaskScope and deal with the concurrency if you want to get access to the future in completion order.
But let starts, with a simple example, we can gather all results using Stream.toList().
try(var scope = new AsyncScope<Integer, RuntimeException>()) {
var task = scope.async(() -> {
Thread.sleep(300);
return 30;
});
var task2 = scope.async(() -> {
Thread.sleep(100);
return 10;
});
List<Result<Integer, RuntimeException>> results =
scope.await(Stream::toList);
var sum = 0;
for(var result: results) {
sum += result.getNow();
}
assertEquals(40, sum);
}
We can filter to only keep the success result using Result::keepOnlySuccess which either return a stream with the result value in case of a SUCCESS and an empty stream if the computation FAILED.
try(var scope = new AsyncScope<Integer, RuntimeException>()) {
var task = scope.async(() -> {
Thread.sleep(300);
return 30;
});
var task2 = scope.async(() -> {
Thread.sleep(100);
return 10;
});
List<Integer> values =
scope.await(stream -> stream.flatMap(Result::keepOnlySuccess).toList());
assertEquals(List.of(10, 30), values);
}
And we can reduce the results into one unique result using Result.merger() which takes a binary function to combine the results (failures are combined using addSuppressed).
try(var scope = new AsyncScope<Integer, RuntimeException>()) {
var task = scope.async(() -> {
Thread.sleep(100);
return 10;
});
var task2 = scope.async(() -> {
Thread.sleep(300);
return 30;
});
Result<Integer, RuntimeException> result =
scope.await(stream -> stream.reduce(Result.merger(Integer::sum)))
.orElseThrow();
switch (result.state()) {
case SUCCESS -> assertEquals(40, result.result());
case FAILED -> fail();
}
}
And we can use Stream.findFirst() (or any other short circuited terminal operations) if we do not need all results of the computations. In that case, the tasks still running will be cancelled asynchronously with the guarantee that all computation have finished when exiting the scope.
try(var scope = new AsyncScope<Integer, RuntimeException>()) {
var task = scope.async(() -> {
Thread.sleep(100);
return 10;
});
var task2 = scope.async(() -> {
Thread.sleep(1_000);
return 30;
});
int value = scope
.await(stream -> stream.flatMap(Result::keepOnlySuccess).findFirst())
.orElseThrow();
assertEquals(10, value);
assertEquals(10, task.getNow());
assertTrue(task2.result().isCancelled());
}
There are other examples on Github [1].
To summarize, an async scope defines a scope for asynchronous calls that will run in parallel by default. the method async() executes an asynchronous computation in a fresh virtual thread and returns a task object that is only available after one of the method await() is called. awaitAll() waits for all computations to complete (normally or abnormally). await(stream -> ...) process the results of computation in completion order and in case of early returns, all still running computations are automatically cancelled.
Rémi
[1] https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/structured/AsyncScopeTest.java
More information about the loom-dev
mailing list