[concurrency-interest] draft Carrier API

Alex Otenko oleksandr.otenko at gmail.com
Wed Mar 11 06:55:55 UTC 2020


I think the mapping between poll, take of blocking queue and various
versions of receive are fairly obvious. Same for offer, put and various
send, including the versions with timeouts. The substantial difference is
in dealing with "closed" states in blocking send/receive. It is difficult
to signal that state without changing return type. Returning arbitrary
references or null is a source of errors. I see returning Optional as a
necessary deviation from the direct mapping of the blocking put/take, but I
see dealing with Optional can also be awkward. I guess we'll wait and see.

SendSynchronous is not just a wait for send to succeed, it also ensures the
corresponding receive consuming the sent item has occurred. You can see
this as an ack of that item and all the preceding items, although I don't
know if the API goes as far as the guarantee for the preceding receives.


Alex

On Wed, 11 Mar 2020, 03:10 Ghadi Shayban, <gshayban at gmail.com> wrote:

> As an API comparison point, Clojure's CSP library is called core.async. It
> features channels and send/receive operations, and the CSP select operation
> called "alts". The alts op is a plain function (no compiler magic) which
> non-deterministically chooses the first operation that can proceed, whether
> it is a channel put or a take. Channel buffers are polymorphic (fixed,
> sliding and dropping; or no buffer, meaning rendezvous semantics).
>
> Channel put returns whether the put succeeded (false if channel is
> closed).  Critically, the channel isClosed predicate is not exposed
> publicly, as it is hard to use without a TOCTOU bug.  Channel take receives
> the item (which is any reference, including Boolean false!) or receives nil
> when a channel is closed. There is only a single concrete implementation of
> a channel, ManyToManyChannel, but the underlying interfaces are split into
> read/write.
>
> When is it useful to use the exception throwing variants of Carrier, rather
> than trySend/tryReceive? Seems like there is a whole lot of API surface
> area to deal with channels being closed or closing.
>
> From a receivers point of view, the distinction between shutdown & closed
> seems arbitrary. You're not done receiving until a buffer (if present)
> drains. Over the years, I have found from my usage of core.async that the
> most useful ops around termination are: a sender signaling that they're
> done, or a receiver abandoning the interaction early. Generally only one
> side is in charge of closing a channel, but in the consumer abandonment
> scenario, the producer will detect that the channel has closed during its
> next put (then walks away, too.)
>
> I don't understand the distinction between send+timeout and
> sendSynchronously(). Isn't synchronicity more a property of whether a
> buffer is present or not (rendezvous channel)?  Same question re:
> tryReceiveEventually
>
> I love that there is something CSP-like going into the JVM - it will be
> killer when Loom drops.
>
> On Tue, Mar 10, 2020 at 7:49 AM Alex Otenko via Concurrency-interest <
> concurrency-interest at cs.oswego.edu> wrote:
>
> > Thanks, that will probably work. Or some tryReceive version like poll
> with
> > timeout. Passing max_value is essentially blocking until closed or a
> value
> > arrives (or spurious wakeup-like condition).
> >
> > Alex
> >
> > On Tue, 10 Mar 2020, 11:30 Doug Lea via Concurrency-interest, <
> > concurrency-interest at cs.oswego.edu> wrote:
> >
> >> On 3/9/20 3:29 PM, Alex Otenko wrote:
> >> > IllegalStateException is ok if receiver should've known there are no
> >> > more items to receive. This is a good idea in cases with definite
> length
> >> > of stream, and the length being known to the receiver before entering
> >> > receive(). This doesn't seem like a good idea for indefinite length
> >> > cases - like, loop to read all items until eof.
> >> >
> >> This is the reason for:
> >>     Stream<T> stream();             // destructive (consume-on-traverse)
> >> But it is also sensible to provide a simpler forEach analog:
> >>     long consumeEach(Consumer<? super T> proc); // return count
> >>
> >> For those who need stateful loops, we could add "eventually" forms of
> >> tryReceive. With non-value-types, the preferable form that can co-exist
> >> with value-types is usually to return a resultIfAbsent (that is almost
> >> always chosen to be null), and for value types, Optional. To avoid
> >> annoying people, we should probably have both.
> >>
> >>     T tryReceive(T resultIfAbsent); // resultIfAbsent if closed or empty
> >>     Optional<T> tryReceive();       // Optional.empty if closed or empty
> >>
> >>     T tryReceiveEventually(T resultIfAbsent); // resultIfAbsent if
> closed
> >>     Optional<T> tryReceiveEventually(); // Optional.empty if closed
> >>
> >> Maybe there is a better method name.
> >>
> >> (See updates at http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java)
> >>
> >> -Doug
> >>
> >> _______________________________________________
> >> Concurrency-interest mailing list
> >> Concurrency-interest at cs.oswego.edu
> >> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
> >>
> > _______________________________________________
> > Concurrency-interest mailing list
> > Concurrency-interest at cs.oswego.edu
> > http://cs.oswego.edu/mailman/listinfo/concurrency-interest
> >
>


More information about the loom-dev mailing list