[External] : Re: Structured Concurrency vs the Streams API

Arkadiusz Gasiński jigga at jigga.pl
Tue May 10 21:09:20 UTC 2022


Thanks Ron - your implementation was one semicolon away from successful
compilation and one statement reorder away from working correctly - had to
move the call to scope.shutdown() after the call to result.compareAndSet,
otherwise I was getting the below - I assume there was a race condition
between the thread setting the result and the scope owner checking the
result at the end.

VirtualThread[#28]/runnable at ForkJoinPool-1-worker-1 - connecting to:
amazon.com/176.32.103.205
Successfully connected to: amazon.com/176.32.103.205
Socket set successfully for address: amazon.com/176.32.103.205
Exception in thread "main" java.io.IOException: All connection attempts
failed
        at HappyEyeballsRon.connect(HappyEyeballsRon.java:65)
        at HappyEyeballsRon.main(HappyEyeballsRon.java:71)

I'm also thinking if, in general, it's a good practice for tasks to hold a
reference to the (parent) scope that created them (excluding the case where
the task creates its own scope)? My argument against it is that if we don't
do this, we can have a clear separation between task logic and task
management. Any thoughts on this?

Thanks,
Arek

On Mon, May 9, 2022 at 12:25 AM Ron Pressler <ron.pressler at oracle.com>
wrote:

> Hi.
>
> We’ve put some thought into whether it is worthwhile to add more built-in
> policies (or enhance the existing ones) to support advanced use-cases such
> as happy eyeballs, but decided against it (for now?). More on the
> complications below.
>
> In terms of style, I think you’re depending too much on Future and not
> enough on the scope’s shutdown. In fact, my conjecture is that the only
> Future method that needs to be used in a StructuredTaskScope is resultNow
> (in which case, we could replace the returned Futures with Suppliers), but
> we’re returning Futures to easily support implementing things similar to
> ExecutorService.invokeAll.
>
> However, the biggest problem with your implementation as far as I could
> see is not the style, but a subtle bug. Multiple connections could succeed,
> in which case you may return while leaving some sockets open.
>
> Here is my sketch of a possible implementation that doesn’t use any
> built-in policy, just the basic operations, fork, shutdown, and join (I
> didn’t even try to compile it):
>
> private static Socket connect(String hostname) throws
> InterruptedException, IOException {
>     var addresses = InetAddress.getAllByName(hostname);
>     var source = new ArrayBlockingQueue<InetAddress>(addresses.length,
> false, Arrays.asList(addresses));
>     var work = new ArrayBlockingQueue<InetAddress>(addresses.length);
>
>     var result = new AtomicReference<Socket>();
>
>     try (var s = new StructuredTaskScope<Void>()) {
>         // try an address every 250 ms
>         s.fork(() -> {
>             while (true) {
>                 work.put(source.remove());
>                 Thread.sleep(250);
>             }
>         });
>
>         // read addresses from the qork queue and fork attempts
>         s.fork(() -> {
>             for (int i=0; i < addresses.length; i++) {
>                 boolean first = i > 0;
>                 var address = work.take();
>                 s.fork(() -> {
>                     Socket socket = null;
>                     try {
>                         System.out.println(Thread.currentThread() + " -
> connecting to: " + address);
>                         if (!first)
>                             Thread.sleep(address instanceof Inet4Address ?
> 300 : 100);
>                         socket = new Socket(address, 443);
>                     } catch (IOException e) {
>                         System.out.println("Got " + e + " when connecting
> to " + address);
>                         work.put(source.remove()); // immediately try
> another address
>                         throw e;
>                     }
>                     System.out.println("Successfully connected to: " +
> address);
>                     s.shutdown();
>                     if (!result.compareAndSet(null, socket))
>                        socket.close(); // someone else beat us to it;
> clean up
>                     return null;
>                 });
>             }
>             return null;
>         })
>
>         s.join();
>
>         if (result.get() == null)
>             throw new IOException("All connection attempts failed");
>         return result.get();
>     }
> }
>
> — Ron
>
> On 7 May 2022, at 23:14, Arkadiusz Gasiński <jigga at jigga.pl> wrote:
>
> Thanks Ron.
>
> I'm actually working on the "Structured Concurrency in Java" presentation
> that I plan to give at a local JUG meeting at the end of May and after
> playing a bit with the SC APIs in the EA Loom builds, I wonder if Java's
> intent is purely about containing the threads in a well defined scope or
> will there be more to it?
>
> Practically all presentations on the subject I've seen so far start with
> going down the "goto statement considered harmful" memory lane and are
> trying to prove that thread spawning, futures, promises, etc. are "not
> even the modern domesticated goto, but the old-testament
> fire-and-brimstone goto" of concurrency. Do you agree with this?
>
> Also, one of Dijkstra's remarks in the "Case against the GO TO statement"
> was that we should do our best to shorten the gap between how the program
> is spread out in text and its runtime (process) representation. Do you
> think this is feasible thing to achieve in structured Concurrency?
>
>  One last thing - I made an attempt at implementing a rather dummy (don't
> take IPv4 vs IPv6 into consideration) version of the Happy Eyeballs
> algorithm with the StructuredTaskScope API (
> https://github.com/jigga/classes/blob/main/HappyEyeballs.java
> <https://urldefense.com/v3/__https://github.com/jigga/classes/blob/main/HappyEyeballs.java__;!!ACWV5N9M2RV99hQ!OS9ViAHAqF7jAdF8Z9Ll6ls2BjMBLmJd-sdIlkjssH2vvLKgoSoFIaSSP5l5yUMAwus_X5DeDygauFgm$>)
> - similar to what N. J. Smith did with his Trio library in this (
> https://youtu.be/oLkfnc_UMcE
> <https://urldefense.com/v3/__https://youtu.be/oLkfnc_UMcE__;!!ACWV5N9M2RV99hQ!OS9ViAHAqF7jAdF8Z9Ll6ls2BjMBLmJd-sdIlkjssH2vvLKgoSoFIaSSP5l5yUMAwus_X5DeD67wPhJD$>)
> talk. Would you be so kind to have a look and give some feedback? While it
> works (starts subsequent task when the previous one times out or fails and
> finishes as soon the first succeeds), I think it's still lacking in
> cleanliness. Right now I'm thinking if it wouldn't be clearer/more
> idiomatic to encapsulate the delay between tasks submissions in some custom
> policy (e.g. StructuredTaskScope.ShutdownOnSuccessForkWithDelay), where
> you'd fork all tasks immediately (ideally a call to fork would return
> instantaneously and not after timeout passes) and then just join like in
> other use cases. Any thoughts?
>
> Thanks,
> Arek
>
> On Fri, May 6, 2022, 12:09 PM Ron Pressler <ron.pressler at oracle.com>
> wrote:
>
>> Hi.
>>
>> Parallel streams are indeed structured, but they’re (currently) focused
>> on pure processing and data parallelism, rather than “concurrency,” which
>> often involves a lot of I/O, failure handling, and cancellation.
>>
>> It is also true that the Stream API could be modified to implement some
>> flavour of structued concurrency, and it is an idea that we’re thinking
>> about.
>>
>> — Ron
>>
>> > On 6 May 2022, at 09:53, Arkadiusz Gasiński <jigga at jigga.pl> wrote:
>> >
>> > Hi all,
>> >
>> > Just a random thought of mine that I'd like loom-dev to comment on.
>> >
>> > Do you guys think it is a valid claim to call the (parallel) stream API
>> an
>> > implementation of the Structured Concurrency paradigm?
>> >
>> > The more I think about it the more I'm convinced that this is indeed the
>> > case, but would like to hear some more opinions.
>> >
>> > Thanks,
>> > Arek
>>
>>
>


More information about the loom-dev mailing list