Request/discussion: BufferedReader reading using async API while providing sync API

Brunoais brunoaiss at gmail.com
Mon Oct 24 14:59:38 UTC 2016


Attached and sending!


On 24/10/2016 13:48, Pavel Rappo wrote:
> Could you please send a new email on this list with the source attached as a
> text file?
>
>> On 23 Oct 2016, at 19:14, Brunoais <brunoaiss at gmail.com> wrote:
>>
>> Here's my poc/prototype:
>> http://pastebin.com/WRpYWDJF
>>
>> I've implemented the bare minimum of the class that follows the same contract of BufferedReader while signaling all issues I think it may have or has in comments.
>> I also wrote some javadoc to help guiding through the class.
>>
>> I could have used more fields from BufferedReader but the names were so minimalistic that were confusing me. I intent to change them before sending this to openJDK.
>>
>> One of the major problems this has is long overflowing. It is major because it is hidden, it will be extremely rare and it takes a really long time to reproduce. There are different ways of dealing with it. From just documenting to actually making code that works with it.
>>
>> I built a simple test code for it to have some ideas about performance and correctness.
>>
>> http://pastebin.com/eh6LFgwT
>>
>> This doesn't do a through test if it is actually working correctly but I see no reason for it not working correctly after fixing the 2 bugs that test found.
>>
>> I'll also leave here some conclusions about speed and resource consumption I found.
>>
>> I made tests with default buffer sizes, 5000B  15_000B and 500_000B. I noticed that, with my hardware, with the 1 530 000 000B file, I was getting around:
>>
>> In all buffers and fake work: 10~15s speed improvement ( from 90% HDD speed to 100% HDD speed)
>> In all buffers and no fake work: 1~2s speed improvement ( from 90% HDD speed to 100% HDD speed)
>>
>> Changing the buffer size was giving different reading speeds but both were quite equal in how much they would change when changing the buffer size.
>> Finally, I could always confirm that I/O was always the slowest thing while this code was running.
>>
>> For the ones wondering about the file size; it is both to avoid OS cache and to make the reading at the main use-case these objects are for (large streams of bytes).
>>
>> @Pavel, are you open for discussion now ;)? Need anything else?
>>
>> On 21/10/2016 19:21, Pavel Rappo wrote:
>>> Just to append to my previous email. BufferedReader wraps any Reader out there.
>>> Not specifically FileReader. While you're talking about the case of effective
>>> reading from a file.
>>>
>>> I guess there's one existing possibility to provide exactly what you need (as I
>>> understand it) under this method:
>>>
>>> /**
>>>   * Opens a file for reading, returning a {@code BufferedReader} to read text
>>>   * from the file in an efficient manner...
>>>     ...
>>>   */
>>> java.nio.file.Files#newBufferedReader(java.nio.file.Path)
>>>
>>> It can return _anything_ as long as it is a BufferedReader. We can do it, but it
>>> needs to be investigated not only for your favorite OS but for other OSes as
>>> well. Feel free to prototype this and we can discuss it on the list later.
>>>
>>> Thanks,
>>> -Pavel
>>>
>>>> On 21 Oct 2016, at 18:56, Brunoais <brunoaiss at gmail.com> wrote:
>>>>
>>>> Pavel is right.
>>>>
>>>> In reality, I was expecting such BufferedReader to use only a single buffer and have that Buffer being filled asynchronously, not in a different Thread.
>>>> Additionally, I don't have the intention of having a larger buffer than before unless stated through the API (the constructor).
>>>>
>>>> In my idea, internally, it is supposed to use java.nio.channels.AsynchronousFileChannel or equivalent.
>>>>
>>>> It does not prevent having two buffers and I do not intent to change BufferedReader itself. I'd do an BufferedAsyncReader of sorts (any name suggestion is welcome as I'm an awful namer).
>>>>
>>>>
>>>> On 21/10/2016 18:38, Roger Riggs wrote:
>>>>> Hi Pavel,
>>>>>
>>>>> I think Brunoais asking for a double buffering scheme in which the implementation of
>>>>> BufferReader fills (a second buffer) in parallel with the application reading from the 1st buffer
>>>>> and managing the swaps and async reads transparently.
>>>>> It would not change the API but would change the interactions between the buffered reader
>>>>> and the underlying stream.  It would also increase memory requirements and processing
>>>>> by introducing or using a separate thread and the necessary synchronization.
>>>>>
>>>>> Though I think the formal interface semantics could be maintained, I have doubts
>>>>> about compatibility and its unintended consequences on existing subclasses,
>>>>> applications and libraries.
>>>>>
>>>>> $.02, Roger
>>>>>
>>>>> On 10/21/16 1:22 PM, Pavel Rappo wrote:
>>>>>> Off the top of my head, I would say it's not possible to change the design of an
>>>>>> _extensible_ type that has been out there for 20 or so years. All these I/O
>>>>>> streams from java.io were designed for simple synchronous use case.
>>>>>>
>>>>>> It's not that their design is flawed in some way, it's that they doesn't seem to
>>>>>> suit your needs. Have you considered using java.nio.channels.AsynchronousFileChannel
>>>>>> in your applications?
>>>>>>
>>>>>> -Pavel
>>>>>>
>>>>>>> On 21 Oct 2016, at 17:08, Brunoais <brunoaiss at gmail.com> wrote:
>>>>>>>
>>>>>>> Any feedback on this? I'm really interested in implementing such BufferedReader/BufferedStreamReader to allow speeding up my applications without having to think in an asynchronous way or multi-threading while programming with it.
>>>>>>>
>>>>>>> That's why I'm asking this here.
>>>>>>>
>>>>>>>
>>>>>>> On 13/10/2016 14:45, Brunoais wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I looked at BufferedReader source code for java 9 long with the source code of the channels/streams used. I noticed that, like in java 7, BufferedReader does not use an Async API to load data from files, instead, the data loading is all done synchronously even when the OS allows requesting a file to be read and getting a warning later when the file is effectively read.
>>>>>>>>
>>>>>>>> Why Is BufferedReader not async while providing a sync API?
>>>>>>>>
>

-------------- next part --------------
package pocs.java;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;

public class BufferedNonBlockStream extends BufferedInputStream {

	private static final int DEFAULT_BUFFER_SIZE = 8_192;
	
	/**
     * The maximum size of array to allocate.
     * Some VMs reserve some header words in an array.
     * Attempts to allocate larger arrays may result in
     * OutOfMemoryError: Requested array size exceeds VM limit
     */
    private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;

	private static final int MIN_READ_BUFFER_SIZE = 10;
	/**
     * The maximum size of the read buffer if set automatically.
     * Unless explicitly told, there's no need to starve the VM out of memory
     */
	private static final int MAX_READ_BUFFER_SIZE = 1_000_000;
	
	/**
     * The virtual index of the next position where data will be read.
     * Every operation requires executing a mathematical mod ({@code %}) operation on it against <code>buf.length</code>
     * to get the correct buffer position to work on.
     */
    protected long readPos;
    
    /**
     * The virtual index of the next position where data will be written
     * Every operation requires executing a mathematical mod ({@code %}) operation on it against <code>buf.length</code>
     * to get the correct buffer position to work on.
     * 
     * At every time, {@code readPos < writePos %} must be true
     * 
     */
    // Note to reviewers: What to do when long overflows? It will happen but does a documented warning suffice or... do I need to work with the overflows?
    protected long writePos;
	
	ReadableByteChannel inputChannel;
	ByteBuffer readBuffer;
	
	boolean isClosed;
	boolean eof;

	public BufferedNonBlockStream(InputStream inputStream) {
		super(inputStream);
	}
	
	public BufferedNonBlockStream(ReadableByteChannel inputChannel) {
		this(inputChannel, DEFAULT_BUFFER_SIZE);
	}
	
	public BufferedNonBlockStream(ReadableByteChannel inputChannel, int bufferSize) {
		this(inputChannel, bufferSize, Math.max(Math.min(bufferSize / 4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
	}
	
	public BufferedNonBlockStream(ReadableByteChannel inputChannel, int bufferSize, int readBufferSize) {
		super(null, bufferSize);
		Objects.requireNonNull(inputChannel, "The byte channel must not be null");
		this.inputChannel = inputChannel;
		
		if(readBufferSize < 1){
			throw new IllegalArgumentException("Read buffer must be at least 1 byte");
		}
		
		if(readBufferSize > bufferSize){
			throw new IllegalArgumentException("ReadBufferSize must be smaller than bufferSize");
		}
		
		// The read buffer must not be larger than the internal buffer.
		readBufferSize = Math.min(readBufferSize, MAX_BUFFER_SIZE - 1);
		this.readBuffer = ByteBuffer.allocateDirect(readBufferSize);
		isClosed = false;
	}
	
	int readSpaceAvailable(){
		// This can be an int because buf size can never be larger than the int
		return (int) (writePos - readPos);
	}
	int writeSpaceAvailable(){
		return buf.length - readSpaceAvailable();
	}
	
	void fill() throws IOException {
		fill(false);
	}
    void blockingFill(boolean forced) throws IOException {
		fill(forced);
		while(readPos == writePos){
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				// An interrupt may mean more data is available
			}
			fill(forced);
    	}
    }
    
    synchronized void fill(boolean forced) throws IOException {
    	isClosed = !inputChannel.isOpen();
        
        eof = inputChannel.read(readBuffer) == -1;
        
        // For the poc, markers are not coded
        
        long freeBufSpace = writeSpaceAvailable();
        
        if(	(forced && readBuffer.position() > 0) ||
    		// It's no use reading a very small quantity from the buffer
            // Please review that.
        	(freeBufSpace > readBuffer.capacity() / 2 && freeBufSpace >= readBuffer.position())
        ){
        	readBuffer.flip();
            // Ints are faster and an array can only be defined using an int
            int writeBufferFrom = (int) (writePos % buf.length);
            int canReadAmount =  Math.min((int) freeBufSpace, readBuffer.remaining());
            int canWriteUntil = (writeBufferFrom + canReadAmount) % buf.length;
            
        	// This can only be done like this because ReadBufferSize is smaller than bufferSize
            // and overflows are ignored
        	if(canWriteUntil < writeBufferFrom){
        		// Read in 2 parts
        		readBuffer.get(buf, writeBufferFrom, buf.length - writeBufferFrom);
        		readBuffer.get(buf, 0, canWriteUntil);
        	} else {
        		readBuffer.get(buf, writeBufferFrom, canWriteUntil - writeBufferFrom);
        	}

        	writePos += canReadAmount;
        	// Reset the buffer for more reading
        	readBuffer.clear();
        }
        
    }
	
	@Override
	public int read() throws IOException {
		if(inputChannel == null){
			return super.read();			
		}
		fill(false);
		if(readPos == writePos){
			// Get anything that is in the buffer
			blockingFill(true);
		}
		
		return buf[(int)(readPos++ % buf.length)] & 0xff;
	}

	@Override
	public int read(byte[] b, int off, int len) throws IOException {
		if(inputChannel == null){
			return super.read(b, off, len);
		}
		// Always fill. Even when rejecting
		fill();
		if(eof && readSpaceAvailable() == 0){
			// Buffer emptied and the stream ended
			return -1;
		}
		if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }
		
		int bytesRead = 0;
		
		// For a ReadableChannel, assume that it has always read as much as it could
		if(readSpaceAvailable() == 0){
			fill(true);
			if(readSpaceAvailable() == 0){
				return eof ? -1 : 0;
			}
		}

		// Note for reviewing: Some optimizations that take some time were not made here.
		// E.g.
		// -> If b consumes the whole buff, copy directly from readBuffer to this buff
		// -> As long as reading buffs are large enough (requires benchmarking), 
		// 	  not do use the buf array at all (late initialization too)

        int readBufferFrom = (int) (readPos % buf.length);
        int canReadAmount =  Math.min((int) readSpaceAvailable(), len - off);
        int canReadBufferUntil = (readBufferFrom + canReadAmount) % buf.length;
        
		if(canReadBufferUntil < readBufferFrom){
    		// For reviewer: Should it always read once? Or should it read twice?
			System.arraycopy(buf, readBufferFrom, b, off, buf.length - readBufferFrom);
			int ndStartAt = off + (buf.length - readBufferFrom);
			// This if should never evaluate "true". It's here just to make sure and make breakpoints
			if(off - ndStartAt >= len){
				readPos = canReadBufferUntil;
				return off - ndStartAt; 
			}
			System.arraycopy(buf, 0, b, ndStartAt, canReadBufferUntil);
			readPos += (buf.length - readBufferFrom) + canReadBufferUntil;
			return (buf.length - readBufferFrom) + canReadBufferUntil;
    	} else {
			System.arraycopy(buf, readBufferFrom, b, off, canReadBufferUntil - readBufferFrom);
			readPos += canReadBufferUntil - readBufferFrom;
			return canReadBufferUntil - readBufferFrom;
    	}
		
		
	}

	@Override
	public long skip(long arg0) throws IOException {
		if(inputChannel == null){
			return super.skip(arg0);
		}
		
		// There's no skip in this poc
		// For the real implementation, I'd do (while I didn't skip enough):
		// I'd just "jump" the readPos accordingly (while keeping readPos <= writePos)
		// Then 
		// reposition (.position(int)) in buffer
		// OR
		// clear the buffer
		// Then
		// If SeekableChannel -> use (sord of) .position(.position()+ skipLeft)
		// else -> do not seek.
		
		
		throw new UnsupportedOperationException();
		
	}

	@Override
	public int available() throws IOException {
		if(inputChannel == null){
			return super.available();
		}
		fill();
		// For now, just return what I am sure I have
		return readSpaceAvailable();
		
	}

	@Override
	public void mark(int arg0) {
		if(inputChannel == null){
			super.mark(arg0);
		}
		
		// This is not hard to do but it requires quite some time!
		// I'll wait first if the poc passes without this method
	}

	@Override
	public void reset() throws IOException {
		if(inputChannel == null){
			super.reset();
		}
		throw new IOException("The mark was lost");
	}

	@Override
	public boolean markSupported() {
		if(inputChannel == null){
			return super.markSupported();
		}
		// false for now, at least.
		return false;
	}

	@Override
	public void close() throws IOException {
		if(inputChannel != null){
			inputChannel.close();
		}
		super.close();
	}

}
-------------- next part --------------
package pocs.java;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.util.zip.CRC32;

public class Tests {
	
	private static final int PROGRAM_BUFFER = 200;
	private static final int READ_BUFFER = 500_000;
	private static final boolean EXECUTE_WORK = true;

	static void buildTestFile() throws Exception{

		BufferedOutputStream outputing = new BufferedOutputStream(new FileOutputStream("TestFile"), 30000);
		
		for (int i = 0; i < 6_000_000; i++) {
			for (int j = 0; j < 255; j++) {
				outputing.write(j);
			}
		}
		
		outputing.close();
	}
	
	static void theRead(BufferedInputStream inputing) throws Exception{
		byte[] myBuf = new byte[PROGRAM_BUFFER];
		int readAmount = 0;
		byte expectingByte = 0x00;
		int strikes = 0;
		
		int readCount = 0;
		
		while((readAmount = inputing.read(myBuf)) != -1){
			readCount++;
			for (int i = 0; i < readAmount; i++) {
				if(expectingByte == -1){
					expectingByte = 0;
				}
				if(expectingByte != myBuf[i]){
					System.out.println("For byte " + (int) myBuf[i] + " expected " + (int) expectingByte );
					if(strikes++ > 50){
						return;
					}
				}
				expectingByte += 1;
			}
			if(EXECUTE_WORK){
				// doing work
				for (int i = 0; i < 30; i++) {
					new CRC32().update(myBuf);	
				}
			}
		}
	}
	
	static void normalRead() throws Exception {

		try(BufferedInputStream inputing = new BufferedInputStream(new FileInputStream("TestFile"), READ_BUFFER);){
			theRead(inputing);
		}
	}
	

	static void myRead() throws Exception {

		try(BufferedInputStream inputing = new BufferedNonBlockStream(
					FileChannel.open(new File("TestFile").toPath()), READ_BUFFER
				);
			){
			theRead(inputing);
		}
	}
	
	public static void main(String[] args) throws Exception {

//		buildTestFile();
		long start = System.currentTimeMillis();
//		normalRead();
		myRead();
		System.out.println("Time! " + (System.currentTimeMillis() - start));
		
		
	}
}



More information about the core-libs-dev mailing list