Remark on the StructuredTaskScope API of Java 25

David Alayachew davidalayachew at gmail.com
Wed Sep 24 19:04:44 UTC 2025


Hello Rémi,

> allSuccessfulOrThrow() should use List instead
> of Stream

This is a good suggestion. No need to pay for instances of Stream that you
won't use, especially if what you wanted was a List in the first place.

> awaitAllSuccessfulOrThrow() should use <R>
> instead of Void

I don't see how this would be useful. When would you care about the type of
null? I'm not even sure what that means.

> Can we get a joiner that returns subtasks
> ordered by onComplete(), and not in the order
> of onFork()?

Maybe Subtask can track onFork() and onComplete() timestamps? Then you
could order yourself.

On Wed, Sep 24, 2025 at 11:37 AM Remi Forax <forax at univ-mlv.fr> wrote:

> Hello,
> I know that the API will change a bit in 26, but this is a review of the
> API of 25.
>
> I find the new API (joiners + configuration) to be cleaner than the
> previous iteration (composition vs inheritance).
>
> In Joiner,
>   - onFork() and onComplete() should take a SubTask<T> and not a SubTask<?
> extends T>.
>     This is Okay because SubTask is sealed, so the type parameter is
> covariant.
>
>     Another way to see it is that instead of every implementation of
> onFork/onComplete doing an unsafe cast,
>     the caller of those methods should do the unsafe cast.
>
>   - allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so
> the result is a List and not a stream.
>     In terms of implementation, in result(), the code should be
>       return Collections.unmodifiableList(subtasks);
>
>   - awaitAllSuccessfulOrThrow should not use Void, but be typed like this
>       <T, R> Joiner<T, R> awaitAllSuccessfulOrThrow()
>
>     which means that the return type of join() can be any type.
>     This allow users to type the result value null the way they want.
>
>   - allUntil is missing a ? super, it should use a SubTask<T> (like in
> onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it
> should be:
>
>      <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends
> Subtask<T>> isDone) {
>
> And now two remarks,
>  - is there a way to remove the limitation that the main thread (the one
> that have created the STS) can not access to SubTask.get(),
>    because there is at least a case where i know that the task is finished
> before join() is called (see below).
>  - is there a way to get a joiner that returns the list of subtask in the
> order if their completeness, not in the order of onFork() ?
>
>
> regards,
> Rémi
>
> ---
>
> The following code works but I do not understand why i have to create a
> virtual thread to run the Runnable ?
>
> final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void>
> {
>   private int counter;
>   private volatile boolean done;
>   private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue
> = new LinkedBlockingDeque<>();
>
>   @Override
>   public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) {
>     StructuredTaskScope.Joiner.super.onFork(subtask);
>     counter++;
>     return false;
>   }
>
>   @Override
>   @SuppressWarnings("unchecked")
>   public boolean onComplete(StructuredTaskScope.Subtask<? extends T>
> subtask) {
>     StructuredTaskScope.Joiner.super.onComplete(subtask);
>     if (done) {
>       return true;
>     }
>     try {
>       queue.put((StructuredTaskScope.Subtask<T>) subtask);
>     } catch (InterruptedException e) {
>       throw new RuntimeException(e);
>     }
>     return false;
>   }
>
>   @Override
>   public Void result() {
>     return null;
>   }
>
>   public <R> R compute(Function<? super Stream<T>, ? extends R> function)
> throws InterruptedException {
>     var spliterator = new Spliterator<T>() {
>       private int remaining = counter;
>
>       @Override
>       public boolean tryAdvance(Consumer<? super T> action) {
>         if (remaining == 0) {
>           return false;
>         }
>         StructuredTaskScope.Subtask<T> subtask;
>         try {
>           subtask = queue.take();
>         } catch (InterruptedException e) {
>           throw new RuntimeException(e);
>         }
>         if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
>           action.accept(subtask.get());
>         }
>         remaining--;
>         return true;
>       }
>
>       @Override
>       public Spliterator<T> trySplit() {
>         return null;
>       }
>
>       @Override
>       public long estimateSize() {
>         return remaining;
>       }
>
>       @Override
>       public int characteristics() {
>         return 0;
>       }
>     };
>     var stream = StreamSupport.stream(spliterator, false);
>     var runnable = new Runnable() {
>       private R result;
>
>       @Override
>       public void run() {
>         result = function.apply(stream);
>       }
>     };
>     Thread.ofVirtual().start(runnable).join();
>     done = true;
>     return runnable.result;
>   }
> }
>
> Callable<WeatherResponse> task(LatLong latLong) {
>   return () -> OpenMeteo.getWeatherResponse(latLong);
> }
>
> void main() throws InterruptedException {
>   var paris = new LatLong(48.864716, 2.349014);  // use 30_000
>   var nantes = new LatLong(47.2181, -1.5528);
>   var marseille = new LatLong(43.2964, 5.37);
>
>   var latlongs = List.of(paris, nantes, marseille);
>   var joiner = new StreamJoiner<WeatherResponse>();
>   try(var scope = StructuredTaskScope.open(joiner)) {
>     var callables = latlongs.stream()
>         .map(this::task)
>         .toList();
>     callables.forEach(scope::fork);
>
>     var response = joiner.compute(Stream::toList);
>     //var response = joiner.compute(s -> s.findFirst().orElseThrow());
>     scope.join();
>
>     IO.println(response);
>   }
> }
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20250924/9f5b4c4d/attachment-0001.htm>


More information about the loom-dev mailing list