Remark on the StructuredTaskScope API of Java 25
David Alayachew
davidalayachew at gmail.com
Wed Sep 24 19:04:44 UTC 2025
Hello Rémi,
> allSuccessfulOrThrow() should use List instead
> of Stream
This is a good suggestion. No need to pay for instances of Stream that you
won't use, especially if what you wanted was a List in the first place.
> awaitAllSuccessfulOrThrow() should use <R>
> instead of Void
I don't see how this would be useful. When would you care about the type of
null? I'm not even sure what that means.
> Can we get a joiner that returns subtasks
> ordered by onComplete(), and not in the order
> of onFork()?
Maybe Subtask can track onFork() and onComplete() timestamps? Then you
could order yourself.
On Wed, Sep 24, 2025 at 11:37 AM Remi Forax <forax at univ-mlv.fr> wrote:
> Hello,
> I know that the API will change a bit in 26, but this is a review of the
> API of 25.
>
> I find the new API (joiners + configuration) to be cleaner than the
> previous iteration (composition vs inheritance).
>
> In Joiner,
> - onFork() and onComplete() should take a SubTask<T> and not a SubTask<?
> extends T>.
> This is Okay because SubTask is sealed, so the type parameter is
> covariant.
>
> Another way to see it is that instead of every implementation of
> onFork/onComplete doing an unsafe cast,
> the caller of those methods should do the unsafe cast.
>
> - allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so
> the result is a List and not a stream.
> In terms of implementation, in result(), the code should be
> return Collections.unmodifiableList(subtasks);
>
> - awaitAllSuccessfulOrThrow should not use Void, but be typed like this
> <T, R> Joiner<T, R> awaitAllSuccessfulOrThrow()
>
> which means that the return type of join() can be any type.
> This allow users to type the result value null the way they want.
>
> - allUntil is missing a ? super, it should use a SubTask<T> (like in
> onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it
> should be:
>
> <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends
> Subtask<T>> isDone) {
>
> And now two remarks,
> - is there a way to remove the limitation that the main thread (the one
> that have created the STS) can not access to SubTask.get(),
> because there is at least a case where i know that the task is finished
> before join() is called (see below).
> - is there a way to get a joiner that returns the list of subtask in the
> order if their completeness, not in the order of onFork() ?
>
>
> regards,
> Rémi
>
> ---
>
> The following code works but I do not understand why i have to create a
> virtual thread to run the Runnable ?
>
> final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void>
> {
> private int counter;
> private volatile boolean done;
> private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue
> = new LinkedBlockingDeque<>();
>
> @Override
> public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) {
> StructuredTaskScope.Joiner.super.onFork(subtask);
> counter++;
> return false;
> }
>
> @Override
> @SuppressWarnings("unchecked")
> public boolean onComplete(StructuredTaskScope.Subtask<? extends T>
> subtask) {
> StructuredTaskScope.Joiner.super.onComplete(subtask);
> if (done) {
> return true;
> }
> try {
> queue.put((StructuredTaskScope.Subtask<T>) subtask);
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> return false;
> }
>
> @Override
> public Void result() {
> return null;
> }
>
> public <R> R compute(Function<? super Stream<T>, ? extends R> function)
> throws InterruptedException {
> var spliterator = new Spliterator<T>() {
> private int remaining = counter;
>
> @Override
> public boolean tryAdvance(Consumer<? super T> action) {
> if (remaining == 0) {
> return false;
> }
> StructuredTaskScope.Subtask<T> subtask;
> try {
> subtask = queue.take();
> } catch (InterruptedException e) {
> throw new RuntimeException(e);
> }
> if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
> action.accept(subtask.get());
> }
> remaining--;
> return true;
> }
>
> @Override
> public Spliterator<T> trySplit() {
> return null;
> }
>
> @Override
> public long estimateSize() {
> return remaining;
> }
>
> @Override
> public int characteristics() {
> return 0;
> }
> };
> var stream = StreamSupport.stream(spliterator, false);
> var runnable = new Runnable() {
> private R result;
>
> @Override
> public void run() {
> result = function.apply(stream);
> }
> };
> Thread.ofVirtual().start(runnable).join();
> done = true;
> return runnable.result;
> }
> }
>
> Callable<WeatherResponse> task(LatLong latLong) {
> return () -> OpenMeteo.getWeatherResponse(latLong);
> }
>
> void main() throws InterruptedException {
> var paris = new LatLong(48.864716, 2.349014); // use 30_000
> var nantes = new LatLong(47.2181, -1.5528);
> var marseille = new LatLong(43.2964, 5.37);
>
> var latlongs = List.of(paris, nantes, marseille);
> var joiner = new StreamJoiner<WeatherResponse>();
> try(var scope = StructuredTaskScope.open(joiner)) {
> var callables = latlongs.stream()
> .map(this::task)
> .toList();
> callables.forEach(scope::fork);
>
> var response = joiner.compute(Stream::toList);
> //var response = joiner.compute(s -> s.findFirst().orElseThrow());
> scope.join();
>
> IO.println(response);
> }
> }
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20250924/9f5b4c4d/attachment-0001.htm>
More information about the loom-dev
mailing list