[External] : Re: Structured Concurrency vs the Streams API
Ron Pressler
ron.pressler at oracle.com
Sun May 8 22:25:41 UTC 2022
Hi.
We’ve put some thought into whether it is worthwhile to add more built-in policies (or enhance the existing ones) to support advanced use-cases such as happy eyeballs, but decided against it (for now?). More on the complications below.
In terms of style, I think you’re depending too much on Future and not enough on the scope’s shutdown. In fact, my conjecture is that the only Future method that needs to be used in a StructuredTaskScope is resultNow (in which case, we could replace the returned Futures with Suppliers), but we’re returning Futures to easily support implementing things similar to ExecutorService.invokeAll.
However, the biggest problem with your implementation as far as I could see is not the style, but a subtle bug. Multiple connections could succeed, in which case you may return while leaving some sockets open.
Here is my sketch of a possible implementation that doesn’t use any built-in policy, just the basic operations, fork, shutdown, and join (I didn’t even try to compile it):
private static Socket connect(String hostname) throws InterruptedException, IOException {
var addresses = InetAddress.getAllByName(hostname);
var source = new ArrayBlockingQueue<InetAddress>(addresses.length, false, Arrays.asList(addresses));
var work = new ArrayBlockingQueue<InetAddress>(addresses.length);
var result = new AtomicReference<Socket>();
try (var s = new StructuredTaskScope<Void>()) {
// try an address every 250 ms
s.fork(() -> {
while (true) {
work.put(source.remove());
Thread.sleep(250);
}
});
// read addresses from the qork queue and fork attempts
s.fork(() -> {
for (int i=0; i < addresses.length; i++) {
boolean first = i > 0;
var address = work.take();
s.fork(() -> {
Socket socket = null;
try {
System.out.println(Thread.currentThread() + " - connecting to: " + address);
if (!first)
Thread.sleep(address instanceof Inet4Address ? 300 : 100);
socket = new Socket(address, 443);
} catch (IOException e) {
System.out.println("Got " + e + " when connecting to " + address);
work.put(source.remove()); // immediately try another address
throw e;
}
System.out.println("Successfully connected to: " + address);
s.shutdown();
if (!result.compareAndSet(null, socket))
socket.close(); // someone else beat us to it; clean up
return null;
});
}
return null;
})
s.join();
if (result.get() == null)
throw new IOException("All connection attempts failed");
return result.get();
}
}
— Ron
On 7 May 2022, at 23:14, Arkadiusz Gasiński <jigga at jigga.pl<mailto:jigga at jigga.pl>> wrote:
Thanks Ron.
I'm actually working on the "Structured Concurrency in Java" presentation that I plan to give at a local JUG meeting at the end of May and after playing a bit with the SC APIs in the EA Loom builds, I wonder if Java's intent is purely about containing the threads in a well defined scope or will there be more to it?
Practically all presentations on the subject I've seen so far start with going down the "goto statement considered harmful" memory lane and are trying to prove that thread spawning, futures, promises, etc. are "not even the modern domesticated goto, but the old-testament fire-and-brimstone goto" of concurrency. Do you agree with this?
Also, one of Dijkstra's remarks in the "Case against the GO TO statement" was that we should do our best to shorten the gap between how the program is spread out in text and its runtime (process) representation. Do you think this is feasible thing to achieve in structured Concurrency?
One last thing - I made an attempt at implementing a rather dummy (don't take IPv4 vs IPv6 into consideration) version of the Happy Eyeballs algorithm with the StructuredTaskScope API (https://github.com/jigga/classes/blob/main/HappyEyeballs.java<https://urldefense.com/v3/__https://github.com/jigga/classes/blob/main/HappyEyeballs.java__;!!ACWV5N9M2RV99hQ!OS9ViAHAqF7jAdF8Z9Ll6ls2BjMBLmJd-sdIlkjssH2vvLKgoSoFIaSSP5l5yUMAwus_X5DeDygauFgm$>) - similar to what N. J. Smith did with his Trio library in this (https://youtu.be/oLkfnc_UMcE<https://urldefense.com/v3/__https://youtu.be/oLkfnc_UMcE__;!!ACWV5N9M2RV99hQ!OS9ViAHAqF7jAdF8Z9Ll6ls2BjMBLmJd-sdIlkjssH2vvLKgoSoFIaSSP5l5yUMAwus_X5DeD67wPhJD$>) talk. Would you be so kind to have a look and give some feedback? While it works (starts subsequent task when the previous one times out or fails and finishes as soon the first succeeds), I think it's still lacking in cleanliness. Right now I'm thinking if it wouldn't be clearer/more idiomatic to encapsulate the delay between tasks submissions in some custom policy (e.g. StructuredTaskScope.ShutdownOnSuccessForkWithDelay), where you'd fork all tasks immediately (ideally a call to fork would return instantaneously and not after timeout passes) and then just join like in other use cases. Any thoughts?
Thanks,
Arek
On Fri, May 6, 2022, 12:09 PM Ron Pressler <ron.pressler at oracle.com<mailto:ron.pressler at oracle.com>> wrote:
Hi.
Parallel streams are indeed structured, but they’re (currently) focused on pure processing and data parallelism, rather than “concurrency,” which often involves a lot of I/O, failure handling, and cancellation.
It is also true that the Stream API could be modified to implement some flavour of structued concurrency, and it is an idea that we’re thinking about.
— Ron
> On 6 May 2022, at 09:53, Arkadiusz Gasiński <jigga at jigga.pl<mailto:jigga at jigga.pl>> wrote:
>
> Hi all,
>
> Just a random thought of mine that I'd like loom-dev to comment on.
>
> Do you guys think it is a valid claim to call the (parallel) stream API an
> implementation of the Structured Concurrency paradigm?
>
> The more I think about it the more I'm convinced that this is indeed the
> case, but would like to hear some more opinions.
>
> Thanks,
> Arek
More information about the loom-dev
mailing list