draft Carrier API
Alex Otenko
oleksandr.otenko at gmail.com
Sat Mar 7 23:25:57 UTC 2020
Send is declared to throw when is closed for sending. Is there a good
reason to not throw when is closed for receiving? Or what is the intended
behavior in this case, given that it may block if full?
Alex
On Sat, 7 Mar 2020, 16:30 Doug Lea, <dl at cs.oswego.edu> wrote:
>
> Collecting replies/responses, with updated sketches pasted below and at
> http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java
>
> On 3/6/20 3:56 PM, John Rose wrote:
>
> > This design puts both endpoints on one type, as opposed to two
> > similar types, like InputStream and OutputStream. This leads to
> > fewer types and objects (good) but broader ones. Broader is little
> > less good, since most use points only care about 1/2 of the methods;
> > the other 1/2 is then noise.
>
> The main API design issue here is that there are three (not two) views
> of a Carrier: the protocol state (closed, empty, etc), sender-side, and
> receiver-side operations. If you split them, you need at least four
> interfaces/classes total (one to combine them). Doing this in a way that
> does not result in nearly all usages needing the combined interface
> requires other tradeoffs (like declaring some bookkeeping methods public
> and breaking read-only-ness of base interface). But given the reaction
> so far, I'm back to thinking this can be done in a way that more people
> will prefer. (Aside: I've waffled on this many many times, including
> pre-j.u.c
>
> http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
> , and several previous Carrier drafts.)
>
> So, back to a refreshed 4-interface version.
>
> On 3/6/20 1:04 PM, Dávid Karnok wrote:
>
> > onClose(Runnable): will this allow one handler ...
>
> Thanks for the prod. It is much better to define:
> /** Returns a CompletableFuture that isDone when closed. */
> CompletionStage<Carriable<T>> onClose();
> In which case these questions and others are already answered, and we
> can also omit awaitClose method.
>
> >
> > tryReceive(T def): always somewhat trouble to expect a default value
> > of T, perhaps return Optional<T> if T is planned to be non-null.
>
> I think best to keep this, but also add a variant of your other suggestion:
> boolean tryConsume(Consumer<? super T> proc); // false if closed or empty
>
> > + closeExceptionally(Throwable) :
>
> Yes, thanks; for the same reasons we added to SubmissionPublisher. Also
> adding getClosedException() method.
>
> > + onSenderReady(Runnable r) receiveAsPublisher(Executor)
> sendAsSubscriber(Executor)
>
> ... among other possibilities. I'm leaving interplay with Flow as TBD
> for now, in part because...
>
> On 3/6/20 5:14 PM, Thomas May wrote:
>
> > It also could introduce interesting patterns like multiplexing… (IE,
> > having multiple receivers getting the same message)
>
> We already have a good multicaster, SubmissionPublisher. But I'm still
> not sure of the best linkages.
>
> On 3/6/20 6:06 PM, Alex Otenko wrote:
>
> > Can methods return something more useful than void? Eg something
> > that can be used to test progress? (I am afraid my imagination is
> > limited to returning a ticket number, and a sequencer API to inspect
> > whether send / close has a matching receive / await for such an
> > event)
>
> I can't think of enough use cases to justified added cost. Can you?
>
> >
> > SendSynchronously with timeout is ambiguous. If timeout occurs, was
> > it placed in the buffer, and not received yet, or not even buffered?
> >
>
> The only thing you know is that upon exception, the item cannot have
> been (and never will be) received. This is no different than other
> methods. (although you are right that bounded+synchronous+timeout is
> the most complicated to implement.)
>
> ... pasting updated draft ...
>
>
> // snapshot: Sat Mar 7 11:13:22 2020 Doug Lea (dl at nuc40)
>
> // API sketches, with "public" omitted throughout
>
> /**
> * A component for sending and receiving data. Carriers support
> * usages similar to those of BlockingQueues, but additionally
> * implement AutoCloseable, and may be explicitly closed for sending,
> * receiving, or both.
> *
> * This interface combines three sets of methods, defined in three
> * interfaces: Carriable methods access protocol state and
> * configuration. Interfaces CarrierSender and CarrierReceiver extend
> * Carriable with sender- and receiver- side views. Finally, this
> * interface combines these views.
> *
> * To control interactions between Thread interrupts and state, rather
> * than throwing InterruptedExceptions, potentially blocking methods
> * rely on a provided policy to distinguish cancelling the operation
> * vs closing the carrier vs ignoring the interrupt. The default for
> * current implementations is CANCEL, because it is the least
> * constraining; for example some mixed usages can catch
> * CancellationException to then close only when desired.
> *
> * Concrete implementation classes may enforce a given capacity (after
> * which method send will block waiting for available space), or be
> * effectively unbounded, in which case method send will never block
> * but may fail with an OutOfMemoryError.
> */
> interface Carrier<T> extends CarrierSender<T>, CarrierReceiver<T> {
> // TBD: factory methods for jdk implementations
> // some utility methods, such as...
> static <E> CarrierReceiver<E> discardingCarrier(); // /dev/null analog
> // TBD: Flow (reactive stream) adaptors
> }
>
> /**
> * Methods accessing the protocol state and configuration of a
> * Carrier.
> */
> interface Carriable<T> extends AutoCloseable {
> boolean isClosedForSending();
> boolean isClosedForReceiving();
> boolean isClosed(); // true if both sides closed
> boolean isOpen(); // { return !isClosed(); }
> boolean isEmpty();
> boolean isFull(); // never true if unbounded
> long capacity(); // Long.MAX_VALUE if unbounded
> OnInterrupt interruptPolicy(); // return policy
>
> void close(); // immediate close both sides
> void closeExceptionally(Throwable cause); // record as cause
> Throwable getClosedException();
>
> /** Returns a CompletableFuture that isDone when closed. */
> CompletionStage<Carriable<T>> onClose();
> }
>
> /**
> * Methods defining the sender-side view of a Carrier.
> */
> interface CarrierSender<T> extends Carriable<T> {
> /**
> * Send item, throw if isClosedForSending, block if full.
> * May cancel or close on interrupt, depending on OnInterrupt policy.
> */
> void send(T item) throws ClosedException, CancellationException;
>
> /** Send and block until item received */
> void sendSynchronously(T item) throws ClosedException,
> CancellationException;
>
> /** Try to send, upon timeout, the item is no longer available. */
> void send(T item, Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
> void sendSynchronously(T item, Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
>
> boolean trySend(T item); // false if closed or full
> void closeForSending(); // fully close when
> isClosedForReceiving
>
> // linkage support; locators are opaque cookie-like identifiers
> void registerSource(CarrierSender<? super T> c, long locator);
> }
>
> /**
> * Methods defining the receiver-side view of a Carrier.
> */
> interface CarrierReceiver<T> extends Carriable<T> {
> /**
> * Consume item, throw if isClosedForReceiving, block if empty.
> * May cancel or close on interrupt, depending on OnInterrupt policy.
> */
> T receive() throws ClosedException, CancellationException;
> T receive(Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
>
> T tryReceive(T resultIfAbsent); // absent if closed or empty
> T peek(T resultIfAbsent); // may false-positive
>
> void closeForReceiving(); // fully close when isClosedForSending
>
> boolean tryConsume(Consumer<? super T> proc); // false if closed or
> empty
> Stream<T> stream(); // destructive (consume-on-traverse)
>
> // notification of send or close by registered source
> void sourceSent(CarrierSender<? extends T> source, long locator, T
> item);
> void sourceClosed(CarrierSender<? extends T> source, long locator);
> }
>
> // TBD: provide abstract class AbstractCarrier<T>.
>
> class LinkedCarrier<T> implements Carrier<T> {
> LinkedCarrier(OnInterrupt policy);
> LinkedCarrier() { this(OnInterrupt.CANCEL); } // default
> // main linked implementation
> // coming soon, based on LinkedTransferQueue algorithms
> }
>
> class BufferedCarrier<T> implemnts Carrier<T> {
> // main array-based implementation(s)
> // coming later, with single- vs multiple- sink/source options
> }
>
> /**
> * A Carrier that aggregates sources established in its constructor.
> * The receive method blocks waiting for any to become available, then
> * returns the corresponding item.
> */
> class CarrierSelector<T> implements CarrierReceiver<T> {
> Selector(<CarrierSender<? extends T> c, ...) {
> // for each c { c.registerSource(this, locatorFor(c)); }
> }
> }
>
> /**
> * A policy for responding to Thread.interrupt in blocking methods in
> * classes implementing AutoCloseable
> */
> static Enum OnInterrupt {
> IGNORE, // continue waiting
> CANCEL, // throw CancellationException
> CLOSE // close and throw ClosedException
> }
>
> // This could be placed in java.lang for use with any AutoCloseable
> class ClosedException extends IllegalStateException {
> ClosedException(AutoCloseable c); // the closed component
> // copied from ExecutionException:
> /**
> * Constructs an {@code ClosedException} with the specified cause.
> * The detail message is set to {@code (cause == null ? null :
> * cause.toString())} (which typically contains the class and
> * detail message of {@code cause}).
> *
> * @param cause the cause (which is saved for later retrieval by the
> * {@link #getCause()} method)
> */
> public ClosedException(Throwable cause) {
> super(cause);
> }
> // ...
> }
>
>
>
>
>
More information about the loom-dev
mailing list