<div dir="ltr"><div class="gmail_default" style="font-family:monospace">Poking the thread in case you are able to answer my previous question Viktor.</div><div class="gmail_default" style="font-family:monospace"><br></div><div class="gmail_default" style="font-family:monospace">Also, another question -- I posted this thread to Reddit, and some good discussion has already started. Can you or someone else answer some of the questions that have popped up there?</div><div class="gmail_default" style="font-family:monospace"><br></div><div class="gmail_default" style="font-family:monospace"><a href="https://old.reddit.com/r/java/comments/1gukzhb/a_surprising_pain_point_regarding_parallel_java/">https://old.reddit.com/r/java/comments/1gukzhb/a_surprising_pain_point_regarding_parallel_java/</a></div><div class="gmail_default" style="font-family:monospace"><br></div><div class="gmail_default" style="font-family:monospace">Ty vm!<br></div></div><br><div class="gmail_quote"><div dir="ltr" class="gmail_attr">On Thu, Nov 14, 2024 at 5:45 PM David Alayachew <<a href="mailto:davidalayachew@gmail.com">davidalayachew@gmail.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div dir="auto">Oh ok. So it truly is a toss-up depending on each implementation when and where this occurs.<div dir="auto"><br></div><div dir="auto">Ok, then as my final request, I think even informing the user that this CAN occur is worth doing. If nothing else, the user scouring the documentation for documentation of this behaviour will know that it is simply something that can occur. They don't need to know all the details. Simply give it a official term, describe the behaviour, tell that it is implementation specific on when this happens, but that it is only possible during parallelism. Even just knowing the verbiage to describe the problem will enable them to better communicate with each other on what they want vs what they get. That helps searchability, if nothing else.</div></div><br><div class="gmail_quote"><div dir="ltr" class="gmail_attr">On Thu, Nov 14, 2024, 8:45 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank">viktor.klang@oracle.com</a>> wrote:<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">




<div dir="ltr">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
I see what you're saying, the problem is that it depends on the Stream implementation (given that Stream is an interface).</div>
<div id="m_3078180526897139191m_-127590508971081847Signature" style="color:inherit">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Cheers,<br>
√</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b><br>
</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b>Viktor Klang</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Software Architect, Java Platform Group<br>
Oracle</div>
</div>
<div id="m_3078180526897139191m_-127590508971081847appendonsend"></div>
<hr style="display:inline-block;width:98%">
<div id="m_3078180526897139191m_-127590508971081847divRplyFwdMsg" dir="ltr"><font face="Calibri, sans-serif" style="font-size:11pt" color="#000000"><b>From:</b> David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
<b>Sent:</b> Thursday, 14 November 2024 12:36<br>
<b>To:</b> Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
<b>Cc:</b> Rob Spoor <<a href="mailto:openjdk@icemanx.nl" rel="noreferrer" target="_blank">openjdk@icemanx.nl</a>>; core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
<b>Subject:</b> Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements</font>
<div> </div>
</div>
<div>
<p dir="ltr">Then let me correct myself again, while simplifying -- I just want that detail, that certain combinations might lead to pre-fetching everything, to be documented on the stream api. Package level, or on the Stream interface itself, seems like a
 good spot.</p>
<br>
<div>
<div dir="ltr">On Thu, Nov 14, 2024, 4:22 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer" target="_blank">viktor.klang@oracle.com</a>> wrote:<br>
</div>
<blockquote style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<div dir="ltr">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
The issue here is that the operation cannot advertise this, as it depends on the combination of operations.</div>
<div id="m_3078180526897139191m_-127590508971081847x_m_-5387667337063943796Signature" style="color:inherit">
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Cheers,<br>
√</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<br>
</div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b><br>
</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
<b>Viktor Klang</b></div>
<div style="font-family:Calibri,Arial,Helvetica,sans-serif;font-size:12pt;color:rgb(0,0,0)">
Software Architect, Java Platform Group<br>
Oracle</div>
</div>
<div id="m_3078180526897139191m_-127590508971081847x_m_-5387667337063943796appendonsend"></div>
<hr style="display:inline-block;width:98%">
<div id="m_3078180526897139191m_-127590508971081847x_m_-5387667337063943796divRplyFwdMsg" dir="ltr"><font face="Calibri, sans-serif" color="#000000" style="font-size:11pt"><b>From:</b> core-libs-dev <<a href="mailto:core-libs-dev-retn@openjdk.org" rel="noreferrer noreferrer" target="_blank">core-libs-dev-retn@openjdk.org</a>>
 on behalf of David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
<b>Sent:</b> Wednesday, 13 November 2024 14:07<br>
<b>To:</b> Rob Spoor <<a href="mailto:openjdk@icemanx.nl" rel="noreferrer noreferrer" target="_blank">openjdk@icemanx.nl</a>><br>
<b>Cc:</b> core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
<b>Subject:</b> Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements</font>
<div> </div>
</div>
<div>
<div dir="auto">
<div>That is a good point Rob.</div>
<div dir="auto"><br>
</div>
<div dir="auto">Then let me correct myself -- I think the terminal operations don't do a great job of advertising whether or not they pre-fetch everything when parallelism is activated. Collector fetches as needed. FindAny pre-fetches everything. I understand
 that later releases might change their behaviour, but I still want to document the current behaviour in the official javadocs so that we can limit undocumented tripping hazards.</div>
<div dir="auto"><br>
<br>
<div dir="auto">
<div dir="ltr">On Wed, Nov 13, 2024, 7:07 AM Rob Spoor <<a href="mailto:openjdk@icemanx.nl" rel="noreferrer noreferrer" target="_blank">openjdk@icemanx.nl</a>> wrote:<br>
</div>
<blockquote style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
distinct() doesn't require everything to be pulled. It can push elements <br>
to the downstream as they come along for the first time. When <br>
downstream.push returns false the gatherer is done.<br>
<br>
As part of some experimentation I've implemented all intermediary <br>
operations using gatherers. Most of them are pretty straightforward and <br>
will stop integrating once the downstream starts rejecting elements <br>
(although some use Gatherer.ofSequential to keep it easy). I only found <br>
two exceptions that don't return the result of downstream.push:<br>
<br>
* mapMulti. The downstream.push is passed as the mapper which is a <br>
Consumer - the return value is ignored. With some more effort it's <br>
probably possible to capture any false return value and return that from <br>
the integrator, but I haven't tried that yet.<br>
<br>
* sorted. Obviously every element needs to be inspected.<br>
<br>
<br>
On 13/11/2024 00:37, David Alayachew wrote:<br>
> Oh sure, I expect something like distinct() to pull everything. In order to<br>
> know if something is distinct, you have to do some variant of "check<br>
> against everyone else". Whether that is holding all instances in memory or<br>
> their hashes, it's clear from a glance that you will need to look at<br>
> everything, and therefore, pre-fetching makes intuitive sense to me.<br>
> <br>
> I 100% did not expect terminal operations like findAny() or reduce() to<br>
> pull the whole data set. That was a complete whiplash for me. The method<br>
> findAny() advertises itself as a short-circuiting operation, so to find out<br>
> that it actually pulls the whole data set anyways was shocking.<br>
> <br>
> And that was my biggest pain point -- looking at the documentation, it is<br>
> not clear to me at all that methods like findAny() would pull in all data<br>
> upon becoming parallel().<br>
> <br>
> Do you think it would make sense to add documentation about this to the<br>
> javadocs for Stream/java.util.stream? Or maybe it is already there and I<br>
> misunderstood it (even after reading through it thoroughly over 5 times).<br>
> <br>
> <br>
> On Tue, Nov 12, 2024, 10:06 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>> wrote:<br>
> <br>
>>> We are told how Streams can process unbounded data sets, but when it<br>
>> tries to do a findAny() with parallel(), it runs into an OOME because it<br>
>> fetched all the data ahead of time. In fact, almost of the terminal<br>
>> operations will hit an OOME in the exact same way if they are parallel and<br>
>> have a big enough data set. It's definitely not the end of the world, but<br>
>> it seems that I have to fit everything into a Collector and/or a Gatherer<br>
>> if I want to avoid pre-fetching everything.<br>
>><br>
>> Yeah, I think it is important to distinguish "can process unbounded data<br>
>> sets" from "always able to process unbounded data sets".<br>
>><br>
>> Some operations inherently need the end of the stream, so even something<br>
>> somple like: stream.distinct() or stream.sorted() can end up pulling in all<br>
>> data (which of course won't terminate).<br>
>><br>
>> Fortunately, I think Gatherers can unlock much more situations where<br>
>> unbounded streams can be processed.<br>
>><br>
>> Cheers,<br>
>> √<br>
>><br>
>><br>
>> *Viktor Klang*<br>
>> Software Architect, Java Platform Group<br>
>> Oracle<br>
>> ------------------------------<br>
>> *From:* David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
>> *Sent:* Tuesday, 12 November 2024 15:08<br>
>> *To:* Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
>> *Cc:* core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer noreferrer noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and<br>
>> fetching too many elements<br>
>><br>
>><br>
>> Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon<br>
>> as I thought of it.<br>
>><br>
>><br>
>> I hand-waved away the idea because I thought that the method would turn<br>
>> the stream pipeline parallel, thus, recreating the same problem I currently<br>
>> have of parallelism causing all of the elements to be fetched ahead of<br>
>> time, causing an OOME.<br>
>><br>
>><br>
>> It did NOT occur to me that the pipeline would stay sequential, and just<br>
>> kick these off sequentially, but have them executing in parallel. I can't<br>
>> see why I came to that incorrect conclusion. I have read the javadocs of<br>
>> this method several times. Though, to be fair, I came to the same,<br>
>> incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't<br>
>> until someone pointed out what the documentation was actually saying that I<br>
>> realized it's true properties.<br>
>><br>
>> Thanks. That definitely solves at least part of my problem. Obviously, I<br>
>> would prefer to write to S3 in parallel too, but at the very least, the<br>
>> calculation part is being done in parallel. And worst case scenario, I can<br>
>> be really bad and just do the write to S3 in the mapConcurrent, and then<br>
>> just return the metadata of each write, and just bundle that up with<br>
>> collect.<br>
>><br>
>><br>
>> And that's ignoring the fact that I can just use the workaround too.<br>
>><br>
>><br>
>> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me<br>
>> from a performance perspective, but is rather unintuitive to me from a<br>
>> usability perspective. We are told how Streams can process unbounded data<br>
>> sets, but when it tries to do a findAny() with parallel(), it runs into an<br>
>> OOME because it fetched all the data ahead of time. In fact, almost of the<br>
>> terminal operations will hit an OOME in the exact same way if they are<br>
>> parallel and have a big enough data set. It's definitely not the end of the<br>
>> world, but it seems that I have to fit everything into a Collector and/or a<br>
>> Gatherer if I want to avoid pre-fetching everything.<br>
>><br>
>> On Tue, Nov 12, 2024, 6:36 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
>> wrote:<br>
>><br>
>> Have you considered Gatherers.mapConcurrent(…)?<br>
>><br>
>><br>
>> Cheers,<br>
>> √<br>
>><br>
>><br>
>> *Viktor Klang*<br>
>> Software Architect, Java Platform Group<br>
>> Oracle<br>
>> ------------------------------<br>
>> *From:* David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
>> *Sent:* Tuesday, 12 November 2024 01:53<br>
>> *To:* Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
>> *Cc:* core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer noreferrer noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and<br>
>> fetching too many elements<br>
>><br>
>> Good to know, ty vm.<br>
>><br>
>> At the very least, I have this workaround. This will meet my needs for now.<br>
>><br>
>> I guess my final question would be -- is this type of problem better<br>
>> suited to something besides parallel streams? Maybe an ExecutorService?<br>
>><br>
>> Really, all I am doing is taking a jumbo file, splitting it into batches,<br>
>> and then doing some work on those batches. My IO speeds are pretty fast,<br>
>> and the compute work is non-trivial, so there is performance being left on<br>
>> the table if I give up parallelism. And I am in a position where completion<br>
>> time is very important to us.<br>
>><br>
>> I just naturally assumed parallel streams were the right choice because<br>
>> the compute work is simple. A pure function that I can break out, and then<br>
>> call in a map. Once I do that, I just call forEach to write the batches<br>
>> back out to S3. Maybe I should look into a different part of the std lib<br>
>> instead because I am using the wrong tool for the job? My nose says<br>
>> ExecutorService, but I figure I should ask before I dive too deep in.<br>
>><br>
>><br>
>> On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
>> wrote:<br>
>><br>
>> You're most welcome!<br>
>><br>
>> In a potential future where all intermediate operations are<br>
>> Gatherer-based, and all terminal operations are Collector-based, it would<br>
>> just work as expected. But with that said, I'm not sure it is practically<br>
>> achievable because some operations might not have the same<br>
>> performance-characteristics as before.<br>
>><br>
>> Cheers,<br>
>> √<br>
>><br>
>><br>
>> *Viktor Klang*<br>
>> Software Architect, Java Platform Group<br>
>> Oracle<br>
>> ------------------------------<br>
>> *From:* David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
>> *Sent:* Monday, 11 November 2024 18:32<br>
>> *To:* Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
>> *Cc:* core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer noreferrer noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
>> *Subject:* [External] : Re: Question about Streams, Gatherers, and<br>
>> fetching too many elements<br>
>><br>
>><br>
>> Thanks for the workaround. It's running beautifully.<br>
>><br>
>> Is there a future where this island concept is extended to the rest of<br>
>> streams? Tbh, I don't fully understand it.<br>
>><br>
>> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" rel="noreferrer noreferrer noreferrer" target="_blank">viktor.klang@oracle.com</a>><br>
>> wrote:<br>
>><br>
>> Hi David,<br>
>><br>
>> This is the effect of how parallel streams are implemented, where<br>
>> different stages, which are not representible as a join-less Spliterator<br>
>> are executed as a series of "islands" where the next isn't started until<br>
>> the former has completed.<br>
>><br>
>> If you think about it, parallelization of a Stream works best when the<br>
>> entire data set can be split amongst a set of worker threads, and that sort<br>
>> of implies that you want eager pre-fetch of data, so if your dataset does<br>
>> not fit in memory, that is likely to lead to less desirable outcomes.<br>
>><br>
>> What I was able to do for Gatherers is to implement "gather(…) +<br>
>> collect(…)"-fusion so any number of consecutive gather(…)-operations<br>
>> immediately followed by a collect(…) is run in the same "island".<br>
>><br>
>> So with that said, you could try something like the following:<br>
>><br>
>> static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T> *each*) {<br>
>>      *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e), (*l*,<br>
>> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH);<br>
>> }<br>
>><br>
>><br>
>> stream<br>
>> .parallel()<br>
>> .unordered()<br>
>> .gather(Gatherers.windowFixed(BATCH_SIZE))<br>
>> .collect(forEach(eachList -> println(eachList.getFirst())));<br>
>><br>
>><br>
>> Cheers,<br>
>> √<br>
>><br>
>><br>
>> *Viktor Klang*<br>
>> Software Architect, Java Platform Group<br>
>> Oracle<br>
>> ------------------------------<br>
>> *From:* core-libs-dev <<a href="mailto:core-libs-dev-retn@openjdk.org" rel="noreferrer noreferrer noreferrer" target="_blank">core-libs-dev-retn@openjdk.org</a>> on behalf of David<br>
>> Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
>> *Sent:* Monday, 11 November 2024 14:52<br>
>> *To:* core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" rel="noreferrer noreferrer noreferrer" target="_blank">core-libs-dev@openjdk.org</a>><br>
>> *Subject:* Re: Question about Streams, Gatherers, and fetching too many<br>
>> elements<br>
>><br>
>> And just to avoid the obvious question, I can hold about 30 batches in<br>
>> memory before the Out of Memory error occurs. So this is not an issue of my<br>
>> batch size being too high.<br>
>><br>
>> But just to confirm, I set the batch size to 1, and it still ran into an<br>
>> out of memory error. So I feel fairly confident saying that the Gatherer is<br>
>> trying to grab all available data before sending any of it downstream.<br>
>><br>
>> On Mon, Nov 11, 2024, 8:46 AM David Alayachew <<a href="mailto:davidalayachew@gmail.com" rel="noreferrer noreferrer noreferrer" target="_blank">davidalayachew@gmail.com</a>><br>
>> wrote:<br>
>><br>
>> Hello Core Libs Dev Team,<br>
>><br>
>> I was trying out Gatherers for a project at work, and ran into a rather<br>
>> sad scenario.<br>
>><br>
>> I need to process a large file in batches. Each batch is small enough that<br>
>> I can hold it in memory, but I cannot hold the entire file (and thus, all<br>
>> of the batches) in memory at once.<br>
>><br>
>> Looking at the Gatherers API, I saw windowFixed and thought that it would<br>
>> be a great match for my use case.<br>
>><br>
>> However, when trying it out, I was disappointed to see that it ran out of<br>
>> memory very quickly. Here is my attempt at using it.<br>
>><br>
>> stream<br>
>> .parallel()<br>
>> .unordered()<br>
>> .gather(Gatherers.windowFixed(BATCH_SIZE))<br>
>> .forEach(eachList -> println(eachList.getFirst()))<br>
>> ;<br>
>><br>
>> As you can see, I am just splitting the file into batches, and printing<br>
>> out the first of each batch. This is purely for example's sake, of course.<br>
>> I had planned on building even more functionality on top of this, but I<br>
>> couldn't even get past this example.<br>
>><br>
>> But anyways, not even a single one of them printed out. Which leads me to<br>
>> believe that it's pulling all of them in the Gatherer.<br>
>><br>
>> I can get it to run successfully if I go sequentially, but not parallel.<br>
>> Parallel gives me that out of memory error.<br>
>><br>
>> Is there any way for me to be able to have the Gatherer NOT pull in<br>
>> everything while still remaining parallel and unordered?<br>
>><br>
>> Thank you for your time and help.<br>
>> David Alayachew<br>
</blockquote>
</div>
</div>
</div>
</div>
</div>
</blockquote>
</div>
</div>
</div>

</blockquote></div>
</blockquote></div>