Streams, parallelization, and OOME.
David Alayachew
davidalayachew at gmail.com
Sat Oct 19 13:36:36 UTC 2024
So to be clear, I added a logger to my BufferedReader. So I know for a fact
that it is reading data.
And as for the code, it is a very simple parallel forEach.
someStream.parallel().forEach(**work**);
I only wanted to change the execution from sequential to parallel.
So I have millions and millions of lines being read from the source (1
million lines is several batches alone) and none of it is entering the
forEach loop. And I know this because the very first thing my forEach loop
does is print logs.
As for the iterator, it is literally just a hand rolled iterator (that I
copied from StackOverflow) where there is an instance field holding a list
of elements, sized to match my batch size. hasNext() prepares the next
batch and stores it into that instance field, then returns true/false if
there is data to send, then next() returns that instance field's prepared
batch. Preparing is just making a new list, and storing a batch size's
worth of elements in it.
So this means that it is just grabbing batches and batches, but letting any
of them through into the forEach loop. And when I turn off parallelism,
they suddenly are going through just fine, one after the other.
On Sat, Oct 19, 2024, 7:12 AM Olexandr Rotan <rotanolexandr842 at gmail.com>
wrote:
> Hi David. I am not a core libs team but I guess I can have some clues :).
>
> It is hard to tell without the code, but I assume that there are a few
> layers to it.
>
> 1. Stalling. I would assume it is caused mostly by GC pauses taking too
> long (forever) if GC does not have any computational powers to run on.
> There is a fairly common GC-pauses related issue when database connection
> interrupts with exception saying "Broken pipe", which under the hood is
> caused by timeout of connection to database due to long GC pause when
> running on low memory. I am not saying this is your case, but If I were to
> guess I would assume that stall is caused by low memory.
>
> 2. Out of memory root cause may be too much splitting of your data source
> input. You may try to limit it by modifying the behaviour of trySplit
> method of your spliterator.
>
> Alternatively, If you don't mind taking up some disk space, you can try to
> stream data into file, save it, and then use memory-mapped buffers
> (java.nio.MappedByteBuffer) to process accepted data. I am not sure this
> will work, but memory-mapped files is a common tool to deal with operations
> that cant fit into RAM.
>
> Regards
>
>
> On Sat, Oct 19, 2024 at 8:54 AM David Alayachew <davidalayachew at gmail.com>
> wrote:
>
>> Hello Core Libs Dev Team,
>>
>> I have a file that I am streaming from a service, and I am trying to
>> split into multiple parts based on a certain attribute found on each line.
>> I am sending each part up to a different service.
>>
>> I am using BufferedReader.lines(). However, I cannot read the whole file
>> into memory because it is larger than the amount of RAM that I have on the
>> machine. So, since I don't have access to Java 22's Preview Gatherers Fixed
>> Window, I used the iterator() method on my stream, wrapped that in another
>> iterator that can grab my batch size worth of data, then built a
>> spliterator from that that I then used to create a new stream. In short,
>> this wrapper iterator isn't Iterator<T>, it's Iterator<List<T>>.
>>
>> When I ran this sequentially, everything worked well. However, my CPU was
>> low and we definitely have a performance problem -- our team needs this
>> number as fast as we can get. Plus, we had plenty of network bandwidth to
>> spare, so I had (imo) good reason to go use parallelism.
>>
>> As soon as I turned on parallelism, the stream's behaviour changed
>> completely. Instead of fetching the batch and processing, it started
>> grabbing SEVERAL BATCHES and processing NONE OF THEM. Or at the very least,
>> it grabbed so many batches that it ran out of memory before it could get to
>> processing them.
>>
>> To give some numbers, this is a 4 core machine. And we can safely hold
>> about 30-40 batches worth of data in memory before crashing. But again,
>> when running sequentially, this thing only grabs 1 batch, processes that
>> one batch, sends out the results, and then start the next one, all as
>> expected. I thought that adding parallelism would simply make it so that we
>> have this happening 4 or 8 times at once.
>>
>> After a very long period of digging, I managed to find this link.
>>
>>
>> https://stackoverflow.com/questions/30825708/java-8-using-parallel-in-a-stream-causes-oom-error
>>
>> Tagir Valeev gives an answer which doesn't go very deep into the "why" at
>> all. And the answer is more directed to the user's specific question as
>> opposed to solving this particular problem.
>>
>> After digging through a bunch of other solutions (plus my own testing),
>> it seems that the answer is that the engine that does parallelization for
>> Streams tries to grab a large enough "buffer" before doing any parallel
>> processing. I could be wrong, and how large that buffer is? I have no idea.
>>
>> Regardless, that's about where I gave up and went sequential, since the
>> clock was ticking.
>>
>> But I still have a performance problem. How would one suggest going about
>> this in Java 8?
>>
>> Thank you for your time and help.
>> David Alayachew
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20241019/4b63001f/attachment.htm>
More information about the core-libs-dev
mailing list