RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

Remi Forax forax at univ-mlv.fr
Fri Sep 1 13:22:26 UTC 2023


----- Original Message -----
> From: "Robert Engels" <rengels at ix.netcom.com>
> To: "Rémi Forax" <forax at openjdk.org>
> Cc: "loom-dev" <loom-dev at openjdk.org>
> Sent: Friday, September 1, 2023 3:17:39 PM
> Subject: Re: RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

> Why use streams for this all?
> 
> Why not add methods like getFirst() and getAll(). Much easier api to work with -
> especially if you want to expose exceptions.

getFirst() and getAll() are already here, there are named ShutdownOnFailre and ShutdownOnSucess,
this third sublass is for the uses cases where the semantics is a little more complex than just getFirst() and getAll(), like the first two (using Stream.limit()).

regards,
Rémi

> 
>> On Sep 1, 2023, at 8:12 AM, Rémi Forax <forax at openjdk.org> wrote:
>> 
>> This is a rough minimal patch that adds a new subclass of StructuredTaskScope
>> named Stremable (better name needed) pushing failed or succceding subtasks into
>> a Stream.
>> 
>> This subclass aim to:
>> - make easier for users to use STS without having to override handleCompleted,
>> which is called concurrently so hard to get right, at a price of being a little
>> less efficient
>> - ease the implementation of shortcut semantics like get the first two values,
>> get the first value greater than a threshold, etc by auto shutdowning the STS
>> once the condition is true
>> 
>> The Streamable STS adds a new method joinWhen(function) that takes a function
>> that takes a Stream and return a value
>> 
>>  public <U> U joinWhen(Function<? super Stream<Subtask<T>>, ? extends U> mapper)
>>  throws InterruptedException {
>> 
>> 
>> When this method is called, the non NONAVAILABLE subtask are pushed into the
>> Stream once they have finished until either there is no more subtasks or the
>> stream has finished (has been short-circuited). If some tasks are still pending
>> because the stream has been short-cirtuited, they are shutdown.
>> 
>> Here are two examples:
>> - get a list of all the values that suceeed
>> 
>>    try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
>>            streamable.fork(() -> {
>>                Thread.sleep(200);
>>                return 12;
>>            });
>>            streamable.fork(() -> {
>>                Thread.sleep(100);
>>                return 17;
>>            });
>>            List<Integer> list = streamable.joinWhen(stream -> stream.filter(task ->
>>            task.state() == State.SUCCESS).map(Subtask::get).toList());
>>            System.out.println(list);  // [17, 12]
>>        }
>> 
>> - find the first subtask (that suceed or fail)
>> 
>>    try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
>>            streamable.fork(() -> {
>>                Thread.sleep(1_000);
>>                return 12;
>>            });
>>            streamable.fork(() -> {
>>                Thread.sleep(100);
>>                return 17;
>>            });
>>            Optional<Subtask<Integer>> first = streamable.joinWhen(Stream::findFirst);
>>            System.out.println(first);  // Optional[PlainSubTask[state=SUCCESS, result=17,
>>            exception=null]]
>>        }
>> 
>> 
>> Internally, handleCompleted  post each subtask into a queue which is read by the
>> Stream spliterator inside joinWhen.
>> 
>> The current implementation is not the right one, instead of introducing a new
>> method into the Flock that can wait on shutdown, threadCount == 0 and the queue
>> has a new subtask, this implementation shutdown the flock early and do not
>> implement shutdown() so it only have to check if the number of tasks is zero.
>> 
>> -------------
>> 
>> Commit messages:
>> - WIP: add a new subclass of StructuredTaskScope that shows the finished
>> subtasks as a Stream
>> 
>> Changes: https://git.openjdk.org/loom/pull/201/files
>> Webrev: https://webrevs.openjdk.org/?repo=loom&pr=201&range=00
>>  Stats: 226 lines in 1 file changed: 220 ins; 0 del; 6 mod
>>  Patch: https://git.openjdk.org/loom/pull/201.diff
>>  Fetch: git fetch https://git.openjdk.org/loom.git pull/201/head:pull/201
>> 
> > PR: https://git.openjdk.org/loom/pull/201


More information about the loom-dev mailing list