[External] : Re: Question about Streams, Gatherers, and fetching too many elements

David Alayachew davidalayachew at gmail.com
Tue Nov 12 23:39:21 UTC 2024


Apologies, I did not mean to add reduce(). Please ignore that part.

On Tue, Nov 12, 2024, 6:37 PM David Alayachew <davidalayachew at gmail.com>
wrote:

> Oh sure, I expect something like distinct() to pull everything. In order
> to know if something is distinct, you have to do some variant of "check
> against everyone else". Whether that is holding all instances in memory or
> their hashes, it's clear from a glance that you will need to look at
> everything, and therefore, pre-fetching makes intuitive sense to me.
>
> I 100% did not expect terminal operations like findAny() or reduce() to
> pull the whole data set. That was a complete whiplash for me. The method
> findAny() advertises itself as a short-circuiting operation, so to find out
> that it actually pulls the whole data set anyways was shocking.
>
> And that was my biggest pain point -- looking at the documentation, it is
> not clear to me at all that methods like findAny() would pull in all data
> upon becoming parallel().
>
> Do you think it would make sense to add documentation about this to the
> javadocs for Stream/java.util.stream? Or maybe it is already there and I
> misunderstood it (even after reading through it thoroughly over 5 times).
>
>
> On Tue, Nov 12, 2024, 10:06 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
>> >We are told how Streams can process unbounded data sets, but when it
>> tries to do a findAny() with parallel(), it runs into an OOME because it
>> fetched all the data ahead of time. In fact, almost of the terminal
>> operations will hit an OOME in the exact same way if they are parallel and
>> have a big enough data set. It's definitely not the end of the world, but
>> it seems that I have to fit everything into a Collector and/or a Gatherer
>> if I want to avoid pre-fetching everything.
>>
>> Yeah, I think it is important to distinguish "can process unbounded data
>> sets" from "always able to process unbounded data sets".
>>
>> Some operations inherently need the end of the stream, so even something
>> somple like: stream.distinct() or stream.sorted() can end up pulling in all
>> data (which of course won't terminate).
>>
>> Fortunately, I think Gatherers can unlock much more situations where
>> unbounded streams can be processed.
>>
>> Cheers,
>>>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* David Alayachew <davidalayachew at gmail.com>
>> *Sent:* Tuesday, 12 November 2024 15:08
>> *To:* Viktor Klang <viktor.klang at oracle.com>
>> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>>
>> Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon
>> as I thought of it.
>>
>>
>> I hand-waved away the idea because I thought that the method would turn
>> the stream pipeline parallel, thus, recreating the same problem I currently
>> have of parallelism causing all of the elements to be fetched ahead of
>> time, causing an OOME.
>>
>>
>> It did NOT occur to me that the pipeline would stay sequential, and just
>> kick these off sequentially, but have them executing in parallel. I can't
>> see why I came to that incorrect conclusion. I have read the javadocs of
>> this method several times. Though, to be fair, I came to the same,
>> incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't
>> until someone pointed out what the documentation was actually saying that I
>> realized it's true properties.
>>
>> Thanks. That definitely solves at least part of my problem. Obviously, I
>> would prefer to write to S3 in parallel too, but at the very least, the
>> calculation part is being done in parallel. And worst case scenario, I can
>> be really bad and just do the write to S3 in the mapConcurrent, and then
>> just return the metadata of each write, and just bundle that up with
>> collect.
>>
>>
>> And that's ignoring the fact that I can just use the workaround too.
>>
>>
>> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me
>> from a performance perspective, but is rather unintuitive to me from a
>> usability perspective. We are told how Streams can process unbounded data
>> sets, but when it tries to do a findAny() with parallel(), it runs into an
>> OOME because it fetched all the data ahead of time. In fact, almost of the
>> terminal operations will hit an OOME in the exact same way if they are
>> parallel and have a big enough data set. It's definitely not the end of the
>> world, but it seems that I have to fit everything into a Collector and/or a
>> Gatherer if I want to avoid pre-fetching everything.
>>
>> On Tue, Nov 12, 2024, 6:36 AM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>>
>> Have you considered Gatherers.mapConcurrent(…)?
>>
>>
>> Cheers,
>>>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* David Alayachew <davidalayachew at gmail.com>
>> *Sent:* Tuesday, 12 November 2024 01:53
>> *To:* Viktor Klang <viktor.klang at oracle.com>
>> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>> Good to know, ty vm.
>>
>> At the very least, I have this workaround. This will meet my needs for
>> now.
>>
>> I guess my final question would be -- is this type of problem better
>> suited to something besides parallel streams? Maybe an ExecutorService?
>>
>> Really, all I am doing is taking a jumbo file, splitting it into batches,
>> and then doing some work on those batches. My IO speeds are pretty fast,
>> and the compute work is non-trivial, so there is performance being left on
>> the table if I give up parallelism. And I am in a position where completion
>> time is very important to us.
>>
>> I just naturally assumed parallel streams were the right choice because
>> the compute work is simple. A pure function that I can break out, and then
>> call in a map. Once I do that, I just call forEach to write the batches
>> back out to S3. Maybe I should look into a different part of the std lib
>> instead because I am using the wrong tool for the job? My nose says
>> ExecutorService, but I figure I should ask before I dive too deep in.
>>
>>
>> On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>>
>> You're most welcome!
>>
>> In a potential future where all intermediate operations are
>> Gatherer-based, and all terminal operations are Collector-based, it would
>> just work as expected. But with that said, I'm not sure it is practically
>> achievable because some operations might not have the same
>> performance-characteristics as before.
>>
>> Cheers,
>>>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* David Alayachew <davidalayachew at gmail.com>
>> *Sent:* Monday, 11 November 2024 18:32
>> *To:* Viktor Klang <viktor.klang at oracle.com>
>> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
>> *Subject:* [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>>
>> Thanks for the workaround. It's running beautifully.
>>
>> Is there a future where this island concept is extended to the rest of
>> streams? Tbh, I don't fully understand it.
>>
>> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>>
>> Hi David,
>>
>> This is the effect of how parallel streams are implemented, where
>> different stages, which are not representible as a join-less Spliterator
>> are executed as a series of "islands" where the next isn't started until
>> the former has completed.
>>
>> If you think about it, parallelization of a Stream works best when the
>> entire data set can be split amongst a set of worker threads, and that sort
>> of implies that you want eager pre-fetch of data, so if your dataset does
>> not fit in memory, that is likely to lead to less desirable outcomes.
>>
>> What I was able to do for Gatherers is to implement "gather(…) +
>> collect(…)"-fusion so any number of consecutive gather(…)-operations
>> immediately followed by a collect(…) is run in the same "island".
>>
>> So with that said, you could try something like the following:
>>
>> static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T> *each*)
>> {
>>     *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e), (*l*,
>> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH);
>> }
>>
>>
>> stream
>> .parallel()
>> .unordered()
>> .gather(Gatherers.windowFixed(BATCH_SIZE))
>> .collect(forEach(eachList -> println(eachList.getFirst())));
>>
>>
>> Cheers,
>>>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of
>> David Alayachew <davidalayachew at gmail.com>
>> *Sent:* Monday, 11 November 2024 14:52
>> *To:* core-libs-dev <core-libs-dev at openjdk.org>
>> *Subject:* Re: Question about Streams, Gatherers, and fetching too many
>> elements
>>
>> And just to avoid the obvious question, I can hold about 30 batches in
>> memory before the Out of Memory error occurs. So this is not an issue of my
>> batch size being too high.
>>
>> But just to confirm, I set the batch size to 1, and it still ran into an
>> out of memory error. So I feel fairly confident saying that the Gatherer is
>> trying to grab all available data before sending any of it downstream.
>>
>> On Mon, Nov 11, 2024, 8:46 AM David Alayachew <davidalayachew at gmail.com>
>> wrote:
>>
>> Hello Core Libs Dev Team,
>>
>> I was trying out Gatherers for a project at work, and ran into a rather
>> sad scenario.
>>
>> I need to process a large file in batches. Each batch is small enough
>> that I can hold it in memory, but I cannot hold the entire file (and thus,
>> all of the batches) in memory at once.
>>
>> Looking at the Gatherers API, I saw windowFixed and thought that it would
>> be a great match for my use case.
>>
>> However, when trying it out, I was disappointed to see that it ran out of
>> memory very quickly. Here is my attempt at using it.
>>
>> stream
>> .parallel()
>> .unordered()
>> .gather(Gatherers.windowFixed(BATCH_SIZE))
>> .forEach(eachList -> println(eachList.getFirst()))
>> ;
>>
>> As you can see, I am just splitting the file into batches, and printing
>> out the first of each batch. This is purely for example's sake, of course.
>> I had planned on building even more functionality on top of this, but I
>> couldn't even get past this example.
>>
>> But anyways, not even a single one of them printed out. Which leads me to
>> believe that it's pulling all of them in the Gatherer.
>>
>> I can get it to run successfully if I go sequentially, but not parallel.
>> Parallel gives me that out of memory error.
>>
>> Is there any way for me to be able to have the Gatherer NOT pull in
>> everything while still remaining parallel and unordered?
>>
>> Thank you for your time and help.
>> David Alayachew
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20241112/7674eca1/attachment-0001.htm>


More information about the core-libs-dev mailing list