Request/discussion: BufferedReader reading using async API while providing sync API
Brunoais
brunoaiss at gmail.com
Sat Oct 29 18:10:29 UTC 2016
Here's my try on going async.
On my tests on my local windows 10 machine (I don't have access to the
linux one without a VM ATM) , now with 1GB file, I noticed:
~2s of speed improvement from BufferedInputStream to
BufferedNonBlockStream when BufferedNonBlockStream is at its most
advantageous state (with CPU work between reads). Otherwise, it has
~0.3s speed improvement; maybe less.
~0.8s of speed improvement from BufferedNonBlockStream to
BufferedAsyncStream when BufferedNonBlockStream/BufferedAsyncStream is
at its most advantageous state. Otherwise, ~0 speed improvement.
I noticed: Until you process as fast as you read, both
BufferedNonBlockStream and BufferedAsyncStream gain advantage towards
BufferedInputStream. From the point as you take longer to process than
to I/O, BufferedNonBlockStream and BufferedAsyncStream tend to keep the
advantage.
If the file is in cache, BufferedInputStream takes ~1.6-1.9s to read the
file, BufferedNonBlockStream takes a steady ~2.05-2.1s to read the file
and BufferedAsyncStream ~1.2s.
BufferedNonBlockStream and BufferedAsyncStream win more when given more
buffer memory than BufferedInputStream but only until a certain limit.
On my hardware, the best speed I got was with 655360B for the new ones.
Any more than that was not producing any visible results. I guess it is
due to the speed data was processing for the test.
On 28/10/2016 09:16, Brunoais wrote:
> I'll try going back to a previous version I worked on which used the
> java7's AsynchronousFileChannel and work from there. My small research
> shows it can also work with AsynchronousFileChannel mostly without
> changes.
>
> For now, 1 question:
> Is Thread.sleep() a possible way of dealing the block requirements of
> read()? Do I need to use LockSupport.park() or something like that?
>
> I'll call back here when it is done.
>
>
> On 27/10/2016 22:09, David Holmes wrote:
>> You might try discussing on net-dev rather than core-libs-dev, to get
>> additional historical info related to the io and nio file APIs.
>>
>> David
>>
>> On 28/10/2016 5:08 AM, Brunoais wrote:
>>> You are right. Even in windows it does not set the flags for async
>>> reads. It seems like it is windows itself that does the decision to
>>> buffer the contents based on its own heuristics.
>>>
>>> But... Why? Why won't it be? Why is there no API for it? How am I
>>> getting 100% HDD use and faster times when I fake work to delay getting
>>> more data and I only have a fluctuating 60-90% (always going up and
>>> down) when I use an InputStream?
>>> Is it related to how both classes cache and how frequently and how much
>>> each one asks for data?
>>>
>>> I really would prefer not having to read the source code because it
>>> takes a real long time T.T.
>>>
>>> I end up reinstating... And wondering...
>>>
>>> Why doesn't java provide a single-threaded non-block API for file reads
>>> for all OS that support it? I simply cannot find that information no
>>> matter how much I search on google, bing, duck duck go... Can any of
>>> you
>>> point me to whomever knows?
>>>
>>> On 27/10/2016 14:11, Vitaly Davidovich wrote:
>>>> I don't know about Windows specifically, but generally file systems
>>>> across major OS's will implement readahead in their IO scheduler when
>>>> they detect sequential scans.
>>>>
>>>> On Linux, you can also strace your test to confirm which syscalls are
>>>> emitted (you should be seeing plain read()'s there, with
>>>> FileInputStream and FileChannel).
>>>>
>>>> On Thu, Oct 27, 2016 at 9:06 AM, Brunoais <brunoaiss at gmail.com
>>>> <mailto:brunoaiss at gmail.com>> wrote:
>>>>
>>>> Thanks for the heads up.
>>>>
>>>> I'll try that later. These tests are still useful then. Meanwhile,
>>>> I'll end up also checking how FileChannel queries the OS on
>>>> windows. I'm getting 100% HDD reads... Could it be that the OS
>>>> reads the file ahead on its own?... Anyway, I'll look into it.
>>>> Thanks for the heads up.
>>>>
>>>>
>>>> On 27/10/2016 13:53, Vitaly Davidovich wrote:
>>>>>
>>>>>
>>>>> On Thu, Oct 27, 2016 at 8:34 AM, Brunoais <brunoaiss at gmail.com
>>>>> <mailto:brunoaiss at gmail.com>> wrote:
>>>>>
>>>>> Oh... I see. In that case, it means something is terribly
>>>>> wrong. It can be my initial tests, though.
>>>>>
>>>>> I'm testing on both linux and windows and I'm getting
>>>>> performance gains from using the FileChannel compared to
>>>>> using FileInputStream... The tests also make sense based on
>>>>> my predictions O_O...
>>>>>
>>>>> FileInputStream requires copying native buffers holding the read
>>>>> data to the java byte[]. If you're using direct ByteBuffer for
>>>>> FileChannel, that whole memcpy is skipped. Try comparing
>>>>> FileChannel with HeapByteBuffer instead.
>>>>>
>>>>>
>>>>> On 27/10/2016 11:47, Vitaly Davidovich wrote:
>>>>>>
>>>>>>
>>>>>> On Thursday, October 27, 2016, Brunoais <brunoaiss at gmail.com
>>>>>> <mailto:brunoaiss at gmail.com>> wrote:
>>>>>>
>>>>>> Did you read the C code?
>>>>>>
>>>>>> I looked at the Linux code in the JDK.
>>>>>>
>>>>>> Have you got any idea how many functions Windows or
>>>>>> Linux (nearly all flavors) have for the read operation
>>>>>> towards a file?
>>>>>>
>>>>>> I do.
>>>>>>
>>>>>>
>>>>>> I have already done that homework myself. I may not have
>>>>>> read JVM's source code but I know well that there's
>>>>>> functions on both Windows and Linux that provide such
>>>>>> interface I mentioned although they require a slightly
>>>>>> different treatment (and different constants).
>>>>>>
>>>>>> You should read the JDK (native) source code instead of
>>>>>> guessing/assuming. On Linux, it doesn't use aio facilities
>>>>>> for files. The kernel io scheduler may issue readahead
>>>>>> behind the scenes, but there's no nonblocking file io that's
>>>>>> at the heart of your premise.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 27/10/2016 00:06, Vitaly Davidovich wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wednesday, October 26, 2016, Brunoais
>>>>>> <brunoaiss at gmail.com <mailto:brunoaiss at gmail.com>>
>>>>>> wrote:
>>>>>>
>>>>>> It is actually based on the premise that:
>>>>>>
>>>>>> 1. The first call to
>>>>>> ReadableByteChannel.read(ByteBuffer) sets the OS
>>>>>> buffer size to fill in as the same size as
>>>>>> ByteBuffer.
>>>>>>
>>>>>> Why do you say that? AFAICT, it issues a read
>>>>>> syscall and that will block if the data isn't in
>>>>>> page cache.
>>>>>>
>>>>>> 2. The consecutive calls to
>>>>>> ReadableByteChannel.read(ByteBuffer)
>>>>>> orders
>>>>>> the JVM to order the OS to execute memcpy()
>>>>>> to copy from its memory
>>>>>> to the shared memory created at ByteBuffer
>>>>>> instantiation (in
>>>>>> java 8)
>>>>>> using Unsafe and then for the JVM to update
>>>>>> the ByteBuffer fields.
>>>>>>
>>>>>> I think subsequent reads just invoke the same read
>>>>>> syscall, passing the current file offset maintained
>>>>>> by the file channel instance.
>>>>>>
>>>>>> 3. The call will not block waiting for I/O and
>>>>>> it won't take longer
>>>>>> than the JNI interface if no new data exists.
>>>>>> However, it will
>>>>>> block
>>>>>> waiting for the OS to execute memcpy() to the
>>>>>> shared memory.
>>>>>>
>>>>>> So why do you think it won't block?
>>>>>>
>>>>>>
>>>>>> Is my premise wrong?
>>>>>>
>>>>>> If I read correctly, if I don't use a
>>>>>> DirectBuffer, there would be
>>>>>> even another intermediate buffer to copy data to
>>>>>> before giving it
>>>>>> to the "user" which would be useless.
>>>>>>
>>>>>> If you use a HeapByteBuffer, then there's an extra
>>>>>> copy from the native buffer to the Java buffer.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 26/10/2016 11:57, Pavel Rappo wrote:
>>>>>>
>>>>>> I believe I see where you coming from.
>>>>>> Please correct me if
>>>>>> I'm wrong.
>>>>>>
>>>>>> Your implementation is based on the premise
>>>>>> that a call to
>>>>>> ReadableByteChannel.read()
>>>>>> _initiates_ the operation and returns
>>>>>> immediately. The OS then
>>>>>> continues to fill
>>>>>> the buffer while there's a free space in the
>>>>>> buffer and the
>>>>>> channel hasn't encountered EOF.
>>>>>>
>>>>>> Is that right?
>>>>>>
>>>>>> On 25 Oct 2016, at 22:16, Brunoais
>>>>>> <brunoaiss at gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Thank you for your time. I'll try to
>>>>>> explain it. I hope I
>>>>>> can clear it up.
>>>>>> First of it, I made a meaning mistake
>>>>>> between asynchronous
>>>>>> and non-blocking. This implementation
>>>>>> uses a non-blocking
>>>>>> algorithm internally while providing a
>>>>>> blocking-like
>>>>>> algorithm on the surface. It is
>>>>>> single-threaded and not
>>>>>> multi-threaded where one thread fetches
>>>>>> data and blocks
>>>>>> waiting and the other accumulates it and
>>>>>> provides to
>>>>>> whichever wants it.
>>>>>>
>>>>>> Second of it, I had made a mistake of
>>>>>> going after
>>>>>> BufferedReader instead of going after
>>>>>> BufferedInputStream.
>>>>>> If you want me to go after
>>>>>> BufferedReader it's ok but I
>>>>>> only thought that going after
>>>>>> BufferedInputStream would be
>>>>>> more generically useful than
>>>>>> BufferedReaderwhen I started
>>>>>> the poc.
>>>>>>
>>>>>> On to my code:
>>>>>> Short answers:
>>>>>> • The sleep(int) exists because
>>>>>> I don't know how
>>>>>> to wait until more data exists in the
>>>>>> buffer which is part
>>>>>> of read()'s contract.
>>>>>> • The ByteBuffer gives a buffer
>>>>>> that is filled by
>>>>>> the OS (what I believe Channels do)
>>>>>> instead of getting
>>>>>> data only by demand (what I
>>>>>> believe Streams do).
>>>>>> Full answers:
>>>>>> The blockingFill(boolean) method is a
>>>>>> method for a busy
>>>>>> wait for a fill which is used
>>>>>> exclusively by the read()
>>>>>> method. All other methods use the
>>>>>> version that does not
>>>>>> sleep (fill(boolean)).
>>>>>> blockingFill(boolean)'s existance like that is only
>>>>>> because the read() method must not
>>>>>> return unless either:
>>>>>>
>>>>>> • The stream ended.
>>>>>> • The next byte is ready for
>>>>>> reading.
>>>>>> Additionally, statistically, that while
>>>>>> loop will rarely
>>>>>> evaluate to true as reads are in chunks
>>>>>> so readPos will be
>>>>>> behind writePos most of the time.
>>>>>> I have no idea if an interrupt will ever
>>>>>> happen, to be
>>>>>> honest. The main reasons why I'm using a
>>>>>> sleep is because
>>>>>> I didn't want a hog onto the CPU in a
>>>>>> full thread usage
>>>>>> busy wait and because I didn't find any
>>>>>> way of doing a
>>>>>> thread sleep in order to wake up later
>>>>>> when the buffer
>>>>>> managed by native code has more data.
>>>>>> The Non-blocking part is managed by the
>>>>>> buffer the OS
>>>>>> keeps filling most if not all the time.
>>>>>> That buffer is the
>>>>>> field
>>>>>>
>>>>>> ByteBuffer readBuffer
>>>>>> That's the gaining part against the
>>>>>> plain old Buffered
>>>>>> classes.
>>>>>>
>>>>>>
>>>>>> Did that make sense to you? Feel free to
>>>>>> ask anything else
>>>>>> you need.
>>>>>>
>>>>>> On 25/10/2016 20:52, Pavel Rappo wrote:
>>>>>>
>>>>>> I've skimmed through the code and
>>>>>> I'm not sure I can
>>>>>> see any asynchronicity
>>>>>> (you were pointing at the lack of it
>>>>>> in BufferedReader).
>>>>>> And the mechanics of this is very
>>>>>> puzzling to me, to
>>>>>> be honest:
>>>>>> void blockingFill(boolean
>>>>>> forced) throws
>>>>>> IOException {
>>>>>> fill(forced);
>>>>>> while (readPos ==
>>>>>> writePos) {
>>>>>> try {
>>>>>> Thread.sleep(100);
>>>>>> } catch
>>>>>> (InterruptedException e) {
>>>>>> // An interrupt may mean more data is
>>>>>> available
>>>>>> }
>>>>>> fill(forced);
>>>>>> }
>>>>>> }
>>>>>> I thought you were suggesting that
>>>>>> we should utilize
>>>>>> the tools which OS provides
>>>>>> more efficiently. Instead we have
>>>>>> something that looks
>>>>>> very similarly to a
>>>>>> "busy loop" and... also who and when
>>>>>> is supposed to
>>>>>> interrupt Thread.sleep()?
>>>>>> Sorry, I'm not following. Could you
>>>>>> please explain how
>>>>>> this is supposed to work?
>>>>>>
>>>>>> On 24 Oct 2016, at 15:59,
>>>>>> Brunoais
>>>>>> <brunoaiss at gmail.com>
>>>>>> wrote:
>>>>>> Attached and sending!
>>>>>> On 24/10/2016 13:48, Pavel Rappo
>>>>>> wrote:
>>>>>>
>>>>>> Could you please send a new
>>>>>> email on this list
>>>>>> with the source attached
>>>>>> as a
>>>>>> text file?
>>>>>>
>>>>>> On 23 Oct 2016, at
>>>>>> 19:14, Brunoais
>>>>>> <brunoaiss at gmail.com>
>>>>>> wrote:
>>>>>> Here's my poc/prototype:
>>>>>>
>>>>>> http://pastebin.com/WRpYWDJF
>>>>>>
>>>>>> I've implemented the
>>>>>> bare minimum of the
>>>>>> class that follows the same contract of
>>>>>> BufferedReader while signaling all issues
>>>>>> I think it may have or
>>>>>> has in comments.
>>>>>> I also wrote some
>>>>>> javadoc to help guiding
>>>>>> through the class.
>>>>>> I could have used more
>>>>>> fields from
>>>>>> BufferedReader but the names were so
>>>>>> minimalistic that were confusing me. I
>>>>>> intent to change them before sending this
>>>>>> to openJDK.
>>>>>> One of the major
>>>>>> problems this has is long
>>>>>> overflowing. It is major because it is
>>>>>> hidden, it will be extremely rare and it
>>>>>> takes a really long time to reproduce.
>>>>>> There are different ways of dealing with
>>>>>> it. From just
>>>>>> documenting to actually
>>>>>> making code that works with it.
>>>>>> I built a simple test
>>>>>> code for it to have
>>>>>> some ideas about
>>>>>> performance and correctness.
>>>>>>
>>>>>> http://pastebin.com/eh6LFgwT
>>>>>>
>>>>>> This doesn't do a
>>>>>> through test if it is
>>>>>> actually working correctly but I see no
>>>>>> reason for it not working correctly after
>>>>>> fixing the 2 bugs that test found.
>>>>>> I'll also leave here
>>>>>> some conclusions
>>>>>> about speed and resource consumption I found.
>>>>>> I made tests with
>>>>>> default buffer sizes,
>>>>>> 5000B 15_000B and 500_000B. I noticed
>>>>>> that, with my hardware, with the 1 530 000
>>>>>> 000B file, I was getting
>>>>>> around:
>>>>>> In all buffers and fake
>>>>>> work: 10~15s speed
>>>>>> improvement ( from 90% HDD speed to 100%
>>>>>> HDD speed)
>>>>>> In all buffers and no
>>>>>> fake work: 1~2s
>>>>>> speed improvement ( from 90% HDD speed to
>>>>>> 100% HDD speed)
>>>>>> Changing the buffer size was giving
>>>>>> different reading speeds but both were
>>>>>> quite equal in how much they would change
>>>>>> when changing the buffer
>>>>>> size.
>>>>>> Finally, I could always confirm that I/O
>>>>>> was always the slowest
>>>>>> thing while this
>>>>>> code was running.
>>>>>> For the ones wondering
>>>>>> about the file
>>>>>> size; it is both to avoid OS cache and to
>>>>>> make the reading at the
>>>>>> main use-case
>>>>>> these objects are for (large streams of
>>>>>> bytes).
>>>>>> @Pavel, are you open for discussion now
>>>>>> ;)? Need anything else?
>>>>>> On 21/10/2016 19:21,
>>>>>> Pavel Rappo wrote:
>>>>>>
>>>>>> Just to append to my previous email.
>>>>>> BufferedReader wraps any Reader out there.
>>>>>> Not specifically FileReader. While
>>>>>> you're talking about the case of effective
>>>>>> reading from a file.
>>>>>> I guess there's one existing
>>>>>> possibility to provide exactly what
>>>>>> you need (as I
>>>>>> understand it) under this method:
>>>>>> /**
>>>>>> * Opens a file for reading,
>>>>>> returning a {@code BufferedReader} to
>>>>>> read text
>>>>>> * from the file in an efficient
>>>>>> manner...
>>>>>> ...
>>>>>> */
>>>>>>
>>>>>> java.nio.file.Files#newBufferedReader(java.nio.file.Path)
>>>>>> It can return _anything_ as long as it
>>>>>> is a BufferedReader. We can do it, but it
>>>>>> needs to be investigated not only for
>>>>>> your favorite OS but for other OSes as
>>>>>> well. Feel free to prototype this and
>>>>>> we can discuss it on the list later.
>>>>>> Thanks,
>>>>>> -Pavel
>>>>>>
>>>>>> On 21 Oct 2016, at 18:56, Brunoais
>>>>>> <brunoaiss at gmail.com>
>>>>>> wrote:
>>>>>> Pavel is right.
>>>>>> In reality, I was expecting such
>>>>>> BufferedReader to use only a
>>>>>> single buffer and have that Buffer
>>>>>> being filled asynchronously, not
>>>>>> in a different Thread.
>>>>>> Additionally, I don't have the
>>>>>> intention of having a larger
>>>>>> buffer than before unless stated
>>>>>> through the API (the constructor).
>>>>>> In my idea, internally, it is
>>>>>> supposed to use
>>>>>> java.nio.channels.AsynchronousFileChannel
>>>>>> or equivalent.
>>>>>> It does not prevent having two
>>>>>> buffers and I do not intent to
>>>>>> change BufferedReader itself. I'd
>>>>>> do an BufferedAsyncReader of sorts
>>>>>> (any name suggestion is welcome as
>>>>>> I'm an awful namer).
>>>>>> On 21/10/2016 18:38, Roger Riggs
>>>>>> wrote:
>>>>>>
>>>>>> Hi Pavel,
>>>>>> I think Brunoais asking for a
>>>>>> double buffering scheme in
>>>>>> which the implementation of
>>>>>> BufferReader fills (a second
>>>>>> buffer) in parallel with the
>>>>>> application reading from the
>>>>>> 1st buffer
>>>>>> and managing the swaps and
>>>>>> async reads transparently.
>>>>>> It would not change the API
>>>>>> but would change the
>>>>>> interactions between the
>>>>>> buffered reader
>>>>>> and the underlying stream. It
>>>>>> would also increase memory
>>>>>> requirements and processing
>>>>>> by introducing or using a
>>>>>> separate thread and the
>>>>>> necessary synchronization.
>>>>>> Though I think the formal
>>>>>> interface semantics could be
>>>>>> maintained, I have doubts
>>>>>> about compatibility and its
>>>>>> unintended consequences on
>>>>>> existing subclasses,
>>>>>> applications and libraries.
>>>>>> $.02, Roger
>>>>>> On 10/21/16 1:22 PM, Pavel
>>>>>> Rappo wrote:
>>>>>>
>>>>>> Off the top of my head, I
>>>>>> would say it's not
>>>>>> possible to change the
>>>>>> design of an
>>>>>> _extensible_ type that has
>>>>>> been out there for 20 or
>>>>>> so years. All these I/O
>>>>>> streams from java.io <http://java.io>
>>>>>> <http://java.io> were
>>>>>> designed for simple
>>>>>> synchronous use case.
>>>>>> It's not that their design
>>>>>> is flawed in some way,
>>>>>> it's that they doesn't seem to
>>>>>> suit your needs. Have you
>>>>>> considered using
>>>>>> java.nio.channels.AsynchronousFileChannel
>>>>>> in your applications?
>>>>>> -Pavel
>>>>>>
>>>>>> On 21 Oct 2016, at
>>>>>> 17:08, Brunoais
>>>>>> <brunoaiss at gmail.com>
>>>>>> wrote:
>>>>>> Any feedback on this?
>>>>>> I'm really interested
>>>>>> in implementing such
>>>>>> BufferedReader/BufferedStreamReader
>>>>>> to allow speeding up
>>>>>> my applications
>>>>>> without having to
>>>>>> think in an
>>>>>> asynchronous way or
>>>>>> multi-threading while
>>>>>> programming with it.
>>>>>> That's why I'm asking
>>>>>> this here.
>>>>>> On 13/10/2016 14:45,
>>>>>> Brunoais wrote:
>>>>>>
>>>>>> Hi,
>>>>>> I looked at
>>>>>> BufferedReader
>>>>>> source code for
>>>>>> java 9 long with
>>>>>> the source code of
>>>>>> the
>>>>>> channels/streams
>>>>>> used. I noticed
>>>>>> that, like in java
>>>>>> 7, BufferedReader
>>>>>> does not use an
>>>>>> Async API to load
>>>>>> data from files,
>>>>>> instead, the data
>>>>>> loading is all
>>>>>> done synchronously
>>>>>> even when the OS
>>>>>> allows requesting
>>>>>> a file to be read
>>>>>> and getting a
>>>>>> warning later when
>>>>>> the file is
>>>>>> effectively read.
>>>>>> Why Is
>>>>>> BufferedReader not
>>>>>> async while
>>>>>> providing a sync API?
>>>>>>
>>>>>> <BufferedNonBlockStream.java><Tests.java>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> -- Sent from my phone
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> -- Sent from my phone
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>
-------------- next part --------------
package org.sample.BufferedNon_BlockIO;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.locks.LockSupport;
public class BufferedAsyncStream extends BufferedInputStream {
private static final int DEFAULT_BUFFER_SIZE = 8_192;
/**
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
*/
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
private static final int MIN_READ_BUFFER_SIZE = 10;
/**
* The maximum size of the read buffer if set automatically.
* Unless explicitly told, there's no need to starve the VM out of memory
*/
private static final int MAX_READ_BUFFER_SIZE = 1_000_000;
/**
* The virtual index of the next position where data will be read.
* Every operation requires executing a mathematical mod ({@code %}) operation on it against <code>buf.length</code>
* to get the correct buffer position to work on.
*/
protected long readPos;
/**
* The virtual index of the next position where data will be written
* Every operation requires executing a mathematical mod ({@code %}) operation on it against <code>buf.length</code>
* to get the correct buffer position to work on.
*
* At every time, {@code readPos <= writePos %} must be true
*
*/
// Note to reviewers: What to do when long overflows? It will happen but does a documented warning suffice or... do I need to work with the overflows?
protected long writePos;
ChannelReader readFrom;
ByteBuffer readBuffer;
FileReadCallback<Object> readCallback;
IOException nextException;
transient boolean isReading;
Thread parkingThread;
boolean isClosed;
boolean eof;
interface ChannelReader extends Closeable{
void read(ByteBuffer dst);
}
class AsyncFileChannelReader implements ChannelReader, CompletionHandler<Integer, ByteBuffer>{
AsynchronousFileChannel inChannel;
long currentPosition;
AsyncFileChannelReader(AsynchronousFileChannel inChannel) {
this.inChannel = inChannel;
currentPosition = 0;
}
@Override
public void read(ByteBuffer dst) {
isReading = true;
inChannel.read(dst, currentPosition, dst, this);
}
@Override
public void close() throws IOException{
inChannel.close();
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(result > 0){
currentPosition += result;
}
readCallback.completed(result, attachment);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
readCallback.failed(exc, attachment);
}
}
class AsyncByteChannelReader implements ChannelReader{
AsynchronousByteChannel inChannel;
AsyncByteChannelReader(AsynchronousByteChannel inChannel) {
this.inChannel = inChannel;
}
@Override
public void read(ByteBuffer dst) {
isReading = true;
inChannel.read(dst, dst, readCallback);
}
@Override
public void close() throws IOException {
inChannel.close();
}
}
class FileReadCallback<A> implements CompletionHandler<Integer, A>{
@Override
public void completed(Integer resultI, A attachment) {
// unbox
int result = resultI;
if(result == -1){
eof = true;
}
long freeBufSpace = writeSpaceAvailable();
if(freeBufSpace < readBuffer.position()){
// No enough space in buf to copy from readBuffer.
// Just send it back to fill even more
readFrom.read(readBuffer);
return;
}
if(readBuffer.position() > readBuffer.capacity() / 2 || (eof && readBuffer.position() > 0)){
readBuffer.flip();
// Ints are faster and an array can only be defined using an int
int writeBufferFrom = (int) (writePos % buf.length);
int canReadAmount = Math.min((int) freeBufSpace, readBuffer.remaining());
int canWriteUntil = (writeBufferFrom + canReadAmount) % buf.length;
// This can only be done like this because ReadBufferSize is smaller than bufferSize
// and overflows are ignored
if(canWriteUntil < writeBufferFrom){
// Read in 2 parts
readBuffer.get(buf, writeBufferFrom, buf.length - writeBufferFrom);
readBuffer.get(buf, 0, canWriteUntil);
} else {
readBuffer.get(buf, writeBufferFrom, canWriteUntil - writeBufferFrom);
}
writePos += canReadAmount;
// Reset the buffer for more reading
readBuffer.clear();
LockSupport.unpark(parkingThread);
}
if(eof){
isReading = false;
return;
}
readFrom.read(readBuffer);
}
@Override
public void failed(Throwable exc, A attachment) {
if(exc instanceof IOException){
nextException = (IOException) exc;
} else {
nextException = new IOException("Failed to read more data to the buffer", exc);
}
isReading = false;
}
}
public BufferedAsyncStream(InputStream inputStream) {
super(inputStream);
}
public BufferedAsyncStream(AsynchronousFileChannel inputChannel) {
this(inputChannel, DEFAULT_BUFFER_SIZE);
}
public BufferedAsyncStream(AsynchronousFileChannel inputChannel, int bufferSize) {
this(inputChannel, bufferSize, Math.max(Math.min(bufferSize / 4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
}
public BufferedAsyncStream(AsynchronousFileChannel inputChannel, int bufferSize, int readBufferSize) {
super(null, Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1));
// Objects.requireNonNull(inputChannel, "The file channel must not be null");
init(new AsyncFileChannelReader(inputChannel), Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1), readBufferSize);
}
public BufferedAsyncStream(AsynchronousByteChannel inputChannel) {
this(inputChannel, DEFAULT_BUFFER_SIZE);
}
public BufferedAsyncStream(AsynchronousByteChannel inputChannel, int bufferSize) {
this(inputChannel, bufferSize, Math.max(Math.min(bufferSize / 4, MAX_READ_BUFFER_SIZE), MIN_READ_BUFFER_SIZE));
}
public BufferedAsyncStream(AsynchronousByteChannel inputChannel, int bufferSize, int readBufferSize) {
super(null, Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1));
// Objects.requireNonNull(inputChannel, "The byte channel must not be null");
init(new AsyncByteChannelReader(inputChannel), Math.max(bufferSize, MIN_READ_BUFFER_SIZE + 1), readBufferSize);
}
private void init(ChannelReader inputChannel, int bufferSize, int readBufferSize){
this.readFrom = inputChannel;
if(readBufferSize < 10){
throw new IllegalArgumentException("Read buffer must be at least 10 bytes");
}
if(readBufferSize > bufferSize){
throw new IllegalArgumentException("ReadBufferSize must be smaller than bufferSize");
}
// The read buffer must not be larger than the internal buffer.
readBufferSize = Math.min(readBufferSize, MAX_BUFFER_SIZE - 1);
this.readBuffer = ByteBuffer.allocateDirect(readBufferSize);
this.readCallback = new FileReadCallback<>();
isClosed = false;
readFrom.read(readBuffer);
}
int readSpaceAvailable(){
// This can be an int because buf size can never be larger than the int
return (int) (writePos - readPos);
}
int writeSpaceAvailable(){
return buf.length - readSpaceAvailable();
}
/**
* Blocks waiting for a write.
* After leaving the block, it is guaranteed that at least 1 byte is readable from the buffer
* @throws IOException
*/
// This is /* synchronized */ to make sure only up to 1 thread is parked inside
void waitForFill() throws IOException {
while(readSpaceAvailable() < 1 && !eof){
try{
parkingThread = Thread.currentThread();
// wait for 1 second tops. File/web reading really should be faster than that
LockSupport.parkNanos(this, 1_000_000_000);
}finally{
parkingThread = null;
}
}
}
private int readingFormalities() throws IOException {
if(!isReading){
if(nextException != null){
try{
throw nextException;
} finally {
nextException = null;
}
}
if(eof){
return -1;
}
readFrom.read(readBuffer);
}
return readSpaceAvailable();
}
@Override
public /* synchronized */ int read() throws IOException {
if(readFrom == null){
return super.read();
}
switch(readingFormalities()){
case -1:
return -1;
case 0:
waitForFill();
/* No Default */
}
return buf[(int)(readPos++ % buf.length)] & 0xff;
}
@Override
public /* synchronized */ int read(byte[] b, int off, int len) throws IOException {
if(readFrom == null){
return super.read(b, off, len);
}
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
switch(readingFormalities()){
case -1:
return -1;
case 0:
// waitForFill();
/* No Default */
}
// Note for reviewing: Some optimizations that take some time were not made here.
// E.g.
// -> If b consumes the whole buff, copy directly from ByteBuffer to this buff
// -> As long as reading buffs are large enough (requires benchmarking),
// not do use the buf array at all (late initialization too)
int ByteBufferFrom = (int) (readPos % buf.length);
int canReadAmount = Math.min((int) readSpaceAvailable(), len - off);
int canByteBufferUntil = (ByteBufferFrom + canReadAmount) % buf.length;
if(canByteBufferUntil < ByteBufferFrom){
// For reviewer: Should it always read once? Or should it read twice?
System.arraycopy(buf, ByteBufferFrom, b, off, buf.length - ByteBufferFrom);
int ndStartAt = off + (buf.length - ByteBufferFrom);
// This if should never evaluate "true". It's here just to make sure and make breakpoints
if(off - ndStartAt >= len){
readPos = canByteBufferUntil;
return off - ndStartAt;
}
System.arraycopy(buf, 0, b, ndStartAt, canByteBufferUntil);
readPos += (buf.length - ByteBufferFrom) + canByteBufferUntil;
return (buf.length - ByteBufferFrom) + canByteBufferUntil;
} else {
System.arraycopy(buf, ByteBufferFrom, b, off, canByteBufferUntil - ByteBufferFrom);
readPos += canByteBufferUntil - ByteBufferFrom;
return canByteBufferUntil - ByteBufferFrom;
}
}
@Override
public long skip(long arg0) throws IOException {
if(readFrom == null){
return super.skip(arg0);
}
// There's no skip in this poc
// For the real implementation, I'd do (while I didn't skip enough):
// I'd just "jump" the readPos accordingly (while keeping readPos <= writePos)
// Then
// reposition (.position(int)) in buffer
// OR
// clear the buffer
// Then
// If SeekableChannel -> use (sord of) .position(.position()+ skipLeft)
// else -> do not seek.
throw new UnsupportedOperationException();
}
@Override
public int available() throws IOException {
if(readFrom == null){
return super.available();
}
return readSpaceAvailable();
}
@Override
public void mark(int arg0) {
if(readFrom == null){
super.mark(arg0);
}
// This is not hard to do but it requires quite some time!
// I'll wait first if the poc passes without this method
}
@Override
public void reset() throws IOException {
if(readFrom == null){
super.reset();
}
throw new IOException("The mark was lost");
}
@Override
public boolean markSupported() {
if(readFrom == null){
return super.markSupported();
}
// false for now, at least.
return false;
}
@Override
public void close() throws IOException {
if(readFrom != null){
readFrom.close();
}
super.close();
}
}
-------------- next part --------------
package org.sample.BufferedNon_BlockIO;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
/**
* A sample file reading benchmark
*/
@BenchmarkMode(Mode.SingleShotTime)
@Fork(value = 1)
@Warmup(iterations = 0, time = 2, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 2)
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class BufferedNonBlockStreamBenchmark_try1 {
public static boolean isWindows = System.getProperty("os.name").contains("Windows");
@Param({"1gig_file"})
String file;
// @Param({"4100", "16400", "131100", "1048600"})
@Param({"4100", "131100", "1048600"})
int javaBufSize;
@Param({"100", "1000"})
int readSize;
// @Param({"0", "1000", "100000"})
@Param({"0", "1000"})
long cpuWork;
// This MUST be smaller than javaBufSize
// @Param({"4096", "16384", "131072", "1048576"})
@Param({"BufferedInputStream", "BufferedNonBlockStream|4096", "BufferedNonBlockStream|131072", "BufferedNonBlockStream|1048576",
"BufferedAsyncStream|4096", "BufferedAsyncStream|131072", "BufferedAsyncStream|1048576"})
String implType;
interface ReadOp {
int read(byte[] buf) throws IOException;
}
private ReadOp read;
private Closeable close;
private byte[] buf;
@Setup(Level.Iteration)
public void setup() throws IOException, InterruptedException {
clearCache();
String[] type_readSize = implType.split("\\|");
switch (type_readSize[0]) {
case "BufferedInputStream":{
BufferedInputStream in = new BufferedInputStream(
new FileInputStream(file), javaBufSize);
read = in::read;
close = in::close;
}
break;
case "BufferedNonBlockStream": {
BufferedNonBlockStream in = new BufferedNonBlockStream(
FileChannel.open(new File(file).toPath()), javaBufSize, Integer.parseInt(type_readSize[1]));
read = in::read;
close = in::close;
}
break;
case "BufferedAsyncStream": {
BufferedAsyncStream in = new BufferedAsyncStream(
AsynchronousFileChannel.open(new File(file).toPath()), javaBufSize, Integer.parseInt(type_readSize[1]));
read = in::read;
close = in::close;
}
break;
default:
throw new IllegalArgumentException(
"Invalid parameter 'implType': " + implType);
}
buf = new byte[readSize];
}
@TearDown(Level.Iteration)
public void tearDown() throws IOException {
close.close();
}
/**
*
* For linux:
*
* Compile the following C program into clear_cache executable:
* <pre>{@code
*
* #include <stdio.h>
* #include <string.h>
* #include <errno.h>
*
* #define PROC_FILE "/proc/sys/vm/drop_caches"
*
* int main(char *argv[], int argc) {
* FILE *f;
* f = fopen(PROC_FILE, "w");
* if (f) {
* fprintf(f, "3\n");
* fclose(f);
* return 0;
* } else {
* fprintf(stderr, "Can't write to: %s: %s\n", PROC_FILE, strerror(errno));
* return 1;
* }
* }
*
* }</pre>
* ... and make it SUID root!
*
* For windows:
* Use the provided clear_cache.exe
* (source code and original author:)
* https://gist.github.com/bitshifter/c87aa396446bbebeab29
* Then run this program with administrator privileges
*/
private static void clearCache() throws IOException, InterruptedException {
// spawn an OS command to clear the FS cache...
if(isWindows){
new ProcessBuilder("clear_cache.exe").start().waitFor();
} else {
new ProcessBuilder("clear_cache").start().waitFor();
}
}
@Benchmark
public int testRead() throws IOException {
int nread = 0;
int n;
while ((n = read.read(buf)) >= 0) {
nread += n;
Blackhole.consumeCPU(cpuWork);
}
return nread;
}
}
More information about the core-libs-dev
mailing list