Structured Concurrency yet again

forax at univ-mlv.fr forax at univ-mlv.fr
Tue May 9 22:42:46 UTC 2023


> From: "Brian Goetz" <brian.goetz at oracle.com>
> To: "Remi Forax" <forax at univ-mlv.fr>, "loom-dev" <loom-dev at openjdk.java.net>
> Sent: Tuesday, May 9, 2023 10:38:35 PM
> Subject: Re: Structured Concurrency yet again

> I would suggest to start with what problem you are trying to solve, rather than
> a proposed competing solution; otherwise, it is hard to compare one solution
> with another.

> For example, one problem you could be addressing here is "Future sucks". But
> that doesn't require a whole new API, it only requires asking "is Future the
> best thing for fork to return, or should we define another abstraction?" (As it
> turns out, the updated proposal Ron and Alan have does exactly that.)
yes, Future does not suck, it's more oriented as something to store result of a pure computation than as something able to run IO/Network operations. 

> Are there other problems you are trying to solve here, that the incubating JEP
> does not do well enough ?
See my message to Ron. 

regards, 
Rémi 

> On 5/9/2023 12:33 PM, Remi Forax wrote:

>> I think we should restart the work on the structured concurrency API, here is a
>> proposal.

>> If Java had async/await keyword, computing the sum of two asynchronous call, it
>> will be something like

>>   int value = await async () -> {
>>       Thread.sleep(100);
>>       return 10;
>>   };
>>   int value2 = await async () -> {
>>       Thread.sleep(300);
>>       return 30;
>>   };
>>   assertEquals(40, value + value2);

>> Apart of the colored function problem, another problem of this API is that the
>> asynchronous calls are executed sequentially and not at the same time.

>> If we use the Executor/Future API instead, the calls are called in parallel

>>   var executor = ...
>>   var future = executor.submit(() -> {
>>       Thread.sleep(100);
>>       return 10;
>>   });
>>   var future2 = executor.submit(() -> {
>>       Thread.sleep(300);
>>       return 30;
>>   });
>>   int value = future.get();
>>   int value2 = future2.get();
>>   assertEquals(40, value + value2);

>> But there are several other issues with this API
>> - if "future.get()" throws an exception, future2.get() is never called so the
>> second task is now a runaway task. It should be cancelled.
>> - submit returns a Future which is only parameterized by the type of the result,
>> all exceptions are stored as a Throwable, there is no exception transparency
>> (something the async/await approach provides).
>> - A future has 4 states, RUNNING, SUCCESS, FAILED and CANCELLED, but as a user
>> we are only need to know if the computation succeed or failed. We should not be
>> able to access to the state of the computation if the computation is not
>> finished (Future.resultNow()/exceptionnNow() already does that) and a cancelled
>> task should be a sub-category of a failed task.

>> To solve the first problem, we introduce a try-with-resources that guarantees
>> that all asynchronous tasks can not escape the scope and a method awaitAll()
>> that guarantee that all computations are finished at that point.
>> (If you wonder why we need both, see later).

>> To solve the two other problems, we need to replace the Callable/Future pair by
>> a functional interface Computation and a class AsyncTask that correctly
>> propagates the exceptions and provides a simpler API, with only two methods,
>> getNow() that either return the result or throw the exception and result() that
>> provides a result object that either represent a SUCCESS with a value or FAILED
>> with an exception.

>>   public interface Computation<R, E extends Exception> {
>>     /**
>>      * Compute the computation.
>>      * @return a result
>>      * @throws E an exception
>>      * @throws InterruptedException if the computation is interrupted
>>      */
>>     R compute() throws E, InterruptedException;
>>   }

>>   public interface AsyncTask<R, E extends Exception> {
>>     /**
>>     * Returns a result object corresponding to the computation if the computation is
>>      done.
>>     * @return a result object corresponding to the computation if the computation is
>>      done.
>>      * @throws IllegalStateException if the computation is not done.
>>      */
>>     Result<R, E> result();

>>     /**
>>      * Returns the value of the computation
>>      * @return the value of the computation
>>      * @throws E the exception thrown by the computation
>>      * @throws InterruptedException if the task was cancelled.
>>      * @throws IllegalStateException if the computation is not done.
>>      */
>>     R getNow() throws E, InterruptedException;
>>   }

>>   public static final class Result<R, E extends Exception> {
>>     public enum State {
>>       /**
>>        * if the computation succeed.
>>        */
>>       SUCCESS,
>>       /**
>>        * If the computation failed because an exception is thrown
>>        */
>>       FAILED
>>     }

>>     /**
>>      * Returns the state of the result.
>>      * @return the state of the result.
>>      */
>>     public State state() {
>>       return state;
>>     }

>>     /**
>>      * Returns the result of the computation.
>>      * @throws IllegalStateException if the state is not {@link State#SUCCESS}.
>>      * @return the result of the computation.
>>      */
>>     public R result() {
>>       if (state != State.SUCCESS) {
>>         throw new IllegalStateException("state not a success");
>>       }
>>       return result;
>>     }

>>     /**
>>      * Returns the failure thrown by the computation.
>>      * @throws IllegalStateException if the state is not {@link State#FAILED}.
>>     * @return the failure thrown by the computation or null if the task has been
>>      cancelled.
>>      */
>>     public E failure() {
>>       if (state != State.FAILED) {
>>         throw new IllegalStateException("state not a failure");
>>       }
>>       return failure;
>>     }
>>     ...
>>   }

>> If we put everything together, we get the following code

>>   try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>>       AsyncTask<Integer, RuntimeException> task = scope.async(() -> {
>>           Thread.sleep(100);
>>           return 10;
>>       });
>>       var task2 = scope.async(() -> {
>>           Thread.sleep(300);
>>           return 30;
>>       });

>>       scope.awaitAll();

>>       int value = task.getNow();
>>       int value2 = task2.getNow();
>>       assertEquals(40, value + value2);
>>   }

>> We may also want the results as a stream, for that we introduce a method await()
>> that takes a function that take a stream of the results and return a value.
>> Using a stream is interesting because it's a way to abstract a loop over the
>> results at the same time the results are produced, so if the stream is
>> short-circuited, we do not need to wait all results to be computed and the
>> results are ordered by completion order.

>> This API is far better than the current StructuredTaskScope.handleComplete()
>> which requires to subclass StructuredTaskScope and deal with the concurrency if
>> you want to get access to the future in completion order.

>> But let starts, with a simple example, we can gather all results using
>> Stream.toList().

>>   try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>>       var task = scope.async(() -> {
>>           Thread.sleep(300);
>>           return 30;
>>       });
>>       var task2 = scope.async(() -> {
>>           Thread.sleep(100);
>>           return 10;
>>       });

>>       List<Result<Integer, RuntimeException>> results =
>>           scope.await(Stream::toList);
>>       var sum = 0;
>>       for(var result: results) {
>>           sum += result.getNow();
>>       }
>>       assertEquals(40, sum);
>>   }

>> We can filter to only keep the success result using Result::keepOnlySuccess
>> which either return a stream with the result value in case of a SUCCESS and an
>> empty stream if the computation FAILED.

>>   try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>>       var task = scope.async(() -> {
>>           Thread.sleep(300);
>>           return 30;
>>       });
>>       var task2 = scope.async(() -> {
>>           Thread.sleep(100);
>>           return 10;
>>       });

>>       List<Integer> values =
>>           scope.await(stream -> stream.flatMap(Result::keepOnlySuccess).toList());
>>       assertEquals(List.of(10, 30), values);
>>   }

>> And we can reduce the results into one unique result using Result.merger() which
>> takes a binary function to combine the results (failures are combined using
>> addSuppressed).

>>   try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>>       var task = scope.async(() -> {
>>           Thread.sleep(100);
>>           return 10;
>>       });
>>       var task2 = scope.async(() -> {
>>           Thread.sleep(300);
>>           return 30;
>>       });

>>       Result<Integer, RuntimeException> result =
>>           scope.await(stream -> stream.reduce(Result.merger(Integer::sum)))
>>                .orElseThrow();
>>       switch (result.state()) {
>>         case SUCCESS -> assertEquals(40, result.result());
>>         case FAILED -> fail();
>>       }
>>     }

>> And we can use Stream.findFirst() (or any other short circuited terminal
>> operations) if we do not need all results of the computations. In that case,
>> the tasks still running will be cancelled asynchronously with the guarantee
>> that all computation have finished when exiting the scope.

>>   try(var scope = new AsyncScope<Integer, RuntimeException>()) {
>>       var task = scope.async(() -> {
>>           Thread.sleep(100);
>>           return 10;
>>       });
>>       var task2 = scope.async(() -> {
>>           Thread.sleep(1_000);
>>           return 30;
>>       });

>>       int value = scope
>>           .await(stream -> stream.flatMap(Result::keepOnlySuccess).findFirst())
>>           .orElseThrow();
>>       assertEquals(10, value);
>>       assertEquals(10, task.getNow());
>>       assertTrue(task2.result().isCancelled());
>>   }

>> There are other examples on Github [1].

>> To summarize, an async scope defines a scope for asynchronous calls that will
>> run in parallel by default. the method async() executes an asynchronous
>> computation in a fresh virtual thread and returns a task object that is only
>> available after one of the method await() is called. awaitAll() waits for all
>> computations to complete (normally or abnormally). await(stream -> ...) process
>> the results of computation in completion order and in case of early returns,
>> all still running computations are automatically cancelled.

>> Rémi

>> [1] [
>> https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/structured/AsyncScopeTest.java
>> |
>> https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/structured/AsyncScopeTest.java
>> ]
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20230510/335f18c9/attachment-0001.htm>


More information about the loom-dev mailing list