[External] : Re: Structured Concurrency vs the Streams API
Ron Pressler
ron.pressler at oracle.com
Tue May 10 22:56:38 UTC 2022
On 10 May 2022, at 22:09, Arkadiusz Gasiński <jigga at jigga.pl<mailto:jigga at jigga.pl>> wrote:
Thanks Ron - your implementation was one semicolon away from successful compilation and one statement reorder away from working correctly - had to move the call to scope.shutdown() after the call to result.compareAndSet, otherwise I was getting the below - I assume there was a race condition between the thread setting the result and the scope owner checking the result at the end.
Right.
I'm also thinking if, in general, it's a good practice for tasks to hold a reference to the (parent) scope that created them (excluding the case where the task creates its own scope)? My argument against it is that if we don't do this, we can have a clear separation between task logic and task management. Any thoughts on this?
My feeling is that the part of the task that’s defined in the same method as the scope is no different from a loop body, and should be considered part of that method even though it runs in a different thread, so it is perfectly reasonable to refer to the scope at least there. I agree that letting the scope escape to some other method requires more justification.
Thanks,
Arek
— Ron
On Mon, May 9, 2022 at 12:25 AM Ron Pressler <ron.pressler at oracle.com<mailto:ron.pressler at oracle.com>> wrote:
Hi.
We’ve put some thought into whether it is worthwhile to add more built-in policies (or enhance the existing ones) to support advanced use-cases such as happy eyeballs, but decided against it (for now?). More on the complications below.
In terms of style, I think you’re depending too much on Future and not enough on the scope’s shutdown. In fact, my conjecture is that the only Future method that needs to be used in a StructuredTaskScope is resultNow (in which case, we could replace the returned Futures with Suppliers), but we’re returning Futures to easily support implementing things similar to ExecutorService.invokeAll.
However, the biggest problem with your implementation as far as I could see is not the style, but a subtle bug. Multiple connections could succeed, in which case you may return while leaving some sockets open.
Here is my sketch of a possible implementation that doesn’t use any built-in policy, just the basic operations, fork, shutdown, and join (I didn’t even try to compile it):
private static Socket connect(String hostname) throws InterruptedException, IOException {
var addresses = InetAddress.getAllByName(hostname);
var source = new ArrayBlockingQueue<InetAddress>(addresses.length, false, Arrays.asList(addresses));
var work = new ArrayBlockingQueue<InetAddress>(addresses.length);
var result = new AtomicReference<Socket>();
try (var s = new StructuredTaskScope<Void>()) {
// try an address every 250 ms
s.fork(() -> {
while (true) {
work.put(source.remove());
Thread.sleep(250);
}
});
// read addresses from the qork queue and fork attempts
s.fork(() -> {
for (int i=0; i < addresses.length; i++) {
boolean first = i > 0;
var address = work.take();
s.fork(() -> {
Socket socket = null;
try {
System.out.println(Thread.currentThread() + " - connecting to: " + address);
if (!first)
Thread.sleep(address instanceof Inet4Address ? 300 : 100);
socket = new Socket(address, 443);
} catch (IOException e) {
System.out.println("Got " + e + " when connecting to " + address);
work.put(source.remove()); // immediately try another address
throw e;
}
System.out.println("Successfully connected to: " + address);
s.shutdown();
if (!result.compareAndSet(null, socket))
socket.close(); // someone else beat us to it; clean up
return null;
});
}
return null;
})
s.join();
if (result.get() == null)
throw new IOException("All connection attempts failed");
return result.get();
}
}
— Ron
On 7 May 2022, at 23:14, Arkadiusz Gasiński <jigga at jigga.pl<mailto:jigga at jigga.pl>> wrote:
Thanks Ron.
I'm actually working on the "Structured Concurrency in Java" presentation that I plan to give at a local JUG meeting at the end of May and after playing a bit with the SC APIs in the EA Loom builds, I wonder if Java's intent is purely about containing the threads in a well defined scope or will there be more to it?
Practically all presentations on the subject I've seen so far start with going down the "goto statement considered harmful" memory lane and are trying to prove that thread spawning, futures, promises, etc. are "not even the modern domesticated goto, but the old-testament fire-and-brimstone goto" of concurrency. Do you agree with this?
Also, one of Dijkstra's remarks in the "Case against the GO TO statement" was that we should do our best to shorten the gap between how the program is spread out in text and its runtime (process) representation. Do you think this is feasible thing to achieve in structured Concurrency?
One last thing - I made an attempt at implementing a rather dummy (don't take IPv4 vs IPv6 into consideration) version of the Happy Eyeballs algorithm with the StructuredTaskScope API (https://github.com/jigga/classes/blob/main/HappyEyeballs.java<https://urldefense.com/v3/__https://github.com/jigga/classes/blob/main/HappyEyeballs.java__;!!ACWV5N9M2RV99hQ!OS9ViAHAqF7jAdF8Z9Ll6ls2BjMBLmJd-sdIlkjssH2vvLKgoSoFIaSSP5l5yUMAwus_X5DeDygauFgm$>) - similar to what N. J. Smith did with his Trio library in this (https://youtu.be/oLkfnc_UMcE<https://urldefense.com/v3/__https://youtu.be/oLkfnc_UMcE__;!!ACWV5N9M2RV99hQ!OS9ViAHAqF7jAdF8Z9Ll6ls2BjMBLmJd-sdIlkjssH2vvLKgoSoFIaSSP5l5yUMAwus_X5DeD67wPhJD$>) talk. Would you be so kind to have a look and give some feedback? While it works (starts subsequent task when the previous one times out or fails and finishes as soon the first succeeds), I think it's still lacking in cleanliness. Right now I'm thinking if it wouldn't be clearer/more idiomatic to encapsulate the delay between tasks submissions in some custom policy (e.g. StructuredTaskScope.ShutdownOnSuccessForkWithDelay), where you'd fork all tasks immediately (ideally a call to fork would return instantaneously and not after timeout passes) and then just join like in other use cases. Any thoughts?
Thanks,
Arek
On Fri, May 6, 2022, 12:09 PM Ron Pressler <ron.pressler at oracle.com<mailto:ron.pressler at oracle.com>> wrote:
Hi.
Parallel streams are indeed structured, but they’re (currently) focused on pure processing and data parallelism, rather than “concurrency,” which often involves a lot of I/O, failure handling, and cancellation.
It is also true that the Stream API could be modified to implement some flavour of structued concurrency, and it is an idea that we’re thinking about.
— Ron
> On 6 May 2022, at 09:53, Arkadiusz Gasiński <jigga at jigga.pl<mailto:jigga at jigga.pl>> wrote:
>
> Hi all,
>
> Just a random thought of mine that I'd like loom-dev to comment on.
>
> Do you guys think it is a valid claim to call the (parallel) stream API an
> implementation of the Structured Concurrency paradigm?
>
> The more I think about it the more I'm convinced that this is indeed the
> case, but would like to hear some more opinions.
>
> Thanks,
> Arek
More information about the loom-dev
mailing list