Structured Concurrency yet again

forax at univ-mlv.fr forax at univ-mlv.fr
Wed May 10 21:46:24 UTC 2023


>> On 10 May 2023, at 12:34, Alan Bateman <alan.bateman at oracle.com> wrote:

>> :
>> The problem is that unlike any other classes in java.util.concurrent, STS is not
>> thread safe by default, it requires to carefully implement any subclasses in a
>> thread safe way.
> 
> STS is thread safe, as are the ShutdownOnXXX subclasses for the common
> cases. When you go down the more advanced route, in this case to have a
> policy to short circuit when there are 3 results or a task fails with
> UHE, then it means extending STS and being careful. I think we will need
> to get real world usage to see if this is really a problem or whether
> additional support is required to make it easier to implement policy in
> a thread safe way.

I think we can have 3 implementations, the two existing ShutdownOnXXX and another that uses internally a blocking queue + a spliterator so if a user want a specific logic, he/she does not have to deal with the concurrency issue.

> 
> 
>> :
>>
>> - results() can not be protected by ensureOwnerAndJoined given it is accessed by
>> the implementation of the ShutdownStrategy
> The intention is that methods for processing the outcome after join use
> ensureOwnerAndJoined so they are restricted to the main task/owner.

This works well only when you wait for all the asynchronous calls to finish before doing something.

> 
> 
>> :
>>
>> I would prefer to provide an API easier to use. I believe that having a method
>> join() that takes a function that takes a stream is easier to use correctly.
> We were down that road before when the prototype was based on ES. It's a
> different style of API, also it involves the main task doing O(N)
> wakeups rather than one.

Yes, but Go channels also do O(N) wakeups in the worst case (sometimes you can pump several results in one wakeup) and nobody complain.
What i'm proposing is just a glorified blocking queue + a spliterator but it allows to implement any logic more easily.


---
> From: "Ron Pressler" <ron.pressler at oracle.com>
> To: "Alan Bateman" <alan.bateman at oracle.com>, "Remi Forax" <forax at univ-mlv.fr>
> Cc: "loom-dev" <loom-dev at openjdk.java.net>
> Sent: Wednesday, May 10, 2023 9:36:32 PM
> Subject: Re: Structured Concurrency yet again

>> On 10 May 2023, at 12:34, Alan Bateman <alan.bateman at oracle.com> wrote:


>>> I would prefer to provide an API easier to use. I believe that having a method
>>> join() that takes a function that takes a stream is easier to use correctly.
>> We were down that road before when the prototype was based on ES.
> 
> Exactly. Since we tried quite a few things and STS is expected to start Preview,
> I think we should let actual problems people encounter with the API inform its
> evolution (as was the case with ditching Future as fork’s return type),
> especially because SC is not quite mainstream yet. I think STS is a good API to
> preview because I find it to strike a nice balance between simplicity and
> flexibility; it’s neither too high-level nor too low-level. So I think it can
> teach us about how people may use SC in the field while actually being useful
> to them.

You're aware that I have demonstrated the Java 19 version of STS to numerous conference attendees. However, the requirement to create custom subclasses is often a point of confusion for them. Many existing APIs attempt to separate the concurrency aspect, which is managed by the implementation, from the business logic that users should focus on.
I believe the current incarnation of STS is falling short.


> 
> We have ideas for other structured-concurrency APIs (e.g. stream-based) that
> could be provided in addition to STS to make some common cases even more
> pleasant, so it’s quite possible that STS will not be the last word the JDK
> says on the subject of structured concurrency.

Cool !


BTW, here is how to write the code if scope.join() (scope.await() here) takes a function that takes a stream of results

  try(var scope = new AsyncScope<Integer, IOException>()) {
      for(...) {
          scope.async(() -> {
              ...
          });
      }
      var box = new Object() { int counter; };
      Result<List<Integer>, IOException> result =
          scope.await(stream -> stream
               .peek(r -> {
                   if (r.isFailed() && r.failure() instanceof UnknownHostException e) {
                       throw new UncheckedIOException(e);
                   }
               })
               .filter(r -> isValid(r))
               .takeWhile(r -> r.isFailed() || box.counter++ < 3)
               .collect(Result.toResult(Collectors.toList())));
      ...
  }

The code is full of side effects, more than i would, but i think it's more readable than creating a subclass of STS and hoping to get the concurrency right.

> 
> — Ron

Rémi


More information about the loom-dev mailing list