Structured Concurrency yet again
Holo The Sage Wolf
holo3146 at gmail.com
Wed May 10 06:47:15 UTC 2023
The way to handle these cases is to put the logic after the task ends into
the scope itself.
For example:
class ContinuableScope<T> extends StructuredTaskScope<T, K> {
@FunctionalInterface
public interface FanLogic<T, K> {
K get(TaskHandle<T> handle) throws Throwable;
}
private final Queue<K> results = new ConcurrentLinkedQueue<>();
private final Queue<Throwable> exceptions = new
ConcurrentLinkedQueue<>();
ContinuableScope(FanLogic<T,K> next, ShutdownStrategy<K> strategy) {
this.next = next;
this.strategt = strategy;
super(null, Thread.ofVirtual().factory());
}
@Override
protected void handleComplete(TaskHandle<T> handle) {
try {
var nr = next.get(handle);
if (nr == null) return; // using null to not make this more
complicated in the mail
if (strategy.testResult(nr, results, exceptions)) {
// Kill the scope
return;
}
results.add(nr);
} catch(Throwable e) {
if (strategy.testFailure(e, results, exceptions)) {
// Kill the scope
return;
}
exceptions.add(nr);
}
}
public Stream<T> results() {
super.ensureOwnerAndJoined();
return results.stream();
}
public Stream<Throwable> exceptions() {
super.ensureOwnerAndJoined();
return exceptions.stream();
}
}
---
public void main() {
var strategy = new ShutdownStrategy<String>(...); // Shut down on
unknown host, and on results length == 3
var fanAction = (th) -> {
var r = transform(th);
if (validate(r)) return r;
return null; //again, using null for briefity
}
try (var scope = new ContinuableScope<Result>(fanAction, strategy)) {
for (...) {
scope.fork(() -> call(...));
}
scope.join();
if (scope.results().isEmpty()) {
throw ...
}
// Return an object storing both scope.results() and
scope.exceptions()
}
}
On Wed, May 10, 2023, 01:39 <forax at univ-mlv.fr> wrote:
> ----- Original Message -----
> > From: "Ron Pressler" <ron.pressler at oracle.com>
> > To: "Remi Forax" <forax at univ-mlv.fr>
> > Cc: "Alan Bateman" <alan.bateman at oracle.com>, "loom-dev" <
> loom-dev at openjdk.java.net>
> > Sent: Tuesday, May 9, 2023 10:33:25 PM
> > Subject: Re: Structured Concurrency yet again
>
> >> On 9 May 2023, at 15:29, forax at univ-mlv.fr wrote:
> >>
> >>
> >> It is interesting to note that we seem to agree that fork() should not
> return a
> >> Future. I can identify two logical consequences.
> >>
> >> Firstly, as mentioned in the JEP, get() or state() should only be
> called after
> >> joinAll(). Calling task.state() on a running computation would throw an
> >> IllegalStateException, making State.RUNNING unnecessary. Furthermore,
> when
> >> scope.shutdown() is invoked, a cancelled task can be in state FAILED or
> >> CANCELLED, as explained in the documentation, which is a pain to deal
> with as a
> >> user. I propose to simplify the design and remove the state CANCELLED
> (perhaps
> >> making it a sub-state of FAILED with a method isCancelled() if we want
> to keep
> >> it). This way, users will have only to deal with two states SUCCESS or
> FAILED.
> >> It is worth noting that existing reactive APIs do not differentiate
> between
> >> CANCELLED and FAILED too.
> >>
> >> The second consequence concerns the TaskHandle.exception() method. It
> should not
> >> return a Throwable, since Callable throws an Exception and storing an
> Error in
> >> a field is not a good practice. Exception handling is a significant
> concern in
> >> Java, and we have the opportunity to provide a transparent way to
> address it
> >> without being constrained by the design of Future. A TaskHandle can be
> >> parameterized by both the result type and the exception type. When
> calling
> >> get(), exceptions are transparently re-spawned. However, this means that
> >> TaskHandle cannot be a Supplier.
> >>
> >> Then I prefer to use a Stream to deal with the different semantics of a
> scope
> >> instead of asking to subclass it and hoping that users will correctly
> manage
> >> the concurrency when overriding handleComplete().
> >>
> >>>
> >>> -Alan.
> >>>
> >>> [1] https://openjdk.org/jeps/8306641
> >>
> >> regards,
> >> Rémi
> >
> > When a policy is used, there is no need for the scope to ever call either
> > `state` or `exception` on subtasks’ handles; these methods are intended
> to be
> > used only by handleComplete (or from the scope only if no policy is
> used, but
> > preferably one should be). As the JEP describes, all exceptions should be
> > handled centrally, as part of the policy, before calling
> TaskHandle.get(), the
> > only TaskHandle method the scope should call when using a policy. Could
> you
> > show what problems you’ve run into with exceptions when policies are
> used ?
>
> Usually when you do something synchronous with some network calls, you mix
> an algorithm (i need at most the first three valid values) with some
> IO/Network specific concerns (i want to stop if the is an
> UnknownHostException (aka DNS error) but an IOException (aka unknown
> network error) is fine).
>
> In term of synchronous code, it's something like
>
> var list = new ArrayList<Result>();
> var error = (IOException) null;
> for(...) {
> Result result;
> try {
> result = synchronousCall(...);
> } catch(UnknownHostException e) {
> throw ... (e);
> } catch(IOException e) {
> if (error == null) {
> error = e; // just record the error
> }
> continue;
> }
> if (valid(result)) {
> list.add(result);
> if (list.size() == 3) {
> break;
> }
> }
> }
> if (list.isEmpty()) {
> throw ... (error);
> }
> ...
>
> How can i write this kind of stuff as a reusable policy that centralizes
> all the exceptions ?
>
> >
> > — Ron
>
> Rémi
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/loom-dev/attachments/20230510/46d44d57/attachment.htm>
More information about the loom-dev
mailing list