draft Carrier API

Vsevolod Tolstopyatov qwwdfsad at gmail.com
Wed Mar 11 11:51:13 UTC 2020


Hi,

We have a similar API at Kotlin [1] with both sending and receiving parts;
happy to see a j.u.c is moving toward a similar API shape!

I have a few notes based on our experience with Kotlin Channel's API:

1) isFull method. We used to have it as well and eventually decided to
deprecate it. We didn't find any compelling use-cases, its semantics is
vague (e.g. is "TransferCarrier" always full?), and it is hard to make it
linearizable with the rest of operations for buffered carriers.

2) receive throws ClosedException (that is IllegalStateException). To be
somehow consistent with the Collections API, we've decided that attempt to
receive from a closed carrier/channel is similar to 'remove' from an empty
queue and thus should be NoSuchElementException. Send attempt to a closed
channel, though, is still IllegalStateException. Maybe it is something
worth considering here as well.

3) trySend. We have an analogue called offer, that also returns boolean,
**but** may throw an exception if a channel was closed with an exception
(aka "failure" rather than a normal close). In retrospect, this was not the
best design decision: users do not expect method with a signature 'boolean
offer(value)' to throw. We currently are thinking about migrating users to
'trySend' that never throws.
The question here is whether to return boolean or some value-type like
"Try<Boolean>" or "Either<Boolean|Throwable>"  to avoid check-and-act
misusages. Otherwise, it is impossible to deterministically know whether
'trySend' failed because the source was closed or because buffer was full.

4) Conflation. There are primitives (in Channels, Rx, etc.) that allow
value conflation: if there is no place in a (usually, single-slot) buffer
on send, the current value just gets overwritten.
I am not sure whether such API is in the scope of Carrier API, but if so,
contracts around Carriable methods (capacity, estimatedSize, close etc.)
require a really careful wording to make conflation fit here.

5) Send atomicity along with cancellation. If one sends a closeable
resource via carrier, is there any guarantee on when it is safe to close a
resource on the sending side (because a carrier is closed and there are no
receivers) and when it is not? E.g. if 'send' throws ClosedException or
CancellationException, is it safe to close an item in a catch block?
"upon timeout, the item is no longer available" on send with timeout really
caught my eye here.


[1]
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html
--
Best regards,
Tolstopyatov Vsevolod


On Fri, Mar 6, 2020 at 6:22 PM Doug Lea <dl at cs.oswego.edu> wrote:

>
> [Cross-posting concurrency-interest and loom-dev.]
>
> To continue improving java.util.concurrent support for increasingly
> diverse programming styles (while still avoiding arguments about whether
> any of them are best!), it would be helpful to provide "BlockingQueues
> meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
> A sketch is pasted below. To avoid mail-reader glitches, you might want
> to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java
>
> Suggestions and comments are welcome. An initial implementation class
> (LinkedCarrier) should be available shortly after API issues settle;
> others later.
>
> ...
>
> // API sketches, with "public" omitted throughout
>
> /**
>  * A component for sending and receiving messages. Carriers support
>  * usages similar to those of BlockingQueues, but additionally
>  * implement AutoCloseable, and may be explicitly closed for sending,
>  * receiving, or both. Carriers also provide policy-based control for
>  * responses to Thread.interrupt while blocked (ignoring, cancelling
>  * the current operation only, or closing the carrier). Concrete
>  * implementation classes may be created with a given capacity (after
>  * which method send will block waiting for available space), or
>  * effectively unbounded, in which case method send will never block
>  * but may fail with an OutOfMemoryError.
>  *
>  * Design notes:
>  *
>  * (1) Both send and receive methods are declared here, but allowing
>  * either side to be permanently (vs eventually) closed for send-only
>  * or receive-only components. This loses some static type checking
>  * opportunities of separate send and receive APIs. However the class
>  * includes methods (in the style of Collections.unmodifiableX) to
>  * produce views that provide dynamic directionality enforcement.
>  *
>  * (2) This is an abstract class (rather than interface) providing
>  * uniform Observer-syle methods for Selectors and related
>  * classes. The alternative is some sort of SPI.
>  *
>  * (3) To control interactions between Thread interrupts and state,
>  * rather than throwing InterruptedExceptions, potentially blocking
>  * methods rely on a provided policy to distinguish cancelling the
>  * operation vs closing the carrier vs ignoring the interrupt. The
>  * default is CANCEL, because it is the least constraining; for
>  * example some mixed usages can catch CancellationException to then
>  * close only when desired.
>  *
>  * (4) To broaden coverage of channel-based programming styles,
>  * implementations support sendSynchronously, which is otherwise
>  * available in BlockingQueues only as the poorly-named and underused
>  * method LinkedTransferQueue.transfer.
>  */
> abstract class Carrier<T> implements AutoCloseable {
>     Carrier(OnInterrupt policy);
>     Carrier() { this(OnInterrupt.CANCEL); } // default
>
>     // Basic messaging
>
>     /**
>      * Consume item, throw if isClosedForReceiving, block if empty.
>      * May cancel or close on interrupt, depending on OnInterrupt policy.
>      */
>     T receive() throws ClosedException, CancellationException;
>
>     /**
>      * Send item, throw if isClosedForSending, block if full.
>      * May cancel or close on interrupt, depending on OnInterrupt policy.
>      */
>     void send(T item) throws ClosedException, CancellationException;
>
>     /** Send and block until item received */
>     void sendSynchronously(T item) throws ClosedException,
> CancellationException;
>
>     // Timeout versions
>     T receive(Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     void send(T item, Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     void sendSynchronously(T item, Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>
>     // Non-blocking access
>     boolean trySend(T item);        // false if closed or full
>     T tryReceive(T resultIfAbsent); // absent if closed or empty
>     T peek(T resultIfAbsent);       // may false-positive
>
>     // Termination
>     void closeForSending();         // fully close when
> isClosedForReceiving
>     void closeForReceiving();       // vice-versa
>     void close();                   // immediate close
>     void awaitClose() throws interruptedException;
>     void onClose(Runnable closeHandler); // run by thread triggering close
>
>     // Status
>     boolean isClosedForSending();
>     boolean isClosedForReceiving();
>     boolean isClosed();             // true if both sides closed
>     boolean isOpen()                { return !isClosed(); }
>     boolean isEmpty();
>     boolean isFull();               // never true if unbounded
>     long    capacity();             // Long.MAX_VALUE if unbounded
>     OnInterrupt interruptPolicy();  // return policy
>
>     // linkage support, noops here; locators are opaque cookie-like
> identifiers
>     protected void registerSource(Carrier<? super T> c, long locator) {}
>     // notification of send or close by registered carrier
>     protected void sourceEvent(long locator, boolean isClosed) {}
>
>     // views to disable one direction; similar to Collections.unmodifiableX
>     static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
>     static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);
>
>     // other possible utilities
>     Stream<T> stream();             // destructive (consume-on-traverse)
>     static <E> Carrier<E> discardingCarrier(); // /dev/null analog
>     // TBD: selector as static factory method vs class (as below)
>     // TBD: Flow (reactive stream) adaptors
> }
>
> class LinkedCarrier<T> extends Carrier<T> {
>     // main linked implementation
>     // coming soon, based on LinkedTransferQueue algorithms
> }
>
> class BufferedCarrier<T> extends Carrier<T> {
>     // main array-based implementation(s)
>     // coming later, with single- vs multiple- sink/source options
> }
>
> /**
>  * A Carrier that aggregates sources established in its constructor.
>  * The receive method blocks waiting for any to become available, then
>  * returns the corresponding item. Selectors are always closed for
>  * sending, and may become fully closed when all sources close.
>  */
> class Selector<T> extends Carrier<T> { // possibly a more specific name
>     Selector(<Carrier<? extends T> c, ...) {
>         // for each c { c.registerSource(this, locatorFor(c)); }
>     }
>     boolean isClosedForSending() { return true; }
>     // ...
> }
>
> /**
>  * A policy for responding to Thread.interrupt in blocking methods in
>  * classes implementing AutoCloseable
>  */
> static Enum OnInterrupt {
>     IGNORE,  // continue waiting
>     CANCEL,  // throw CancellationException
>     CLOSE    // close and throw ClosedException
> }
>
> // This could be placed in java.lang for use with any AutoCloseable
> class ClosedException extends IllegalStateException {
>     ClosedException(AutoCloseable c); // the closed component
>     // ...
> }
>
>


More information about the loom-dev mailing list