Structured Concurrency Feedback

Filip Egeric egericf at gmail.com
Tue Oct 21 07:40:05 UTC 2025


Hello,

I would like to follow up on this one, because I also found myself creating
a wrapper similar to this one, but for a different reason.
Specifically, when I want to limit concurrency in order to not exceed a
rate limit somewhere.
Example:

Stream<Article> summarize(Stream<Article> articles) {
    try (var scope = MyTaskScope.withLimitedConcurrency(10)) {
        var subtasks = articles.map(article -> scope.fork(() ->
llm.summarize(article)));

        return subtasks.map(Subtask::get);
    }
}

And `MyTaskScope` looks like this:

class MyTaskScope<T> implements AutoCloseable {
    private final StructuredTaskScope<T, Void> sts;
    private final Semaphore semaphore;

    private MyTaskScope(StructuredTaskScope<T, Void> sts, int n) {
        this.sts = sts;
        this.semaphore = new Semaphore(n, true);
    }

    static <T> MyTaskScope<T> withLimitedConcurrency(int n) {
        return new MyTaskScope<T>(StructuredTaskScope.open(), n);
    }

    <U extends T> Subtask<U> fork(Callable<? extends U> task) {
        return sts.fork(() -> {
            try {
                semaphore.acquire();
                return task.call();
            } finally {
                semaphore.release();
            }
        });
    }

    @Override
    public void close() {
        sts.close();
    }
}

For this use case, it would make more sense to implement
`StructuredTaskScope` instead of `AutoCloseable`, but it's sealed.

Kind regards,
Filip Egeric


On Mon, Oct 20, 2025 at 8:04 PM Josiah Noel <josiahnoel at gmail.com> wrote:

> Long time no see,
>
> So I moved one of my applications to the new API, and I don't really know
> what else to say other than it works for me. The only thing I'm not a fan
> of is the interrupted exception when calling join. Because of the
> interrupted exception I find myself creating a wrapper class like this:
>
> public class STSWrapper implements AutoCloseable {
>
>
> private final StructuredTaskScope<Object, Void> scope =
> StructuredTaskScope.open();
>
>
> public <T> Supplier<T> fork(Callable<? extends T> task) {
>
> return scope.fork(task);
>
> }
>
>
> public <T> Supplier<T> fork(Runnable task) {
>
> return scope.fork(task);
>
> }
>
>
> public void join() {
>
> try {
>
> scope.join();
>
> } catch (final InterruptedException e) {
>
> Thread.currentThread().interrupt();
>
> throw new IllegalStateException(e);
>
> }
>
> }
>
>
> @Override
>
> public void close() {
>
> scope.close();
>
> }
>
> }
>
>
> I assume there are good reasons to not make it like CompletableFuture's
> unchecked join method, so if nothing can be done I'll just leave it at that.
>
> The application itself is nothing but an orchestration api where we make
> downstream calls to other services to get and combine data. I didn't test
> anything other than the default joiner because if any of the calls fail we
> want to terminate the request immediately (they were also all different
> types). I didn't need to check the state of the individual tasks, so I used
> a supplier for the wrapper.
>
> Most of the other services I work on follow similar requirements so I
> don't think I'll be able to test any other joiners other than the default
> on real services.
>
> --
> Cheers, Josiah.
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20251021/fed75a8e/attachment-0001.htm>


More information about the loom-dev mailing list