draft Carrier API

Alex Otenko oleksandr.otenko at gmail.com
Sat Mar 7 23:17:14 UTC 2020

> enough use cases to justified added cost

Most primitives we have are really monolithic. You get only two ways of
interacting with them: try or get blocked indefinitely.

A ticket system allows to build nicer things, because essentially you give
away a bit of control what to do during  the wait. You can switch between
try* and blocking after you made the call, choosing when the time to block
is more suitable.


On Sat, 7 Mar 2020, 16:30 Doug Lea, <dl at cs.oswego.edu> wrote:

> Collecting replies/responses, with updated sketches pasted below and at
> http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java
> On 3/6/20 3:56 PM, John Rose wrote:
> > This design puts both endpoints on one type, as opposed to two
> > similar types, like InputStream and OutputStream. This leads to
> > fewer types and objects (good) but broader ones. Broader is little
> > less good, since most use points only care about 1/2 of the methods;
> > the other 1/2 is then noise.
> The main API design issue here is that there are three (not two) views
> of a Carrier: the protocol state (closed, empty, etc), sender-side, and
>  receiver-side operations. If you split them, you need at least four
> interfaces/classes total (one to combine them). Doing this in a way that
> does not result in nearly all usages needing the combined interface
> requires other tradeoffs (like declaring some bookkeeping methods public
> and breaking read-only-ness of base interface). But given the reaction
> so far, I'm back to thinking this can be done in a way that more people
> will prefer. (Aside: I've waffled on this many many times, including
> pre-j.u.c
> http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
> , and several previous Carrier drafts.)
> So, back to a refreshed 4-interface version.
> On 3/6/20 1:04 PM, Dávid Karnok wrote:
> > onClose(Runnable): will this allow one handler ...
> Thanks for the prod. It is much better to define:
>   /** Returns a CompletableFuture that isDone when closed. */
>   CompletionStage<Carriable<T>> onClose();
> In which case these questions and others are already answered, and we
> can also omit awaitClose method.
> >
> > tryReceive(T def): always somewhat trouble to expect a default value
> > of T, perhaps return Optional<T> if T is planned to be non-null.
> I think best to keep this, but also add a variant of your other suggestion:
>   boolean tryConsume(Consumer<? super T> proc); // false if closed or empty
> > + closeExceptionally(Throwable) :
> Yes, thanks; for the same reasons we added to SubmissionPublisher. Also
> adding getClosedException() method.
> > + onSenderReady(Runnable r) receiveAsPublisher(Executor)
> sendAsSubscriber(Executor)
> ... among other possibilities. I'm leaving interplay with Flow as TBD
> for now, in part because...
> On 3/6/20 5:14 PM, Thomas May wrote:
> > It also could introduce interesting patterns like multiplexing… (IE,
> > having multiple receivers getting the same message)
> We already have a good multicaster, SubmissionPublisher. But I'm still
> not sure of the best linkages.
> On 3/6/20 6:06 PM, Alex Otenko wrote:
> > Can methods return something more useful than void? Eg something
> > that can be used to test progress? (I am afraid my imagination is
> > limited to returning a ticket number, and a sequencer API to inspect
> > whether send / close has a matching receive / await for such an
> > event)
> I can't think of enough use cases to justified added cost. Can you?
> >
> > SendSynchronously with timeout is ambiguous. If timeout occurs, was
> > it placed in the buffer, and not received yet, or not even buffered?
> >
> The only thing you know is that upon exception, the item cannot have
> been (and never will be) received. This is no different than other
> methods. (although you are right that bounded+synchronous+timeout is
> the most complicated to implement.)
> ... pasting updated draft ...
> // snapshot: Sat Mar  7 11:13:22 2020  Doug Lea  (dl at nuc40)
> // API sketches, with "public" omitted throughout
> /**
>  * A component for sending and receiving data. Carriers support
>  * usages similar to those of BlockingQueues, but additionally
>  * implement AutoCloseable, and may be explicitly closed for sending,
>  * receiving, or both.
>  *
>  * This interface combines three sets of methods, defined in three
>  * interfaces: Carriable methods access protocol state and
>  * configuration. Interfaces CarrierSender and CarrierReceiver extend
>  * Carriable with sender- and receiver- side views. Finally, this
>  * interface combines these views.
>  *
>  * To control interactions between Thread interrupts and state, rather
>  * than throwing InterruptedExceptions, potentially blocking methods
>  * rely on a provided policy to distinguish cancelling the operation
>  * vs closing the carrier vs ignoring the interrupt. The default for
>  * current implementations is CANCEL, because it is the least
>  * constraining; for example some mixed usages can catch
>  * CancellationException to then close only when desired.
>  *
>  * Concrete implementation classes may enforce a given capacity (after
>  * which method send will block waiting for available space), or be
>  * effectively unbounded, in which case method send will never block
>  * but may fail with an OutOfMemoryError.
>  */
> interface Carrier<T> extends CarrierSender<T>, CarrierReceiver<T> {
>     // TBD: factory methods for jdk implementations
>     // some utility methods, such as...
>     static <E> CarrierReceiver<E> discardingCarrier(); // /dev/null analog
>     // TBD: Flow (reactive stream) adaptors
> }
> /**
>  * Methods accessing the protocol state and configuration of a
>  * Carrier.
>  */
> interface Carriable<T> extends AutoCloseable {
>     boolean isClosedForSending();
>     boolean isClosedForReceiving();
>     boolean isClosed();             // true if both sides closed
>     boolean isOpen();               // { return !isClosed(); }
>     boolean isEmpty();
>     boolean isFull();               // never true if unbounded
>     long    capacity();             // Long.MAX_VALUE if unbounded
>     OnInterrupt interruptPolicy();  // return policy
>     void close();                   // immediate close both sides
>     void closeExceptionally(Throwable cause); // record as cause
>     Throwable getClosedException();
>     /** Returns a CompletableFuture that isDone when closed. */
>     CompletionStage<Carriable<T>> onClose();
> }
> /**
>  * Methods defining the sender-side view of a Carrier.
>  */
> interface CarrierSender<T> extends Carriable<T> {
>     /**
>      * Send item, throw if isClosedForSending, block if full.
>      * May cancel or close on interrupt, depending on OnInterrupt policy.
>      */
>     void send(T item) throws ClosedException, CancellationException;
>     /** Send and block until item received */
>     void sendSynchronously(T item) throws ClosedException,
> CancellationException;
>     /** Try to send, upon timeout, the item is no longer available. */
>     void send(T item, Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     void sendSynchronously(T item, Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     boolean trySend(T item);        // false if closed or full
>     void closeForSending();         // fully close when
> isClosedForReceiving
>     // linkage support; locators are opaque cookie-like identifiers
>     void registerSource(CarrierSender<? super T> c, long locator);
> }
> /**
>  * Methods defining the receiver-side view of a Carrier.
>  */
> interface CarrierReceiver<T> extends Carriable<T> {
>     /**
>      * Consume item, throw if isClosedForReceiving, block if empty.
>      * May cancel or close on interrupt, depending on OnInterrupt policy.
>      */
>     T receive() throws ClosedException, CancellationException;
>     T receive(Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     T tryReceive(T resultIfAbsent); // absent if closed or empty
>     T peek(T resultIfAbsent);       // may false-positive
>     void closeForReceiving();       // fully close when isClosedForSending
>     boolean tryConsume(Consumer<? super T> proc); // false if closed or
> empty
>     Stream<T> stream();             // destructive (consume-on-traverse)
>     // notification of send or close by registered source
>     void sourceSent(CarrierSender<? extends T> source, long locator, T
> item);
>     void sourceClosed(CarrierSender<? extends T> source, long locator);
> }
> // TBD: provide abstract class AbstractCarrier<T>.
> class LinkedCarrier<T> implements Carrier<T> {
>     LinkedCarrier(OnInterrupt policy);
>     LinkedCarrier() { this(OnInterrupt.CANCEL); } // default
>     // main linked implementation
>     // coming soon, based on LinkedTransferQueue algorithms
> }
> class BufferedCarrier<T> implemnts Carrier<T> {
>     // main array-based implementation(s)
>     // coming later, with single- vs multiple- sink/source options
> }
> /**
>  * A Carrier that aggregates sources established in its constructor.
>  * The receive method blocks waiting for any to become available, then
>  * returns the corresponding item.
>  */
> class CarrierSelector<T> implements CarrierReceiver<T> {
>     Selector(<CarrierSender<? extends T> c, ...) {
>         // for each c { c.registerSource(this, locatorFor(c)); }
>     }
> }
> /**
>  * A policy for responding to Thread.interrupt in blocking methods in
>  * classes implementing AutoCloseable
>  */
> static Enum OnInterrupt {
>     IGNORE,  // continue waiting
>     CANCEL,  // throw CancellationException
>     CLOSE    // close and throw ClosedException
> }
> // This could be placed in java.lang for use with any AutoCloseable
> class ClosedException extends IllegalStateException {
>     ClosedException(AutoCloseable c); // the closed component
>     // copied from ExecutionException:
>     /**
>      * Constructs an {@code ClosedException} with the specified cause.
>      * The detail message is set to {@code (cause == null ? null :
>      * cause.toString())} (which typically contains the class and
>      * detail message of {@code cause}).
>      *
>      * @param  cause the cause (which is saved for later retrieval by the
>      *         {@link #getCause()} method)
>      */
>     public ClosedException(Throwable cause) {
>         super(cause);
>     }
>     // ...
> }

