AsynchronousFilter / AsynchronousSSLChannel proposal

libman at terabit.com.au libman at terabit.com.au
Wed Jan 7 08:21:13 PST 2009


Dear NIO2 developers and users,

We would like to introduce an addition to the NIO2 - generic
AsynchronousFilter (an abstract class) and as it's example
AsynchronousSSLChannel concrete implementation.

The idea of AsynchronousFilter is similar to the
FilterInputStream/FilterOutputStream classes.
The AsynchronousFilter contains some other AsynchronousByteChannel
instance, which it uses as its basic source and sink of data, possibly
transforming the data along the way or providing additional functionality.

The AsynchronousFilters could be used as various encoders/decoders,
formatters, parsers, readers, loggers, tracers, etc.  Probably the most
interesting and useful example of AsynchronousFilter is
AsynchronousSSLChannel.

Assuming that AsynchronousFilter itself implements AsynchronousByteChannel
interface (and therefore, it can be used everywhere where
AsynchronousByteChannel is expected), its usage becomes transparent for
application developer.  They do not have to know anything about
AsynchronousFilter and can think that they deal with
AsynchronousByteChannel.

As it was mentioned AsynchronousFilter encapsulates other
AsynchrnousByteChannel instance, which in its turn can be
AsynchronousFilter implementation. Therefore, it is possible to
dynamically build a stack or chain of AsynchronousFilters.

Speaking about AsynchronousFilter, we have to distinguish cases whether we
refer to AsynchronousByteChannel respresented by AsynchrnousFilter itself
and whether we assume the embedded AsynchronousByteChannel.  For
simplicity, lets name the AsynchronousFilter itself as  "Outer" channel
and encapsulated instance or embedded channel as "Inner" channel.

For example lets consider simple filter that does basic data
enconding/decoding.

When read/write operation is invoked on "Outer" channel, filter creates a
Future<Interger> result, stores it somewhere, performs some data
processing (encoding phase for write operation) and initiates read/write
operation on "Inner" channel and returns created result to the caller. 
The AsynchronousFilter provides internal completion handlers for "Inner"
channel. When "Inner" read/write completes, filters picks up previously
stored Future<Integer> result for the "Outer" channel, fills with
completion data (decoding phase for read operation) and calls user-defined
completion handler.

In above example each read/write operation on "Outer" channel corresponds
to one read/write operation on "Inner" Channel. That is why we call it
"simple" filter.
But in general, there is no rule to map 1:1 "Outer" and "Inner" operations.
In case of SSL Filter (AsynchronousSSLChannel) single read/write operation
on "Outer" channel can produce a sequence of read/write operations on
"Inner" channel as SSL handshake can be requested at any time.

We can consider AsynchrnousFilter as state machine driven by five main
signals:
a) Asynchronous read on "Outer" channel is invoked
b) Asynchronous write on "Outer" channel is invoked
c) Asynchronous read on "Inner" channel completes
d) Asynchronous write on "Inner" channel completes
e) "Outer" channel is closed

We should keep in mind that filter is a state machine that can receive at
same time several signals in parallel threads, i.e. filter is always must
be thread safe.

To make the job of AsynchronousFilter easy, we propose a small framework,
which can be found at
http://jproactor.svn.sourceforge.net/viewvc/jproactor/trunk/nio2/src/au/com/terabit/nio2/filter/

The package testfilter contains bunch of tests for NIO2 and
AsynchronousFilter:
http://jproactor.svn.sourceforge.net/viewvc/jproactor/trunk/nio2/src/au/com/terabit/nio2/testfilter/



The framework consisting of following classes:

AsynchronousFilter
      Contains filter skeleton, implements Outer AsynchronousBytesChannel.
This class is also responsible for creation of Future<Integer>
results and defines abstact methods-callbacks that should be
overridden in concrete filter.


AsynchronousFilterFuture
AsynchronousFilterFuture represents Future<Integer> for Outer channel and
visible only for the filter developer. This class allows thread-safe
manipulations on the Future and stores all information about operation and
associated Outer completion handler.


AsynchronousFilterFutureImpl<A>
	Derived from AsynchronousFilterFuture and contains template dependent
information about Attachment and CompletionHandler for the operations on
Outer channel. Used only from AsynchronousFilter. Filters developer is
free of knowledge this class.

AsynchronousHandler
    - Convenient class encapsulating completion handler and operations
with           "Inner" channel

AtomicIntegerExt
- Helper class based on AtomicInteger. Contains useful primitives for
maintaining stat atomically: getAndSetBits, getAndClearBits, setMax,
setMin, setIfLess, setIfGreater, etc.

AsynchronousSSLChannel -
    -  Asynchronous SSL channel as example of AsynchronousFilter.




To develop your own filter the developer should derive his class from
abstract class AsynchrnousFilter and override 10 methods:

public abstract class AsynchronousFilter
    implements AsynchronousByteChannel
{
    /*
     * events for Outer channel
     * called when read/write/close are invoked for Outer channel
     */
    protected abstract void startOuterRead (AsynchronousFilterFuture future);
    protected abstract void startOuterWrite (AsynchronousFilterFuture
future);
    protected abstract void onOuterChannelClosed ();

    /*
     * events for Inner channel - read completions
     */
    protected abstract void onInnerReadCompleted (ByteBuffer buffer,
                                                  int bytesCompleted);
    protected abstract void onInnerReadFailed  (ByteBuffer buffer,
                                                Throwable xc);
    protected abstract void onInnerReadCancelled (ByteBuffer buffer);


    /*
     * events for Inner channel - write completions
     */
    protected abstract void onInnerWriteCompleted (ByteBuffer buffer,
                                                   int bytesCompleted);
    protected abstract void onInnerWriteFailed  (ByteBuffer buffer,
                                                 Throwable xc);
    protected abstract void onInnerWriteCancelled (ByteBuffer buffer);

     /*
      * called when inner channel is closed and
      * all pending inner operations are finished,
      * i.e. (completed/cancelled/failed)
      */
    protected abstract void onInnerChannelClosed ();
}


public abstract class AsynchronousFilterFuture
    implements Future<Integer>
{
    private static final int  ST_PENDING     = 0x0000;
    private static final int  ST_RUNNING     = 0x0001;
    private static final int  ST_DONE        = 0x0080;

    private static final int  ST_COMPLETED = ST_DONE | 0x0100;
    private static final int  ST_FAILED    = ST_DONE | 0x0200;
    private static final int  ST_CANCELLED = ST_DONE | 0x0400;


    private AtomicIntegerExt                     m_state;
    private AsynchronousFilter                   m_filter;
    private AtomicReference<ByteBuffer>          m_buffer;
    private int                                  m_initialPosition;
    private int                                  m_bytesCompleted;
    private Throwable                            m_error;


    // created on demand only when get methods is invoked and
    // the state is ST_PENDING or ST_RUNNING
    private AtomicReference<ReentrantLock>       m_lock;
    private Condition                            m_condition;

    AsynchronousFilterFuture (AsynchronousFilter filter,
                              ByteBuffer  buffer);

    public ByteBuffer buffer();

    @Override
    public Integer get()
          throws InterruptedException, ExecutionException ;

    @Override
    public Integer get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException,
TimeoutException;

    @Override
    public boolean isCancelled()
    {
        return m_state.get() == ST_CANCELLED;
    }

    @Override
    public boolean isDone();

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) ;

    // filter should call this method to prevent cancellation
    public boolean setRunning ();

    // filter should call this method to allow cancellation
    public boolean setPending ()


    // setting the result and calling Outer completion handler

    public boolean setCancelled (boolean enableDirectCallback);

    public boolean setCompleted (int bytesCompleted,
                                 boolean enableDirectCallback);

    public boolean setFailed (Throwable xc,
                               boolean enableDirectCallback);

}


Any feedbacks and discussions are greatly appreciated.
Thanks,
Alex Libman
Yevgeny Libman




More information about the nio-dev mailing list