[External] : Re: Question about Streams, Gatherers, and fetching too many elements

David Alayachew davidalayachew at gmail.com
Tue Nov 19 08:02:02 UTC 2024


Poking the thread in case you are able to answer my previous question
Viktor.

Also, another question -- I posted this thread to Reddit, and some good
discussion has already started. Can you or someone else answer some of the
questions that have popped up there?

https://old.reddit.com/r/java/comments/1gukzhb/a_surprising_pain_point_regarding_parallel_java/

Ty vm!

On Thu, Nov 14, 2024 at 5:45 PM David Alayachew <davidalayachew at gmail.com>
wrote:

> Oh ok. So it truly is a toss-up depending on each implementation when and
> where this occurs.
>
> Ok, then as my final request, I think even informing the user that this
> CAN occur is worth doing. If nothing else, the user scouring the
> documentation for documentation of this behaviour will know that it is
> simply something that can occur. They don't need to know all the details.
> Simply give it a official term, describe the behaviour, tell that it is
> implementation specific on when this happens, but that it is only possible
> during parallelism. Even just knowing the verbiage to describe the problem
> will enable them to better communicate with each other on what they want vs
> what they get. That helps searchability, if nothing else.
>
> On Thu, Nov 14, 2024, 8:45 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
>> I see what you're saying, the problem is that it depends on the Stream
>> implementation (given that Stream is an interface).
>>
>> Cheers,
>>>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* David Alayachew <davidalayachew at gmail.com>
>> *Sent:* Thursday, 14 November 2024 12:36
>> *To:* Viktor Klang <viktor.klang at oracle.com>
>> *Cc:* Rob Spoor <openjdk at icemanx.nl>; core-libs-dev <
>> core-libs-dev at openjdk.org>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>>
>> Then let me correct myself again, while simplifying -- I just want that
>> detail, that certain combinations might lead to pre-fetching everything, to
>> be documented on the stream api. Package level, or on the Stream interface
>> itself, seems like a good spot.
>>
>> On Thu, Nov 14, 2024, 4:22 AM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>>
>> The issue here is that the operation cannot advertise this, as it depends
>> on the combination of operations.
>>
>>
>> Cheers,
>>>>
>>
>> *Viktor Klang*
>> Software Architect, Java Platform Group
>> Oracle
>> ------------------------------
>> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of
>> David Alayachew <davidalayachew at gmail.com>
>> *Sent:* Wednesday, 13 November 2024 14:07
>> *To:* Rob Spoor <openjdk at icemanx.nl>
>> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> fetching too many elements
>>
>> That is a good point Rob.
>>
>> Then let me correct myself -- I think the terminal operations don't do a
>> great job of advertising whether or not they pre-fetch everything when
>> parallelism is activated. Collector fetches as needed. FindAny pre-fetches
>> everything. I understand that later releases might change their behaviour,
>> but I still want to document the current behaviour in the official javadocs
>> so that we can limit undocumented tripping hazards.
>>
>>
>> On Wed, Nov 13, 2024, 7:07 AM Rob Spoor <openjdk at icemanx.nl> wrote:
>>
>> distinct() doesn't require everything to be pulled. It can push elements
>> to the downstream as they come along for the first time. When
>> downstream.push returns false the gatherer is done.
>>
>> As part of some experimentation I've implemented all intermediary
>> operations using gatherers. Most of them are pretty straightforward and
>> will stop integrating once the downstream starts rejecting elements
>> (although some use Gatherer.ofSequential to keep it easy). I only found
>> two exceptions that don't return the result of downstream.push:
>>
>> * mapMulti. The downstream.push is passed as the mapper which is a
>> Consumer - the return value is ignored. With some more effort it's
>> probably possible to capture any false return value and return that from
>> the integrator, but I haven't tried that yet.
>>
>> * sorted. Obviously every element needs to be inspected.
>>
>>
>> On 13/11/2024 00:37, David Alayachew wrote:
>> > Oh sure, I expect something like distinct() to pull everything. In
>> order to
>> > know if something is distinct, you have to do some variant of "check
>> > against everyone else". Whether that is holding all instances in memory
>> or
>> > their hashes, it's clear from a glance that you will need to look at
>> > everything, and therefore, pre-fetching makes intuitive sense to me.
>> >
>> > I 100% did not expect terminal operations like findAny() or reduce() to
>> > pull the whole data set. That was a complete whiplash for me. The method
>> > findAny() advertises itself as a short-circuiting operation, so to find
>> out
>> > that it actually pulls the whole data set anyways was shocking.
>> >
>> > And that was my biggest pain point -- looking at the documentation, it
>> is
>> > not clear to me at all that methods like findAny() would pull in all
>> data
>> > upon becoming parallel().
>> >
>> > Do you think it would make sense to add documentation about this to the
>> > javadocs for Stream/java.util.stream? Or maybe it is already there and I
>> > misunderstood it (even after reading through it thoroughly over 5
>> times).
>> >
>> >
>> > On Tue, Nov 12, 2024, 10:06 AM Viktor Klang <viktor.klang at oracle.com>
>> wrote:
>> >
>> >>> We are told how Streams can process unbounded data sets, but when it
>> >> tries to do a findAny() with parallel(), it runs into an OOME because
>> it
>> >> fetched all the data ahead of time. In fact, almost of the terminal
>> >> operations will hit an OOME in the exact same way if they are parallel
>> and
>> >> have a big enough data set. It's definitely not the end of the world,
>> but
>> >> it seems that I have to fit everything into a Collector and/or a
>> Gatherer
>> >> if I want to avoid pre-fetching everything.
>> >>
>> >> Yeah, I think it is important to distinguish "can process unbounded
>> data
>> >> sets" from "always able to process unbounded data sets".
>> >>
>> >> Some operations inherently need the end of the stream, so even
>> something
>> >> somple like: stream.distinct() or stream.sorted() can end up pulling
>> in all
>> >> data (which of course won't terminate).
>> >>
>> >> Fortunately, I think Gatherers can unlock much more situations where
>> >> unbounded streams can be processed.
>> >>
>> >> Cheers,
>> >> √
>> >>
>> >>
>> >> *Viktor Klang*
>> >> Software Architect, Java Platform Group
>> >> Oracle
>> >> ------------------------------
>> >> *From:* David Alayachew <davidalayachew at gmail.com>
>> >> *Sent:* Tuesday, 12 November 2024 15:08
>> >> *To:* Viktor Klang <viktor.klang at oracle.com>
>> >> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
>> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> >> fetching too many elements
>> >>
>> >>
>> >> Oh woah. I certainly did not. Or rather, I had dismissed the idea as
>> soon
>> >> as I thought of it.
>> >>
>> >>
>> >> I hand-waved away the idea because I thought that the method would turn
>> >> the stream pipeline parallel, thus, recreating the same problem I
>> currently
>> >> have of parallelism causing all of the elements to be fetched ahead of
>> >> time, causing an OOME.
>> >>
>> >>
>> >> It did NOT occur to me that the pipeline would stay sequential, and
>> just
>> >> kick these off sequentially, but have them executing in parallel. I
>> can't
>> >> see why I came to that incorrect conclusion. I have read the javadocs
>> of
>> >> this method several times. Though, to be fair, I came to the same,
>> >> incorrect conclusion about Collectors.groupingByConcurrent(), and it
>> wasn't
>> >> until someone pointed out what the documentation was actually saying
>> that I
>> >> realized it's true properties.
>> >>
>> >> Thanks. That definitely solves at least part of my problem. Obviously,
>> I
>> >> would prefer to write to S3 in parallel too, but at the very least, the
>> >> calculation part is being done in parallel. And worst case scenario, I
>> can
>> >> be really bad and just do the write to S3 in the mapConcurrent, and
>> then
>> >> just return the metadata of each write, and just bundle that up with
>> >> collect.
>> >>
>> >>
>> >> And that's ignoring the fact that I can just use the workaround too.
>> >>
>> >>
>> >> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to
>> me
>> >> from a performance perspective, but is rather unintuitive to me from a
>> >> usability perspective. We are told how Streams can process unbounded
>> data
>> >> sets, but when it tries to do a findAny() with parallel(), it runs
>> into an
>> >> OOME because it fetched all the data ahead of time. In fact, almost of
>> the
>> >> terminal operations will hit an OOME in the exact same way if they are
>> >> parallel and have a big enough data set. It's definitely not the end
>> of the
>> >> world, but it seems that I have to fit everything into a Collector
>> and/or a
>> >> Gatherer if I want to avoid pre-fetching everything.
>> >>
>> >> On Tue, Nov 12, 2024, 6:36 AM Viktor Klang <viktor.klang at oracle.com>
>> >> wrote:
>> >>
>> >> Have you considered Gatherers.mapConcurrent(…)?
>> >>
>> >>
>> >> Cheers,
>> >> √
>> >>
>> >>
>> >> *Viktor Klang*
>> >> Software Architect, Java Platform Group
>> >> Oracle
>> >> ------------------------------
>> >> *From:* David Alayachew <davidalayachew at gmail.com>
>> >> *Sent:* Tuesday, 12 November 2024 01:53
>> >> *To:* Viktor Klang <viktor.klang at oracle.com>
>> >> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
>> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
>> >> fetching too many elements
>> >>
>> >> Good to know, ty vm.
>> >>
>> >> At the very least, I have this workaround. This will meet my needs for
>> now.
>> >>
>> >> I guess my final question would be -- is this type of problem better
>> >> suited to something besides parallel streams? Maybe an ExecutorService?
>> >>
>> >> Really, all I am doing is taking a jumbo file, splitting it into
>> batches,
>> >> and then doing some work on those batches. My IO speeds are pretty
>> fast,
>> >> and the compute work is non-trivial, so there is performance being
>> left on
>> >> the table if I give up parallelism. And I am in a position where
>> completion
>> >> time is very important to us.
>> >>
>> >> I just naturally assumed parallel streams were the right choice because
>> >> the compute work is simple. A pure function that I can break out, and
>> then
>> >> call in a map. Once I do that, I just call forEach to write the batches
>> >> back out to S3. Maybe I should look into a different part of the std
>> lib
>> >> instead because I am using the wrong tool for the job? My nose says
>> >> ExecutorService, but I figure I should ask before I dive too deep in.
>> >>
>> >>
>> >> On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <viktor.klang at oracle.com>
>> >> wrote:
>> >>
>> >> You're most welcome!
>> >>
>> >> In a potential future where all intermediate operations are
>> >> Gatherer-based, and all terminal operations are Collector-based, it
>> would
>> >> just work as expected. But with that said, I'm not sure it is
>> practically
>> >> achievable because some operations might not have the same
>> >> performance-characteristics as before.
>> >>
>> >> Cheers,
>> >> √
>> >>
>> >>
>> >> *Viktor Klang*
>> >> Software Architect, Java Platform Group
>> >> Oracle
>> >> ------------------------------
>> >> *From:* David Alayachew <davidalayachew at gmail.com>
>> >> *Sent:* Monday, 11 November 2024 18:32
>> >> *To:* Viktor Klang <viktor.klang at oracle.com>
>> >> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
>> >> *Subject:* [External] : Re: Question about Streams, Gatherers, and
>> >> fetching too many elements
>> >>
>> >>
>> >> Thanks for the workaround. It's running beautifully.
>> >>
>> >> Is there a future where this island concept is extended to the rest of
>> >> streams? Tbh, I don't fully understand it.
>> >>
>> >> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <viktor.klang at oracle.com>
>> >> wrote:
>> >>
>> >> Hi David,
>> >>
>> >> This is the effect of how parallel streams are implemented, where
>> >> different stages, which are not representible as a join-less
>> Spliterator
>> >> are executed as a series of "islands" where the next isn't started
>> until
>> >> the former has completed.
>> >>
>> >> If you think about it, parallelization of a Stream works best when the
>> >> entire data set can be split amongst a set of worker threads, and that
>> sort
>> >> of implies that you want eager pre-fetch of data, so if your dataset
>> does
>> >> not fit in memory, that is likely to lead to less desirable outcomes.
>> >>
>> >> What I was able to do for Gatherers is to implement "gather(…) +
>> >> collect(…)"-fusion so any number of consecutive gather(…)-operations
>> >> immediately followed by a collect(…) is run in the same "island".
>> >>
>> >> So with that said, you could try something like the following:
>> >>
>> >> static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T>
>> *each*) {
>> >>      *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e),
>> (*l*,
>> >> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH);
>> >> }
>> >>
>> >>
>> >> stream
>> >> .parallel()
>> >> .unordered()
>> >> .gather(Gatherers.windowFixed(BATCH_SIZE))
>> >> .collect(forEach(eachList -> println(eachList.getFirst())));
>> >>
>> >>
>> >> Cheers,
>> >> √
>> >>
>> >>
>> >> *Viktor Klang*
>> >> Software Architect, Java Platform Group
>> >> Oracle
>> >> ------------------------------
>> >> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of
>> David
>> >> Alayachew <davidalayachew at gmail.com>
>> >> *Sent:* Monday, 11 November 2024 14:52
>> >> *To:* core-libs-dev <core-libs-dev at openjdk.org>
>> >> *Subject:* Re: Question about Streams, Gatherers, and fetching too many
>> >> elements
>> >>
>> >> And just to avoid the obvious question, I can hold about 30 batches in
>> >> memory before the Out of Memory error occurs. So this is not an issue
>> of my
>> >> batch size being too high.
>> >>
>> >> But just to confirm, I set the batch size to 1, and it still ran into
>> an
>> >> out of memory error. So I feel fairly confident saying that the
>> Gatherer is
>> >> trying to grab all available data before sending any of it downstream.
>> >>
>> >> On Mon, Nov 11, 2024, 8:46 AM David Alayachew <
>> davidalayachew at gmail.com>
>> >> wrote:
>> >>
>> >> Hello Core Libs Dev Team,
>> >>
>> >> I was trying out Gatherers for a project at work, and ran into a rather
>> >> sad scenario.
>> >>
>> >> I need to process a large file in batches. Each batch is small enough
>> that
>> >> I can hold it in memory, but I cannot hold the entire file (and thus,
>> all
>> >> of the batches) in memory at once.
>> >>
>> >> Looking at the Gatherers API, I saw windowFixed and thought that it
>> would
>> >> be a great match for my use case.
>> >>
>> >> However, when trying it out, I was disappointed to see that it ran out
>> of
>> >> memory very quickly. Here is my attempt at using it.
>> >>
>> >> stream
>> >> .parallel()
>> >> .unordered()
>> >> .gather(Gatherers.windowFixed(BATCH_SIZE))
>> >> .forEach(eachList -> println(eachList.getFirst()))
>> >> ;
>> >>
>> >> As you can see, I am just splitting the file into batches, and printing
>> >> out the first of each batch. This is purely for example's sake, of
>> course.
>> >> I had planned on building even more functionality on top of this, but I
>> >> couldn't even get past this example.
>> >>
>> >> But anyways, not even a single one of them printed out. Which leads me
>> to
>> >> believe that it's pulling all of them in the Gatherer.
>> >>
>> >> I can get it to run successfully if I go sequentially, but not
>> parallel.
>> >> Parallel gives me that out of memory error.
>> >>
>> >> Is there any way for me to be able to have the Gatherer NOT pull in
>> >> everything while still remaining parallel and unordered?
>> >>
>> >> Thank you for your time and help.
>> >> David Alayachew
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20241119/752fb3f9/attachment-0001.htm>


More information about the core-libs-dev mailing list