[concurrency-interest] draft Carrier API
Dávid Karnok
akarnokd at gmail.com
Fri Mar 6 18:04:31 UTC 2020
Hi. I have a few comments:
onClose(Runnable): will this allow one handler at most, many handler, or
calling multiple times will replace the handler and run the old one?
Calling onClose the first time after the carrier was closed runs the
Runnable immediately, right?
tryReceive(T def): always somewhat trouble to expect a default value of T,
perhaps return Optional<T> if T is planned to be non-null.
stream(): I presume closing the stream closes the carrier
+ receive(Consumer): boolean: could allow receiving without making a
default T
+ closeExceptionally(Throwable) : perhaps for both sides? Certainly, one
can send a record of T+Throwable one way, but not the other.
+ onSenderReady(Runnable r): would allow a non-blocking consumer to react
to items or sender-side close.
+ onReceiverReady(Runnable r): would allow a non-blocking producer to react
to buffer slots becoming available or receiver-side close.
+ receiveAsPublisher(Executor):
~ probably should only allow one subscriber and rely on external
multicasting
~ without onSenderReady, it has to rely on blocking and thus run on a
suspendable thread
+ sendAsSubscriber(Executor)
~ a full buffer requires suspending so that when a slot becomes
available, the upstream can now to send more items
~ without onReceiverReady and blocking on send, I'm not sure how to link
cancel to a receiver-side close - depends on how onClose allows registering
multiple runnables
~ unbounded capacity <-> unbounded request?
Doug Lea via Concurrency-interest <concurrency-interest at cs.oswego.edu> ezt
írta (időpont: 2020. márc. 6., P, 16:22):
>
> [Cross-posting concurrency-interest and loom-dev.]
>
> To continue improving java.util.concurrent support for increasingly
> diverse programming styles (while still avoiding arguments about whether
> any of them are best!), it would be helpful to provide "BlockingQueues
> meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
> A sketch is pasted below. To avoid mail-reader glitches, you might want
> to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java
>
> Suggestions and comments are welcome. An initial implementation class
> (LinkedCarrier) should be available shortly after API issues settle;
> others later.
>
> ...
>
> // API sketches, with "public" omitted throughout
>
> /**
> * A component for sending and receiving messages. Carriers support
> * usages similar to those of BlockingQueues, but additionally
> * implement AutoCloseable, and may be explicitly closed for sending,
> * receiving, or both. Carriers also provide policy-based control for
> * responses to Thread.interrupt while blocked (ignoring, cancelling
> * the current operation only, or closing the carrier). Concrete
> * implementation classes may be created with a given capacity (after
> * which method send will block waiting for available space), or
> * effectively unbounded, in which case method send will never block
> * but may fail with an OutOfMemoryError.
> *
> * Design notes:
> *
> * (1) Both send and receive methods are declared here, but allowing
> * either side to be permanently (vs eventually) closed for send-only
> * or receive-only components. This loses some static type checking
> * opportunities of separate send and receive APIs. However the class
> * includes methods (in the style of Collections.unmodifiableX) to
> * produce views that provide dynamic directionality enforcement.
> *
> * (2) This is an abstract class (rather than interface) providing
> * uniform Observer-syle methods for Selectors and related
> * classes. The alternative is some sort of SPI.
> *
> * (3) To control interactions between Thread interrupts and state,
> * rather than throwing InterruptedExceptions, potentially blocking
> * methods rely on a provided policy to distinguish cancelling the
> * operation vs closing the carrier vs ignoring the interrupt. The
> * default is CANCEL, because it is the least constraining; for
> * example some mixed usages can catch CancellationException to then
> * close only when desired.
> *
> * (4) To broaden coverage of channel-based programming styles,
> * implementations support sendSynchronously, which is otherwise
> * available in BlockingQueues only as the poorly-named and underused
> * method LinkedTransferQueue.transfer.
> */
> abstract class Carrier<T> implements AutoCloseable {
> Carrier(OnInterrupt policy);
> Carrier() { this(OnInterrupt.CANCEL); } // default
>
> // Basic messaging
>
> /**
> * Consume item, throw if isClosedForReceiving, block if empty.
> * May cancel or close on interrupt, depending on OnInterrupt policy.
> */
> T receive() throws ClosedException, CancellationException;
>
> /**
> * Send item, throw if isClosedForSending, block if full.
> * May cancel or close on interrupt, depending on OnInterrupt policy.
> */
> void send(T item) throws ClosedException, CancellationException;
>
> /** Send and block until item received */
> void sendSynchronously(T item) throws ClosedException,
> CancellationException;
>
> // Timeout versions
> T receive(Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
> void send(T item, Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
> void sendSynchronously(T item, Duration timeout)
> throws ClosedException, CancellationException, TimeoutException;
>
> // Non-blocking access
> boolean trySend(T item); // false if closed or full
> T tryReceive(T resultIfAbsent); // absent if closed or empty
> T peek(T resultIfAbsent); // may false-positive
>
> // Termination
> void closeForSending(); // fully close when
> isClosedForReceiving
> void closeForReceiving(); // vice-versa
> void close(); // immediate close
> void awaitClose() throws interruptedException;
> void onClose(Runnable closeHandler); // run by thread triggering close
>
> // Status
> boolean isClosedForSending();
> boolean isClosedForReceiving();
> boolean isClosed(); // true if both sides closed
> boolean isOpen() { return !isClosed(); }
> boolean isEmpty();
> boolean isFull(); // never true if unbounded
> long capacity(); // Long.MAX_VALUE if unbounded
> OnInterrupt interruptPolicy(); // return policy
>
> // linkage support, noops here; locators are opaque cookie-like
> identifiers
> protected void registerSource(Carrier<? super T> c, long locator) {}
> // notification of send or close by registered carrier
> protected void sourceEvent(long locator, boolean isClosed) {}
>
> // views to disable one direction; similar to Collections.unmodifiableX
> static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
> static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);
>
> // other possible utilities
> Stream<T> stream(); // destructive (consume-on-traverse)
> static <E> Carrier<E> discardingCarrier(); // /dev/null analog
> // TBD: selector as static factory method vs class (as below)
> // TBD: Flow (reactive stream) adaptors
> }
>
> class LinkedCarrier<T> extends Carrier<T> {
> // main linked implementation
> // coming soon, based on LinkedTransferQueue algorithms
> }
>
> class BufferedCarrier<T> extends Carrier<T> {
> // main array-based implementation(s)
> // coming later, with single- vs multiple- sink/source options
> }
>
> /**
> * A Carrier that aggregates sources established in its constructor.
> * The receive method blocks waiting for any to become available, then
> * returns the corresponding item. Selectors are always closed for
> * sending, and may become fully closed when all sources close.
> */
> class Selector<T> extends Carrier<T> { // possibly a more specific name
> Selector(<Carrier<? extends T> c, ...) {
> // for each c { c.registerSource(this, locatorFor(c)); }
> }
> boolean isClosedForSending() { return true; }
> // ...
> }
>
> /**
> * A policy for responding to Thread.interrupt in blocking methods in
> * classes implementing AutoCloseable
> */
> static Enum OnInterrupt {
> IGNORE, // continue waiting
> CANCEL, // throw CancellationException
> CLOSE // close and throw ClosedException
> }
>
> // This could be placed in java.lang for use with any AutoCloseable
> class ClosedException extends IllegalStateException {
> ClosedException(AutoCloseable c); // the closed component
> // ...
> }
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
--
Best regards,
David Karnok
More information about the loom-dev
mailing list