Structured Concurrency API ?
Alex Otenko
oleksandr.otenko at gmail.com
Sun Apr 10 09:12:49 UTC 2022
How do these AsyncLoop constructs and __threads() differ from
Stream.parallel()?
Alex
On Sat, 9 Apr 2022, 15:21 , <forax at univ-mlv.fr> wrote:
> ----- Original Message -----
> > From: "Ron Pressler" <ron.pressler at oracle.com>
> > To: "Remi Forax" <forax at univ-mlv.fr>
> > Cc: "loom-dev" <loom-dev at openjdk.java.net>
> > Sent: Friday, April 8, 2022 9:38:31 PM
> > Subject: Re: Structured Concurrency API ?
>
> >> On 8 Apr 2022, at 20:14, Ron Pressler <ron.pressler at oracle.com> wrote:
> >>
> >> I have been toying with a different idea for functional composition of
> >> structured concurrency.
> >>
> >> Stream.of(“abc”, “cde”, “efg”).__threads().filter(s ->
> >> s.contains(“c”)).findAny()
> >>
> >> where __threads() will spawn a new thread for each element in the
> stream, the
> >> filter operation will run concurrently in each of the threads, and
> findAny(),
> >> will shutdown all remaning threads after finding an element (limit()
> would
> >> behave analogously). Operations that require order will join all the
> threads
> >> (or as many of them as necessary, to ensure that findFirst() would
> return “abc”
> >> and not “cde”), but we could also have an explicit join() method to
> continue
> >> the rest of the pipeline on the current thread. Of course, relevant
> operations
> >> will probably include some blocking IO.
> >>
> >> — Ron
> >
> > P.S.
> >
> > Overrides of __threads can take a ThreadFactory and/or a timeout.
> >
> > I have a working prototype, but the design is still in its infancy. The
> proposed
> > StructuredTaskScope API is not intended to be the last word on structured
> > concurrency; indeed, it is only the first.
>
> I've played a little with that idea, first we can not returns a stream
> directly, otherwise we have to wrap it into a try-with-resources too,
> something like this:
>
> try(var stream = Stream.of(“abc”, “cde”, “efg”).__threads()) {
> return stream.filter(s -> s.contains(“c”)).findAny()
> }
>
> then there is still the issue with the checked exceptions i spoke about in
> an sibling email.
>
> What is possible is to have one method call delimiting the whole
> calculation instead of a try-with-resources,
> one method call obviously is executed by the same thread, so it's less way
> to be miss used but it becomes perhaps a little too lispy (there a lot of
> parenthesis).
>
> Anyway, here is an example
>
> var times = Stream.of(200, 100);
> var list = AsyncLoops.asyncLoop(times,
> time -> {
> Thread.sleep(time);
> return time;
> },
> Stream::toList);
> System.out.println(list); // [200, 100]
>
> The method asyncLoop takes 3 parameters: a stream source, a computation
> and a mapper (a function that a stream of the results of the computations
> and return the result of the whole loop).
> Because the last parameter is a lambda, the stream stay bounded so we can
> guarantee that all virtual threads do not run anymore at the end of
> asyncLoop.
>
> If we remove the wildcards, the signature of asyncLoop is:
> public static <V, R, T, E extends Exception> T asyncLoop(Stream<V> source,
> Computation<V, R, E> computation, Function<Stream<R>, T> mapper) throws E,
> InterruptedException;
>
> By default, the stream of the computation results receive the result in
> order if the source stream is ordered, and unordered otherwise.
>
> One can use source.unordered(), to forget about the ordering, by example:
> var times = Stream.of(500, 100).unordered();
> var result = AsyncLoops.asyncLoop(times,
> time -> {
> Thread.sleep(time);
> return time;
> },
> Stream::findFirst)
> .orElseThrow();
> System.out.println(result); // 100
>
> Here, findFirst() will return the result of the first computation to
> finish, not the first computation in the order of the source stream.
>
> And a last example using IO operations, to find the last modified file of
> a folder
> Optional<Path> lastModifiedFile;
> try(var paths = Files.list(Path.of("."))) {
> record PathAndTime(Path path, FileTime time) {}
> lastModifiedFile = AsyncLoops.asyncLoop(paths,
> path -> {
> var time = Files.getLastModifiedTime(path);
> return new PathAndTime(path, time);
> },
> stream -> stream.max(Comparator.comparing(PathAndTime::time))
> ).map(PathAndTime::path);
> }
> System.out.println(lastModifiedFile);
>
>
> The API is here
>
> https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncLoops.java
>
> and there are some tests here
>
> https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/monad/AsyncLoopsTest.java
>
> Rémi
>
More information about the loom-dev
mailing list