Structured Concurrency yet again
forax at univ-mlv.fr
forax at univ-mlv.fr
Tue May 9 22:42:46 UTC 2023
> From: "Brian Goetz" <brian.goetz at oracle.com>
> To: "Remi Forax" <forax at univ-mlv.fr>, "loom-dev" <loom-dev at openjdk.java.net>
> Sent: Tuesday, May 9, 2023 10:38:35 PM
> Subject: Re: Structured Concurrency yet again
> I would suggest to start with what problem you are trying to solve, rather than
> a proposed competing solution; otherwise, it is hard to compare one solution
> with another.
> For example, one problem you could be addressing here is "Future sucks". But
> that doesn't require a whole new API, it only requires asking "is Future the
> best thing for fork to return, or should we define another abstraction?" (As it
> turns out, the updated proposal Ron and Alan have does exactly that.)
yes, Future does not suck, it's more oriented as something to store result of a pure computation than as something able to run IO/Network operations.
> Are there other problems you are trying to solve here, that the incubating JEP
> does not do well enough ?
See my message to Ron.
regards,
Rémi
> On 5/9/2023 12:33 PM, Remi Forax wrote:
>> I think we should restart the work on the structured concurrency API, here is a
>> proposal.
>> If Java had async/await keyword, computing the sum of two asynchronous call, it
>> will be something like
>> int value = await async () -> {
>> Thread.sleep(100);
>> return 10;
>> };
>> int value2 = await async () -> {
>> Thread.sleep(300);
>> return 30;
>> };
>> assertEquals(40, value + value2);
>> Apart of the colored function problem, another problem of this API is that the
>> asynchronous calls are executed sequentially and not at the same time.
>> If we use the Executor/Future API instead, the calls are called in parallel
>> var executor = ...
>> var future = executor.submit(() -> {
>> Thread.sleep(100);
>> return 10;
>> });
>> var future2 = executor.submit(() -> {
>> Thread.sleep(300);
>> return 30;
>> });
>> int value = future.get();
>> int value2 = future2.get();
>> assertEquals(40, value + value2);
>> But there are several other issues with this API
>> - if "future.get()" throws an exception, future2.get() is never called so the
>> second task is now a runaway task. It should be cancelled.
>> - submit returns a Future which is only parameterized by the type of the result,
>> all exceptions are stored as a Throwable, there is no exception transparency
>> (something the async/await approach provides).
>> - A future has 4 states, RUNNING, SUCCESS, FAILED and CANCELLED, but as a user
>> we are only need to know if the computation succeed or failed. We should not be
>> able to access to the state of the computation if the computation is not
>> finished (Future.resultNow()/exceptionnNow() already does that) and a cancelled
>> task should be a sub-category of a failed task.
>> To solve the first problem, we introduce a try-with-resources that guarantees
>> that all asynchronous tasks can not escape the scope and a method awaitAll()
>> that guarantee that all computations are finished at that point.
>> (If you wonder why we need both, see later).
>> To solve the two other problems, we need to replace the Callable/Future pair by
>> a functional interface Computation and a class AsyncTask that correctly
>> propagates the exceptions and provides a simpler API, with only two methods,
>> getNow() that either return the result or throw the exception and result() that
>> provides a result object that either represent a SUCCESS with a value or FAILED
>> with an exception.
>> public interface Computation<R, E extends Exception> {
>> /**
>> * Compute the computation.
>> * @return a result
>> * @throws E an exception
>> * @throws InterruptedException if the computation is interrupted
>> */
>> R compute() throws E, InterruptedException;
>> }
>> public interface AsyncTask<R, E extends Exception> {
>> /**
>> * Returns a result object corresponding to the computation if the computation is
>> done.
>> * @return a result object corresponding to the computation if the computation is
>> done.
>> * @throws IllegalStateException if the computation is not done.
>> */
>> Result<R, E> result();
>> /**
>> * Returns the value of the computation
>> * @return the value of the computation
>> * @throws E the exception thrown by the computation
>> * @throws InterruptedException if the task was cancelled.
>> * @throws IllegalStateException if the computation is not done.
>> */
>> R getNow() throws E, InterruptedException;
>> }
>> public static final class Result<R, E extends Exception> {
>> public enum State {
>> /**
>> * if the computation succeed.
>> */
>> SUCCESS,
>> /**
>> * If the computation failed because an exception is thrown
>> */
>> FAILED
>> }
>> /**
>> * Returns the state of the result.
>> * @return the state of the result.
>> */
>> public State state() {
>> return state;
>> }
>> /**
>> * Returns the result of the computation.
>> * @throws IllegalStateException if the state is not {@link State#SUCCESS}.
>> * @return the result of the computation.
>> */
>> public R result() {
>> if (state != State.SUCCESS) {
>> throw new IllegalStateException("state not a success");
>> }
>> return result;
>> }
>> /**
>> * Returns the failure thrown by the computation.
>> * @throws IllegalStateException if the state is not {@link State#FAILED}.
>> * @return the failure thrown by the computation or null if the task has been
>> cancelled.
>> */
>> public E failure() {
>> if (state != State.FAILED) {
>> throw new IllegalStateException("state not a failure");
>> }
>> return failure;
>> }
>> ...
>> }
>> If we put everything together, we get the following code
>> try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>> AsyncTask<Integer, RuntimeException> task = scope.async(() -> {
>> Thread.sleep(100);
>> return 10;
>> });
>> var task2 = scope.async(() -> {
>> Thread.sleep(300);
>> return 30;
>> });
>> scope.awaitAll();
>> int value = task.getNow();
>> int value2 = task2.getNow();
>> assertEquals(40, value + value2);
>> }
>> We may also want the results as a stream, for that we introduce a method await()
>> that takes a function that take a stream of the results and return a value.
>> Using a stream is interesting because it's a way to abstract a loop over the
>> results at the same time the results are produced, so if the stream is
>> short-circuited, we do not need to wait all results to be computed and the
>> results are ordered by completion order.
>> This API is far better than the current StructuredTaskScope.handleComplete()
>> which requires to subclass StructuredTaskScope and deal with the concurrency if
>> you want to get access to the future in completion order.
>> But let starts, with a simple example, we can gather all results using
>> Stream.toList().
>> try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>> var task = scope.async(() -> {
>> Thread.sleep(300);
>> return 30;
>> });
>> var task2 = scope.async(() -> {
>> Thread.sleep(100);
>> return 10;
>> });
>> List<Result<Integer, RuntimeException>> results =
>> scope.await(Stream::toList);
>> var sum = 0;
>> for(var result: results) {
>> sum += result.getNow();
>> }
>> assertEquals(40, sum);
>> }
>> We can filter to only keep the success result using Result::keepOnlySuccess
>> which either return a stream with the result value in case of a SUCCESS and an
>> empty stream if the computation FAILED.
>> try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>> var task = scope.async(() -> {
>> Thread.sleep(300);
>> return 30;
>> });
>> var task2 = scope.async(() -> {
>> Thread.sleep(100);
>> return 10;
>> });
>> List<Integer> values =
>> scope.await(stream -> stream.flatMap(Result::keepOnlySuccess).toList());
>> assertEquals(List.of(10, 30), values);
>> }
>> And we can reduce the results into one unique result using Result.merger() which
>> takes a binary function to combine the results (failures are combined using
>> addSuppressed).
>> try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>> var task = scope.async(() -> {
>> Thread.sleep(100);
>> return 10;
>> });
>> var task2 = scope.async(() -> {
>> Thread.sleep(300);
>> return 30;
>> });
>> Result<Integer, RuntimeException> result =
>> scope.await(stream -> stream.reduce(Result.merger(Integer::sum)))
>> .orElseThrow();
>> switch (result.state()) {
>> case SUCCESS -> assertEquals(40, result.result());
>> case FAILED -> fail();
>> }
>> }
>> And we can use Stream.findFirst() (or any other short circuited terminal
>> operations) if we do not need all results of the computations. In that case,
>> the tasks still running will be cancelled asynchronously with the guarantee
>> that all computation have finished when exiting the scope.
>> try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>> var task = scope.async(() -> {
>> Thread.sleep(100);
>> return 10;
>> });
>> var task2 = scope.async(() -> {
>> Thread.sleep(1_000);
>> return 30;
>> });
>> int value = scope
>> .await(stream -> stream.flatMap(Result::keepOnlySuccess).findFirst())
>> .orElseThrow();
>> assertEquals(10, value);
>> assertEquals(10, task.getNow());
>> assertTrue(task2.result().isCancelled());
>> }
>> There are other examples on Github [1].
>> To summarize, an async scope defines a scope for asynchronous calls that will
>> run in parallel by default. the method async() executes an asynchronous
>> computation in a fresh virtual thread and returns a task object that is only
>> available after one of the method await() is called. awaitAll() waits for all
>> computations to complete (normally or abnormally). await(stream -> ...) process
>> the results of computation in completion order and in case of early returns,
>> all still running computations are automatically cancelled.
>> Rémi
>> [1] [
>> https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/structured/AsyncScopeTest.java
>> |
>> https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/structured/AsyncScopeTest.java
>> ]
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20230510/335f18c9/attachment-0001.htm>
More information about the loom-dev
mailing list