Structured Concurrency API ?

Remi Forax forax at univ-mlv.fr
Thu Apr 7 17:39:05 UTC 2022


Okay, same concept, it's still a kind of parallel loop but next iteration of the API.

I believe the API can be simplified by not visually separating the part that forks the tasks and the part that collects the results,
while it's conceptually sound, it creates syntactic noise, so let's go back to an API closer to the StructuredTaskScope.

Here is a simple example,

  try(var scope = AsyncScope.<Integer, RuntimeException>of()) {
      scope.fork(() -> 10);
      scope.fork(() -> 20);
      assertEquals(List.of(10, 20), scope.result(Stream::toList));
  }

again, the concept does not change, the scope models a loop so by default it gets the results of the tasks in order,
throwing an exception inside a task, cancels all the other taks and the exception is propagated by result() automatically,
the try-with-resources guarantee that all task computations can not escape the scope.

The scope can be unordered, in that case asking only for one element with findFirst
(or any other shortcut using limit(), takewhile(), etc) cancels all other tasks

  try(var scope = AsyncScope.<Integer, RuntimeException>unordered()) {
      scope.fork(() -> {
          Thread.sleep(200);
          return 10;
      });
      scope.fork(() -> 20);
      assertEquals(20, scope.result(Stream::findFirst).orElseThrow());
    }


Before getting the results as a stream, one can specify a deadline or recover (or wrap) the checked exceptions
raised by the task executions

  List<Integer> list;
  try(var scope = AsyncScope.<Integer, IOException>unordered()) {
      scope.fork(() -> {
          Thread.sleep(200);
          throw new IOException("boom !");
      });
      scope.fork(() -> 666);
      list = scope
          .recover(ioException -> 333)
          .deadline(Instant.now().plus(1, ChronoUnit.SECONDS))
          .result(Stream::toList);
  }
  assertEquals(List.of(666, 333), list);


The API is available here
  https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncScope.java

and tests are available here
  https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/monad/AsyncScopeTest.java


regards,
Rémi

----- Original Message -----
> From: "Remi Forax" <forax at univ-mlv.fr>
> To: "loom-dev" <loom-dev at openjdk.java.net>
> Sent: Wednesday, April 6, 2022 5:49:29 PM
> Subject: Structured Concurrency API ?

> Hi all,
> this is a proposal of an API to enable structured concurrency in Java, this is
> similar to the StructuredTaskScope in its goal but i believe easier to use
> because more high level.
> 
> The idea of structured concurrency is to use the control flow block to delimit
> the asynchronous computation,
> it bonds well with the idea of loom, code in a synchronous way, works in an
> asynchronous way.
> 
> Conceptually, i think there is a missing piece in the StructuredTaskScope API,
> it delimits the computation but fails to control the async computation.
> And now i show my age, i always kind of a like the OpenMP parallel loop
> abstraction,
> i think it's what missing in the current API, there is no notion of loop.
> 
> In Java, we have already an abstraction (we even have several) for a loop, the
> Stream API, it can represent classical ordered loop but also unordered loop
> (because a stream can be parallel) and loop with a shortcut. I believe we
> should try to have an API that combines the try-with-resources for the
> delimitation and a stream to express how the different async computation
> results are combined.
> 
> I also want to get ride of Futures that are too low-level, do not track the type
> of the exception and have too many states to represent the async computation (i
> would like to limit the state of a computation to be either a success or an
> exception, like in the synchronous case).
> 
> I see 3 phases for an API that control asynchronous computations
> - the fork phase, the tasks are spawn
> - the semantics of the whole computation phase, where we define if we want only
> the first result, shutdown on error, a timeout, the handling of exception, etc
> - the collection of the results
> 
> Note that the default semantics should be the same as the synchronous loop, so
> - the result are available in order (like with a for loop).
> - an exception should be propagated and stop the whole computation (so cancel
> all the remaining tasks).
> 
> I propose the AsyncMonad API (i know the name should be changed),
> it works that way
> 
>  int sum;
>  try(var asyncMonad = AsyncMonad.<Integer, RuntimeException>of(forker -> {
>      forker.fork(() -> {
>          System.out.println(Thread.currentThread());
>          Thread.sleep(500);
>          return 500;
>      });
>      forker.fork(() -> {
>          System.out.println(Thread.currentThread());
>          Thread.sleep(100);
>          return 100;
>      });
>    })) {
>        sum = asyncMonad
>          .unordered()
>          .result(stream -> stream.mapToInt(v -> v).sum());
>    }
> 
> When creating the async monad, the static method of() provide a forker that can
> be used to spawn tasks, once this is done, the async monad is created.
> Then the async monad is configured, here with unordered() to indicate that we
> will see the result of the computation out of order (in the order of the tasks
> completion)
> And to finish, the terminal operation result() provides a stream of the results
> and ask a user how the results should be combined, here the results are summed.
> 
> Here are a non exhaustive list of the intermediary operations:
> - unordered() relax the ordering constraints
> - recover(exceptionHandler) the equivalent of a try/catch called when a checked
> exception is raised to either wrap the exception or replace it by a value
> - timeout(deadline) (not yet implemented)
> 
> The API is defined here:
>  https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncMonad.java
> 
> And there are more examples here:
>  https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncMonadMain.java
> 
> regards,
> Rémi


More information about the loom-dev mailing list