[External] : Re: Structured Concurrency API ?

Ron Pressler ron.pressler at oracle.com
Mon Apr 11 15:02:11 UTC 2022


I specifically didn’t choose an example with checked exceptions because I didn’t want to get into that, but they don’t pose a big problem here. The reason is that simply propagating exceptions is not what we want here, and we always need to specify what we want to do with them — do we want to abort or to ignore? Also, since we spawn a thread for each element, not for “tasks”, callables are just values, and we could treat them as such. For example:

    addresses.stream().__threads().map(address -> address::connect) // this is now a stream of Callables
      .flatMap(Callables.runAndStreamSuccessful()).findAny()

This is not a proposal, though; just an idea.

But the most importand thing is that there are many choices to make, and we'll want to wait and see what you and others come up with in libraries, and what ideas become well-liked. Libraries are on the same footing as the JDK here in terms of capabilities.

Structured programming is still young, and we generally don’t like adopting new ideas, but realised we had to do something minimal to help herd lots of threads. We’re relatively early adopters in this case despite ourselves, so we opted to do something that is simple and at least somewhat familiar on the one hand, and yet serves as an introduction to structured programming. As it matures and, I hope, becomes popular, we’ll be able to better understand what problems are most important, and what solutions prove to work best.

— Ron

> On 9 Apr 2022, at 15:21, forax at univ-mlv.fr wrote:
> 
> ----- Original Message -----
>> From: "Ron Pressler" <ron.pressler at oracle.com>
>> To: "Remi Forax" <forax at univ-mlv.fr>
>> Cc: "loom-dev" <loom-dev at openjdk.java.net>
>> Sent: Friday, April 8, 2022 9:38:31 PM
>> Subject: Re: Structured Concurrency API ?
> 
>>> On 8 Apr 2022, at 20:14, Ron Pressler <ron.pressler at oracle.com> wrote:
>>> 
>>> I have been toying with a different idea for functional composition of
>>> structured concurrency.
>>> 
>>>   Stream.of(“abc”, “cde”, “efg”).__threads().filter(s ->
>>>   s.contains(“c”)).findAny()
>>> 
>>> where __threads() will spawn a new thread for each element in the stream, the
>>> filter operation will run concurrently in each of the threads, and findAny(),
>>> will shutdown all remaning threads after finding an element (limit() would
>>> behave analogously). Operations that require order will join all the threads
>>> (or as many of them as necessary, to ensure that findFirst() would return “abc”
>>> and not “cde”), but we could also have an explicit join() method to continue
>>> the rest of the pipeline on the current thread. Of course, relevant operations
>>> will probably include some blocking IO.
>>> 
>>> — Ron
>> 
>> P.S.
>> 
>> Overrides of __threads can take a ThreadFactory and/or a timeout.
>> 
>> I have a working prototype, but the design is still in its infancy. The proposed
>> StructuredTaskScope API is not intended to be the last word on structured
>> concurrency; indeed, it is only the first.
> 
> I've played a little with that idea, first we can not returns a stream directly, otherwise we have to wrap it into a try-with-resources too,
> something like this:
> 
>  try(var stream = Stream.of(“abc”, “cde”, “efg”).__threads()) {
>    return stream.filter(s -> s.contains(“c”)).findAny()
>  }
> 
> then there is still the issue with the checked exceptions i spoke about in an sibling email.
> 
> What is possible is to have one method call delimiting the whole calculation instead of a try-with-resources,
> one method call obviously is executed by the same thread, so it's less way to be miss used but it becomes perhaps a little too lispy (there a lot of parenthesis).
> 
> Anyway, here is an example
> 
>  var times = Stream.of(200, 100);
>  var list = AsyncLoops.asyncLoop(times,
>      time -> {
>          Thread.sleep(time);
>          return time;
>      },
>      Stream::toList);
> System.out.println(list);   // [200, 100]
> 
> The method asyncLoop takes 3 parameters: a stream source, a computation and a mapper (a function that a stream of the results of the computations and return the result of the whole loop).
> Because the last parameter is a lambda, the stream stay bounded so we can guarantee that all virtual threads do not run anymore at the end of asyncLoop.
> 
> If we remove the wildcards, the signature of asyncLoop is:
> public static <V, R, T, E extends Exception> T asyncLoop(Stream<V> source, Computation<V, R, E> computation, Function<Stream<R>, T> mapper) throws E, InterruptedException;
> 
> By default, the stream of the computation results receive the result in order if the source stream is ordered, and unordered otherwise.
> 
> One can use source.unordered(), to forget about the ordering, by example:
>  var times = Stream.of(500, 100).unordered();
>  var result = AsyncLoops.asyncLoop(times,
>      time -> {
>          Thread.sleep(time);
>          return time;
>      },
>      Stream::findFirst)
>   .orElseThrow();
> System.out.println(result); // 100
> 
> Here, findFirst() will return the result of the first computation to finish, not the first computation in the order of the source stream.
> 
> And a last example using IO operations, to find the last modified file of a folder
>  Optional<Path> lastModifiedFile;
>  try(var paths = Files.list(Path.of("."))) {
>      record PathAndTime(Path path, FileTime time) {}
>      lastModifiedFile = AsyncLoops.asyncLoop(paths,
>          path -> {
>              var time = Files.getLastModifiedTime(path);
>             return new PathAndTime(path, time);
>          },
>          stream -> stream.max(Comparator.comparing(PathAndTime::time))
>      ).map(PathAndTime::path);
>  }
>  System.out.println(lastModifiedFile);
> 
> 
> The API is here
>  https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncLoops.java__;!!ACWV5N9M2RV99hQ!eOmKxfx-Gxa_qvunOMOvzZ13_VLAsNzW5vPkLy0nX8SAnmcvdgUefo99cbKh3QTfwQ$ 
> 
> and there are some tests here
>  https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/monad/AsyncLoopsTest.java__;!!ACWV5N9M2RV99hQ!eOmKxfx-Gxa_qvunOMOvzZ13_VLAsNzW5vPkLy0nX8SAnmcvdgUefo99cbJb7nd2cQ$ 
> 
> Rémi



More information about the loom-dev mailing list