<div dir="auto"><p dir="ltr">Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon as I thought of it.</p><p dir="ltr"><br></p><p dir="ltr">I hand-waved away the idea because I thought that the method would turn the stream pipeline parallel, thus, recreating the same problem I currently have of parallelism causing all of the elements to be fetched ahead of time, causing an OOME.</p><p dir="ltr"><br></p><p dir="ltr">It did NOT occur to me that the pipeline would stay sequential, and just kick these off sequentially, but have them executing in parallel. I can't see why I came to that incorrect conclusion. I have read the javadocs of this method several times. Though, to be fair, I came to the same, incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't until someone pointed out what the documentation was actually saying that I realized it's true properties.</p>
<p dir="ltr">Thanks. That definitely solves at least part of my problem. Obviously, I would prefer to write to S3 in parallel too, but at the very least, the calculation part is being done in parallel. And worst case scenario, I can be really bad and just do the write to S3 in the mapConcurrent, and then just return the metadata of each write, and just bundle that up with collect.</p><p dir="ltr"><br></p><p dir="ltr">And that's ignoring the fact that I can just use the workaround too.</p><p dir="ltr"><br></p><p dir="ltr">Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me from a performance perspective, but is rather unintuitive to me from a usability perspective. We are told how Streams can process unbounded data sets, but when it tries to do a findAny() with parallel(), it runs into an OOME because it fetched all the data ahead of time. In fact, almost of the terminal operations will hit an OOME in the exact same way if they are parallel and have a big enough data set. It's definitely not the end of the world, but it seems that I have to fit everything into a Collector and/or a Gatherer if I want to avoid pre-fetching everything.</p></div>
<br><div class="gmail_quote"><div dir="ltr" class="gmail_attr">On Tue, Nov 12, 2024, 6:36 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank" rel="noreferrer">viktor.klang@oracle.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<div dir="ltr">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Have you considered Gatherers.mapConcurrent(…)?</div>
<div id="m_4859356468357508064m_-6943530765427357859Signature" style="color:inherit">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Cheers,<br>
√</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b><br>
</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b>Viktor Klang</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Software Architect, Java Platform Group<br>
Oracle</div>
</div>
<div id="m_4859356468357508064m_-6943530765427357859appendonsend"></div>
<hr style="display:inline-block;width:98%">
<div id="m_4859356468357508064m_-6943530765427357859divRplyFwdMsg" dir="ltr"><font face="Calibri, sans-serif" style="font-size:11pt" color="#000000"><b>From:</b> David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
<b>Sent:</b> Tuesday, 12 November 2024 01:53<br>
<b>To:</b> Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
<b>Cc:</b> core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
<b>Subject:</b> Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements</font>
<div> </div>
</div>
<div>
<div dir="auto">Good to know, ty vm.
<div dir="auto"><br>
</div>
<div dir="auto">At the very least, I have this workaround. This will meet my needs for now.</div>
<div dir="auto"><br>
</div>
<div dir="auto">I guess my final question would be -- is this type of problem better suited to something besides parallel streams? Maybe an ExecutorService?</div>
<div dir="auto"><br>
</div>
<div dir="auto">Really, all I am doing is taking a jumbo file, splitting it into batches, and then doing some work on those batches. My IO speeds are pretty fast, and the compute work is non-trivial, so there is performance being left on the table if I give
up parallelism. And I am in a position where completion time is very important to us.</div>
<div dir="auto"><br>
</div>
<div dir="auto">I just naturally assumed parallel streams were the right choice because the compute work is simple. A pure function that I can break out, and then call in a map. Once I do that, I just call forEach to write the batches back out to S3. Maybe
I should look into a different part of the std lib instead because I am using the wrong tool for the job? My nose says ExecutorService, but I figure I should ask before I dive too deep in.</div>
<br>
<br>
<div dir="auto">
<div dir="ltr">On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>> wrote:<br>
</div>
<blockquote style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<div dir="ltr">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
You're most welcome!<br>
<br>
In a potential future where all intermediate operations are Gatherer-based, and all terminal operations are Collector-based, it would just work as expected. But with that said, I'm not sure it is practically achievable because some operations might not have
the same performance-characteristics as before.</div>
<div id="m_4859356468357508064m_-6943530765427357859x_m_-4243356549772704639Signature" style="color:inherit">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Cheers,<br>
√</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b><br>
</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b>Viktor Klang</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Software Architect, Java Platform Group<br>
Oracle</div>
</div>
<div id="m_4859356468357508064m_-6943530765427357859x_m_-4243356549772704639appendonsend"></div>
<hr style="display:inline-block;width:98%">
<div id="m_4859356468357508064m_-6943530765427357859x_m_-4243356549772704639divRplyFwdMsg" dir="ltr"><font face="Calibri, sans-serif" color="#000000" style="font-size:11pt"><b>From:</b> David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
<b>Sent:</b> Monday, 11 November 2024 18:32<br>
<b>To:</b> Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
<b>Cc:</b> core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer noreferrer noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
<b>Subject:</b> [External] : Re: Question about Streams, Gatherers, and fetching too many elements</font>
<div> </div>
</div>
<div>
<p dir="ltr">Thanks for the workaround. It's running beautifully.</p>
<p dir="ltr">Is there a future where this island concept is extended to the rest of streams? Tbh, I don't fully understand it.</p>
<br>
<div>
<div dir="ltr">On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>> wrote:<br>
</div>
<blockquote style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<div dir="ltr">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Hi David,<br>
<br>
This is the effect of how parallel streams are implemented, where different stages, which are not representible as a join-less Spliterator are executed as a series of "islands" where the next isn't started until the former has completed.</div>
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
If you think about it, parallelization of a Stream works best when the entire data set can be split amongst a set of worker threads, and that sort of implies that you want eager pre-fetch of data, so if your dataset does not fit in memory, that is likely to
lead to less desirable outcomes.</div>
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
What I was able to do for Gatherers is to implement "gather(…) + collect(…)"-fusion so any number of consecutive gather(…)-operations immediately followed by a collect(…) is run in the same "island".<br>
<br>
</div>
<div style="direction:ltr;text-align:left;text-indent:0px;line-height:normal;background-color:rgb(255,255,255);margin:0px;font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
So with that said, you could try something like the following:<br>
<br>
</div>
<div style="direction:ltr;text-align:left;text-indent:0px;line-height:normal;background-color:rgb(255,255,255);margin:0px;font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
static <T> Collector<T, ?, Void> <b>forEach</b>(Consumer<? <u>super</u> T> <b>each</b>) {</div>
<div style="direction:ltr;text-align:left;text-indent:0px;line-height:normal;background-color:rgb(255,255,255);margin:0px;font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<u>return</u> Collector.of(() -> null, (<b>v</b>, <b>e</b>) -> each.accept(e), (<b>l</b>,
<b>r</b>) -> l, (<b>v</b>) -> null, Collector.Characteristics.IDENTITY_FINISH);<br>
}</div>
<div style="direction:ltr;text-align:left;text-indent:0px;margin:0px;color:rgb(0,0,0)">
<span style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt"><br>
<br>
</span><span style="font-family:"Segoe UI","Segoe UI Web (West European)",-apple-system,"system-ui",Roboto,"Helvetica Neue",sans-serif;font-size:15px">stream</span></div>
<div style="direction:ltr;text-align:left;text-indent:0px;background-color:rgb(255,255,255);margin:0px;font-family:"Segoe UI","Segoe UI Web (West European)",-apple-system,"system-ui",Roboto,"Helvetica Neue",sans-serif;font-size:15px;color:rgb(0,0,0)">
.parallel()</div>
<div style="direction:ltr;text-align:left;text-indent:0px;background-color:rgb(255,255,255);margin:0px;font-family:"Segoe UI","Segoe UI Web (West European)",-apple-system,"system-ui",Roboto,"Helvetica Neue",sans-serif;font-size:15px;color:rgb(0,0,0)">
.unordered()</div>
<div style="direction:ltr;text-align:left;text-indent:0px;background-color:rgb(255,255,255);margin:0px;font-family:"Segoe UI","Segoe UI Web (West European)",-apple-system,"system-ui",Roboto,"Helvetica Neue",sans-serif;font-size:15px;color:rgb(0,0,0)">
.gather(Gatherers.windowFixed(BATCH_SIZE))</div>
<div style="direction:ltr;text-align:left;text-indent:0px;background-color:rgb(255,255,255);margin:0px;font-family:"Segoe UI","Segoe UI Web (West European)",-apple-system,"system-ui",Roboto,"Helvetica Neue",sans-serif;font-size:15px;color:rgb(0,0,0)">
.collect(forEach(eachList -> println(eachList.getFirst())));</div>
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div id="m_4859356468357508064m_-6943530765427357859x_m_-4243356549772704639x_m_7527700827726692131Signature" style="color:inherit">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Cheers,<br>
√</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b><br>
</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b>Viktor Klang</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Software Architect, Java Platform Group<br>
Oracle</div>
</div>
<div id="m_4859356468357508064m_-6943530765427357859x_m_-4243356549772704639x_m_7527700827726692131appendonsend"></div>
<hr style="display:inline-block;width:98%">
<div id="m_4859356468357508064m_-6943530765427357859x_m_-4243356549772704639x_m_7527700827726692131divRplyFwdMsg" dir="ltr">
<font face="Calibri, sans-serif" color="#000000" style="font-size:11pt"><b>From:</b> core-libs-dev <<a href="mailto:core-libs-dev-retn@openjdk.org" rel="noreferrer noreferrer noreferrer noreferrer" target="_blank">core-libs-dev-retn@openjdk.org</a>> on behalf of David Alayachew
<<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
<b>Sent:</b> Monday, 11 November 2024 14:52<br>
<b>To:</b> core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer noreferrer noreferrer noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
<b>Subject:</b> Re: Question about Streams, Gatherers, and fetching too many elements</font>
<div> </div>
</div>
<div>
<div dir="auto">
<div>And just to avoid the obvious question, I can hold about 30 batches in memory before the Out of Memory error occurs. So this is not an issue of my batch size being too high.</div>
<div dir="auto"><br>
</div>
<div dir="auto">But just to confirm, I set the batch size to 1, and it still ran into an out of memory error. So I feel fairly confident saying that the Gatherer is trying to grab all available data before sending any of it downstream.<br>
<br>
<div dir="auto">
<div dir="ltr">On Mon, Nov 11, 2024, 8:46 AM David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>> wrote:<br>
</div>
<blockquote style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<div dir="auto">Hello Core Libs Dev Team,
<div dir="auto"><br>
</div>
<div dir="auto">I was trying out Gatherers for a project at work, and ran into a rather sad scenario.</div>
<div dir="auto"><br>
</div>
<div dir="auto">I need to process a large file in batches. Each batch is small enough that I can hold it in memory, but I cannot hold the entire file (and thus, all of the batches) in memory at once.</div>
<div dir="auto"><br>
</div>
<div dir="auto">Looking at the Gatherers API, I saw windowFixed and thought that it would be a great match for my use case.</div>
<div dir="auto"><br>
</div>
<div dir="auto">However, when trying it out, I was disappointed to see that it ran out of memory very quickly. Here is my attempt at using it.</div>
<div dir="auto"><br>
</div>
<div dir="auto">stream</div>
<div dir="auto">.parallel()</div>
<div dir="auto">.unordered()</div>
<div dir="auto">.gather(Gatherers.windowFixed(BATCH_SIZE))</div>
<div dir="auto">.forEach(eachList -> println(eachList.getFirst()))</div>
<div dir="auto">;</div>
<div dir="auto"><br>
</div>
<div dir="auto">As you can see, I am just splitting the file into batches, and printing out the first of each batch. This is purely for example's sake, of course. I had planned on building even more functionality on top of this, but I couldn't even get past
this example.</div>
<div dir="auto"><br>
</div>
<div dir="auto">But anyways, not even a single one of them printed out. Which leads me to believe that it's pulling all of them in the Gatherer.</div>
<div dir="auto">
<div dir="auto"><br>
</div>
<div dir="auto">I can get it to run successfully if I go sequentially, but not parallel. Parallel gives me that out of memory error.</div>
<div dir="auto"><br>
</div>
</div>
<div dir="auto">Is there any way for me to be able to have the Gatherer NOT pull in everything while still remaining parallel and unordered?</div>
<div dir="auto"><br>
</div>
<div dir="auto">Thank you for your time and help.</div>
<div dir="auto">David Alayachew</div>
</div>
</blockquote>
</div>
</div>
</div>
</div>
</div>
</blockquote>
</div>
</div>
</div>
</blockquote>
</div>
</div>
</div>
</div>
</blockquote></div>