Remark on the StructuredTaskScope API of Java 25

Remi Forax forax at univ-mlv.fr
Wed Sep 24 15:37:41 UTC 2025


Hello,
I know that the API will change a bit in 26, but this is a review of the API of 25.

I find the new API (joiners + configuration) to be cleaner than the previous iteration (composition vs inheritance).

In Joiner,
  - onFork() and onComplete() should take a SubTask<T> and not a SubTask<? extends T>.
    This is Okay because SubTask is sealed, so the type parameter is covariant.

    Another way to see it is that instead of every implementation of onFork/onComplete doing an unsafe cast,
    the caller of those methods should do the unsafe cast.

  - allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so the result is a List and not a stream.
    In terms of implementation, in result(), the code should be
      return Collections.unmodifiableList(subtasks);

  - awaitAllSuccessfulOrThrow should not use Void, but be typed like this
      <T, R> Joiner<T, R> awaitAllSuccessfulOrThrow()

    which means that the return type of join() can be any type.
    This allow users to type the result value null the way they want.
    
  - allUntil is missing a ? super, it should use a SubTask<T> (like in onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it should be:

     <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends Subtask<T>> isDone) {

And now two remarks,
 - is there a way to remove the limitation that the main thread (the one that have created the STS) can not access to SubTask.get(),
   because there is at least a case where i know that the task is finished before join() is called (see below).
 - is there a way to get a joiner that returns the list of subtask in the order if their completeness, not in the order of onFork() ?


regards,
Rémi

---

The following code works but I do not understand why i have to create a virtual thread to run the Runnable ?

final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void> {
  private int counter;
  private volatile boolean done;
  private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue = new LinkedBlockingDeque<>();

  @Override
  public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) {
    StructuredTaskScope.Joiner.super.onFork(subtask);
    counter++;
    return false;
  }

  @Override
  @SuppressWarnings("unchecked")
  public boolean onComplete(StructuredTaskScope.Subtask<? extends T> subtask) {
    StructuredTaskScope.Joiner.super.onComplete(subtask);
    if (done) {
      return true;
    }
    try {
      queue.put((StructuredTaskScope.Subtask<T>) subtask);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
    return false;
  }

  @Override
  public Void result() {
    return null;
  }

  public <R> R compute(Function<? super Stream<T>, ? extends R> function) throws InterruptedException {
    var spliterator = new Spliterator<T>() {
      private int remaining = counter;

      @Override
      public boolean tryAdvance(Consumer<? super T> action) {
        if (remaining == 0) {
          return false;
        }
        StructuredTaskScope.Subtask<T> subtask;
        try {
          subtask = queue.take();
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
        if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
          action.accept(subtask.get());
        }
        remaining--;
        return true;
      }

      @Override
      public Spliterator<T> trySplit() {
        return null;
      }

      @Override
      public long estimateSize() {
        return remaining;
      }

      @Override
      public int characteristics() {
        return 0;
      }
    };
    var stream = StreamSupport.stream(spliterator, false);
    var runnable = new Runnable() {
      private R result;

      @Override
      public void run() {
        result = function.apply(stream);
      }
    };
    Thread.ofVirtual().start(runnable).join();
    done = true;
    return runnable.result;
  }
}

Callable<WeatherResponse> task(LatLong latLong) {
  return () -> OpenMeteo.getWeatherResponse(latLong);
}

void main() throws InterruptedException {
  var paris = new LatLong(48.864716, 2.349014);  // use 30_000
  var nantes = new LatLong(47.2181, -1.5528);
  var marseille = new LatLong(43.2964, 5.37);

  var latlongs = List.of(paris, nantes, marseille);
  var joiner = new StreamJoiner<WeatherResponse>();
  try(var scope = StructuredTaskScope.open(joiner)) {
    var callables = latlongs.stream()
        .map(this::task)
        .toList();
    callables.forEach(scope::fork);

    var response = joiner.compute(Stream::toList);
    //var response = joiner.compute(s -> s.findFirst().orElseThrow());
    scope.join();

    IO.println(response);
  }
}


More information about the loom-dev mailing list