AsynchronousSocketChannel write()
gustav trede
gustav.trede at gmail.com
Thu Aug 19 01:58:00 PDT 2010
On 19 August 2010 10:19, Alan Bateman <Alan.Bateman at oracle.com> wrote:
> Avinash Lakshman wrote:
>
>> Hi All
>>
>> I want to use the write() on the AsynchronousSocketChannel which has no
>> timeout semantics. I also want to pool these connections that I use. Now if
>> I have multiple threads writing into the same connection, even though the
>> writes are synchronized, I get a WritePendingException because a previous
>> write has not completed. What is the general paradigm for handling this? Do
>> I need to keep track of pending writes etc?
>>
> I wonder if this is the right API for what you are doing. If you've got
> multiple threads wanting to write to the same stream then the writing will
> need to be coordinated to avoid "corrupting" the stream. A write to a
> channel is not guaranteed to write all the remaining bytes in the buffer and
> so the buffer prepared by one of the threads may actually require several
> I/O operations. The only thing I can suggest is that you maintain a queue of
> buffers to be written, and use the the completion notification to initiate
> the next write operation (which might be from the same buffer, or the next
> buffer in the queue). I think there are other folks on this mailing list
> that have been through this before and they might be able to advise or
> provide samples from their own usage.
>
> -Alan.
>
Here is an example,
It is part of my websocket impl using many selector threads to handle all
reads,basic frame parsing and a few writes(that don't fit in the socket
native buffer).
iohandler is wrapper for SSL or a plain socketchannel.
Write requests dont block, well the atomic bufferedSendBytes can in theory
cause similar effect by looping delays inside its set methods, but that's
not a real life problem in this case.
The drain method is called from the selector threads if a writeable key is
detected,
or from a context/thread that performed a write operation that completed,
its then likely to be further room in socket native buffer,
hence it tries to drain the queue other threads have caused, queue status is
naturally checked by the post write bufferedSendBytes decrement of the
written size.
Notice the +-1 added to the written size at bufferedSendBytes handling,
Its an by practical means gratis way to keep track of chained iohandlers
internal write buffer status, ssl is an example.
CLQ is used for the per socket writequeue, i know it performs good here due
to im using jdk7 and hence avoid its problematic implementations in older
versions.
......
/**
* Socket native write buffer is not included in this value
*/
private final AtomicInteger bufferedSendBytes = new AtomicInteger();
/**
* Data that did not fit in socket native buffer and is the current buffer
used for writes, more buffers might be in the writeQueue.
*/
private ByteBuffer currentwrite;
private final Queue<ByteBuffer> writeQueue = new
ConcurrentLinkedQueue<>();
@Override
public boolean send(DataFrame dataframe) {//TODO:p1 use @NotNull
return send(dataframe.rawFrameData);
}
@Override
public boolean send(String textUTF8) throws CharacterCodingException{
//TODO:p1 use @NotNull
return send(Utils.encodeWebsocket(CharBuffer.wrap(textUTF8)));
}
/**
* @param rawframe with position at 0
* @return
*/
@Override
public boolean send(final ByteBuffer rawframe){//TODO:p1 @NotNull to
param
try {
final ReadyState rs = readystate;
if (rs == ReadyState.OPEN){
int tosend = rawframe.remaining();
final int buffsize = bufferedSendBytes.addAndGet(tosend);
if (sendbufflimit-buffsize < 0 )
throw new
WebSocketWriteQueueOverflow(buffsize,sendbufflimit);
if (tosend == buffsize){
final boolean notdone = !iohandler.write(rawframe);
if (notdone){
currentwrite = rawframe.slice();
tosend -= rawframe.remaining() + 1;//+1 needed if
buffering iohhandler like ssl or so have unwritten data but we done.
}
if (bufferedSendBytes.addAndGet(-tosend) > 0 && (notdone
||
!drainWriteQueue(20))){//other thread(s) added
data during our write, we can drain some from the current thread to save
writeinterest registration.
iohandler.writeInterestTaskOffer();
}
rawframe.rewind();//frame is ready for another send
return true;
}
writeQueue.add(rawframe.duplicate());
return true;
}
if (rs == ReadyState.CONNECTING)
throw new NotYetConnectedException();
}catch (Throwable ex) {
close(ex,false);
}
rawframe.rewind();//frame is ready for another send
return false;
}
//TODO:p1 perf: evaluate: write multiple bytebuffers using
SocketChannel.write(ByteBuffer[] srcs) when its fixed to not suck.
private boolean drainWriteQueue(int maxwrites) throws IOException {
ByteBuffer bb;
int written = 0;
while(maxwrites-->0 && (bb=getBufferToDrain()) != null){
final int a = bb.position();
final boolean done = iohandler.write(bb);
written += bb.position() - a;
if (done){
if (currentwrite != null){
currentwrite = null;
written++;//+1 indicated that potential internal
iohandler writes is done.
}
continue;
}
if (currentwrite==null){
written--;
currentwrite = bb;
}
break;
}
return bufferedSendBytes.addAndGet(-written) == 0;
}
private ByteBuffer getBufferToDrain(){
return currentwrite != null ? currentwrite : writeQueue.poll();
}
regards
gustav trede
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mail.openjdk.java.net/pipermail/nio-discuss/attachments/20100819/fc7b525b/attachment.html
More information about the nio-discuss
mailing list