Request/discussion: BufferedReader reading using async API while providing sync API

Brunoais brunoaiss at gmail.com
Sat Oct 29 18:10:29 UTC 2016


Here's my try on going async.

On my tests on my local windows 10 machine (I don't have access to the 
linux one without a VM ATM) , now with 1GB file, I noticed:

~2s of speed improvement from BufferedInputStream to 
BufferedNonBlockStream when BufferedNonBlockStream is at its most 
advantageous state (with CPU work between reads). Otherwise, it has 
~0.3s speed improvement; maybe less.

~0.8s of speed improvement from BufferedNonBlockStream to 
BufferedAsyncStream when BufferedNonBlockStream/BufferedAsyncStream is 
at its most advantageous state. Otherwise, ~0 speed improvement.

I noticed: Until you process as fast as you read, both 
BufferedNonBlockStream and BufferedAsyncStream gain advantage towards 
BufferedInputStream. From the point as you take longer to process than 
to I/O, BufferedNonBlockStream and BufferedAsyncStream tend to keep the 
advantage.

If the file is in cache, BufferedInputStream takes ~1.6-1.9s to read the 
file, BufferedNonBlockStream takes a steady ~2.05-2.1s to read the file 
and BufferedAsyncStream ~1.2s.

BufferedNonBlockStream and BufferedAsyncStream win more when given more 
buffer memory than BufferedInputStream but only until a certain limit. 
On my hardware, the best speed I got was with 655360B for the new ones. 
Any more than that was not producing any visible results. I guess it is 
due to the speed data was processing for the test.

On 28/10/2016 09:16, Brunoais wrote:
> I'll try going back to a previous version I worked on which used the 
> java7's AsynchronousFileChannel and work from there. My small research 
> shows it can also work with AsynchronousFileChannel mostly without 
> changes.
>
> For now, 1 question:
> Is Thread.sleep() a possible way of dealing the block requirements of 
> read()? Do I need to use LockSupport.park() or something like that?
>
> I'll call back here when it is done.
>
>
> On 27/10/2016 22:09, David Holmes wrote:
>> You might try discussing on net-dev rather than core-libs-dev, to get 
>> additional historical info related to the io and nio file APIs.
>>
>> David
>>
>> On 28/10/2016 5:08 AM, Brunoais wrote:
>>> You are right. Even in windows it does not set the flags for async
>>> reads. It seems like it is windows itself that does the decision to
>>> buffer the contents based on its own heuristics.
>>>
>>> But... Why? Why won't it be? Why is there no API for it? How am I
>>> getting 100% HDD use and faster times when I fake work to delay getting
>>> more data and I only have a fluctuating 60-90% (always going up and
>>> down) when I use an InputStream?
>>> Is it related to how both classes cache and how frequently and how much
>>> each one asks for data?
>>>
>>> I really would prefer not having to read the source code because it
>>> takes a real long time T.T.
>>>
>>> I end up reinstating... And wondering...
>>>
>>> Why doesn't java provide a single-threaded non-block API for file reads
>>> for all OS that support it? I simply cannot find that information no
>>> matter how much I search on google, bing, duck duck go... Can any of 
>>> you
>>> point me to whomever knows?
>>>
>>> On 27/10/2016 14:11, Vitaly Davidovich wrote:
>>>> I don't know about Windows specifically, but generally file systems
>>>> across major OS's will implement readahead in their IO scheduler when
>>>> they detect sequential scans.
>>>>
>>>> On Linux, you can also strace your test to confirm which syscalls are
>>>> emitted (you should be seeing plain read()'s there, with
>>>> FileInputStream and FileChannel).
>>>>
>>>> On Thu, Oct 27, 2016 at 9:06 AM, Brunoais <brunoaiss at gmail.com
>>>> <mailto:brunoaiss at gmail.com>> wrote:
>>>>
>>>>     Thanks for the heads up.
>>>>
>>>>     I'll try that later. These tests are still useful then. Meanwhile,
>>>>     I'll end up also checking how FileChannel queries the OS on
>>>>     windows. I'm getting 100% HDD reads... Could it be that the OS
>>>>     reads the file ahead on its own?... Anyway, I'll look into it.
>>>>     Thanks for the heads up.
>>>>
>>>>
>>>>     On 27/10/2016 13:53, Vitaly Davidovich wrote:
>>>>>
>>>>>
>>>>>     On Thu, Oct 27, 2016 at 8:34 AM, Brunoais <brunoaiss at gmail.com
>>>>>     <mailto:brunoaiss at gmail.com>> wrote:
>>>>>
>>>>>         Oh... I see. In that case, it means something is terribly
>>>>>         wrong. It can be my initial tests, though.
>>>>>
>>>>>         I'm testing on both linux and windows and I'm getting
>>>>>         performance gains from using the FileChannel compared to
>>>>>         using FileInputStream... The tests also make sense based on
>>>>>         my predictions O_O...
>>>>>
>>>>>     FileInputStream requires copying native buffers holding the read
>>>>>     data to the java byte[].  If you're using direct ByteBuffer for
>>>>>     FileChannel, that whole memcpy is skipped.  Try comparing
>>>>>     FileChannel with HeapByteBuffer instead.
>>>>>
>>>>>
>>>>>         On 27/10/2016 11:47, Vitaly Davidovich wrote:
>>>>>>
>>>>>>
>>>>>>         On Thursday, October 27, 2016, Brunoais <brunoaiss at gmail.com
>>>>>>         <mailto:brunoaiss at gmail.com>> wrote:
>>>>>>
>>>>>>             Did you read the C code?
>>>>>>
>>>>>>         I looked at the Linux code in the JDK.
>>>>>>
>>>>>>             Have you got any idea how many functions Windows or
>>>>>>             Linux (nearly all flavors) have for the read operation
>>>>>>             towards a file?
>>>>>>
>>>>>>         I do.
>>>>>>
>>>>>>
>>>>>>             I have already done that homework myself. I may not have
>>>>>>             read JVM's source code but I know well that there's
>>>>>>             functions on both Windows and Linux that provide such
>>>>>>             interface I mentioned although they require a slightly
>>>>>>             different treatment (and different constants).
>>>>>>
>>>>>>         You should read the JDK (native) source code instead of
>>>>>>         guessing/assuming.  On Linux, it doesn't use aio facilities
>>>>>>         for files.  The kernel io scheduler may issue readahead
>>>>>>         behind the scenes, but there's no nonblocking file io that's
>>>>>>         at the heart of your premise.
>>>>>>
>>>>>>
>>>>>>
>>>>>>             On 27/10/2016 00:06, Vitaly Davidovich wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>>                 On Wednesday, October 26, 2016, Brunoais
>>>>>>                 <brunoaiss at gmail.com <mailto:brunoaiss at gmail.com>>
>>>>>>                 wrote:
>>>>>>
>>>>>>                     It is actually based on the premise that:
>>>>>>
>>>>>>                     1. The first call to
>>>>>>                 ReadableByteChannel.read(ByteBuffer) sets the OS
>>>>>>                        buffer size to fill in as the same size as
>>>>>>                 ByteBuffer.
>>>>>>
>>>>>>                 Why do you say that? AFAICT, it issues a read
>>>>>>                 syscall and that will block if the data isn't in
>>>>>>                 page cache.
>>>>>>
>>>>>>                     2. The consecutive calls to
>>>>>>                 ReadableByteChannel.read(ByteBuffer)
>>>>>>                     orders
>>>>>>                        the JVM to order the OS to execute memcpy()
>>>>>>                 to copy from its memory
>>>>>>                        to the shared memory created at ByteBuffer
>>>>>>                 instantiation (in
>>>>>>                     java 8)
>>>>>>                        using Unsafe and then for the JVM to update
>>>>>>                 the ByteBuffer fields.
>>>>>>
>>>>>>                 I think subsequent reads just invoke the same read
>>>>>>                 syscall, passing the current file offset maintained
>>>>>>                 by the file channel instance.
>>>>>>
>>>>>>                     3. The call will not block waiting for I/O and
>>>>>>                 it won't take longer
>>>>>>                        than the JNI interface if no new data exists.
>>>>>>                 However, it will
>>>>>>                     block
>>>>>>                        waiting for the OS to execute memcpy() to the
>>>>>>                 shared memory.
>>>>>>
>>>>>>                 So why do you think it won't block?
>>>>>>
>>>>>>
>>>>>>                     Is my premise wrong?
>>>>>>
>>>>>>                     If I read correctly, if I don't use a
>>>>>>                 DirectBuffer, there would be
>>>>>>                     even another intermediate buffer to copy data to
>>>>>>                 before giving it
>>>>>>                     to the "user" which would be useless.
>>>>>>
>>>>>>                 If you use a HeapByteBuffer, then there's an extra
>>>>>>                 copy from the native buffer to the Java buffer.
>>>>>>
>>>>>>
>>>>>>
>>>>>>                     On 26/10/2016 11:57, Pavel Rappo wrote:
>>>>>>
>>>>>>                         I believe I see where you coming from.
>>>>>>                 Please correct me if
>>>>>>                         I'm wrong.
>>>>>>
>>>>>>                         Your implementation is based on the premise
>>>>>>                 that a call to
>>>>>>                 ReadableByteChannel.read()
>>>>>>                         _initiates_ the operation and returns
>>>>>>                 immediately. The OS then
>>>>>>                         continues to fill
>>>>>>                         the buffer while there's a free space in the
>>>>>>                 buffer and the
>>>>>>                         channel hasn't encountered EOF.
>>>>>>
>>>>>>                         Is that right?
>>>>>>
>>>>>>                             On 25 Oct 2016, at 22:16, Brunoais
>>>>>>                 <brunoaiss at gmail.com>
>>>>>>                             wrote:
>>>>>>
>>>>>>                             Thank you for your time. I'll try to
>>>>>>                 explain it. I hope I
>>>>>>                             can clear it up.
>>>>>>                             First of it, I made a meaning mistake
>>>>>>                 between asynchronous
>>>>>>                             and non-blocking. This implementation
>>>>>>                 uses a non-blocking
>>>>>>                             algorithm internally while providing a
>>>>>>                 blocking-like
>>>>>>                             algorithm on the surface. It is
>>>>>>                 single-threaded and not
>>>>>>                             multi-threaded where one thread fetches
>>>>>>                 data and blocks
>>>>>>                             waiting and the other accumulates it and
>>>>>>                 provides to
>>>>>>                             whichever wants it.
>>>>>>
>>>>>>                             Second of it, I had made a mistake of
>>>>>>                 going after
>>>>>>                             BufferedReader instead of going after
>>>>>>                 BufferedInputStream.
>>>>>>                             If you want me to go after
>>>>>>                 BufferedReader it's ok but I
>>>>>>                             only thought that going after
>>>>>>                 BufferedInputStream would be
>>>>>>                             more generically useful than
>>>>>>                 BufferedReaderwhen I started
>>>>>>                             the poc.
>>>>>>
>>>>>>                             On to my code:
>>>>>>                             Short answers:
>>>>>>                                     • The sleep(int) exists because
>>>>>>                 I don't know how
>>>>>>                             to wait until more data exists in the
>>>>>>                 buffer which is part
>>>>>>                             of read()'s contract.
>>>>>>                                     • The ByteBuffer gives a buffer
>>>>>>                 that is filled by
>>>>>>                             the OS (what I believe Channels do)
>>>>>>                 instead of getting
>>>>>>                             data only         by demand (what I
>>>>>>                 believe Streams do).
>>>>>>                             Full answers:
>>>>>>                             The blockingFill(boolean) method is a
>>>>>>                 method for a busy
>>>>>>                             wait for a fill which is used
>>>>>>                 exclusively by the read()
>>>>>>                             method. All other methods use the
>>>>>>                 version that does not
>>>>>>                             sleep (fill(boolean)).
>>>>>>                 blockingFill(boolean)'s existance like that is only
>>>>>>                             because the read() method must not
>>>>>>                 return unless either:
>>>>>>
>>>>>>                                     • The stream ended.
>>>>>>                                     • The next byte is ready for
>>>>>>                 reading.
>>>>>>                             Additionally, statistically, that while
>>>>>>                 loop will rarely
>>>>>>                             evaluate to true as reads are in chunks
>>>>>>                 so readPos will be
>>>>>>                             behind writePos most of the time.
>>>>>>                             I have no idea if an interrupt will ever
>>>>>>                 happen, to be
>>>>>>                             honest. The main reasons why I'm using a
>>>>>>                 sleep is because
>>>>>>                             I didn't want a hog onto the CPU in a
>>>>>>                 full thread usage
>>>>>>                             busy wait and because I didn't find any
>>>>>>                 way of doing a
>>>>>>                             thread sleep in order to wake up later
>>>>>>                 when the buffer
>>>>>>                             managed by native code has more data.
>>>>>>                             The Non-blocking part is managed by the
>>>>>>                 buffer the OS
>>>>>>                             keeps filling most if not all the time.
>>>>>>                 That buffer is the
>>>>>>                             field
>>>>>>
>>>>>>                             ByteBuffer readBuffer
>>>>>>                             That's the gaining part against the
>>>>>>                 plain old Buffered
>>>>>>                             classes.
>>>>>>
>>>>>>
>>>>>>                             Did that make sense to you? Feel free to
>>>>>>                 ask anything else
>>>>>>                             you need.
>>>>>>
>>>>>>                             On 25/10/2016 20:52, Pavel Rappo wrote:
>>>>>>
>>>>>>                                 I've skimmed through the code and
>>>>>>                 I'm not sure I can
>>>>>>                                 see any asynchronicity
>>>>>>                                 (you were pointing at the lack of it
>>>>>>                 in BufferedReader).
>>>>>>                                 And the mechanics of this is very
>>>>>>                 puzzling to me, to
>>>>>>                                 be honest:
>>>>>>                                      void blockingFill(boolean
>>>>>>                 forced) throws
>>>>>>                                 IOException {
>>>>>>                  fill(forced);
>>>>>>                                          while (readPos == 
>>>>>> writePos) {
>>>>>>                                              try {
>>>>>>                  Thread.sleep(100);
>>>>>>                                              } catch
>>>>>>                 (InterruptedException e) {
>>>>>>                  // An interrupt may mean more data is
>>>>>>                                 available
>>>>>>                                              }
>>>>>>                  fill(forced);
>>>>>>                                          }
>>>>>>                                      }
>>>>>>                                 I thought you were suggesting that
>>>>>>                 we should utilize
>>>>>>                                 the tools which OS provides
>>>>>>                                 more efficiently. Instead we have
>>>>>>                 something that looks
>>>>>>                                 very similarly to a
>>>>>>                                 "busy loop" and... also who and when
>>>>>>                 is supposed to
>>>>>>                                 interrupt Thread.sleep()?
>>>>>>                                 Sorry, I'm not following. Could you
>>>>>>                 please explain how
>>>>>>                                 this is supposed to work?
>>>>>>
>>>>>>                                     On 24 Oct 2016, at 15:59, 
>>>>>> Brunoais
>>>>>> <brunoaiss at gmail.com>
>>>>>>                                       wrote:
>>>>>>                                     Attached and sending!
>>>>>>                                     On 24/10/2016 13:48, Pavel Rappo
>>>>>>                 wrote:
>>>>>>
>>>>>>                                         Could you please send a new
>>>>>>                 email on this list
>>>>>>                                         with the source attached 
>>>>>> as a
>>>>>>                                         text file?
>>>>>>
>>>>>>                                             On 23 Oct 2016, at
>>>>>>                 19:14, Brunoais
>>>>>> <brunoaiss at gmail.com>
>>>>>>                 wrote:
>>>>>>                 Here's my poc/prototype:
>>>>>>
>>>>>>                 http://pastebin.com/WRpYWDJF
>>>>>>
>>>>>>                                             I've implemented the
>>>>>>                 bare minimum of the
>>>>>>                 class that follows the same contract of
>>>>>>                 BufferedReader while signaling all issues
>>>>>>                                             I think it may have or
>>>>>>                 has in comments.
>>>>>>                                             I also wrote some
>>>>>>                 javadoc to help guiding
>>>>>>                 through the class.
>>>>>>                                             I could have used more
>>>>>>                 fields from
>>>>>>                 BufferedReader but the names were so
>>>>>>                 minimalistic that were confusing me. I
>>>>>>                 intent to change them before sending this
>>>>>>                                             to openJDK.
>>>>>>                                             One of the major
>>>>>>                 problems this has is long
>>>>>>                 overflowing. It is major because it is
>>>>>>                 hidden, it will be extremely rare and it
>>>>>>                 takes a really long time to reproduce.
>>>>>>                 There are different ways of dealing with
>>>>>>                                             it. From just
>>>>>>                 documenting to actually
>>>>>>                 making code that works with it.
>>>>>>                                             I built a simple test
>>>>>>                 code for it to have
>>>>>>                                             some ideas about
>>>>>>                 performance and correctness.
>>>>>>
>>>>>>                 http://pastebin.com/eh6LFgwT
>>>>>>
>>>>>>                                             This doesn't do a
>>>>>>                 through test if it is
>>>>>>                 actually working correctly but I see no
>>>>>>                 reason for it not working correctly after
>>>>>>                 fixing the 2 bugs that test found.
>>>>>>                                             I'll also leave here
>>>>>>                 some conclusions
>>>>>>                 about speed and resource consumption I found.
>>>>>>                                             I made tests with
>>>>>>                 default buffer sizes,
>>>>>>                 5000B 15_000B and 500_000B. I noticed
>>>>>>                 that, with my hardware, with the 1 530 000
>>>>>>                                             000B file, I was getting
>>>>>>                 around:
>>>>>>                                             In all buffers and fake
>>>>>>                 work: 10~15s speed
>>>>>>                 improvement ( from 90% HDD speed to 100%
>>>>>>                                             HDD speed)
>>>>>>                                             In all buffers and no
>>>>>>                 fake work: 1~2s
>>>>>>                 speed improvement ( from 90% HDD speed to
>>>>>>                                             100% HDD speed)
>>>>>>                 Changing the buffer size was giving
>>>>>>                 different reading speeds but both were
>>>>>>                 quite equal in how much they would change
>>>>>>                                             when changing the buffer
>>>>>>                 size.
>>>>>>                 Finally, I could always confirm that I/O
>>>>>>                                             was always the slowest
>>>>>>                 thing while this
>>>>>>                                             code was running.
>>>>>>                                             For the ones wondering
>>>>>>                 about the file
>>>>>>                 size; it is both to avoid OS cache and to
>>>>>>                                             make the reading at the
>>>>>>                 main use-case
>>>>>>                 these objects are for (large streams of
>>>>>>                 bytes).
>>>>>>                 @Pavel, are you open for discussion now
>>>>>>                                             ;)? Need anything else?
>>>>>>                                             On 21/10/2016 19:21,
>>>>>>                 Pavel Rappo wrote:
>>>>>>
>>>>>>                 Just to append to my previous email.
>>>>>>                 BufferedReader wraps any Reader out there.
>>>>>>                 Not specifically FileReader. While
>>>>>>                 you're talking about the case of effective
>>>>>>                 reading from a file.
>>>>>>                 I guess there's one existing
>>>>>>                 possibility to provide exactly what
>>>>>>                 you need (as I
>>>>>>                 understand it) under this method:
>>>>>>                 /**
>>>>>>                   * Opens a file for reading,
>>>>>>                 returning a {@code BufferedReader} to
>>>>>>                 read text
>>>>>>                   * from the file in an efficient
>>>>>>                 manner...
>>>>>>                     ...
>>>>>>                   */
>>>>>>
>>>>>> java.nio.file.Files#newBufferedReader(java.nio.file.Path)
>>>>>>                 It can return _anything_ as long as it
>>>>>>                 is a BufferedReader. We can do it, but it
>>>>>>                 needs to be investigated not only for
>>>>>>                 your favorite OS but for other OSes as
>>>>>>                 well. Feel free to prototype this and
>>>>>>                 we can discuss it on the list later.
>>>>>>                 Thanks,
>>>>>>                 -Pavel
>>>>>>
>>>>>>                     On 21 Oct 2016, at 18:56, Brunoais
>>>>>>                     <brunoaiss at gmail.com>
>>>>>>                       wrote:
>>>>>>                     Pavel is right.
>>>>>>                     In reality, I was expecting such
>>>>>>                     BufferedReader to use only a
>>>>>>                     single buffer and have that Buffer
>>>>>>                     being filled asynchronously, not
>>>>>>                     in a different Thread.
>>>>>>                     Additionally, I don't have the
>>>>>>                     intention of having a larger
>>>>>>                     buffer than before unless stated
>>>>>>                     through the API (the constructor).
>>>>>>                     In my idea, internally, it is
>>>>>>                     supposed to use
>>>>>> java.nio.channels.AsynchronousFileChannel
>>>>>>                     or equivalent.
>>>>>>                     It does not prevent having two
>>>>>>                     buffers and I do not intent to
>>>>>>                     change BufferedReader itself. I'd
>>>>>>                     do an BufferedAsyncReader of sorts
>>>>>>                     (any name suggestion is welcome as
>>>>>>                     I'm an awful namer).
>>>>>>                     On 21/10/2016 18:38, Roger Riggs
>>>>>>                     wrote:
>>>>>>
>>>>>>                         Hi Pavel,
>>>>>>                         I think Brunoais asking for a
>>>>>>                         double buffering scheme in
>>>>>>                         which the implementation of
>>>>>>                         BufferReader fills (a second
>>>>>>                         buffer) in parallel with the
>>>>>>                         application reading from the
>>>>>>                         1st buffer
>>>>>>                         and managing the swaps and
>>>>>>                         async reads transparently.
>>>>>>                         It would not change the API
>>>>>>                         but would change the
>>>>>>                         interactions between the
>>>>>>                         buffered reader
>>>>>>                         and the underlying stream.  It
>>>>>>                         would also increase memory
>>>>>>                         requirements and processing
>>>>>>                         by introducing or using a
>>>>>>                         separate thread and the
>>>>>>                         necessary synchronization.
>>>>>>                         Though I think the formal
>>>>>>                         interface semantics could be
>>>>>>                         maintained, I have doubts
>>>>>>                         about compatibility and its
>>>>>>                         unintended consequences on
>>>>>>                         existing subclasses,
>>>>>>                         applications and libraries.
>>>>>>                         $.02, Roger
>>>>>>                         On 10/21/16 1:22 PM, Pavel
>>>>>>                         Rappo wrote:
>>>>>>
>>>>>>                             Off the top of my head, I
>>>>>>                             would say it's not
>>>>>>                             possible to change the
>>>>>>                             design of an
>>>>>>                             _extensible_ type that has
>>>>>>                             been out there for 20 or
>>>>>>                             so years. All these I/O
>>>>>>                             streams from java.io <http://java.io>
>>>>>>                             <http://java.io> were
>>>>>>                             designed for simple
>>>>>>                             synchronous use case.
>>>>>>                             It's not that their design
>>>>>>                             is flawed in some way,
>>>>>>                             it's that they doesn't seem to
>>>>>>                             suit your needs. Have you
>>>>>>                             considered using
>>>>>> java.nio.channels.AsynchronousFileChannel
>>>>>>                             in your applications?
>>>>>>                             -Pavel
>>>>>>
>>>>>>                                 On 21 Oct 2016, at
>>>>>>                                 17:08, Brunoais
>>>>>> <brunoaiss at gmail.com>
>>>>>>                                   wrote:
>>>>>>                                 Any feedback on this?
>>>>>>                                 I'm really interested
>>>>>>                                 in implementing such
>>>>>>                 BufferedReader/BufferedStreamReader
>>>>>>                                 to allow speeding up
>>>>>>                                 my applications
>>>>>>                                 without having to
>>>>>>                                 think in an
>>>>>>                                 asynchronous way or
>>>>>>                                 multi-threading while
>>>>>>                                 programming with it.
>>>>>>                                 That's why I'm asking
>>>>>>                                 this here.
>>>>>>                                 On 13/10/2016 14:45,
>>>>>>                                 Brunoais wrote:
>>>>>>
>>>>>>                                     Hi,
>>>>>>                                     I looked at
>>>>>>                 BufferedReader
>>>>>>                                     source code for
>>>>>>                                     java 9 long with
>>>>>>                                     the source code of
>>>>>>                                     the
>>>>>>                 channels/streams
>>>>>>                                     used. I noticed
>>>>>>                                     that, like in java
>>>>>>                                     7, BufferedReader
>>>>>>                                     does not use an
>>>>>>                                     Async API to load
>>>>>>                                     data from files,
>>>>>>                                     instead, the data
>>>>>>                                     loading is all
>>>>>>                                     done synchronously
>>>>>>                                     even when the OS
>>>>>>                                     allows requesting
>>>>>>                                     a file to be read
>>>>>>                                     and getting a
>>>>>>                                     warning later when
>>>>>>                                     the file is
>>>>>>                                     effectively read.
>>>>>>                                     Why Is
>>>>>>                 BufferedReader not
>>>>>>                                     async while
>>>>>>                                     providing a sync API?
>>>>>>
>>>>>> <BufferedNonBlockStream.java><Tests.java>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>                 --                 Sent from my phone
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>         --         Sent from my phone
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

-------------- next part --------------
package org.sample.BufferedNon_BlockIO;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.locks.LockSupport;

public class BufferedAsyncStream extends BufferedInputStream {

	private static final int DEFAULT_BUFFER_SIZE = 8_192;
	
	/**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;

	private static final int MIN_READ_BUFFER_SIZE = 10;
	/**
     * The maximum size of the read buffer if set automatically.
     * Unless explicitly told, there's no need to starve the VM out of memory
     */
	private static final int MAX_READ_BUFFER_SIZE = 1_000_000;
	
	/**
     * The virtual index of the next position where data will be read.
     * Every operation requires executing a mathematical mod ({@code %}) operation on it against <code>buf.length</code>
     * to get the correct buffer position to work on.
     */
    protected long readPos;
    
    /**
     * The virtual index of the next position where data will be written
     * Every operation requires executing a mathematical mod ({@code %}) operation on it against <code>buf.length</code>
     * to get the correct buffer position to work on.
     * 
     * At every time, {@code readPos <= writePos %} must be true
     * 
     */
    // Note to reviewers: What to do when long overflows? It will happen but does a documented warning suffice or... do I need to work with the overflows?
    protected long writePos;
	
	ChannelReader readFrom;
	ByteBuffer readBuffer;
	FileReadCallback<Object> readCallback;
	IOException nextException;
	
	transient boolean isReading;
	
	Thread parkingThread;
	
	boolean isClosed;
	boolean eof;

	interface ChannelReader extends Closeable{
		void read(ByteBuffer dst);
	}
	class AsyncFileChannelReader implements ChannelReader, CompletionHandler<Integer, ByteBuffer>{
		AsynchronousFileChannel inChannel;
		long currentPosition;
		AsyncFileChannelReader(AsynchronousFileChannel inChannel) {
			this.inChannel = inChannel;
			currentPosition = 0;
		}

		@Override
		public void read(ByteBuffer dst) {
			isReading = true;
			inChannel.read(dst, currentPosition, dst, this);
		}
		@Override
		public void close() throws IOException{
			inChannel.close();
		}
		
		@Override
		public void completed(Integer result, ByteBuffer attachment) {
			if(result > 0){
				currentPosition += result;
			}
			readCallback.completed(result, attachment);
		}

		@Override
		public void failed(Throwable exc, ByteBuffer attachment) {
			readCallback.failed(exc, attachment);
		}

	}
	
	class AsyncByteChannelReader implements ChannelReader{
		AsynchronousByteChannel inChannel;
		
		AsyncByteChannelReader(AsynchronousByteChannel inChannel) {
			this.inChannel = inChannel;
		}
		
		@Override
		public void read(ByteBuffer dst) {
			isReading = true;
			inChannel.read(dst, dst, readCallback);
		}
		@Override
		public void close() throws IOException {
			inChannel.close();
		}
		
	}

	class FileReadCallback<A> implements CompletionHandler<Integer, A>{

		@Override
		public void completed(Integer resultI, A attachment) {
			// unbox
			int result = resultI;
			if(result == -1){
				eof = true;
			}
			
			long freeBufSpace = writeSpaceAvailable();
			
			if(freeBufSpace < readBuffer.position()){
				// No enough space in buf to copy from readBuffer.
				// Just send it back to fill even more
				readFrom.read(readBuffer);
				return;
			}
			if(readBuffer.position() > readBuffer.capacity() / 2 || (eof && readBuffer.position() > 0)){
				readBuffer.flip();
				// Ints are faster and an array can only be defined using an int
				int writeBufferFrom = (int) (writePos % buf.length);
	            int canReadAmount =  Math.min((int) freeBufSpace, readBuffer.remaining());
	            int canWriteUntil = (writeBufferFrom + canReadAmount) % buf.length;
				
	            // This can only be done like this because ReadBufferSize is smaller than bufferSize
	            // and overflows are ignored
	        	if(canWriteUntil < writeBufferFrom){
	        		// Read in 2 parts
	        		readBuffer.get(buf, writeBufferFrom, buf.length - writeBufferFrom);
	        		readBuffer.get(buf, 0, canWriteUntil);
	        	} else {
	        		readBuffer.get(buf, writeBufferFrom, canWriteUntil - writeBufferFrom);
	        	}

	        	writePos += canReadAmount;
	        	// Reset the buffer for more reading
	        	readBuffer.clear();
	        	
	        	LockSupport.unpark(parkingThread);
			}
			if(eof){
				isReading = false;
				return;
			}
			readFrom.read(readBuffer);
		}

		@Override
		public void failed(Throwable exc, A attachment) {
			if(exc instanceof IOException){
				nextException = (IOException) exc;
			} else {
				nextException = new IOException("Failed to read more data to the buffer", exc);
			}
			isReading = false;
		}
		
	}
	
		
	public BufferedAsyncStream(InputStream inputStream) {
		super(inputStream);
	}
	
	public BufferedAsyncStream(AsynchronousFileChannel inputChannel) {
		this(inputChannel, DEFAULT_BUFFER_SIZE);
	}
	
	public BufferedAsyncStream(AsynchronousFileChannel inputChannel, int bufferSize) {
		this(inputChannel, bufferSize, Math.max(Math.min(bufferSize / 4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
	}
	
	public BufferedAsyncStream(AsynchronousFileChannel inputChannel,  int bufferSize, int readBufferSize) {
		super(null, Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1));
//		Objects.requireNonNull(inputChannel, "The file channel must not be null");
		init(new AsyncFileChannelReader(inputChannel), Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1), readBufferSize);
	}

	
	public BufferedAsyncStream(AsynchronousByteChannel inputChannel) {
		this(inputChannel, DEFAULT_BUFFER_SIZE);
	}
	
	public BufferedAsyncStream(AsynchronousByteChannel inputChannel, int bufferSize) {
		this(inputChannel, bufferSize, Math.max(Math.min(bufferSize / 4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
	}
	
	public BufferedAsyncStream(AsynchronousByteChannel inputChannel,  int bufferSize, int readBufferSize) {
		super(null, Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1));
//		Objects.requireNonNull(inputChannel, "The byte channel must not be null");
		init(new AsyncByteChannelReader(inputChannel), Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1), readBufferSize);
	}
	
	private void init(ChannelReader inputChannel, int bufferSize, int readBufferSize){

		this.readFrom = inputChannel;
		
		if(readBufferSize < 10){
			throw new IllegalArgumentException("Read buffer must be at least 10 bytes");
		}
		
		if(readBufferSize > bufferSize){
			throw new IllegalArgumentException("ReadBufferSize must be smaller than bufferSize");
		}
		
		// The read buffer must not be larger than the internal buffer.
		readBufferSize = Math.min(readBufferSize, MAX_BUFFER_SIZE - 1);
		this.readBuffer = ByteBuffer.allocateDirect(readBufferSize);
		
		this.readCallback = new FileReadCallback<>();
		
		isClosed = false;
		
		readFrom.read(readBuffer);
	}
	
	int readSpaceAvailable(){
		// This can be an int because buf size can never be larger than the int
		return (int) (writePos - readPos);
	}
	int writeSpaceAvailable(){
		return buf.length - readSpaceAvailable();
	}
	
	/**
	 * Blocks waiting for a write.
	 * After leaving the block, it is guaranteed that at least 1 byte is readable from the buffer 
	 * @throws IOException
	 */
	// This is /* synchronized */ to make sure only up to 1 thread is parked inside
	void waitForFill() throws IOException {
		while(readSpaceAvailable() < 1 && !eof){
			try{
				parkingThread = Thread.currentThread();
				// wait for 1 second tops. File/web reading really should be faster than that
				LockSupport.parkNanos(this, 1_000_000_000);
			}finally{
				parkingThread = null;			
			}
    	}
    }
 
	
	private int readingFormalities() throws IOException {
		if(!isReading){
			if(nextException != null){
				try{
					throw nextException;
				} finally {
					nextException = null;
				}
			}
			if(eof){
				return -1;
			}
			readFrom.read(readBuffer);
		}
		
		return readSpaceAvailable();
	}
	
	@Override
	public /* synchronized */ int read() throws IOException {
		if(readFrom == null){
			return super.read();			
		}
		
		switch(readingFormalities()){
			case -1:
				return -1;
			case 0:
				waitForFill();
				/* No Default */
		}
		
		return buf[(int)(readPos++ % buf.length)] & 0xff;
	}

	@Override
	public /* synchronized */ int read(byte[] b, int off, int len) throws IOException {
		if(readFrom == null){
			return super.read(b, off, len);
		}
		
		if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }
		
		switch(readingFormalities()){
			case -1:
				return -1;
			case 0:
//				waitForFill();
				/* No Default */
		}
		
		// Note for reviewing: Some optimizations that take some time were not made here.
		// E.g.
		// -> If b consumes the whole buff, copy directly from ByteBuffer to this buff
		// -> As long as reading buffs are large enough (requires benchmarking), 
		// 	  not do use the buf array at all (late initialization too)

        int ByteBufferFrom = (int) (readPos % buf.length);
        int canReadAmount =  Math.min((int) readSpaceAvailable(), len - off);
        int canByteBufferUntil = (ByteBufferFrom + canReadAmount) % buf.length;
        
		if(canByteBufferUntil < ByteBufferFrom){
    		// For reviewer: Should it always read once? Or should it read twice?
			System.arraycopy(buf, ByteBufferFrom, b, off, buf.length - ByteBufferFrom);
			int ndStartAt = off + (buf.length - ByteBufferFrom);
			// This if should never evaluate "true". It's here just to make sure and make breakpoints
			if(off - ndStartAt >= len){
				readPos = canByteBufferUntil;
				return off - ndStartAt; 
			}
			System.arraycopy(buf, 0, b, ndStartAt, canByteBufferUntil);
			readPos += (buf.length - ByteBufferFrom) + canByteBufferUntil;
			return (buf.length - ByteBufferFrom) + canByteBufferUntil;
    	} else {
			System.arraycopy(buf, ByteBufferFrom, b, off, canByteBufferUntil - ByteBufferFrom);
			readPos += canByteBufferUntil - ByteBufferFrom;
			return canByteBufferUntil - ByteBufferFrom;
    	}
		
		
	}

	@Override
	public long skip(long arg0) throws IOException {
		if(readFrom == null){
			return super.skip(arg0);
		}
		
		// There's no skip in this poc
		// For the real implementation, I'd do (while I didn't skip enough):
		// I'd just "jump" the readPos accordingly (while keeping readPos <= writePos)
		// Then 
		// reposition (.position(int)) in buffer
		// OR
		// clear the buffer
		// Then
		// If SeekableChannel -> use (sord of) .position(.position()+ skipLeft)
		// else -> do not seek.
		
		
		throw new UnsupportedOperationException();
		
	}

	@Override
	public int available() throws IOException {
		if(readFrom == null){
			return super.available();
		}
		return readSpaceAvailable();
		
	}

	@Override
	public void mark(int arg0) {
		if(readFrom == null){
			super.mark(arg0);
		}
		
		// This is not hard to do but it requires quite some time!
		// I'll wait first if the poc passes without this method
	}

	@Override
	public void reset() throws IOException {
		if(readFrom == null){
			super.reset();
		}
		throw new IOException("The mark was lost");
	}

	@Override
	public boolean markSupported() {
		if(readFrom == null){
			return super.markSupported();
		}
		// false for now, at least.
		return false;
	}

	@Override
	public void close() throws IOException {
		if(readFrom != null){
			readFrom.close();
		}
		super.close();
	}

}
-------------- next part --------------
package org.sample.BufferedNon_BlockIO;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

/**
 * A sample file reading benchmark
 */
@BenchmarkMode(Mode.SingleShotTime)
@Fork(value = 1)
@Warmup(iterations = 0, time = 2, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 2)
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class BufferedNonBlockStreamBenchmark_try1 {

	public static boolean isWindows = System.getProperty("os.name").contains("Windows");
	
    @Param({"1gig_file"})
    String file;

//    @Param({"4100", "16400", "131100", "1048600"})
    @Param({"4100", "131100", "1048600"})
    int javaBufSize;
    
    @Param({"100", "1000"})
    int readSize;

//    @Param({"0", "1000", "100000"})
    @Param({"0", "1000"})
    long cpuWork;

    // 												This MUST be smaller than javaBufSize
    //  											@Param({"4096", "16384", "131072", "1048576"})
    @Param({"BufferedInputStream", "BufferedNonBlockStream|4096", "BufferedNonBlockStream|131072", "BufferedNonBlockStream|1048576",
    							   "BufferedAsyncStream|4096",    "BufferedAsyncStream|131072",    "BufferedAsyncStream|1048576"})
    String implType;

    interface ReadOp {
        int read(byte[] buf) throws IOException;
    }

    private ReadOp read;
    private Closeable close;
    private byte[] buf;

    @Setup(Level.Iteration)
    public void setup() throws IOException, InterruptedException {

        clearCache();
        
        String[] type_readSize = implType.split("\\|");

        switch (type_readSize[0]) {
            case "BufferedInputStream":{
                BufferedInputStream in = new BufferedInputStream(
                    new FileInputStream(file), javaBufSize);
                read = in::read;
                close = in::close;
            }
                break;
            case "BufferedNonBlockStream": {
            	BufferedNonBlockStream in = new BufferedNonBlockStream(
        			FileChannel.open(new File(file).toPath()), javaBufSize, Integer.parseInt(type_readSize[1]));
                read = in::read;
                close = in::close;
            }
            break;
            case "BufferedAsyncStream": {
            	BufferedAsyncStream in = new BufferedAsyncStream(
        			AsynchronousFileChannel.open(new File(file).toPath()), javaBufSize, Integer.parseInt(type_readSize[1]));
                read = in::read;
                close = in::close;
            }
            break;
            default:
                throw new IllegalArgumentException(
                    "Invalid parameter 'implType': " + implType);
        }

        buf = new byte[readSize];
    }

    @TearDown(Level.Iteration)
    public void tearDown() throws IOException {
        close.close();
    }

    /**
     * 
     * For linux:
     * 
     * Compile the following C program into clear_cache executable:
     * <pre>{@code
     *
     * #include <stdio.h>
     * #include <string.h>
     * #include <errno.h>
     *
     * #define PROC_FILE "/proc/sys/vm/drop_caches"
     *
     * int main(char *argv[], int argc) {
     *   FILE *f;
     *   f = fopen(PROC_FILE, "w");
     *   if (f) {
     *     fprintf(f, "3\n");
     *     fclose(f);
     *     return 0;
     *   } else {
     *     fprintf(stderr, "Can't write to: %s: %s\n", PROC_FILE, strerror(errno));
     *     return 1;
     *   }
     * }
     *
     * }</pre>
     * ... and make it SUID root!
     * 
     * For windows:
     * Use the provided clear_cache.exe
     * (source code and original author:)
     * https://gist.github.com/bitshifter/c87aa396446bbebeab29
     * Then run this program with administrator privileges
     */
    private static void clearCache() throws IOException, InterruptedException {
        // spawn an OS command to clear the FS cache...
    	if(isWindows){
    		new ProcessBuilder("clear_cache.exe").start().waitFor();
    	} else {
    		new ProcessBuilder("clear_cache").start().waitFor();    		
    	}
    }

    @Benchmark
    public int testRead() throws IOException {
        int nread = 0;
        int n;
        while ((n = read.read(buf)) >= 0) {
            nread += n;
            Blackhole.consumeCPU(cpuWork);
        }
        return nread;
    }
}


More information about the core-libs-dev mailing list