RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

Remi Forax forax at univ-mlv.fr
Fri Sep 1 13:30:05 UTC 2023


Sorry, the first PR was not in its own branch hence this second one.

regards,
Rémi

----- Original Message -----
> From: "Rémi Forax" <forax at openjdk.org>
> To: "loom-dev" <loom-dev at openjdk.org>
> Sent: Friday, September 1, 2023 3:23:10 PM
> Subject: RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

> This is a rough minimal patch that adds a new subclass of StructuredTaskScope
> named Stremable (better name needed) pushing failed or succceding subtasks into
> a Stream.
> 
> This subclass aim to:
> - make easier for users to use STS without having to override handleCompleted,
> which is called concurrently so hard to get right, at a price of being a little
> less efficient
> - ease the implementation of shortcut semantics like get the first two values,
> get the first value greater than a threshold, etc by auto shutdowning the STS
> once the condition is true
> 
> The Streamable STS adds a new method joinWhen(function) that takes a function
> that takes a Stream and return a value
> 
>  public <U> U joinWhen(Function<? super Stream<Subtask<T>>, ? extends U> mapper)
>  throws InterruptedException {
> 
> When this method is called, the non NONAVAILABLE subtask are pushed into the
> Stream once they have finished until either there is no more subtasks or the
> stream has finished (has been short-circuited). If some tasks are still pending
> because the stream has been short-cirtuited, they are shutdown.
> 
> Here are two examples:
> - get a list of all the values that suceeed
> 
>    try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
>            streamable.fork(() -> {
>                Thread.sleep(200);
>                return 12;
>            });
>            streamable.fork(() -> {
>                Thread.sleep(100);
>                return 17;
>            });
>            List<Integer> list = streamable.joinWhen(stream -> stream.filter(task ->
>            task.state() == State.SUCCESS).map(Subtask::get).toList());
>            System.out.println(list);  // [17, 12]
>        }
> 
> - find the first subtask (that suceed or fail)
> 
>    try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
>            streamable.fork(() -> {
>                Thread.sleep(1_000);
>                return 12;
>            });
>            streamable.fork(() -> {
>                Thread.sleep(100);
>                return 17;
>            });
>            Optional<Subtask<Integer>> first = streamable.joinWhen(Stream::findFirst);
>            System.out.println(first);  // Optional[PlainSubTask[state=SUCCESS, result=17,
>            exception=null]]
>        }
> 
> Internally, handleCompleted post each subtask into a queue which is read by the
> Stream spliterator inside joinWhen.
> 
> The current implementation is not the right one, instead of introducing a new
> method into the Flock that can wait on shutdown, threadCount == 0 and the queue
> has a new subtask, this implementation shutdown the flock early and do not
> implement shutdown() so it only have to check if the number of tasks is zero.
> 
> -------------
> 
> Commit messages:
> - WIP: add a new subclass of StructuredTaskScope that shows the finished
> subtasks as a Stream
> 
> Changes: https://git.openjdk.org/loom/pull/202/files
> Webrev: https://webrevs.openjdk.org/?repo=loom&pr=202&range=00
>  Stats: 226 lines in 1 file changed: 220 ins; 0 del; 6 mod
>  Patch: https://git.openjdk.org/loom/pull/202.diff
>  Fetch: git fetch https://git.openjdk.org/loom.git pull/202/head:pull/202
> 
> PR: https://git.openjdk.org/loom/pull/202


More information about the loom-dev mailing list