Issue with Windows IOCP not ***really*** writing data

Joel Buckley leojava at comcast.net
Sat Dec 26 11:22:55 PST 2009


Alan Bateman wrote:
> Joel,
>
> I'm aware of any bugs that would cause the problem you are seeing. Do 
> you think you could create a small test case to demonstrate the issue? 
> Also can you say which jdk7 build this is, and the edition of Windows?
Sure... see below.
Java 1.7.0 build 77
Windows 2003 & 2008 & 7
>
> As regards the note on keep alive - that is only a concern if you are 
> using your own thread pool and are on an older version of Windows 
> (Windows Server 2003 and Windows XP). Even there isn't not really an 
> issue for AsynchronousFileChannel because I/O operations tend to 
> complete very quickly. In any case, ThreadPoolExecutor defines the 
> setKeepAlive method to set the time that threads can be idle before 
> they terminate.
>
> -Alan.
>

public class TestAIO {
public static void main(String[] args) throws Throwable {
    System.err.println("USAGE: TestAIO basefilename count ioSize fileSize");
    String base = (args.length > 0 ? args[0] : "/tmp/junk");
    int count = (args.length > 1 && args[1].matches("\\d+" ? 
Integer.parseInt(args[1]) : 200);
    int ioSize = (args.length > 2 && args[2].matches("\\d+" ? 
Integer.parseInt(args[2]) : 8 * 1024);
    ioSize = (Math.max(ioSize, 512) / 512 * 512);
    long fileSize = (args.length > 3 && args[3].matches("\\d+" ? 
Long.parseLong(args[3]) : 128 * 1024);
    fileSize = (Math.max(ioSize, fileSize) / 512 * 512);
    ConcurrentLinkedQueue < Context > aio = new ConcurrentLinkedQueue < 
Context > ();
    ByteBuffer buffy;
    AsychronousFileChannel afc;
    while (--count >= 0) {
        afc = AsynchronousFileChannel.open(
            new File(base + count).toPath(), OPTIONS, null, ATTRIB);
        buffy = ByteBuffer.allocateDirect(ioSize);
        context = new Context(afc, buffy, fileSize);
        handler = new Handler();
        afc.write(buffy, 0, context, handler);
        aio.offer(context);
    }
    while (aio.peek() != null) {
        try {Thread.currentThread().sleep(1);
        } catch (InterruptedException ie) {}
        for (Context context : aio) {
             if (context.isDone()) {
                 aio.remove(context);
                 context.channel().force(false);
                 context.channel().close();
             }
        }
    }
}
} /* End TestAIO. */

public final class Handler implements CompletionHandler < Integer, 
Context > {
private final static CancellationException CANCELLED;
static {
    CANCELLED = new CancellationException("Operation Cancelled");
}
public void completed(Integer ntransferred, Context context) {
    try {
        int n = ntransferred;
        ByteBuffer buffer = context.buffer();
        if (n < 0) {        /** End Of File. */
            context.setMessage("End-of-File Reached");
            context.done();
        } else if (buffer.hasRemaining()) { /** Finish Operation. */
            long position = context.adjustPosition(n);
            context.channel()
                .write(buffer, position, context, this);
        } else {
            long position = context.adjustPosition(n);
            if (position < 0) {
                context.setMessage("End-of-Cycles Reached");
                context.done();
                return;
            }
            context.updateBuffer(position);
            context.channel().write(buffer, position, context, this);
        }
    } catch (Throwable exc) {
        failed(exc, context);
    }
}
public void cancelled(Context context) {
    context.setMessage("AIO Cancellation Reached");
    context.setFailed(CANCELLED);
    context.done();
}
public void failed(Throwable exc, Context context) {
    if (exc == null) {
        context.setMessage("Null Failure Exception Reached");
    } else {
        context.setMessage(exc.getMessage());
    }
    context.setFailed(exc);
    context.done();
}
} /** End Handler */

public final class Context {
private final AsynchronousFileChannel channel;
private ByteBuffer buffer;
private final AtomicLong position;
private final CountDownLatch latch;
private final long fileSize;
private final int ioSize;
private volatile String message;
private volatile Throwable exception;
{
    position = new AtomicLong(0);
    latch = new CountDownLatch(1);
    message = "Context Initialized";
}
public Context(AsynchronousFileChannel channel, ByteBuffer buffer, long 
fileSize)
    throws InterruptedException {
    this.channel = channel;
    this.buffer = buffer;
    this.ioSize = buffer.capacity();
    this.fileSize = fileSize;
    updateBuffer(0);
}
public final long adjustPosition(long increment) {
    return (position.addAndGet(increment) < fileSize ? position.get() : -1);
}
public final void updateBuffer(long position) {
    buffer.rewind();
    int off = ioSize;
    while ((off -= 512) >= 0) {
        buffer.putLong(off, (position + off));
    }
}
public final AsynchronousFileChannel channel() {return (channel);}
public final ByteBuffer buffer() {return (buffer);}
public final Throwable exception() {return (exception);}
public final String message() {return (message);}
public final void setFailed(Throwable x) {this.exception = x;}
public final void setMessage(String m) {this.message = m;}
public final boolean isDone() {return (latch.getCount() == 0);}
public final void done() {latch.countDown();}
} /** End Context. */



More information about the nio-discuss mailing list