AsynchronousFilter / AsynchronousSSLChannel proposal

libman at libman at
Wed Jan 7 08:21:13 PST 2009

Dear NIO2 developers and users,

We would like to introduce an addition to the NIO2 - generic
AsynchronousFilter (an abstract class) and as it's example
AsynchronousSSLChannel concrete implementation.

The idea of AsynchronousFilter is similar to the
FilterInputStream/FilterOutputStream classes.
The AsynchronousFilter contains some other AsynchronousByteChannel
instance, which it uses as its basic source and sink of data, possibly
transforming the data along the way or providing additional functionality.

The AsynchronousFilters could be used as various encoders/decoders,
formatters, parsers, readers, loggers, tracers, etc.  Probably the most
interesting and useful example of AsynchronousFilter is

Assuming that AsynchronousFilter itself implements AsynchronousByteChannel
interface (and therefore, it can be used everywhere where
AsynchronousByteChannel is expected), its usage becomes transparent for
application developer.  They do not have to know anything about
AsynchronousFilter and can think that they deal with

As it was mentioned AsynchronousFilter encapsulates other
AsynchrnousByteChannel instance, which in its turn can be
AsynchronousFilter implementation. Therefore, it is possible to
dynamically build a stack or chain of AsynchronousFilters.

Speaking about AsynchronousFilter, we have to distinguish cases whether we
refer to AsynchronousByteChannel respresented by AsynchrnousFilter itself
and whether we assume the embedded AsynchronousByteChannel.  For
simplicity, lets name the AsynchronousFilter itself as  "Outer" channel
and encapsulated instance or embedded channel as "Inner" channel.

For example lets consider simple filter that does basic data

When read/write operation is invoked on "Outer" channel, filter creates a
Future<Interger> result, stores it somewhere, performs some data
processing (encoding phase for write operation) and initiates read/write
operation on "Inner" channel and returns created result to the caller. 
The AsynchronousFilter provides internal completion handlers for "Inner"
channel. When "Inner" read/write completes, filters picks up previously
stored Future<Integer> result for the "Outer" channel, fills with
completion data (decoding phase for read operation) and calls user-defined
completion handler.

In above example each read/write operation on "Outer" channel corresponds
to one read/write operation on "Inner" Channel. That is why we call it
"simple" filter.
But in general, there is no rule to map 1:1 "Outer" and "Inner" operations.
In case of SSL Filter (AsynchronousSSLChannel) single read/write operation
on "Outer" channel can produce a sequence of read/write operations on
"Inner" channel as SSL handshake can be requested at any time.

We can consider AsynchrnousFilter as state machine driven by five main
a) Asynchronous read on "Outer" channel is invoked
b) Asynchronous write on "Outer" channel is invoked
c) Asynchronous read on "Inner" channel completes
d) Asynchronous write on "Inner" channel completes
e) "Outer" channel is closed

We should keep in mind that filter is a state machine that can receive at
same time several signals in parallel threads, i.e. filter is always must
be thread safe.

To make the job of AsynchronousFilter easy, we propose a small framework,
which can be found at

The package testfilter contains bunch of tests for NIO2 and

The framework consisting of following classes:

      Contains filter skeleton, implements Outer AsynchronousBytesChannel.
This class is also responsible for creation of Future<Integer>
results and defines abstact methods-callbacks that should be
overridden in concrete filter.

AsynchronousFilterFuture represents Future<Integer> for Outer channel and
visible only for the filter developer. This class allows thread-safe
manipulations on the Future and stores all information about operation and
associated Outer completion handler.

	Derived from AsynchronousFilterFuture and contains template dependent
information about Attachment and CompletionHandler for the operations on
Outer channel. Used only from AsynchronousFilter. Filters developer is
free of knowledge this class.

    - Convenient class encapsulating completion handler and operations
with           "Inner" channel

- Helper class based on AtomicInteger. Contains useful primitives for
maintaining stat atomically: getAndSetBits, getAndClearBits, setMax,
setMin, setIfLess, setIfGreater, etc.

AsynchronousSSLChannel -
    -  Asynchronous SSL channel as example of AsynchronousFilter.

To develop your own filter the developer should derive his class from
abstract class AsynchrnousFilter and override 10 methods:

public abstract class AsynchronousFilter
    implements AsynchronousByteChannel
     * events for Outer channel
     * called when read/write/close are invoked for Outer channel
    protected abstract void startOuterRead (AsynchronousFilterFuture future);
    protected abstract void startOuterWrite (AsynchronousFilterFuture
    protected abstract void onOuterChannelClosed ();

     * events for Inner channel - read completions
    protected abstract void onInnerReadCompleted (ByteBuffer buffer,
                                                  int bytesCompleted);
    protected abstract void onInnerReadFailed  (ByteBuffer buffer,
                                                Throwable xc);
    protected abstract void onInnerReadCancelled (ByteBuffer buffer);

     * events for Inner channel - write completions
    protected abstract void onInnerWriteCompleted (ByteBuffer buffer,
                                                   int bytesCompleted);
    protected abstract void onInnerWriteFailed  (ByteBuffer buffer,
                                                 Throwable xc);
    protected abstract void onInnerWriteCancelled (ByteBuffer buffer);

      * called when inner channel is closed and
      * all pending inner operations are finished,
      * i.e. (completed/cancelled/failed)
    protected abstract void onInnerChannelClosed ();

public abstract class AsynchronousFilterFuture
    implements Future<Integer>
    private static final int  ST_PENDING     = 0x0000;
    private static final int  ST_RUNNING     = 0x0001;
    private static final int  ST_DONE        = 0x0080;

    private static final int  ST_COMPLETED = ST_DONE | 0x0100;
    private static final int  ST_FAILED    = ST_DONE | 0x0200;
    private static final int  ST_CANCELLED = ST_DONE | 0x0400;

    private AtomicIntegerExt                     m_state;
    private AsynchronousFilter                   m_filter;
    private AtomicReference<ByteBuffer>          m_buffer;
    private int                                  m_initialPosition;
    private int                                  m_bytesCompleted;
    private Throwable                            m_error;

    // created on demand only when get methods is invoked and
    // the state is ST_PENDING or ST_RUNNING
    private AtomicReference<ReentrantLock>       m_lock;
    private Condition                            m_condition;

    AsynchronousFilterFuture (AsynchronousFilter filter,
                              ByteBuffer  buffer);

    public ByteBuffer buffer();

    public Integer get()
          throws InterruptedException, ExecutionException ;

    public Integer get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException,

    public boolean isCancelled()
        return m_state.get() == ST_CANCELLED;

    public boolean isDone();

    public boolean cancel(boolean mayInterruptIfRunning) ;

    // filter should call this method to prevent cancellation
    public boolean setRunning ();

    // filter should call this method to allow cancellation
    public boolean setPending ()

    // setting the result and calling Outer completion handler

    public boolean setCancelled (boolean enableDirectCallback);

    public boolean setCompleted (int bytesCompleted,
                                 boolean enableDirectCallback);

    public boolean setFailed (Throwable xc,
                               boolean enableDirectCallback);


Any feedbacks and discussions are greatly appreciated.
Alex Libman
Yevgeny Libman

More information about the nio-dev mailing list