draft Carrier API
Doug Lea
dl at cs.oswego.edu
Sat Mar 7 16:29:22 UTC 2020
Collecting replies/responses, with updated sketches pasted below and at
http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java
On 3/6/20 3:56 PM, John Rose wrote:
> This design puts both endpoints on one type, as opposed to two
> similar types, like InputStream and OutputStream. This leads to
> fewer types and objects (good) but broader ones. Broader is little
> less good, since most use points only care about 1/2 of the methods;
> the other 1/2 is then noise.
The main API design issue here is that there are three (not two) views
of a Carrier: the protocol state (closed, empty, etc), sender-side, and
receiver-side operations. If you split them, you need at least four
interfaces/classes total (one to combine them). Doing this in a way that
does not result in nearly all usages needing the combined interface
requires other tradeoffs (like declaring some bookkeeping methods public
and breaking read-only-ness of base interface). But given the reaction
so far, I'm back to thinking this can be done in a way that more people
will prefer. (Aside: I've waffled on this many many times, including
pre-j.u.c
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
, and several previous Carrier drafts.)
So, back to a refreshed 4-interface version.
On 3/6/20 1:04 PM, Dávid Karnok wrote:
> onClose(Runnable): will this allow one handler ...
Thanks for the prod. It is much better to define:
/** Returns a CompletableFuture that isDone when closed. */
CompletionStage<Carriable<T>> onClose();
In which case these questions and others are already answered, and we
can also omit awaitClose method.
>
> tryReceive(T def): always somewhat trouble to expect a default value
> of T, perhaps return Optional<T> if T is planned to be non-null.
I think best to keep this, but also add a variant of your other suggestion:
boolean tryConsume(Consumer<? super T> proc); // false if closed or empty
> + closeExceptionally(Throwable) :
Yes, thanks; for the same reasons we added to SubmissionPublisher. Also
adding getClosedException() method.
> + onSenderReady(Runnable r) receiveAsPublisher(Executor) sendAsSubscriber(Executor)
... among other possibilities. I'm leaving interplay with Flow as TBD
for now, in part because...
On 3/6/20 5:14 PM, Thomas May wrote:
> It also could introduce interesting patterns like multiplexing… (IE,
> having multiple receivers getting the same message)
We already have a good multicaster, SubmissionPublisher. But I'm still
not sure of the best linkages.
On 3/6/20 6:06 PM, Alex Otenko wrote:
> Can methods return something more useful than void? Eg something
> that can be used to test progress? (I am afraid my imagination is
> limited to returning a ticket number, and a sequencer API to inspect
> whether send / close has a matching receive / await for such an
> event)
I can't think of enough use cases to justified added cost. Can you?
>
> SendSynchronously with timeout is ambiguous. If timeout occurs, was
> it placed in the buffer, and not received yet, or not even buffered?
>
The only thing you know is that upon exception, the item cannot have
been (and never will be) received. This is no different than other
methods. (although you are right that bounded+synchronous+timeout is
the most complicated to implement.)
... pasting updated draft ...
// snapshot: Sat Mar 7 11:13:22 2020 Doug Lea (dl at nuc40)
// API sketches, with "public" omitted throughout
/**
* A component for sending and receiving data. Carriers support
* usages similar to those of BlockingQueues, but additionally
* implement AutoCloseable, and may be explicitly closed for sending,
* receiving, or both.
*
* This interface combines three sets of methods, defined in three
* interfaces: Carriable methods access protocol state and
* configuration. Interfaces CarrierSender and CarrierReceiver extend
* Carriable with sender- and receiver- side views. Finally, this
* interface combines these views.
*
* To control interactions between Thread interrupts and state, rather
* than throwing InterruptedExceptions, potentially blocking methods
* rely on a provided policy to distinguish cancelling the operation
* vs closing the carrier vs ignoring the interrupt. The default for
* current implementations is CANCEL, because it is the least
* constraining; for example some mixed usages can catch
* CancellationException to then close only when desired.
*
* Concrete implementation classes may enforce a given capacity (after
* which method send will block waiting for available space), or be
* effectively unbounded, in which case method send will never block
* but may fail with an OutOfMemoryError.
*/
interface Carrier<T> extends CarrierSender<T>, CarrierReceiver<T> {
// TBD: factory methods for jdk implementations
// some utility methods, such as...
static <E> CarrierReceiver<E> discardingCarrier(); // /dev/null analog
// TBD: Flow (reactive stream) adaptors
}
/**
* Methods accessing the protocol state and configuration of a
* Carrier.
*/
interface Carriable<T> extends AutoCloseable {
boolean isClosedForSending();
boolean isClosedForReceiving();
boolean isClosed(); // true if both sides closed
boolean isOpen(); // { return !isClosed(); }
boolean isEmpty();
boolean isFull(); // never true if unbounded
long capacity(); // Long.MAX_VALUE if unbounded
OnInterrupt interruptPolicy(); // return policy
void close(); // immediate close both sides
void closeExceptionally(Throwable cause); // record as cause
Throwable getClosedException();
/** Returns a CompletableFuture that isDone when closed. */
CompletionStage<Carriable<T>> onClose();
}
/**
* Methods defining the sender-side view of a Carrier.
*/
interface CarrierSender<T> extends Carriable<T> {
/**
* Send item, throw if isClosedForSending, block if full.
* May cancel or close on interrupt, depending on OnInterrupt policy.
*/
void send(T item) throws ClosedException, CancellationException;
/** Send and block until item received */
void sendSynchronously(T item) throws ClosedException,
CancellationException;
/** Try to send, upon timeout, the item is no longer available. */
void send(T item, Duration timeout)
throws ClosedException, CancellationException, TimeoutException;
void sendSynchronously(T item, Duration timeout)
throws ClosedException, CancellationException, TimeoutException;
boolean trySend(T item); // false if closed or full
void closeForSending(); // fully close when isClosedForReceiving
// linkage support; locators are opaque cookie-like identifiers
void registerSource(CarrierSender<? super T> c, long locator);
}
/**
* Methods defining the receiver-side view of a Carrier.
*/
interface CarrierReceiver<T> extends Carriable<T> {
/**
* Consume item, throw if isClosedForReceiving, block if empty.
* May cancel or close on interrupt, depending on OnInterrupt policy.
*/
T receive() throws ClosedException, CancellationException;
T receive(Duration timeout)
throws ClosedException, CancellationException, TimeoutException;
T tryReceive(T resultIfAbsent); // absent if closed or empty
T peek(T resultIfAbsent); // may false-positive
void closeForReceiving(); // fully close when isClosedForSending
boolean tryConsume(Consumer<? super T> proc); // false if closed or
empty
Stream<T> stream(); // destructive (consume-on-traverse)
// notification of send or close by registered source
void sourceSent(CarrierSender<? extends T> source, long locator, T
item);
void sourceClosed(CarrierSender<? extends T> source, long locator);
}
// TBD: provide abstract class AbstractCarrier<T>.
class LinkedCarrier<T> implements Carrier<T> {
LinkedCarrier(OnInterrupt policy);
LinkedCarrier() { this(OnInterrupt.CANCEL); } // default
// main linked implementation
// coming soon, based on LinkedTransferQueue algorithms
}
class BufferedCarrier<T> implemnts Carrier<T> {
// main array-based implementation(s)
// coming later, with single- vs multiple- sink/source options
}
/**
* A Carrier that aggregates sources established in its constructor.
* The receive method blocks waiting for any to become available, then
* returns the corresponding item.
*/
class CarrierSelector<T> implements CarrierReceiver<T> {
Selector(<CarrierSender<? extends T> c, ...) {
// for each c { c.registerSource(this, locatorFor(c)); }
}
}
/**
* A policy for responding to Thread.interrupt in blocking methods in
* classes implementing AutoCloseable
*/
static Enum OnInterrupt {
IGNORE, // continue waiting
CANCEL, // throw CancellationException
CLOSE // close and throw ClosedException
}
// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
ClosedException(AutoCloseable c); // the closed component
// copied from ExecutionException:
/**
* Constructs an {@code ClosedException} with the specified cause.
* The detail message is set to {@code (cause == null ? null :
* cause.toString())} (which typically contains the class and
* detail message of {@code cause}).
*
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method)
*/
public ClosedException(Throwable cause) {
super(cause);
}
// ...
}
More information about the loom-dev
mailing list