draft Carrier API
Alex Otenko
oleksandr.otenko at gmail.com
Fri Mar 6 23:06:06 UTC 2020
Can methods return something more useful than void? Eg something that can
be used to test progress? (I am afraid my imagination is limited to
returning a ticket number, and a sequencer API to inspect whether send /
close has a matching receive / await for such an event)
SendSynchronously with timeout is ambiguous. If timeout occurs, was it
placed in the buffer, and not received yet, or not even buffered?
Alex
On Fri, 6 Mar 2020, 15:22 Doug Lea, <dl at cs.oswego.edu> wrote:
>
> [Cross-posting concurrency-interest and loom-dev.]
>
> To continue improving java.util.concurrent support for increasingly
> diverse programming styles (while still avoiding arguments about whether
> any of them are best!), it would be helpful to provide "BlockingQueues
> meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
> A sketch is pasted below. To avoid mail-reader glitches, you might want
> to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java
>
> Suggestions and comments are welcome. An initial implementation class
> (LinkedCarrier) should be available shortly after API issues settle;
> others later.
>
> ...
>
> // API sketches, with "public" omitted throughout
>
> /**
> * A component for sending and receiving messages. Carriers support
> * usages similar to those of BlockingQueues, but additionally
> * implement AutoCloseable, and may be explicitly closed for sending,
> * receiving, or both. Carriers also provide policy-based control for
> * responses to Thread.interrupt while blocked (ignoring, cancelling
> * the current operation only, or closing the carrier). Concrete
> * implementation classes may be created with a given capacity (after
> * which method send will block waiting for available space), or
> * effectively unbounded, in which case method send will never block
> * but may fail with an OutOfMemoryError.
> *
> * Design notes:
> *
> * (1) Both send and receive methods are declared here, but allowing
> * either side to be permanently (vs eventually) closed for send-only
> * or receive-only components. This loses some static type checking
> * opportunities of separate send and receive APIs. However the class
> * includes methods (in the style of Collections.unmodifiableX) to
> * produce views that provide dynamic directionality enforcement.
> *
> * (2) This is an abstract class (rather than interface) providing
> * uniform Observer-syle methods for Selectors and related
> * classes. The alternative is some sort of SPI.
> *
> * (3) To control interactions between Thread interrupts and state,
> * rather than throwing InterruptedExceptions, potentially blocking
> * methods rely on a provided policy to distinguish cancelling the
> * operation vs closing the carrier vs ignoring the interrupt. The
> * default is CANCEL, because it is the least constraining; for
> * example some mixed usages can catch CancellationException to then
> * close only when desired.
> *
> * (4) To broaden coverage of channel-based programming styles,
> * implementations support sendSynchronously, which is otherwise
> * available in BlockingQueues only as the poorly-named and underused
> * method LinkedTransferQueue.transfer.
> */
> abstract class Carrier<T> implements AutoCloseable {
> Carrier(OnInterrupt policy);
> Carrier() { this(OnInterrupt.CANCEL); } // default
>
> // Basic messaging
>
> /**
> * Consume item, throw if isClosedForReceiving, block if empty.
> * May cancel or close on interrupt, depending on OnInterrupt policy.
> */
> T receive() throws ClosedException, CancellationException;
>
> /**
> * Send item, throw if isClosedForSending, block if full.
> * May cancel or close on interrupt, depending on OnInterrupt policy.
> */
> void send(T item) throws ClosedException, CancellationException;
>
> /** Send and block until item received */
> void sendSynchronously(T item) throws ClosedException,
> CancellationException;
>
> // Timeout versions
> T receive(Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
> void send(T item, Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
> void sendSynchronously(T item, Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
>
> // Non-blocking access
> boolean trySend(T item); // false if closed or full
> T tryReceive(T resultIfAbsent); // absent if closed or empty
> T peek(T resultIfAbsent); // may false-positive
>
> // Termination
> void closeForSending(); // fully close when
> isClosedForReceiving
> void closeForReceiving(); // vice-versa
> void close(); // immediate close
> void awaitClose() throws interruptedException;
> void onClose(Runnable closeHandler); // run by thread triggering close
>
> // Status
> boolean isClosedForSending();
> boolean isClosedForReceiving();
> boolean isClosed(); // true if both sides closed
> boolean isOpen() { return !isClosed(); }
> boolean isEmpty();
> boolean isFull(); // never true if unbounded
> long capacity(); // Long.MAX_VALUE if unbounded
> OnInterrupt interruptPolicy(); // return policy
>
> // linkage support, noops here; locators are opaque cookie-like
> identifiers
> protected void registerSource(Carrier<? super T> c, long locator) {}
> // notification of send or close by registered carrier
> protected void sourceEvent(long locator, boolean isClosed) {}
>
> // views to disable one direction; similar to Collections.unmodifiableX
> static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
> static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);
>
> // other possible utilities
> Stream<T> stream(); // destructive (consume-on-traverse)
> static <E> Carrier<E> discardingCarrier(); // /dev/null analog
> // TBD: selector as static factory method vs class (as below)
> // TBD: Flow (reactive stream) adaptors
> }
>
> class LinkedCarrier<T> extends Carrier<T> {
> // main linked implementation
> // coming soon, based on LinkedTransferQueue algorithms
> }
>
> class BufferedCarrier<T> extends Carrier<T> {
> // main array-based implementation(s)
> // coming later, with single- vs multiple- sink/source options
> }
>
> /**
> * A Carrier that aggregates sources established in its constructor.
> * The receive method blocks waiting for any to become available, then
> * returns the corresponding item. Selectors are always closed for
> * sending, and may become fully closed when all sources close.
> */
> class Selector<T> extends Carrier<T> { // possibly a more specific name
> Selector(<Carrier<? extends T> c, ...) {
> // for each c { c.registerSource(this, locatorFor(c)); }
> }
> boolean isClosedForSending() { return true; }
> // ...
> }
>
> /**
> * A policy for responding to Thread.interrupt in blocking methods in
> * classes implementing AutoCloseable
> */
> static Enum OnInterrupt {
> IGNORE, // continue waiting
> CANCEL, // throw CancellationException
> CLOSE // close and throw ClosedException
> }
>
> // This could be placed in java.lang for use with any AutoCloseable
> class ClosedException extends IllegalStateException {
> ClosedException(AutoCloseable c); // the closed component
> // ...
> }
>
>
More information about the loom-dev
mailing list