Withdrawn: Add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

duke duke at openjdk.org
Fri Feb 9 16:20:10 UTC 2024


On Fri, 1 Sep 2023 13:18:01 GMT, Rémi Forax <forax at openjdk.org> wrote:

> This is a minimal patch that adds a new subclass of StructuredTaskScope named Stremable (better name needed) pushing failed/succceding subtasks into a Stream.
> 
> This subclass aim to:
> - make easier for users to use STS without having to override handleCompleted, which is called concurrently so hard to get right, at a price of being a little less efficient
> - ease the implementation of shortcuited stream semantics like get the first two values, get the first value greater than a threshold, etc by auto shutdowning the STS once the condition is true
> 
> The Streamable STS adds two new methods joinWhile/joinUntilWhile(function) that takes a function that takes a Stream and return a value
> 
>   public <U> U joinWhile(Function<? super Stream<Subtask<T>>, ? extends U> mapper) throws InterruptedException {
> 
> When this method is called, each finished subtask (with state SUCCESS or FAILED) are pushed into the Stream until there is no more subtasks, the stream has finished (has been short-circuited), the scope has been shutdown, interrupted or the dealine occurs. If some tasks are still pending because the stream has been short-cirtuited, they are shutdown.
> 
> Here are two examples:
> - get a list of all the values that suceeed
> 
>     try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
>             streamable.fork(() -> {
>                 Thread.sleep(200);
>                 return 12;
>             });
>             streamable.fork(() -> {
>                 Thread.sleep(100);
>                 return 17;
>             });
>             List<Integer> list = streamable.joinWhile(stream -> stream.filter(task -> task.state() == State.SUCCESS).map(Subtask::get).toList());
>             System.out.println(list);  // [17, 12]
>         }
> 
> - find the first subtask (that suceed or fail)
> 
>     try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
>             streamable.fork(() -> {
>                 Thread.sleep(1_000);
>                 return 12;
>             });
>             streamable.fork(() -> {
>                 Thread.sleep(100);
>                 return 17;
>             });
>             Optional<Subtask<Integer>> first = streamable.joinWhile(Stream::findFirst);
>             System.out.println(first);  // Optional[PlainSubTask[state=SUCCESS, result=17, exception=null]]
>         }
> 
> Internally, handleCompleted post each subtask into a queue which is read by the Stream spliterator inside joinWhile.
> 
> The current implementation uses thread flock methods ThreadFlock.awaitAll()/ThreadFlock.w...

This pull request has been closed without being integrated.

-------------

PR: https://git.openjdk.org/loom/pull/202


More information about the loom-dev mailing list