Structured Concurrency API ?
forax at univ-mlv.fr
forax at univ-mlv.fr
Fri Apr 8 20:40:30 UTC 2022
----- Original Message -----
> From: "Ron Pressler" <ron.pressler at oracle.com>
> To: "Remi Forax" <forax at univ-mlv.fr>
> Cc: "loom-dev" <loom-dev at openjdk.java.net>
> Sent: Friday, April 8, 2022 9:38:31 PM
> Subject: Re: Structured Concurrency API ?
>> On 8 Apr 2022, at 20:14, Ron Pressler <ron.pressler at oracle.com> wrote:
>>
>> I have been toying with a different idea for functional composition of
>> structured concurrency.
>>
>> Stream.of(“abc”, “cde”, “efg”).__threads().filter(s ->
>> s.contains(“c”)).findAny()
>>
>> where __threads() will spawn a new thread for each element in the stream, the
>> filter operation will run concurrently in each of the threads, and findAny(),
>> will shutdown all remaning threads after finding an element (limit() would
>> behave analogously). Operations that require order will join all the threads
>> (or as many of them as necessary, to ensure that findFirst() would return “abc”
>> and not “cde”), but we could also have an explicit join() method to continue
>> the rest of the pipeline on the current thread. Of course, relevant operations
>> will probably include some blocking IO.
so something like
static<T, V> Stream<T> runTasks(Stream<? extends V> values, Function<? super Stream<V>, ? extends Stream<T>> asyncApplier)
>From a stream of values, we start one virtual thread per value, each virtual thread creates a stream, apply the asyncApplier and join all streams into a resulting stream which is returned.
(i'm using a static method here, it can be integrated as an instance method on Stream later)
With your example
runTasks(
Stream.of(“abc”, “cde”, “efg”),
stream -> stream.filter(s.contains("c")
).findAny();
The problem of using the stream API or a stream like API is that there is no way to propagate checked exceptions correctly with the generics we have.
That why stream intermediary operations can not throw checked exception like IOException/InterruptedException.
But the "raison d'etre" of virtual threads is to deal with blocking operations during a computation, otherwise parallel stream are enough.
That's why i've chosen an hybrid approach, defining a computation as a Task/Callable instead as a java.util.Function but use a Stream when dealing with the results of the computations.
>>
>> — Ron
>
> P.S.
>
> Overrides of __threads can take a ThreadFactory and/or a timeout.
>
> I have a working prototype, but the design is still in its infancy. The proposed
> StructuredTaskScope API is not intended to be the last word on structured
> concurrency; indeed, it is only the first.
yes, designing APIs is hard, the first iterations are just here to understand the problems/trade offs.
Rémi
More information about the loom-dev
mailing list