Question about StructuredTaskScope.joinUntil(Instant)
Werner Randelshofer
werner.randelshofer at fibermail.ch
Sun Jul 28 16:20:44 UTC 2024
Hi Sten,
I had to revise my solution. The solution waited for too long, if the task finished earlier.
The revised solution is quite complex now. See code below.
Having a public method in StructuredTaskScope that waits until a timeout elapses,
for example something like StructuredTaskScope.join(Duration timeout),
would be very helpful for my use case.
With best regards,
Werner
Revised solution:
package org.example.structuredconcurrency;
import java.util.concurrent.Callable;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
/**
* A {@link StructuredTaskScope} that supports a deadline given in {@link System#nanoTime()}.
*
* This code is public domain.
*
* @param <T> the type of the structured task scope
*/
public class StructuredTaskScopeNanoTime<T> extends StructuredTaskScope<T> {
private static class Sem {
private volatile int threadCount;
public synchronized void inc() {
threadCount++;
}
public synchronized void dec() {
threadCount--;
if (threadCount == 0) {
notifyAll();
}
}
public synchronized void awaitAll(long deadlineNanos) throws InterruptedException {
long timeout;
while (threadCount > 0
&& (timeout = (deadlineNanos - System.nanoTime()/1000_000L)) > 0) {
wait(timeout);
}
}
}
private final Sem sem = new Sem();
/**
* Creates an unnamed structured task scope that creates virtual threads. The task
* scope is owned by the current thread.
*/
public StructuredTaskScopeNanoTime() {
}
/**
* Creates a structured task scope with the given name and thread factory.
*
* @param name the name of the task scope, can be null
* @param factory the thread factory
*/
public StructuredTaskScopeNanoTime(String name, ThreadFactory factory) {
super(name, factory);
}
@Override
public <U extends T> Subtask<U> fork(Callable<? extends U> task) {
sem.inc();
return super.fork(() -> {
try {
return task.call();
} finally {
sem.dec();
}
});
}
/**
* Wait for all subtasks started in this task scope to finish or the task scope to
* shut down, up to the given deadline.
*
* @param deadlineNanoTime the deadline given in {@link System#nanoTime()} units
* @return this task scope
* @throws IllegalStateException if this task scope is closed
* @throws WrongThreadException if the current thread is not the task scope owner
* @throws InterruptedException if interrupted while waiting
* @throws TimeoutException if the deadline is reached while waiting
*/
public StructuredTaskScopeNanoTime<T> joinUntilNanoTime(long deadlineNanoTime) throws InterruptedException, TimeoutException {
super.fork(() -> {
sem.awaitAll(deadlineNanoTime);
shutdown();
return null;
});
join();
return this;
}
}
> On 28 Jul 2024, at 15:10, Sten Nordstrom <stnordstrom at gmail.com> wrote:
>
> This was surprising to me as well, as this can lead to some surprising results in environments with unreliable time synchronization. Are there any pointers re discussion why this approach was chosen instead of the more common way of specifying a maximum duration to wait?
>
> Glad to hear that you found a solution, we will need to do something similar when we start using StructuredTaskScopes.
>
> Best Regards,
> Sten
>
>
>
> On Tue, Jun 11, 2024 at 18.33 Werner Randelshofer <werner.randelshofer at fibermail.ch> wrote:
> I have solved my problem. :-)
>
> I have now implemented a subclass of StructuredTaskScope, which allows to specify a deadline in System.nanoTime() units.
> With this design, the deadline is independent from external resources (like NTP time servers) over which the Java process has no control.
>
> See code below.
>
> With best regards,
> Werner
> ...
More information about the loom-dev
mailing list