RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream
Remi Forax
forax at univ-mlv.fr
Fri Sep 1 13:30:05 UTC 2023
Sorry, the first PR was not in its own branch hence this second one.
regards,
Rémi
----- Original Message -----
> From: "Rémi Forax" <forax at openjdk.org>
> To: "loom-dev" <loom-dev at openjdk.org>
> Sent: Friday, September 1, 2023 3:23:10 PM
> Subject: RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream
> This is a rough minimal patch that adds a new subclass of StructuredTaskScope
> named Stremable (better name needed) pushing failed or succceding subtasks into
> a Stream.
>
> This subclass aim to:
> - make easier for users to use STS without having to override handleCompleted,
> which is called concurrently so hard to get right, at a price of being a little
> less efficient
> - ease the implementation of shortcut semantics like get the first two values,
> get the first value greater than a threshold, etc by auto shutdowning the STS
> once the condition is true
>
> The Streamable STS adds a new method joinWhen(function) that takes a function
> that takes a Stream and return a value
>
> public <U> U joinWhen(Function<? super Stream<Subtask<T>>, ? extends U> mapper)
> throws InterruptedException {
>
> When this method is called, the non NONAVAILABLE subtask are pushed into the
> Stream once they have finished until either there is no more subtasks or the
> stream has finished (has been short-circuited). If some tasks are still pending
> because the stream has been short-cirtuited, they are shutdown.
>
> Here are two examples:
> - get a list of all the values that suceeed
>
> try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
> streamable.fork(() -> {
> Thread.sleep(200);
> return 12;
> });
> streamable.fork(() -> {
> Thread.sleep(100);
> return 17;
> });
> List<Integer> list = streamable.joinWhen(stream -> stream.filter(task ->
> task.state() == State.SUCCESS).map(Subtask::get).toList());
> System.out.println(list); // [17, 12]
> }
>
> - find the first subtask (that suceed or fail)
>
> try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
> streamable.fork(() -> {
> Thread.sleep(1_000);
> return 12;
> });
> streamable.fork(() -> {
> Thread.sleep(100);
> return 17;
> });
> Optional<Subtask<Integer>> first = streamable.joinWhen(Stream::findFirst);
> System.out.println(first); // Optional[PlainSubTask[state=SUCCESS, result=17,
> exception=null]]
> }
>
> Internally, handleCompleted post each subtask into a queue which is read by the
> Stream spliterator inside joinWhen.
>
> The current implementation is not the right one, instead of introducing a new
> method into the Flock that can wait on shutdown, threadCount == 0 and the queue
> has a new subtask, this implementation shutdown the flock early and do not
> implement shutdown() so it only have to check if the number of tasks is zero.
>
> -------------
>
> Commit messages:
> - WIP: add a new subclass of StructuredTaskScope that shows the finished
> subtasks as a Stream
>
> Changes: https://git.openjdk.org/loom/pull/202/files
> Webrev: https://webrevs.openjdk.org/?repo=loom&pr=202&range=00
> Stats: 226 lines in 1 file changed: 220 ins; 0 del; 6 mod
> Patch: https://git.openjdk.org/loom/pull/202.diff
> Fetch: git fetch https://git.openjdk.org/loom.git pull/202/head:pull/202
>
> PR: https://git.openjdk.org/loom/pull/202
More information about the loom-dev
mailing list