[External] : Re: Structured Concurrency API ?
forax at univ-mlv.fr
forax at univ-mlv.fr
Mon Apr 11 15:37:14 UTC 2022
----- Original Message -----
> From: "Ron Pressler" <ron.pressler at oracle.com>
> To: "Remi Forax" <forax at univ-mlv.fr>
> Cc: "loom-dev" <loom-dev at openjdk.java.net>
> Sent: Monday, April 11, 2022 5:02:11 PM
> Subject: Re: [External] : Re: Structured Concurrency API ?
> I specifically didn’t choose an example with checked exceptions because I didn’t
> want to get into that, but they don’t pose a big problem here. The reason is
> that simply propagating exceptions is not what we want here, and we always need
> to specify what we want to do with them — do we want to abort or to ignore?
I recently change my mind of that subject.
The API should not allow to ignore exceptions, mostly because i want the semantics to be the same as the synchronous case.
When there is a plain old loop, throwing an exception abort the loop. As a user, if you do not want to abort the loop in case of an exception, you add a try/catch.
The same way, if a user want to ignore the exception, he can add a try/catch in the callable.
> Also, since we spawn a thread for each element, not for “tasks”, callables are
> just values, and we could treat them as such. For example:
>
> addresses.stream().__threads().map(address -> address::connect) // this is now a
> stream of Callables
> .flatMap(Callables.runAndStreamSuccessful()).findAny()
>
> This is not a proposal, though; just an idea.
The problem of using flatMap(), i.e. a computation can produce several values is that we are entering into react territories, i.e we need a back pressure mechanism when a threads produces the values fastest than the rest of the stream is able to consume. And also, it's easy to shoot yourself in the foot in term of concurrency because some intermediary operations will be run by the current thread and some will be run by the virtual threads. I think we should stick to the rule, one element -> one virtual thread -> one result.
If you want to return multiple values, either returns a List, so it's clear that the creation of the values are part of the computation run by a virtual thread or the flatMap is executed on the results of the computations by the current thread.
>
> But the most important thing is that there are many choices to make, and we'll
> want to wait and see what you and others come up with in libraries, and what
> ideas become well-liked. Libraries are on the same footing as the JDK here in
> terms of capabilities.
yes, there is no point to rush that API.
>
> Structured programming is still young, and we generally don’t like adopting new
> ideas, but realised we had to do something minimal to help herd lots of
> threads. We’re relatively early adopters in this case despite ourselves, so we
> opted to do something that is simple and at least somewhat familiar on the one
> hand, and yet serves as an introduction to structured programming. As it
> matures and, I hope, becomes popular, we’ll be able to better understand what
> problems are most important, and what solutions prove to work best.
>
> — Ron
Rémi
>
>> On 9 Apr 2022, at 15:21, forax at univ-mlv.fr wrote:
>>
>> ----- Original Message -----
>>> From: "Ron Pressler" <ron.pressler at oracle.com>
>>> To: "Remi Forax" <forax at univ-mlv.fr>
>>> Cc: "loom-dev" <loom-dev at openjdk.java.net>
>>> Sent: Friday, April 8, 2022 9:38:31 PM
>>> Subject: Re: Structured Concurrency API ?
>>
>>>> On 8 Apr 2022, at 20:14, Ron Pressler <ron.pressler at oracle.com> wrote:
>>>>
>>>> I have been toying with a different idea for functional composition of
>>>> structured concurrency.
>>>>
>>>> Stream.of(“abc”, “cde”, “efg”).__threads().filter(s ->
>>>> s.contains(“c”)).findAny()
>>>>
>>>> where __threads() will spawn a new thread for each element in the stream, the
>>>> filter operation will run concurrently in each of the threads, and findAny(),
>>>> will shutdown all remaning threads after finding an element (limit() would
>>>> behave analogously). Operations that require order will join all the threads
>>>> (or as many of them as necessary, to ensure that findFirst() would return “abc”
>>>> and not “cde”), but we could also have an explicit join() method to continue
>>>> the rest of the pipeline on the current thread. Of course, relevant operations
>>>> will probably include some blocking IO.
>>>>
>>>> — Ron
>>>
>>> P.S.
>>>
>>> Overrides of __threads can take a ThreadFactory and/or a timeout.
>>>
>>> I have a working prototype, but the design is still in its infancy. The proposed
>>> StructuredTaskScope API is not intended to be the last word on structured
>>> concurrency; indeed, it is only the first.
>>
>> I've played a little with that idea, first we can not returns a stream directly,
>> otherwise we have to wrap it into a try-with-resources too,
>> something like this:
>>
>> try(var stream = Stream.of(“abc”, “cde”, “efg”).__threads()) {
>> return stream.filter(s -> s.contains(“c”)).findAny()
>> }
>>
>> then there is still the issue with the checked exceptions i spoke about in an
>> sibling email.
>>
>> What is possible is to have one method call delimiting the whole calculation
>> instead of a try-with-resources,
>> one method call obviously is executed by the same thread, so it's less way to be
>> miss used but it becomes perhaps a little too lispy (there a lot of
>> parenthesis).
>>
>> Anyway, here is an example
>>
>> var times = Stream.of(200, 100);
>> var list = AsyncLoops.asyncLoop(times,
>> time -> {
>> Thread.sleep(time);
>> return time;
>> },
>> Stream::toList);
>> System.out.println(list); // [200, 100]
>>
>> The method asyncLoop takes 3 parameters: a stream source, a computation and a
>> mapper (a function that a stream of the results of the computations and return
>> the result of the whole loop).
>> Because the last parameter is a lambda, the stream stay bounded so we can
>> guarantee that all virtual threads do not run anymore at the end of asyncLoop.
>>
>> If we remove the wildcards, the signature of asyncLoop is:
>> public static <V, R, T, E extends Exception> T asyncLoop(Stream<V> source,
>> Computation<V, R, E> computation, Function<Stream<R>, T> mapper) throws E,
>> InterruptedException;
>>
>> By default, the stream of the computation results receive the result in order if
>> the source stream is ordered, and unordered otherwise.
>>
>> One can use source.unordered(), to forget about the ordering, by example:
>> var times = Stream.of(500, 100).unordered();
>> var result = AsyncLoops.asyncLoop(times,
>> time -> {
>> Thread.sleep(time);
>> return time;
>> },
>> Stream::findFirst)
>> .orElseThrow();
>> System.out.println(result); // 100
>>
>> Here, findFirst() will return the result of the first computation to finish, not
>> the first computation in the order of the source stream.
>>
>> And a last example using IO operations, to find the last modified file of a
>> folder
>> Optional<Path> lastModifiedFile;
>> try(var paths = Files.list(Path.of("."))) {
>> record PathAndTime(Path path, FileTime time) {}
>> lastModifiedFile = AsyncLoops.asyncLoop(paths,
>> path -> {
>> var time = Files.getLastModifiedTime(path);
>> return new PathAndTime(path, time);
>> },
>> stream -> stream.max(Comparator.comparing(PathAndTime::time))
>> ).map(PathAndTime::path);
>> }
>> System.out.println(lastModifiedFile);
>>
>>
>> The API is here
>> https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/main/java/fr/umlv/loom/monad/AsyncLoops.java__;!!ACWV5N9M2RV99hQ!eOmKxfx-Gxa_qvunOMOvzZ13_VLAsNzW5vPkLy0nX8SAnmcvdgUefo99cbKh3QTfwQ$
>>
>> and there are some tests here
>> https://urldefense.com/v3/__https://github.com/forax/loom-fiber/blob/master/src/test/java/fr/umlv/loom/monad/AsyncLoopsTest.java__;!!ACWV5N9M2RV99hQ!eOmKxfx-Gxa_qvunOMOvzZ13_VLAsNzW5vPkLy0nX8SAnmcvdgUefo99cbJb7nd2cQ$
>>
> > Rémi
More information about the loom-dev
mailing list