Question about Streams, Gatherers, and fetching too many elements

David Alayachew davidalayachew at gmail.com
Mon Nov 11 17:32:04 UTC 2024


Thanks for the workaround. It's running beautifully.

Is there a future where this island concept is extended to the rest of
streams? Tbh, I don't fully understand it.

On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <viktor.klang at oracle.com> wrote:

> Hi David,
>
> This is the effect of how parallel streams are implemented, where
> different stages, which are not representible as a join-less Spliterator
> are executed as a series of "islands" where the next isn't started until
> the former has completed.
>
> If you think about it, parallelization of a Stream works best when the
> entire data set can be split amongst a set of worker threads, and that sort
> of implies that you want eager pre-fetch of data, so if your dataset does
> not fit in memory, that is likely to lead to less desirable outcomes.
>
> What I was able to do for Gatherers is to implement "gather(…) +
> collect(…)"-fusion so any number of consecutive gather(…)-operations
> immediately followed by a collect(…) is run in the same "island".
>
> So with that said, you could try something like the following:
>
> static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T> *each*) {
>     *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e), (*l*,
> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH);
> }
>
>
> stream
> .parallel()
> .unordered()
> .gather(Gatherers.windowFixed(BATCH_SIZE))
> .collect(forEach(eachList -> println(eachList.getFirst())));
>
>
> Cheers,
>>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of David
> Alayachew <davidalayachew at gmail.com>
> *Sent:* Monday, 11 November 2024 14:52
> *To:* core-libs-dev <core-libs-dev at openjdk.org>
> *Subject:* Re: Question about Streams, Gatherers, and fetching too many
> elements
>
> And just to avoid the obvious question, I can hold about 30 batches in
> memory before the Out of Memory error occurs. So this is not an issue of my
> batch size being too high.
>
> But just to confirm, I set the batch size to 1, and it still ran into an
> out of memory error. So I feel fairly confident saying that the Gatherer is
> trying to grab all available data before sending any of it downstream.
>
> On Mon, Nov 11, 2024, 8:46 AM David Alayachew <davidalayachew at gmail.com>
> wrote:
>
> Hello Core Libs Dev Team,
>
> I was trying out Gatherers for a project at work, and ran into a rather
> sad scenario.
>
> I need to process a large file in batches. Each batch is small enough that
> I can hold it in memory, but I cannot hold the entire file (and thus, all
> of the batches) in memory at once.
>
> Looking at the Gatherers API, I saw windowFixed and thought that it would
> be a great match for my use case.
>
> However, when trying it out, I was disappointed to see that it ran out of
> memory very quickly. Here is my attempt at using it.
>
> stream
> .parallel()
> .unordered()
> .gather(Gatherers.windowFixed(BATCH_SIZE))
> .forEach(eachList -> println(eachList.getFirst()))
> ;
>
> As you can see, I am just splitting the file into batches, and printing
> out the first of each batch. This is purely for example's sake, of course.
> I had planned on building even more functionality on top of this, but I
> couldn't even get past this example.
>
> But anyways, not even a single one of them printed out. Which leads me to
> believe that it's pulling all of them in the Gatherer.
>
> I can get it to run successfully if I go sequentially, but not parallel.
> Parallel gives me that out of memory error.
>
> Is there any way for me to be able to have the Gatherer NOT pull in
> everything while still remaining parallel and unordered?
>
> Thank you for your time and help.
> David Alayachew
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20241111/9e870315/attachment.htm>


More information about the core-libs-dev mailing list