Remark on the StructuredTaskScope API of Java 25
forax at univ-mlv.fr
forax at univ-mlv.fr
Thu Sep 25 04:55:52 UTC 2025
> From: "David Alayachew" <davidalayachew at gmail.com>
> To: "Remi Forax" <forax at univ-mlv.fr>
> Cc: "loom-dev" <loom-dev at openjdk.java.net>, "Alan Bateman"
> <Alan.Bateman at oracle.com>
> Sent: Wednesday, September 24, 2025 9:04:44 PM
> Subject: Re: Remark on the StructuredTaskScope API of Java 25
> Hello Rémi,
> > allSuccessfulOrThrow() should use List instead
> > of Stream
> This is a good suggestion. No need to pay for instances of Stream that you won't
> use, especially if what you wanted was a List in the first place.
> > awaitAllSuccessfulOrThrow() should use <R>
> > instead of Void
> I don't see how this would be useful. When would you care about the type of
> null? I'm not even sure what that means.
My use case is.a list of STSs, one using a joiner that returns null and one using a joiner that returns a List of subtasks,
when i call join() on all the STSs by iterating the list, i wanted the same return type.
> > Can we get a joiner that returns subtasks
> > ordered by onComplete(), and not in the order
> > of onFork()?
> Maybe Subtask can track onFork() and onComplete() timestamps? Then you could
> order yourself.
yes, i can,
i was wondering if it would be a good idea to have this kind of behavior in a joiner provided by the JDK.
regards,
Rémi
> On Wed, Sep 24, 2025 at 11:37 AM Remi Forax < [ mailto:forax at univ-mlv.fr |
> forax at univ-mlv.fr ] > wrote:
>> Hello,
>> I know that the API will change a bit in 26, but this is a review of the API of
>> 25.
>> I find the new API (joiners + configuration) to be cleaner than the previous
>> iteration (composition vs inheritance).
>> In Joiner,
>> - onFork() and onComplete() should take a SubTask<T> and not a SubTask<? extends
>> T>.
>> This is Okay because SubTask is sealed, so the type parameter is covariant.
>> Another way to see it is that instead of every implementation of
>> onFork/onComplete doing an unsafe cast,
>> the caller of those methods should do the unsafe cast.
>> - allSuccessfulOrThrow() should return a Joiner<T, List<Subtask<T>>>, so the
>> result is a List and not a stream.
>> In terms of implementation, in result(), the code should be
>> return Collections.unmodifiableList(subtasks);
>> - awaitAllSuccessfulOrThrow should not use Void, but be typed like this
>> <T, R> Joiner<T, R> awaitAllSuccessfulOrThrow()
>> which means that the return type of join() can be any type.
>> This allow users to type the result value null the way they want.
>> - allUntil is missing a ? super, it should use a SubTask<T> (like in
>> onComplete/onFork) and returns a List (like allSuccessfulOrThrow()), so it
>> should be:
>> <T> Joiner<T, List<Subtask<T>>> allUntil(Predicate<? extends Subtask<T>> isDone)
>> {
>> And now two remarks,
>> - is there a way to remove the limitation that the main thread (the one that
>> have created the STS) can not access to SubTask.get(),
>> because there is at least a case where i know that the task is finished before
>> join() is called (see below).
>> - is there a way to get a joiner that returns the list of subtask in the order
>> if their completeness, not in the order of onFork() ?
>> regards,
>> Rémi
>> ---
>> The following code works but I do not understand why i have to create a virtual
>> thread to run the Runnable ?
>> final class StreamJoiner<T> implements StructuredTaskScope.Joiner<T, Void> {
>> private int counter;
>> private volatile boolean done;
>> private final LinkedBlockingDeque<StructuredTaskScope.Subtask<T>> queue = new
>> LinkedBlockingDeque<>();
>> @Override
>> public boolean onFork(StructuredTaskScope.Subtask<? extends T> subtask) {
>> StructuredTaskScope.Joiner.super.onFork(subtask);
>> counter++;
>> return false;
>> }
>> @Override
>> @SuppressWarnings("unchecked")
>> public boolean onComplete(StructuredTaskScope.Subtask<? extends T> subtask) {
>> StructuredTaskScope.Joiner.super.onComplete(subtask);
>> if (done) {
>> return true;
>> }
>> try {
>> queue.put((StructuredTaskScope.Subtask<T>) subtask);
>> } catch (InterruptedException e) {
>> throw new RuntimeException(e);
>> }
>> return false;
>> }
>> @Override
>> public Void result() {
>> return null;
>> }
>> public <R> R compute(Function<? super Stream<T>, ? extends R> function) throws
>> InterruptedException {
>> var spliterator = new Spliterator<T>() {
>> private int remaining = counter;
>> @Override
>> public boolean tryAdvance(Consumer<? super T> action) {
>> if (remaining == 0) {
>> return false;
>> }
>> StructuredTaskScope.Subtask<T> subtask;
>> try {
>> subtask = queue.take();
>> } catch (InterruptedException e) {
>> throw new RuntimeException(e);
>> }
>> if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
>> action.accept(subtask.get());
>> }
>> remaining--;
>> return true;
>> }
>> @Override
>> public Spliterator<T> trySplit() {
>> return null;
>> }
>> @Override
>> public long estimateSize() {
>> return remaining;
>> }
>> @Override
>> public int characteristics() {
>> return 0;
>> }
>> };
>> var stream = StreamSupport.stream(spliterator, false);
>> var runnable = new Runnable() {
>> private R result;
>> @Override
>> public void run() {
>> result = function.apply(stream);
>> }
>> };
>> Thread.ofVirtual().start(runnable).join();
>> done = true;
>> return runnable.result;
>> }
>> }
>> Callable<WeatherResponse> task(LatLong latLong) {
>> return () -> OpenMeteo.getWeatherResponse(latLong);
>> }
>> void main() throws InterruptedException {
>> var paris = new LatLong(48.864716, 2.349014); // use 30_000
>> var nantes = new LatLong(47.2181, -1.5528);
>> var marseille = new LatLong(43.2964, 5.37);
>> var latlongs = List.of(paris, nantes, marseille);
>> var joiner = new StreamJoiner<WeatherResponse>();
>> try(var scope = StructuredTaskScope.open(joiner)) {
>> var callables = latlongs.stream()
>> .map(this::task)
>> .toList();
>> callables.forEach(scope::fork);
>> var response = joiner.compute(Stream::toList);
>> //var response = joiner.compute(s -> s.findFirst().orElseThrow());
>> scope.join();
>> IO.println(response);
>> }
>> }
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20250925/be8e3d20/attachment-0001.htm>
More information about the loom-dev
mailing list