Structured Concurrency API ?

forax at univ-mlv.fr forax at univ-mlv.fr
Sat Apr 9 14:21:06 UTC 2022


----- Original Message -----
> From: "Ron Pressler" <ron.pressler at oracle.com>
> To: "Remi Forax" <forax at univ-mlv.fr>
> Cc: "loom-dev" <loom-dev at openjdk.java.net>
> Sent: Friday, April 8, 2022 9:38:31 PM
> Subject: Re: Structured Concurrency API ?

>> On 8 Apr 2022, at 20:14, Ron Pressler <ron.pressler at oracle.com> wrote:
>> 
>> I have been toying with a different idea for functional composition of
>> structured concurrency.
>> 
>>    Stream.of(“abc”, “cde”, “efg”).__threads().filter(s ->
>>    s.contains(“c”)).findAny()
>> 
>> where __threads() will spawn a new thread for each element in the stream, the
>> filter operation will run concurrently in each of the threads, and findAny(),
>> will shutdown all remaning threads after finding an element (limit() would
>> behave analogously). Operations that require order will join all the threads
>> (or as many of them as necessary, to ensure that findFirst() would return “abc”
>> and not “cde”), but we could also have an explicit join() method to continue
>> the rest of the pipeline on the current thread. Of course, relevant operations
>> will probably include some blocking IO.
>> 
>> — Ron
> 
> P.S.
> 
> Overrides of __threads can take a ThreadFactory and/or a timeout.
> 
> I have a working prototype, but the design is still in its infancy. The proposed
> StructuredTaskScope API is not intended to be the last word on structured
> concurrency; indeed, it is only the first.

I've played a little with that idea, first we can not returns a stream directly, otherwise we have to wrap it into a try-with-resources too,
something like this:

  try(var stream = Stream.of(“abc”, “cde”, “efg”).__threads()) {
    return stream.filter(s -> s.contains(“c”)).findAny()
  }

then there is still the issue with the checked exceptions i spoke about in an sibling email.

What is possible is to have one method call delimiting the whole calculation instead of a try-with-resources,
one method call obviously is executed by the same thread, so it's less way to be miss used but it becomes perhaps a little too lispy (there a lot of parenthesis).

Anyway, here is an example

  var times = Stream.of(200, 100);
  var list = AsyncLoops.asyncLoop(times,
      time -> {
          Thread.sleep(time);
          return time;
      },
      Stream::toList);
 System.out.println(list);   // [200, 100]

The method asyncLoop takes 3 parameters: a stream source, a computation and a mapper (a function that a stream of the results of the computations and return the result of the whole loop).
Because the last parameter is a lambda, the stream stay bounded so we can guarantee that all virtual threads do not run anymore at the end of asyncLoop.

If we remove the wildcards, the signature of asyncLoop is:
public static <V, R, T, E extends Exception> T asyncLoop(Stream<V> source, Computation<V, R, E> computation, Function<Stream<R>, T> mapper) throws E, InterruptedException;
   
By default, the stream of the computation results receive the result in order if the source stream is ordered, and unordered otherwise.

One can use source.unordered(), to forget about the ordering, by example:
  var times = Stream.of(500, 100).unordered();
  var result = AsyncLoops.asyncLoop(times,
      time -> {
          Thread.sleep(time);
          return time;
      },
      Stream::findFirst)
   .orElseThrow();
 System.out.println(result); // 100

Here, findFirst() will return the result of the first computation to finish, not the first computation in the order of the source stream.

And a last example using IO operations, to find the last modified file of a folder
  Optional<Path> lastModifiedFile;
  try(var paths = Files.list(Path.of("."))) {
      record PathAndTime(Path path, FileTime time) {}
      lastModifiedFile = AsyncLoops.asyncLoop(paths,
          path -> {
              var time = Files.getLastModifiedTime(path);
             return new PathAndTime(path, time);
          },
          stream -> stream.max(Comparator.comparing(PathAndTime::time))
      ).map(PathAndTime::path);
  }
  System.out.println(lastModifiedFile);


The API is here
  https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncLoops.java

and there are some tests here
  https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/monad/AsyncLoopsTest.java

Rémi


More information about the loom-dev mailing list