<div dir="ltr"><div class="gmail_default" style="font-family:monospace">Hello Rémi,<br><br>> allSuccessfulOrThrow() should use List instead<br>> of Stream<br><br>This is a good suggestion. No need to pay for instances of Stream that you won't use, especially if what you wanted was a List in the first place.<br><br>> awaitAllSuccessfulOrThrow() should use <R><br>> instead of Void<br><br>I don't see how this would be useful. When would you care about the type of null? I'm not even sure what that means.<br><br>> Can we get a joiner that returns subtasks<br>> ordered by onComplete(), and not in the order<br>> of onFork()?<br><br>Maybe Subtask can track onFork() and onComplete() timestamps? Then you could order yourself.</div></div><br><div class="gmail_quote gmail_quote_container"><div dir="ltr" class="gmail_attr">On Wed, Sep 24, 2025 at 11:37 AM Remi Forax <<a href="mailto:forax@univ-mlv.fr">forax@univ-mlv.fr</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">Hello,<br>
I know that the API will change a bit in 26, but this is a review of the API of 25.<br>
<br>
I find the new API (joiners + configuration) to be cleaner than the previous iteration (composition vs inheritance).<br>
<br>
In Joiner,<br>
  - onFork() and onComplete() should take a SubTask<T> and not a SubTask<? extends T>.<br>
    This is Okay because SubTask is sealed, so the type parameter is covariant.<br>
<br>
    Another way to see it is that instead of every implementation of onFork/onComplete doing an unsafe cast,<br>
    the caller of those methods should do the unsafe cast.<br>
<br>
  - allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so the result is a List and not a stream.<br>
    In terms of implementation, in result(), the code should be<br>
      return Collections.unmodifiableList(subtasks);<br>
<br>
  - awaitAllSuccessfulOrThrow should not use Void, but be typed like this<br>
      <T, R> Joiner<T, R> awaitAllSuccessfulOrThrow()<br>
<br>
    which means that the return type of join() can be any type.<br>
    This allow users to type the result value null the way they want.<br>
<br>
  - allUntil is missing a ? super, it should use a SubTask<T> (like in onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it should be:<br>
<br>
     <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends Subtask<T>> isDone) {<br>
<br>
And now two remarks,<br>
 - is there a way to remove the limitation that the main thread (the one that have created the STS) can not access to SubTask.get(),<br>
   because there is at least a case where i know that the task is finished before join() is called (see below).<br>
 - is there a way to get a joiner that returns the list of subtask in the order if their completeness, not in the order of onFork() ?<br>
<br>
<br>
regards,<br>
Rémi<br>
<br>
---<br>
<br>
The following code works but I do not understand why i have to create a virtual thread to run the Runnable ?<br>
<br>
final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void> {<br>
  private int counter;<br>
  private volatile boolean done;<br>
  private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue = new LinkedBlockingDeque<>();<br>
<br>
  @Override<br>
  public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) {<br>
    StructuredTaskScope.Joiner.super.onFork(subtask);<br>
    counter++;<br>
    return false;<br>
  }<br>
<br>
  @Override<br>
  @SuppressWarnings("unchecked")<br>
  public boolean onComplete(StructuredTaskScope.Subtask<? extends T> subtask) {<br>
    StructuredTaskScope.Joiner.super.onComplete(subtask);<br>
    if (done) {<br>
      return true;<br>
    }<br>
    try {<br>
      queue.put((StructuredTaskScope.Subtask<T>) subtask);<br>
    } catch (InterruptedException e) {<br>
      throw new RuntimeException(e);<br>
    }<br>
    return false;<br>
  }<br>
<br>
  @Override<br>
  public Void result() {<br>
    return null;<br>
  }<br>
<br>
  public <R> R compute(Function<? super Stream<T>, ? extends R> function) throws InterruptedException {<br>
    var spliterator = new Spliterator<T>() {<br>
      private int remaining = counter;<br>
<br>
      @Override<br>
      public boolean tryAdvance(Consumer<? super T> action) {<br>
        if (remaining == 0) {<br>
          return false;<br>
        }<br>
        StructuredTaskScope.Subtask<T> subtask;<br>
        try {<br>
          subtask = queue.take();<br>
        } catch (InterruptedException e) {<br>
          throw new RuntimeException(e);<br>
        }<br>
        if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {<br>
          action.accept(subtask.get());<br>
        }<br>
        remaining--;<br>
        return true;<br>
      }<br>
<br>
      @Override<br>
      public Spliterator<T> trySplit() {<br>
        return null;<br>
      }<br>
<br>
      @Override<br>
      public long estimateSize() {<br>
        return remaining;<br>
      }<br>
<br>
      @Override<br>
      public int characteristics() {<br>
        return 0;<br>
      }<br>
    };<br>
    var stream = StreamSupport.stream(spliterator, false);<br>
    var runnable = new Runnable() {<br>
      private R result;<br>
<br>
      @Override<br>
      public void run() {<br>
        result = function.apply(stream);<br>
      }<br>
    };<br>
    Thread.ofVirtual().start(runnable).join();<br>
    done = true;<br>
    return runnable.result;<br>
  }<br>
}<br>
<br>
Callable<WeatherResponse> task(LatLong latLong) {<br>
  return () -> OpenMeteo.getWeatherResponse(latLong);<br>
}<br>
<br>
void main() throws InterruptedException {<br>
  var paris = new LatLong(48.864716, 2.349014);  // use 30_000<br>
  var nantes = new LatLong(47.2181, -1.5528);<br>
  var marseille = new LatLong(43.2964, 5.37);<br>
<br>
  var latlongs = List.of(paris, nantes, marseille);<br>
  var joiner = new StreamJoiner<WeatherResponse>();<br>
  try(var scope = StructuredTaskScope.open(joiner)) {<br>
    var callables = latlongs.stream()<br>
        .map(this::task)<br>
        .toList();<br>
    callables.forEach(scope::fork);<br>
<br>
    var response = joiner.compute(Stream::toList);<br>
    //var response = joiner.compute(s -> s.findFirst().orElseThrow());<br>
    scope.join();<br>
<br>
    IO.println(response);<br>
  }<br>
}<br>
</blockquote></div>