Structured Concurrency Feedback
Filip Egeric
egericf at gmail.com
Tue Oct 21 07:40:05 UTC 2025
Hello,
I would like to follow up on this one, because I also found myself creating
a wrapper similar to this one, but for a different reason.
Specifically, when I want to limit concurrency in order to not exceed a
rate limit somewhere.
Example:
Stream<Article> summarize(Stream<Article> articles) {
try (var scope = MyTaskScope.withLimitedConcurrency(10)) {
var subtasks = articles.map(article -> scope.fork(() ->
llm.summarize(article)));
return subtasks.map(Subtask::get);
}
}
And `MyTaskScope` looks like this:
class MyTaskScope<T> implements AutoCloseable {
private final StructuredTaskScope<T, Void> sts;
private final Semaphore semaphore;
private MyTaskScope(StructuredTaskScope<T, Void> sts, int n) {
this.sts = sts;
this.semaphore = new Semaphore(n, true);
}
static <T> MyTaskScope<T> withLimitedConcurrency(int n) {
return new MyTaskScope<T>(StructuredTaskScope.open(), n);
}
<U extends T> Subtask<U> fork(Callable<? extends U> task) {
return sts.fork(() -> {
try {
semaphore.acquire();
return task.call();
} finally {
semaphore.release();
}
});
}
@Override
public void close() {
sts.close();
}
}
For this use case, it would make more sense to implement
`StructuredTaskScope` instead of `AutoCloseable`, but it's sealed.
Kind regards,
Filip Egeric
On Mon, Oct 20, 2025 at 8:04 PM Josiah Noel <josiahnoel at gmail.com> wrote:
> Long time no see,
>
> So I moved one of my applications to the new API, and I don't really know
> what else to say other than it works for me. The only thing I'm not a fan
> of is the interrupted exception when calling join. Because of the
> interrupted exception I find myself creating a wrapper class like this:
>
> public class STSWrapper implements AutoCloseable {
>
>
> private final StructuredTaskScope<Object, Void> scope =
> StructuredTaskScope.open();
>
>
> public <T> Supplier<T> fork(Callable<? extends T> task) {
>
> return scope.fork(task);
>
> }
>
>
> public <T> Supplier<T> fork(Runnable task) {
>
> return scope.fork(task);
>
> }
>
>
> public void join() {
>
> try {
>
> scope.join();
>
> } catch (final InterruptedException e) {
>
> Thread.currentThread().interrupt();
>
> throw new IllegalStateException(e);
>
> }
>
> }
>
>
> @Override
>
> public void close() {
>
> scope.close();
>
> }
>
> }
>
>
> I assume there are good reasons to not make it like CompletableFuture's
> unchecked join method, so if nothing can be done I'll just leave it at that.
>
> The application itself is nothing but an orchestration api where we make
> downstream calls to other services to get and combine data. I didn't test
> anything other than the default joiner because if any of the calls fail we
> want to terminate the request immediately (they were also all different
> types). I didn't need to check the state of the individual tasks, so I used
> a supplier for the wrapper.
>
> Most of the other services I work on follow similar requirements so I
> don't think I'll be able to test any other joiners other than the default
> on real services.
>
> --
> Cheers, Josiah.
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20251021/fed75a8e/attachment-0001.htm>
More information about the loom-dev
mailing list