Remark on the StructuredTaskScope API of Java 25
Remi Forax
forax at univ-mlv.fr
Wed Sep 24 15:37:41 UTC 2025
Hello,
I know that the API will change a bit in 26, but this is a review of the API of 25.
I find the new API (joiners + configuration) to be cleaner than the previous iteration (composition vs inheritance).
In Joiner,
- onFork() and onComplete() should take a SubTask<T> and not a SubTask<? extends T>.
This is Okay because SubTask is sealed, so the type parameter is covariant.
Another way to see it is that instead of every implementation of onFork/onComplete doing an unsafe cast,
the caller of those methods should do the unsafe cast.
- allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so the result is a List and not a stream.
In terms of implementation, in result(), the code should be
return Collections.unmodifiableList(subtasks);
- awaitAllSuccessfulOrThrow should not use Void, but be typed like this
<T, R> Joiner<T, R> awaitAllSuccessfulOrThrow()
which means that the return type of join() can be any type.
This allow users to type the result value null the way they want.
- allUntil is missing a ? super, it should use a SubTask<T> (like in onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it should be:
<T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends Subtask<T>> isDone) {
And now two remarks,
- is there a way to remove the limitation that the main thread (the one that have created the STS) can not access to SubTask.get(),
because there is at least a case where i know that the task is finished before join() is called (see below).
- is there a way to get a joiner that returns the list of subtask in the order if their completeness, not in the order of onFork() ?
regards,
Rémi
---
The following code works but I do not understand why i have to create a virtual thread to run the Runnable ?
final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void> {
private int counter;
private volatile boolean done;
private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue = new LinkedBlockingDeque<>();
@Override
public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) {
StructuredTaskScope.Joiner.super.onFork(subtask);
counter++;
return false;
}
@Override
@SuppressWarnings("unchecked")
public boolean onComplete(StructuredTaskScope.Subtask<? extends T> subtask) {
StructuredTaskScope.Joiner.super.onComplete(subtask);
if (done) {
return true;
}
try {
queue.put((StructuredTaskScope.Subtask<T>) subtask);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return false;
}
@Override
public Void result() {
return null;
}
public <R> R compute(Function<? super Stream<T>, ? extends R> function) throws InterruptedException {
var spliterator = new Spliterator<T>() {
private int remaining = counter;
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (remaining == 0) {
return false;
}
StructuredTaskScope.Subtask<T> subtask;
try {
subtask = queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
action.accept(subtask.get());
}
remaining--;
return true;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return remaining;
}
@Override
public int characteristics() {
return 0;
}
};
var stream = StreamSupport.stream(spliterator, false);
var runnable = new Runnable() {
private R result;
@Override
public void run() {
result = function.apply(stream);
}
};
Thread.ofVirtual().start(runnable).join();
done = true;
return runnable.result;
}
}
Callable<WeatherResponse> task(LatLong latLong) {
return () -> OpenMeteo.getWeatherResponse(latLong);
}
void main() throws InterruptedException {
var paris = new LatLong(48.864716, 2.349014); // use 30_000
var nantes = new LatLong(47.2181, -1.5528);
var marseille = new LatLong(43.2964, 5.37);
var latlongs = List.of(paris, nantes, marseille);
var joiner = new StreamJoiner<WeatherResponse>();
try(var scope = StructuredTaskScope.open(joiner)) {
var callables = latlongs.stream()
.map(this::task)
.toList();
callables.forEach(scope::fork);
var response = joiner.compute(Stream::toList);
//var response = joiner.compute(s -> s.findFirst().orElseThrow());
scope.join();
IO.println(response);
}
}
More information about the loom-dev
mailing list