RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream [v5]

Rémi Forax forax at openjdk.org
Fri Sep 1 21:56:51 UTC 2023


On Fri, 1 Sep 2023 21:28:27 GMT, Rémi Forax <forax at openjdk.org> wrote:

>> This is a minimal patch that adds a new subclass of StructuredTaskScope named Stremable (better name needed) pushing failed or succceding subtasks into a Stream.
>> 
>> This subclass aim to:
>> - make easier for users to use STS without having to override handleCompleted, which is called concurrently so hard to get right, at a price of being a little less efficient
>> - ease the implementation of shortcuited stream semantics like get the first two values, get the first value greater than a threshold, etc by auto shutdowning the STS once the condition is true
>> 
>> The Streamable STS adds a new method joinWhen(function) that takes a function that takes a Stream and return a value
>> 
>>   public <U> U joinWhen(Function<? super Stream<Subtask<T>>, ? extends U> mapper) throws InterruptedException {
>> 
>> When this method is called, each finished subtask (with state SUCCESS or FAILED) are pushed into the Stream until there is no more subtasks, the stream has finished (has been short-circuited), the scope has been shutdown or interrupted. If some tasks are still pending because the stream has been short-cirtuited, they are shutdown.
>> 
>> Here are two examples:
>> - get a list of all the values that suceeed
>> 
>>     try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
>>             streamable.fork(() -> {
>>                 Thread.sleep(200);
>>                 return 12;
>>             });
>>             streamable.fork(() -> {
>>                 Thread.sleep(100);
>>                 return 17;
>>             });
>>             List<Integer> list = streamable.joinWhen(stream -> stream.filter(task -> task.state() == State.SUCCESS).map(Subtask::get).toList());
>>             System.out.println(list);  // [17, 12]
>>         }
>> 
>> - find the first subtask (that suceed or fail)
>> 
>>     try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
>>             streamable.fork(() -> {
>>                 Thread.sleep(1_000);
>>                 return 12;
>>             });
>>             streamable.fork(() -> {
>>                 Thread.sleep(100);
>>                 return 17;
>>             });
>>             Optional<Subtask<Integer>> first = streamable.joinWhen(Stream::findFirst);
>>             System.out.println(first);  // Optional[PlainSubTask[state=SUCCESS, result=17, exception=null]]
>>         }
>> 
>> Internally, handleCompleted post each subtask into a queue which is read by the Stream spliterator inside joinWhen.
>> 
>> The current implementation uses threa...
>
> Rémi Forax has updated the pull request incrementally with one additional commit since the last revision:
> 
>   Fix dangling comment

I've updated the implementation to use ThreadFlock.wakeup()/ThreadFlock.awaitAll() so the semantics of joinWhen() is now fully compatible with join()/joinUntil().
- you can call joinWhen() several times,
- you can call shutdown() when you want,
- a subtask can spawn another subtask,
- etc

I've not exposed all the methods I need as protected method because I'm still not convince that the implemntation of joinWhen() is not too specific. And I still need to add a variant of joinWhen() that takes a timeout as parameter.

As a bonus example, here is an example that use partitioningBy to store all failed tasks in a List and all tasks that suceed in another list

try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
     streamable.fork(() -> { throw new IOException("oops"); });
     streamable.fork(() -> 17);
            
     Map<Boolean, List<Subtask<Integer>>> map =
         streamable.joinWhen(stream -> stream.collect(partitioningBy(task -> task.state() == State.SUCCESS)));
     System.out.println(map);
         // {false=[PlainSubTask[state=FAILED, result=null, exception=java.io.IOException: oops]],
         //   true=[PlainSubTask[state=SUCCESS, result=17, exception=null]]}
}

-------------

PR Comment: https://git.openjdk.org/loom/pull/202#issuecomment-1703355684


More information about the loom-dev mailing list