[External] : Re: Question about Streams, Gatherers, and fetching too many elements
Viktor Klang
viktor.klang at oracle.com
Tue Nov 12 11:36:09 UTC 2024
Have you considered Gatherers.mapConcurrent(…)?
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: David Alayachew <davidalayachew at gmail.com>
Sent: Tuesday, 12 November 2024 01:53
To: Viktor Klang <viktor.klang at oracle.com>
Cc: core-libs-dev <core-libs-dev at openjdk.org>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements
Good to know, ty vm.
At the very least, I have this workaround. This will meet my needs for now.
I guess my final question would be -- is this type of problem better suited to something besides parallel streams? Maybe an ExecutorService?
Really, all I am doing is taking a jumbo file, splitting it into batches, and then doing some work on those batches. My IO speeds are pretty fast, and the compute work is non-trivial, so there is performance being left on the table if I give up parallelism. And I am in a position where completion time is very important to us.
I just naturally assumed parallel streams were the right choice because the compute work is simple. A pure function that I can break out, and then call in a map. Once I do that, I just call forEach to write the batches back out to S3. Maybe I should look into a different part of the std lib instead because I am using the wrong tool for the job? My nose says ExecutorService, but I figure I should ask before I dive too deep in.
On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>> wrote:
You're most welcome!
In a potential future where all intermediate operations are Gatherer-based, and all terminal operations are Collector-based, it would just work as expected. But with that said, I'm not sure it is practically achievable because some operations might not have the same performance-characteristics as before.
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: David Alayachew <davidalayachew at gmail.com<mailto:davidalayachew at gmail.com>>
Sent: Monday, 11 November 2024 18:32
To: Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>>
Cc: core-libs-dev <core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org>>
Subject: [External] : Re: Question about Streams, Gatherers, and fetching too many elements
Thanks for the workaround. It's running beautifully.
Is there a future where this island concept is extended to the rest of streams? Tbh, I don't fully understand it.
On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <viktor.klang at oracle.com<mailto:viktor.klang at oracle.com>> wrote:
Hi David,
This is the effect of how parallel streams are implemented, where different stages, which are not representible as a join-less Spliterator are executed as a series of "islands" where the next isn't started until the former has completed.
If you think about it, parallelization of a Stream works best when the entire data set can be split amongst a set of worker threads, and that sort of implies that you want eager pre-fetch of data, so if your dataset does not fit in memory, that is likely to lead to less desirable outcomes.
What I was able to do for Gatherers is to implement "gather(…) + collect(…)"-fusion so any number of consecutive gather(…)-operations immediately followed by a collect(…) is run in the same "island".
So with that said, you could try something like the following:
static <T> Collector<T, ?, Void> forEach(Consumer<? super T> each) {
return Collector.of(() -> null, (v, e) -> each.accept(e), (l, r) -> l, (v) -> null, Collector.Characteristics.IDENTITY_FINISH);
}
stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.collect(forEach(eachList -> println(eachList.getFirst())));
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: core-libs-dev <core-libs-dev-retn at openjdk.org<mailto:core-libs-dev-retn at openjdk.org>> on behalf of David Alayachew <davidalayachew at gmail.com<mailto:davidalayachew at gmail.com>>
Sent: Monday, 11 November 2024 14:52
To: core-libs-dev <core-libs-dev at openjdk.org<mailto:core-libs-dev at openjdk.org>>
Subject: Re: Question about Streams, Gatherers, and fetching too many elements
And just to avoid the obvious question, I can hold about 30 batches in memory before the Out of Memory error occurs. So this is not an issue of my batch size being too high.
But just to confirm, I set the batch size to 1, and it still ran into an out of memory error. So I feel fairly confident saying that the Gatherer is trying to grab all available data before sending any of it downstream.
On Mon, Nov 11, 2024, 8:46 AM David Alayachew <davidalayachew at gmail.com<mailto:davidalayachew at gmail.com>> wrote:
Hello Core Libs Dev Team,
I was trying out Gatherers for a project at work, and ran into a rather sad scenario.
I need to process a large file in batches. Each batch is small enough that I can hold it in memory, but I cannot hold the entire file (and thus, all of the batches) in memory at once.
Looking at the Gatherers API, I saw windowFixed and thought that it would be a great match for my use case.
However, when trying it out, I was disappointed to see that it ran out of memory very quickly. Here is my attempt at using it.
stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.forEach(eachList -> println(eachList.getFirst()))
;
As you can see, I am just splitting the file into batches, and printing out the first of each batch. This is purely for example's sake, of course. I had planned on building even more functionality on top of this, but I couldn't even get past this example.
But anyways, not even a single one of them printed out. Which leads me to believe that it's pulling all of them in the Gatherer.
I can get it to run successfully if I go sequentially, but not parallel. Parallel gives me that out of memory error.
Is there any way for me to be able to have the Gatherer NOT pull in everything while still remaining parallel and unordered?
Thank you for your time and help.
David Alayachew
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20241112/b5886a8f/attachment-0001.htm>
More information about the core-libs-dev
mailing list