[External] : Re: Question about Streams, Gatherers, and fetching too many elements

David Alayachew davidalayachew at gmail.com
Fri Nov 22 01:08:46 UTC 2024


👍

David Alayachew reacted via Gmail
<https://www.google.com/gmail/about/?utm_source=gmail-in-product&utm_medium=et&utm_campaign=emojireactionemail#app>

On Thu, Nov 21, 2024 at 9:12 AM Viktor Klang <viktor.klang at oracle.com>
wrote:

> Hi David,
>
> I might respond to the Reddit thread later, I need to wrap some other
> tasks up first.
>
> Cheers,
>>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* David Alayachew <davidalayachew at gmail.com>
> *Sent:* Thursday, 21 November 2024 15:01
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* Rob Spoor <openjdk at icemanx.nl>; core-libs-dev <
> core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Thanks for the context Viktor. I think I got to see that breadth vs depth
> threshold myself when work with danielaveryj on reddit. They explained
> largely in the way that you did, and we experimented with batch sizes until
> we found the cutoff point where things broke off.
>
> But thanks again for the context. If you have more thoughts or you ever do
> make a decision on this, I would love to know!
>
> On Tue, Nov 19, 2024, 7:42 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> Another thing which might add some detail:
>
> This isn't really about short-circuiting under parallelization but rather
> a trade-off between depth-first vs breadth-first processing.
>
> For sequential evaluation you typically want to perform depth-first
> processing since that means that you occupy ~constant space.
>
> For parallel evaluation you *need to* perform [some level of]
> breadth-first processing (since otherwise there is no parallelism). So you
> need to decide what the evaluation strategy will be and where the
> join/merge-points are. For some operations which are inherently
> encounter-ordered you can "place a bet" (think: findFirst() where if you
> have multiple candidates each found in different chunks, you only keep the
> one which has the "earliest" encounter order), but for others, like
> *foldLeft* each subsequent value depends on the previous.
>
> In the *foldLeft* case what Gatherers provide is ofSequential, which
> essentially instructs the processing that there's no use trying to
> parallelize this at all, so under parallel evaluation this stage needs to
> run sequentially—and that means that it must run to completion before
> downstream operations can continue, and by implication this also means that
> the output needs intermittent storage, so you are now in a breadth-first
> scenario.
>
> In my experience, parallel streams are better suited for CPU-bound
> workloads where operations are trivially parallelizable (i.e. no
> encounter-order dependencies). Importantly, it's common to need to have a
> rather large set of elements in the stream to have the overhead of
> parallelization win over sequential evaluation. The only way to know what
> makes sense is to benchmark using realistic workloads.
>
> Personally, I wouldn't be surprised if sequential streams plus the
> occasional Gatherers.mapConcurrent() covers > 90% of all Stream use-cases.
>
> As a side-note, it is important to remember that Java Streams are
> push-style streams. (Push-style vs Pull-style vs Push-Pull-style is a
> longer conversation, but all of these strategies come with trade-offs)
>
> Cheers,
>>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of
> Viktor Klang <viktor.klang at oracle.com>
> *Sent:* Tuesday, 19 November 2024 13:19
> *To:* David Alayachew <davidalayachew at gmail.com>
> *Cc:* Rob Spoor <openjdk at icemanx.nl>; core-libs-dev <
> core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> Hi David,
>
> I've been thinking about this topic for a few days and haven't arrived at
> a satisfactory solution. Keep in mind that this "situation" has been in
> place ever since Streams were released, and perhaps Gatherers may be a
> catalyst to have it be more noticeable—I'll keep thinking about what could
> be done to make it more predictable (besides thinking about what
> rearrangements might make the situation go away).
>
> In general though—short-circuiting in combination with parallelization
> requires a lot of tuning to make sure that the cost of processing more data
> doesn't overtake the benefit of "exiting early".
>
>
> Cheers,
>>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* David Alayachew <davidalayachew at gmail.com>
> *Sent:* Thursday, 14 November 2024 23:45
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* Rob Spoor <openjdk at icemanx.nl>; core-libs-dev <
> core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> Oh ok. So it truly is a toss-up depending on each implementation when and
> where this occurs.
>
> Ok, then as my final request, I think even informing the user that this
> CAN occur is worth doing. If nothing else, the user scouring the
> documentation for documentation of this behaviour will know that it is
> simply something that can occur. They don't need to know all the details.
> Simply give it a official term, describe the behaviour, tell that it is
> implementation specific on when this happens, but that it is only possible
> during parallelism. Even just knowing the verbiage to describe the problem
> will enable them to better communicate with each other on what they want vs
> what they get. That helps searchability, if nothing else.
>
> On Thu, Nov 14, 2024, 8:45 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> I see what you're saying, the problem is that it depends on the Stream
> implementation (given that Stream is an interface).
>
> Cheers,
>>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* David Alayachew <davidalayachew at gmail.com>
> *Sent:* Thursday, 14 November 2024 12:36
> *To:* Viktor Klang <viktor.klang at oracle.com>
> *Cc:* Rob Spoor <openjdk at icemanx.nl>; core-libs-dev <
> core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
>
> Then let me correct myself again, while simplifying -- I just want that
> detail, that certain combinations might lead to pre-fetching everything, to
> be documented on the stream api. Package level, or on the Stream interface
> itself, seems like a good spot.
>
> On Thu, Nov 14, 2024, 4:22 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
>
> The issue here is that the operation cannot advertise this, as it depends
> on the combination of operations.
>
>
> Cheers,
>>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of David
> Alayachew <davidalayachew at gmail.com>
> *Sent:* Wednesday, 13 November 2024 14:07
> *To:* Rob Spoor <openjdk at icemanx.nl>
> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> fetching too many elements
>
> That is a good point Rob.
>
> Then let me correct myself -- I think the terminal operations don't do a
> great job of advertising whether or not they pre-fetch everything when
> parallelism is activated. Collector fetches as needed. FindAny pre-fetches
> everything. I understand that later releases might change their behaviour,
> but I still want to document the current behaviour in the official javadocs
> so that we can limit undocumented tripping hazards.
>
>
> On Wed, Nov 13, 2024, 7:07 AM Rob Spoor <openjdk at icemanx.nl> wrote:
>
> distinct() doesn't require everything to be pulled. It can push elements
> to the downstream as they come along for the first time. When
> downstream.push returns false the gatherer is done.
>
> As part of some experimentation I've implemented all intermediary
> operations using gatherers. Most of them are pretty straightforward and
> will stop integrating once the downstream starts rejecting elements
> (although some use Gatherer.ofSequential to keep it easy). I only found
> two exceptions that don't return the result of downstream.push:
>
> * mapMulti. The downstream.push is passed as the mapper which is a
> Consumer - the return value is ignored. With some more effort it's
> probably possible to capture any false return value and return that from
> the integrator, but I haven't tried that yet.
>
> * sorted. Obviously every element needs to be inspected.
>
>
> On 13/11/2024 00:37, David Alayachew wrote:
> > Oh sure, I expect something like distinct() to pull everything. In order
> to
> > know if something is distinct, you have to do some variant of "check
> > against everyone else". Whether that is holding all instances in memory
> or
> > their hashes, it's clear from a glance that you will need to look at
> > everything, and therefore, pre-fetching makes intuitive sense to me.
> >
> > I 100% did not expect terminal operations like findAny() or reduce() to
> > pull the whole data set. That was a complete whiplash for me. The method
> > findAny() advertises itself as a short-circuiting operation, so to find
> out
> > that it actually pulls the whole data set anyways was shocking.
> >
> > And that was my biggest pain point -- looking at the documentation, it is
> > not clear to me at all that methods like findAny() would pull in all data
> > upon becoming parallel().
> >
> > Do you think it would make sense to add documentation about this to the
> > javadocs for Stream/java.util.stream? Or maybe it is already there and I
> > misunderstood it (even after reading through it thoroughly over 5 times).
> >
> >
> > On Tue, Nov 12, 2024, 10:06 AM Viktor Klang <viktor.klang at oracle.com>
> wrote:
> >
> >>> We are told how Streams can process unbounded data sets, but when it
> >> tries to do a findAny() with parallel(), it runs into an OOME because it
> >> fetched all the data ahead of time. In fact, almost of the terminal
> >> operations will hit an OOME in the exact same way if they are parallel
> and
> >> have a big enough data set. It's definitely not the end of the world,
> but
> >> it seems that I have to fit everything into a Collector and/or a
> Gatherer
> >> if I want to avoid pre-fetching everything.
> >>
> >> Yeah, I think it is important to distinguish "can process unbounded data
> >> sets" from "always able to process unbounded data sets".
> >>
> >> Some operations inherently need the end of the stream, so even something
> >> somple like: stream.distinct() or stream.sorted() can end up pulling in
> all
> >> data (which of course won't terminate).
> >>
> >> Fortunately, I think Gatherers can unlock much more situations where
> >> unbounded streams can be processed.
> >>
> >> Cheers,
> >> √
> >>
> >>
> >> *Viktor Klang*
> >> Software Architect, Java Platform Group
> >> Oracle
> >> ------------------------------
> >> *From:* David Alayachew <davidalayachew at gmail.com>
> >> *Sent:* Tuesday, 12 November 2024 15:08
> >> *To:* Viktor Klang <viktor.klang at oracle.com>
> >> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> >> fetching too many elements
> >>
> >>
> >> Oh woah. I certainly did not. Or rather, I had dismissed the idea as
> soon
> >> as I thought of it.
> >>
> >>
> >> I hand-waved away the idea because I thought that the method would turn
> >> the stream pipeline parallel, thus, recreating the same problem I
> currently
> >> have of parallelism causing all of the elements to be fetched ahead of
> >> time, causing an OOME.
> >>
> >>
> >> It did NOT occur to me that the pipeline would stay sequential, and just
> >> kick these off sequentially, but have them executing in parallel. I
> can't
> >> see why I came to that incorrect conclusion. I have read the javadocs of
> >> this method several times. Though, to be fair, I came to the same,
> >> incorrect conclusion about Collectors.groupingByConcurrent(), and it
> wasn't
> >> until someone pointed out what the documentation was actually saying
> that I
> >> realized it's true properties.
> >>
> >> Thanks. That definitely solves at least part of my problem. Obviously, I
> >> would prefer to write to S3 in parallel too, but at the very least, the
> >> calculation part is being done in parallel. And worst case scenario, I
> can
> >> be really bad and just do the write to S3 in the mapConcurrent, and then
> >> just return the metadata of each write, and just bundle that up with
> >> collect.
> >>
> >>
> >> And that's ignoring the fact that I can just use the workaround too.
> >>
> >>
> >> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me
> >> from a performance perspective, but is rather unintuitive to me from a
> >> usability perspective. We are told how Streams can process unbounded
> data
> >> sets, but when it tries to do a findAny() with parallel(), it runs into
> an
> >> OOME because it fetched all the data ahead of time. In fact, almost of
> the
> >> terminal operations will hit an OOME in the exact same way if they are
> >> parallel and have a big enough data set. It's definitely not the end of
> the
> >> world, but it seems that I have to fit everything into a Collector
> and/or a
> >> Gatherer if I want to avoid pre-fetching everything.
> >>
> >> On Tue, Nov 12, 2024, 6:36 AM Viktor Klang <viktor.klang at oracle.com>
> >> wrote:
> >>
> >> Have you considered Gatherers.mapConcurrent(…)?
> >>
> >>
> >> Cheers,
> >> √
> >>
> >>
> >> *Viktor Klang*
> >> Software Architect, Java Platform Group
> >> Oracle
> >> ------------------------------
> >> *From:* David Alayachew <davidalayachew at gmail.com>
> >> *Sent:* Tuesday, 12 November 2024 01:53
> >> *To:* Viktor Klang <viktor.klang at oracle.com>
> >> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and
> >> fetching too many elements
> >>
> >> Good to know, ty vm.
> >>
> >> At the very least, I have this workaround. This will meet my needs for
> now.
> >>
> >> I guess my final question would be -- is this type of problem better
> >> suited to something besides parallel streams? Maybe an ExecutorService?
> >>
> >> Really, all I am doing is taking a jumbo file, splitting it into
> batches,
> >> and then doing some work on those batches. My IO speeds are pretty fast,
> >> and the compute work is non-trivial, so there is performance being left
> on
> >> the table if I give up parallelism. And I am in a position where
> completion
> >> time is very important to us.
> >>
> >> I just naturally assumed parallel streams were the right choice because
> >> the compute work is simple. A pure function that I can break out, and
> then
> >> call in a map. Once I do that, I just call forEach to write the batches
> >> back out to S3. Maybe I should look into a different part of the std lib
> >> instead because I am using the wrong tool for the job? My nose says
> >> ExecutorService, but I figure I should ask before I dive too deep in.
> >>
> >>
> >> On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <viktor.klang at oracle.com>
> >> wrote:
> >>
> >> You're most welcome!
> >>
> >> In a potential future where all intermediate operations are
> >> Gatherer-based, and all terminal operations are Collector-based, it
> would
> >> just work as expected. But with that said, I'm not sure it is
> practically
> >> achievable because some operations might not have the same
> >> performance-characteristics as before.
> >>
> >> Cheers,
> >> √
> >>
> >>
> >> *Viktor Klang*
> >> Software Architect, Java Platform Group
> >> Oracle
> >> ------------------------------
> >> *From:* David Alayachew <davidalayachew at gmail.com>
> >> *Sent:* Monday, 11 November 2024 18:32
> >> *To:* Viktor Klang <viktor.klang at oracle.com>
> >> *Cc:* core-libs-dev <core-libs-dev at openjdk.org>
> >> *Subject:* [External] : Re: Question about Streams, Gatherers, and
> >> fetching too many elements
> >>
> >>
> >> Thanks for the workaround. It's running beautifully.
> >>
> >> Is there a future where this island concept is extended to the rest of
> >> streams? Tbh, I don't fully understand it.
> >>
> >> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <viktor.klang at oracle.com>
> >> wrote:
> >>
> >> Hi David,
> >>
> >> This is the effect of how parallel streams are implemented, where
> >> different stages, which are not representible as a join-less Spliterator
> >> are executed as a series of "islands" where the next isn't started until
> >> the former has completed.
> >>
> >> If you think about it, parallelization of a Stream works best when the
> >> entire data set can be split amongst a set of worker threads, and that
> sort
> >> of implies that you want eager pre-fetch of data, so if your dataset
> does
> >> not fit in memory, that is likely to lead to less desirable outcomes.
> >>
> >> What I was able to do for Gatherers is to implement "gather(…) +
> >> collect(…)"-fusion so any number of consecutive gather(…)-operations
> >> immediately followed by a collect(…) is run in the same "island".
> >>
> >> So with that said, you could try something like the following:
> >>
> >> static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T>
> *each*) {
> >>      *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e),
> (*l*,
> >> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH);
> >> }
> >>
> >>
> >> stream
> >> .parallel()
> >> .unordered()
> >> .gather(Gatherers.windowFixed(BATCH_SIZE))
> >> .collect(forEach(eachList -> println(eachList.getFirst())));
> >>
> >>
> >> Cheers,
> >> √
> >>
> >>
> >> *Viktor Klang*
> >> Software Architect, Java Platform Group
> >> Oracle
> >> ------------------------------
> >> *From:* core-libs-dev <core-libs-dev-retn at openjdk.org> on behalf of
> David
> >> Alayachew <davidalayachew at gmail.com>
> >> *Sent:* Monday, 11 November 2024 14:52
> >> *To:* core-libs-dev <core-libs-dev at openjdk.org>
> >> *Subject:* Re: Question about Streams, Gatherers, and fetching too many
> >> elements
> >>
> >> And just to avoid the obvious question, I can hold about 30 batches in
> >> memory before the Out of Memory error occurs. So this is not an issue
> of my
> >> batch size being too high.
> >>
> >> But just to confirm, I set the batch size to 1, and it still ran into an
> >> out of memory error. So I feel fairly confident saying that the
> Gatherer is
> >> trying to grab all available data before sending any of it downstream.
> >>
> >> On Mon, Nov 11, 2024, 8:46 AM David Alayachew <davidalayachew at gmail.com
> >
> >> wrote:
> >>
> >> Hello Core Libs Dev Team,
> >>
> >> I was trying out Gatherers for a project at work, and ran into a rather
> >> sad scenario.
> >>
> >> I need to process a large file in batches. Each batch is small enough
> that
> >> I can hold it in memory, but I cannot hold the entire file (and thus,
> all
> >> of the batches) in memory at once.
> >>
> >> Looking at the Gatherers API, I saw windowFixed and thought that it
> would
> >> be a great match for my use case.
> >>
> >> However, when trying it out, I was disappointed to see that it ran out
> of
> >> memory very quickly. Here is my attempt at using it.
> >>
> >> stream
> >> .parallel()
> >> .unordered()
> >> .gather(Gatherers.windowFixed(BATCH_SIZE))
> >> .forEach(eachList -> println(eachList.getFirst()))
> >> ;
> >>
> >> As you can see, I am just splitting the file into batches, and printing
> >> out the first of each batch. This is purely for example's sake, of
> course.
> >> I had planned on building even more functionality on top of this, but I
> >> couldn't even get past this example.
> >>
> >> But anyways, not even a single one of them printed out. Which leads me
> to
> >> believe that it's pulling all of them in the Gatherer.
> >>
> >> I can get it to run successfully if I go sequentially, but not parallel.
> >> Parallel gives me that out of memory error.
> >>
> >> Is there any way for me to be able to have the Gatherer NOT pull in
> >> everything while still remaining parallel and unordered?
> >>
> >> Thank you for your time and help.
> >> David Alayachew
>
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: text/vnd.google.email-reaction+json
Size: 37 bytes
Desc: not available
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20241121/ded7e5bc/attachment-0001.bin>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20241121/ded7e5bc/attachment-0001.htm>


More information about the core-libs-dev mailing list