Remark on the StructuredTaskScope API of Java 25

forax at univ-mlv.fr forax at univ-mlv.fr
Thu Sep 25 04:55:52 UTC 2025


> From: "David Alayachew" <davidalayachew at gmail.com>
> To: "Remi Forax" <forax at univ-mlv.fr>
> Cc: "loom-dev" <loom-dev at openjdk.java.net>, "Alan Bateman"
> <Alan.Bateman at oracle.com>
> Sent: Wednesday, September 24, 2025 9:04:44 PM
> Subject: Re: Remark on the StructuredTaskScope API of Java 25

> Hello Rémi,

> > allSuccessfulOrThrow() should use List instead
> > of Stream

> This is a good suggestion. No need to pay for instances of Stream that you won't
> use, especially if what you wanted was a List in the first place.

> > awaitAllSuccessfulOrThrow() should use <R>
> > instead of Void

> I don't see how this would be useful. When would you care about the type of
> null? I'm not even sure what that means.

My use case is.a list of STSs, one using a joiner that returns null and one using a joiner that returns a List of subtasks, 
when i call join() on all the STSs by iterating the list, i wanted the same return type. 

> > Can we get a joiner that returns subtasks
> > ordered by onComplete(), and not in the order
> > of onFork()?

> Maybe Subtask can track onFork() and onComplete() timestamps? Then you could
> order yourself.

yes, i can, 
i was wondering if it would be a good idea to have this kind of behavior in a joiner provided by the JDK. 

regards, 
Rémi 

> On Wed, Sep 24, 2025 at 11:37 AM Remi Forax < [ mailto:forax at univ-mlv.fr |
> forax at univ-mlv.fr ] > wrote:

>> Hello,
>> I know that the API will change a bit in 26, but this is a review of the API of
>> 25.

>> I find the new API (joiners + configuration) to be cleaner than the previous
>> iteration (composition vs inheritance).

>> In Joiner,
>> - onFork() and onComplete() should take a SubTask<T> and not a SubTask<? extends
>> T>.
>> This is Okay because SubTask is sealed, so the type parameter is covariant.

>> Another way to see it is that instead of every implementation of
>> onFork/onComplete doing an unsafe cast,
>> the caller of those methods should do the unsafe cast.

>> - allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so the
>> result is a List and not a stream.
>> In terms of implementation, in result(), the code should be
>> return Collections.unmodifiableList(subtasks);

>> - awaitAllSuccessfulOrThrow should not use Void, but be typed like this
>> <T, R> Joiner<T, R> awaitAllSuccessfulOrThrow()

>> which means that the return type of join() can be any type.
>> This allow users to type the result value null the way they want.

>> - allUntil is missing a ? super, it should use a SubTask<T> (like in
>> onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it
>> should be:

>> <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends Subtask<T>> isDone)
>> {

>> And now two remarks,
>> - is there a way to remove the limitation that the main thread (the one that
>> have created the STS) can not access to SubTask.get(),
>> because there is at least a case where i know that the task is finished before
>> join() is called (see below).
>> - is there a way to get a joiner that returns the list of subtask in the order
>> if their completeness, not in the order of onFork() ?

>> regards,
>> Rémi

>> ---

>> The following code works but I do not understand why i have to create a virtual
>> thread to run the Runnable ?

>> final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void> {
>> private int counter;
>> private volatile boolean done;
>> private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue = new
>> LinkedBlockingDeque<>();

>> @Override
>> public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) {
>> StructuredTaskScope.Joiner.super.onFork(subtask);
>> counter++;
>> return false;
>> }

>> @Override
>> @SuppressWarnings("unchecked")
>> public boolean onComplete(StructuredTaskScope.Subtask<? extends T> subtask) {
>> StructuredTaskScope.Joiner.super.onComplete(subtask);
>> if (done) {
>> return true;
>> }
>> try {
>> queue.put((StructuredTaskScope.Subtask<T>) subtask);
>> } catch (InterruptedException e) {
>> throw new RuntimeException(e);
>> }
>> return false;
>> }

>> @Override
>> public Void result() {
>> return null;
>> }

>> public <R> R compute(Function<? super Stream<T>, ? extends R> function) throws
>> InterruptedException {
>> var spliterator = new Spliterator<T>() {
>> private int remaining = counter;

>> @Override
>> public boolean tryAdvance(Consumer<? super T> action) {
>> if (remaining == 0) {
>> return false;
>> }
>> StructuredTaskScope.Subtask<T> subtask;
>> try {
>> subtask = queue.take();
>> } catch (InterruptedException e) {
>> throw new RuntimeException(e);
>> }
>> if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
>> action.accept(subtask.get());
>> }
>> remaining--;
>> return true;
>> }

>> @Override
>> public Spliterator<T> trySplit() {
>> return null;
>> }

>> @Override
>> public long estimateSize() {
>> return remaining;
>> }

>> @Override
>> public int characteristics() {
>> return 0;
>> }
>> };
>> var stream = StreamSupport.stream(spliterator, false);
>> var runnable = new Runnable() {
>> private R result;

>> @Override
>> public void run() {
>> result = function.apply(stream);
>> }
>> };
>> Thread.ofVirtual().start(runnable).join();
>> done = true;
>> return runnable.result;
>> }
>> }

>> Callable<WeatherResponse> task(LatLong latLong) {
>> return () -> OpenMeteo.getWeatherResponse(latLong);
>> }

>> void main() throws InterruptedException {
>> var paris = new LatLong(48.864716, 2.349014); // use 30_000
>> var nantes = new LatLong(47.2181, -1.5528);
>> var marseille = new LatLong(43.2964, 5.37);

>> var latlongs = List.of(paris, nantes, marseille);
>> var joiner = new StreamJoiner<WeatherResponse>();
>> try(var scope = StructuredTaskScope.open(joiner)) {
>> var callables = latlongs.stream()
>> .map(this::task)
>> .toList();
>> callables.forEach(scope::fork);

>> var response = joiner.compute(Stream::toList);
>> //var response = joiner.compute(s -> s.findFirst().orElseThrow());
>> scope.join();

>> IO.println(response);
>> }
>> }
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20250925/be8e3d20/attachment-0001.htm>


More information about the loom-dev mailing list