[External] : Re: Structured Concurrency API ?

Alex Otenko oleksandr.otenko at gmail.com
Mon Apr 11 14:57:05 UTC 2022


...for finite Streams


Alex

On Mon, 11 Apr 2022, 15:55 Ron Pressler, <ron.pressler at oracle.com> wrote:

> Maybe we could add overloads of Stream.parallel() to subsume this, but as
> things stand, Stream.parallel() will only use as many threads as there are
> *processors*; in these cases, we want as many threads as there are *items*.
>
> — Ron
>
> On 10 Apr 2022, at 10:12, Alex Otenko <oleksandr.otenko at gmail.com> wrote:
>
> How do these AsyncLoop constructs and __threads() differ from
> Stream.parallel()?
>
> Alex
>
> On Sat, 9 Apr 2022, 15:21 , <forax at univ-mlv.fr> wrote:
>
>> ----- Original Message -----
>> > From: "Ron Pressler" <ron.pressler at oracle.com>
>> > To: "Remi Forax" <forax at univ-mlv.fr>
>> > Cc: "loom-dev" <loom-dev at openjdk.java.net>
>> > Sent: Friday, April 8, 2022 9:38:31 PM
>> > Subject: Re: Structured Concurrency API ?
>>
>> >> On 8 Apr 2022, at 20:14, Ron Pressler <ron.pressler at oracle.com> wrote:
>> >>
>> >> I have been toying with a different idea for functional composition of
>> >> structured concurrency.
>> >>
>> >>    Stream.of(“abc”, “cde”, “efg”).__threads().filter(s ->
>> >>    s.contains(“c”)).findAny()
>> >>
>> >> where __threads() will spawn a new thread for each element in the
>> stream, the
>> >> filter operation will run concurrently in each of the threads, and
>> findAny(),
>> >> will shutdown all remaning threads after finding an element (limit()
>> would
>> >> behave analogously). Operations that require order will join all the
>> threads
>> >> (or as many of them as necessary, to ensure that findFirst() would
>> return “abc”
>> >> and not “cde”), but we could also have an explicit join() method to
>> continue
>> >> the rest of the pipeline on the current thread. Of course, relevant
>> operations
>> >> will probably include some blocking IO.
>> >>
>> >> — Ron
>> >
>> > P.S.
>> >
>> > Overrides of __threads can take a ThreadFactory and/or a timeout.
>> >
>> > I have a working prototype, but the design is still in its infancy. The
>> proposed
>> > StructuredTaskScope API is not intended to be the last word on
>> structured
>> > concurrency; indeed, it is only the first.
>>
>> I've played a little with that idea, first we can not returns a stream
>> directly, otherwise we have to wrap it into a try-with-resources too,
>> something like this:
>>
>>   try(var stream = Stream.of(“abc”, “cde”, “efg”).__threads()) {
>>     return stream.filter(s -> s.contains(“c”)).findAny()
>>   }
>>
>> then there is still the issue with the checked exceptions i spoke about
>> in an sibling email.
>>
>> What is possible is to have one method call delimiting the whole
>> calculation instead of a try-with-resources,
>> one method call obviously is executed by the same thread, so it's less
>> way to be miss used but it becomes perhaps a little too lispy (there a lot
>> of parenthesis).
>>
>> Anyway, here is an example
>>
>>   var times = Stream.of(200, 100);
>>   var list = AsyncLoops.asyncLoop(times,
>>       time -> {
>>           Thread.sleep(time);
>>           return time;
>>       },
>>       Stream::toList);
>>  System.out.println(list);   // [200, 100]
>>
>> The method asyncLoop takes 3 parameters: a stream source, a computation
>> and a mapper (a function that a stream of the results of the computations
>> and return the result of the whole loop).
>> Because the last parameter is a lambda, the stream stay bounded so we can
>> guarantee that all virtual threads do not run anymore at the end of
>> asyncLoop.
>>
>> If we remove the wildcards, the signature of asyncLoop is:
>> public static <V, R, T, E extends Exception> T asyncLoop(Stream<V>
>> source, Computation<V, R, E> computation, Function<Stream<R>, T> mapper)
>> throws E, InterruptedException;
>>
>> By default, the stream of the computation results receive the result in
>> order if the source stream is ordered, and unordered otherwise.
>>
>> One can use source.unordered(), to forget about the ordering, by example:
>>   var times = Stream.of(500, 100).unordered();
>>   var result = AsyncLoops.asyncLoop(times,
>>       time -> {
>>           Thread.sleep(time);
>>           return time;
>>       },
>>       Stream::findFirst)
>>    .orElseThrow();
>>  System.out.println(result); // 100
>>
>> Here, findFirst() will return the result of the first computation to
>> finish, not the first computation in the order of the source stream.
>>
>> And a last example using IO operations, to find the last modified file of
>> a folder
>>   Optional<Path> lastModifiedFile;
>>   try(var paths = Files.list(Path.of("."))) {
>>       record PathAndTime(Path path, FileTime time) {}
>>       lastModifiedFile = AsyncLoops.asyncLoop(paths,
>>           path -> {
>>               var time = Files.getLastModifiedTime(path);
>>              return new PathAndTime(path, time);
>>           },
>>           stream -> stream.max(Comparator.comparing(PathAndTime::time))
>>       ).map(PathAndTime::path);
>>   }
>>   System.out.println(lastModifiedFile);
>>
>>
>> The API is here
>>
>> https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncLoops.java
>> <https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncLoops.java__;!!ACWV5N9M2RV99hQ!Y6CZwAFuQmZ8L5BGV7Y1tpg6aZeK7EXBBUSq6L58RDyw1vteqCMK4F9wJEoasrcL6w$>
>>
>> and there are some tests here
>>
>> https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/monad/AsyncLoopsTest.java
>> <https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/monad/AsyncLoopsTest.java__;!!ACWV5N9M2RV99hQ!Y6CZwAFuQmZ8L5BGV7Y1tpg6aZeK7EXBBUSq6L58RDyw1vteqCMK4F9wJEpF0QLeHw$>
>>
>> Rémi
>>
>
>


More information about the loom-dev mailing list