Issue with Windows IOCP not ***really*** writing data
Joel Buckley
leojava at comcast.net
Sat Dec 26 11:22:55 PST 2009
Alan Bateman wrote:
> Joel,
>
> I'm aware of any bugs that would cause the problem you are seeing. Do
> you think you could create a small test case to demonstrate the issue?
> Also can you say which jdk7 build this is, and the edition of Windows?
Sure... see below.
Java 1.7.0 build 77
Windows 2003 & 2008 & 7
>
> As regards the note on keep alive - that is only a concern if you are
> using your own thread pool and are on an older version of Windows
> (Windows Server 2003 and Windows XP). Even there isn't not really an
> issue for AsynchronousFileChannel because I/O operations tend to
> complete very quickly. In any case, ThreadPoolExecutor defines the
> setKeepAlive method to set the time that threads can be idle before
> they terminate.
>
> -Alan.
>
public class TestAIO {
public static void main(String[] args) throws Throwable {
System.err.println("USAGE: TestAIO basefilename count ioSize fileSize");
String base = (args.length > 0 ? args[0] : "/tmp/junk");
int count = (args.length > 1 && args[1].matches("\\d+" ?
Integer.parseInt(args[1]) : 200);
int ioSize = (args.length > 2 && args[2].matches("\\d+" ?
Integer.parseInt(args[2]) : 8 * 1024);
ioSize = (Math.max(ioSize, 512) / 512 * 512);
long fileSize = (args.length > 3 && args[3].matches("\\d+" ?
Long.parseLong(args[3]) : 128 * 1024);
fileSize = (Math.max(ioSize, fileSize) / 512 * 512);
ConcurrentLinkedQueue < Context > aio = new ConcurrentLinkedQueue <
Context > ();
ByteBuffer buffy;
AsychronousFileChannel afc;
while (--count >= 0) {
afc = AsynchronousFileChannel.open(
new File(base + count).toPath(), OPTIONS, null, ATTRIB);
buffy = ByteBuffer.allocateDirect(ioSize);
context = new Context(afc, buffy, fileSize);
handler = new Handler();
afc.write(buffy, 0, context, handler);
aio.offer(context);
}
while (aio.peek() != null) {
try {Thread.currentThread().sleep(1);
} catch (InterruptedException ie) {}
for (Context context : aio) {
if (context.isDone()) {
aio.remove(context);
context.channel().force(false);
context.channel().close();
}
}
}
}
} /* End TestAIO. */
public final class Handler implements CompletionHandler < Integer,
Context > {
private final static CancellationException CANCELLED;
static {
CANCELLED = new CancellationException("Operation Cancelled");
}
public void completed(Integer ntransferred, Context context) {
try {
int n = ntransferred;
ByteBuffer buffer = context.buffer();
if (n < 0) { /** End Of File. */
context.setMessage("End-of-File Reached");
context.done();
} else if (buffer.hasRemaining()) { /** Finish Operation. */
long position = context.adjustPosition(n);
context.channel()
.write(buffer, position, context, this);
} else {
long position = context.adjustPosition(n);
if (position < 0) {
context.setMessage("End-of-Cycles Reached");
context.done();
return;
}
context.updateBuffer(position);
context.channel().write(buffer, position, context, this);
}
} catch (Throwable exc) {
failed(exc, context);
}
}
public void cancelled(Context context) {
context.setMessage("AIO Cancellation Reached");
context.setFailed(CANCELLED);
context.done();
}
public void failed(Throwable exc, Context context) {
if (exc == null) {
context.setMessage("Null Failure Exception Reached");
} else {
context.setMessage(exc.getMessage());
}
context.setFailed(exc);
context.done();
}
} /** End Handler */
public final class Context {
private final AsynchronousFileChannel channel;
private ByteBuffer buffer;
private final AtomicLong position;
private final CountDownLatch latch;
private final long fileSize;
private final int ioSize;
private volatile String message;
private volatile Throwable exception;
{
position = new AtomicLong(0);
latch = new CountDownLatch(1);
message = "Context Initialized";
}
public Context(AsynchronousFileChannel channel, ByteBuffer buffer, long
fileSize)
throws InterruptedException {
this.channel = channel;
this.buffer = buffer;
this.ioSize = buffer.capacity();
this.fileSize = fileSize;
updateBuffer(0);
}
public final long adjustPosition(long increment) {
return (position.addAndGet(increment) < fileSize ? position.get() : -1);
}
public final void updateBuffer(long position) {
buffer.rewind();
int off = ioSize;
while ((off -= 512) >= 0) {
buffer.putLong(off, (position + off));
}
}
public final AsynchronousFileChannel channel() {return (channel);}
public final ByteBuffer buffer() {return (buffer);}
public final Throwable exception() {return (exception);}
public final String message() {return (message);}
public final void setFailed(Throwable x) {this.exception = x;}
public final void setMessage(String m) {this.message = m;}
public final boolean isDone() {return (latch.getCount() == 0);}
public final void done() {latch.countDown();}
} /** End Context. */
More information about the nio-discuss
mailing list