RFR: WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

Rémi Forax forax at openjdk.org
Fri Sep 1 13:11:31 UTC 2023


This is a rough minimal patch that adds a new subclass of StructuredTaskScope named Stremable (better name needed) pushing failed or succceding subtasks into a Stream.

This subclass aim to:
- make easier for users to use STS without having to override handleCompleted, which is called concurrently so hard to get right, at a price of being a little less efficient
- ease the implementation of shortcut semantics like get the first two values, get the first value greater than a threshold, etc by auto shutdowning the STS once the condition is true

The Streamable STS adds a new method joinWhen(function) that takes a function that takes a Stream and return a value 

  public <U> U joinWhen(Function<? super Stream<Subtask<T>>, ? extends U> mapper) throws InterruptedException {


When this method is called, the non NONAVAILABLE subtask are pushed into the Stream once they have finished until either there is no more subtasks or the stream has finished (has been short-circuited). If some tasks are still pending because the stream has been short-cirtuited, they are shutdown.

Here are two examples:
- get a list of all the values that suceeed

    try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
            streamable.fork(() -> {
                Thread.sleep(200);
                return 12;
            });
            streamable.fork(() -> {
                Thread.sleep(100);
                return 17;
            });
            List<Integer> list = streamable.joinWhen(stream -> stream.filter(task -> task.state() == State.SUCCESS).map(Subtask::get).toList());
            System.out.println(list);  // [17, 12]
        }

- find the first subtask (that suceed or fail)

    try(var streamable = new StructuredTaskScope.Streamable<Integer>()) {
            streamable.fork(() -> {
                Thread.sleep(1_000);
                return 12;
            });
            streamable.fork(() -> {
                Thread.sleep(100);
                return 17;
            });
            Optional<Subtask<Integer>> first = streamable.joinWhen(Stream::findFirst);
            System.out.println(first);  // Optional[PlainSubTask[state=SUCCESS, result=17, exception=null]]
        }


Internally, handleCompleted  post each subtask into a queue which is read by the Stream spliterator inside joinWhen.

The current implementation is not the right one, instead of introducing a new method into the Flock that can wait on shutdown, threadCount == 0 and the queue has a new subtask, this implementation shutdown the flock early and do not implement shutdown() so it only have to check if the number of tasks is zero.

-------------

Commit messages:
 - WIP: add a new subclass of StructuredTaskScope that shows the finished subtasks as a Stream

Changes: https://git.openjdk.org/loom/pull/201/files
 Webrev: https://webrevs.openjdk.org/?repo=loom&pr=201&range=00
  Stats: 226 lines in 1 file changed: 220 ins; 0 del; 6 mod
  Patch: https://git.openjdk.org/loom/pull/201.diff
  Fetch: git fetch https://git.openjdk.org/loom.git pull/201/head:pull/201

PR: https://git.openjdk.org/loom/pull/201


More information about the loom-dev mailing list