<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<style type="text/css" style="display:none;"> P {margin-top:0;margin-bottom:0;} </style>
</head>
<body dir="ltr">
<div class="elementToProof" style="font-family: Aptos, Aptos_EmbeddedFont, Aptos_MSFontService, Calibri, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);">
The issue here is that the operation cannot advertise this, as it depends on the combination of operations.</div>
<div id="Signature" class="elementToProof" style="color: inherit;">
<div style="font-family: Aptos, Aptos_EmbeddedFont, Aptos_MSFontService, Calibri, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);">
<br>
</div>
<div style="font-family: Aptos, Aptos_EmbeddedFont, Aptos_MSFontService, Calibri, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);">
<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);">
Cheers,<br>
√</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);">
<br>
</div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);">
<b><br>
</b></div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);">
<b>Viktor Klang</b></div>
<div style="font-family: Calibri, Arial, Helvetica, sans-serif; font-size: 12pt; color: rgb(0, 0, 0);">
Software Architect, Java Platform Group<br>
Oracle</div>
</div>
<div id="appendonsend"></div>
<hr style="display:inline-block;width:98%" tabindex="-1">
<div id="divRplyFwdMsg" dir="ltr"><font face="Calibri, sans-serif" style="font-size:11pt" color="#000000"><b>From:</b> core-libs-dev <core-libs-dev-retn@openjdk.org> on behalf of David Alayachew <davidalayachew@gmail.com><br>
<b>Sent:</b> Wednesday, 13 November 2024 14:07<br>
<b>To:</b> Rob Spoor <openjdk@icemanx.nl><br>
<b>Cc:</b> core-libs-dev <core-libs-dev@openjdk.org><br>
<b>Subject:</b> Re: [External] : Re: Question about Streams, Gatherers, and fetching too many elements</font>
<div> </div>
</div>
<div>
<div dir="auto">
<div>That is a good point Rob.</div>
<div dir="auto"><br>
</div>
<div dir="auto">Then let me correct myself -- I think the terminal operations don't do a great job of advertising whether or not they pre-fetch everything when parallelism is activated. Collector fetches as needed. FindAny pre-fetches everything. I understand
that later releases might change their behaviour, but I still want to document the current behaviour in the official javadocs so that we can limit undocumented tripping hazards.</div>
<div dir="auto"><br>
<br>
<div class="x_gmail_quote" dir="auto">
<div dir="ltr" class="x_gmail_attr">On Wed, Nov 13, 2024, 7:07 AM Rob Spoor <<a href="mailto:openjdk@icemanx.nl">openjdk@icemanx.nl</a>> wrote:<br>
</div>
<blockquote class="x_gmail_quote" style="margin:0 0 0 .8ex; border-left:1px #ccc solid; padding-left:1ex">
distinct() doesn't require everything to be pulled. It can push elements <br>
to the downstream as they come along for the first time. When <br>
downstream.push returns false the gatherer is done.<br>
<br>
As part of some experimentation I've implemented all intermediary <br>
operations using gatherers. Most of them are pretty straightforward and <br>
will stop integrating once the downstream starts rejecting elements <br>
(although some use Gatherer.ofSequential to keep it easy). I only found <br>
two exceptions that don't return the result of downstream.push:<br>
<br>
* mapMulti. The downstream.push is passed as the mapper which is a <br>
Consumer - the return value is ignored. With some more effort it's <br>
probably possible to capture any false return value and return that from <br>
the integrator, but I haven't tried that yet.<br>
<br>
* sorted. Obviously every element needs to be inspected.<br>
<br>
<br>
On 13/11/2024 00:37, David Alayachew wrote:<br>
> Oh sure, I expect something like distinct() to pull everything. In order to<br>
> know if something is distinct, you have to do some variant of "check<br>
> against everyone else". Whether that is holding all instances in memory or<br>
> their hashes, it's clear from a glance that you will need to look at<br>
> everything, and therefore, pre-fetching makes intuitive sense to me.<br>
> <br>
> I 100% did not expect terminal operations like findAny() or reduce() to<br>
> pull the whole data set. That was a complete whiplash for me. The method<br>
> findAny() advertises itself as a short-circuiting operation, so to find out<br>
> that it actually pulls the whole data set anyways was shocking.<br>
> <br>
> And that was my biggest pain point -- looking at the documentation, it is<br>
> not clear to me at all that methods like findAny() would pull in all data<br>
> upon becoming parallel().<br>
> <br>
> Do you think it would make sense to add documentation about this to the<br>
> javadocs for Stream/java.util.stream? Or maybe it is already there and I<br>
> misunderstood it (even after reading through it thoroughly over 5 times).<br>
> <br>
> <br>
> On Tue, Nov 12, 2024, 10:06 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank" rel="noreferrer">viktor.klang@oracle.com</a>> wrote:<br>
> <br>
>>> We are told how Streams can process unbounded data sets, but when it<br>
>> tries to do a findAny() with parallel(), it runs into an OOME because it<br>
>> fetched all the data ahead of time. In fact, almost of the terminal<br>
>> operations will hit an OOME in the exact same way if they are parallel and<br>
>> have a big enough data set. It's definitely not the end of the world, but<br>
>> it seems that I have to fit everything into a Collector and/or a Gatherer<br>
>> if I want to avoid pre-fetching everything.<br>
>><br>
>> Yeah, I think it is important to distinguish "can process unbounded data<br>
>> sets" from "always able to process unbounded data sets".<br>
>><br>
>> Some operations inherently need the end of the stream, so even something<br>
>> somple like: stream.distinct() or stream.sorted() can end up pulling in all<br>
>> data (which of course won't terminate).<br>
>><br>
>> Fortunately, I think Gatherers can unlock much more situations where<br>
>> unbounded streams can be processed.<br>
>><br>
>> Cheers,<br>
>> √<br>
>><br>
>><br>
>> *Viktor Klang*<br>
>> Software Architect, Java Platform Group<br>
>> Oracle<br>
>> ------------------------------<br>
>> *From:* David Alayachew <<a href="mailto:davidalayachew@gmail.com" target="_blank" rel="noreferrer">davidalayachew@gmail.com</a>><br>
>> *Sent:* Tuesday, 12 November 2024 15:08<br>
>> *To:* Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank" rel="noreferrer">viktor.klang@oracle.com</a>><br>
>> *Cc:* core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" target="_blank" rel="noreferrer">core-libs-dev@openjdk.org</a>><br>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and<br>
>> fetching too many elements<br>
>><br>
>><br>
>> Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon<br>
>> as I thought of it.<br>
>><br>
>><br>
>> I hand-waved away the idea because I thought that the method would turn<br>
>> the stream pipeline parallel, thus, recreating the same problem I currently<br>
>> have of parallelism causing all of the elements to be fetched ahead of<br>
>> time, causing an OOME.<br>
>><br>
>><br>
>> It did NOT occur to me that the pipeline would stay sequential, and just<br>
>> kick these off sequentially, but have them executing in parallel. I can't<br>
>> see why I came to that incorrect conclusion. I have read the javadocs of<br>
>> this method several times. Though, to be fair, I came to the same,<br>
>> incorrect conclusion about Collectors.groupingByConcurrent(), and it wasn't<br>
>> until someone pointed out what the documentation was actually saying that I<br>
>> realized it's true properties.<br>
>><br>
>> Thanks. That definitely solves at least part of my problem. Obviously, I<br>
>> would prefer to write to S3 in parallel too, but at the very least, the<br>
>> calculation part is being done in parallel. And worst case scenario, I can<br>
>> be really bad and just do the write to S3 in the mapConcurrent, and then<br>
>> just return the metadata of each write, and just bundle that up with<br>
>> collect.<br>
>><br>
>><br>
>> And that's ignoring the fact that I can just use the workaround too.<br>
>><br>
>><br>
>> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me<br>
>> from a performance perspective, but is rather unintuitive to me from a<br>
>> usability perspective. We are told how Streams can process unbounded data<br>
>> sets, but when it tries to do a findAny() with parallel(), it runs into an<br>
>> OOME because it fetched all the data ahead of time. In fact, almost of the<br>
>> terminal operations will hit an OOME in the exact same way if they are<br>
>> parallel and have a big enough data set. It's definitely not the end of the<br>
>> world, but it seems that I have to fit everything into a Collector and/or a<br>
>> Gatherer if I want to avoid pre-fetching everything.<br>
>><br>
>> On Tue, Nov 12, 2024, 6:36 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank" rel="noreferrer">viktor.klang@oracle.com</a>><br>
>> wrote:<br>
>><br>
>> Have you considered Gatherers.mapConcurrent(…)?<br>
>><br>
>><br>
>> Cheers,<br>
>> √<br>
>><br>
>><br>
>> *Viktor Klang*<br>
>> Software Architect, Java Platform Group<br>
>> Oracle<br>
>> ------------------------------<br>
>> *From:* David Alayachew <<a href="mailto:davidalayachew@gmail.com" target="_blank" rel="noreferrer">davidalayachew@gmail.com</a>><br>
>> *Sent:* Tuesday, 12 November 2024 01:53<br>
>> *To:* Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank" rel="noreferrer">viktor.klang@oracle.com</a>><br>
>> *Cc:* core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" target="_blank" rel="noreferrer">core-libs-dev@openjdk.org</a>><br>
>> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and<br>
>> fetching too many elements<br>
>><br>
>> Good to know, ty vm.<br>
>><br>
>> At the very least, I have this workaround. This will meet my needs for now.<br>
>><br>
>> I guess my final question would be -- is this type of problem better<br>
>> suited to something besides parallel streams? Maybe an ExecutorService?<br>
>><br>
>> Really, all I am doing is taking a jumbo file, splitting it into batches,<br>
>> and then doing some work on those batches. My IO speeds are pretty fast,<br>
>> and the compute work is non-trivial, so there is performance being left on<br>
>> the table if I give up parallelism. And I am in a position where completion<br>
>> time is very important to us.<br>
>><br>
>> I just naturally assumed parallel streams were the right choice because<br>
>> the compute work is simple. A pure function that I can break out, and then<br>
>> call in a map. Once I do that, I just call forEach to write the batches<br>
>> back out to S3. Maybe I should look into a different part of the std lib<br>
>> instead because I am using the wrong tool for the job? My nose says<br>
>> ExecutorService, but I figure I should ask before I dive too deep in.<br>
>><br>
>><br>
>> On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank" rel="noreferrer">viktor.klang@oracle.com</a>><br>
>> wrote:<br>
>><br>
>> You're most welcome!<br>
>><br>
>> In a potential future where all intermediate operations are<br>
>> Gatherer-based, and all terminal operations are Collector-based, it would<br>
>> just work as expected. But with that said, I'm not sure it is practically<br>
>> achievable because some operations might not have the same<br>
>> performance-characteristics as before.<br>
>><br>
>> Cheers,<br>
>> √<br>
>><br>
>><br>
>> *Viktor Klang*<br>
>> Software Architect, Java Platform Group<br>
>> Oracle<br>
>> ------------------------------<br>
>> *From:* David Alayachew <<a href="mailto:davidalayachew@gmail.com" target="_blank" rel="noreferrer">davidalayachew@gmail.com</a>><br>
>> *Sent:* Monday, 11 November 2024 18:32<br>
>> *To:* Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank" rel="noreferrer">viktor.klang@oracle.com</a>><br>
>> *Cc:* core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" target="_blank" rel="noreferrer">core-libs-dev@openjdk.org</a>><br>
>> *Subject:* [External] : Re: Question about Streams, Gatherers, and<br>
>> fetching too many elements<br>
>><br>
>><br>
>> Thanks for the workaround. It's running beautifully.<br>
>><br>
>> Is there a future where this island concept is extended to the rest of<br>
>> streams? Tbh, I don't fully understand it.<br>
>><br>
>> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <<a href="mailto:viktor.klang@oracle.com" target="_blank" rel="noreferrer">viktor.klang@oracle.com</a>><br>
>> wrote:<br>
>><br>
>> Hi David,<br>
>><br>
>> This is the effect of how parallel streams are implemented, where<br>
>> different stages, which are not representible as a join-less Spliterator<br>
>> are executed as a series of "islands" where the next isn't started until<br>
>> the former has completed.<br>
>><br>
>> If you think about it, parallelization of a Stream works best when the<br>
>> entire data set can be split amongst a set of worker threads, and that sort<br>
>> of implies that you want eager pre-fetch of data, so if your dataset does<br>
>> not fit in memory, that is likely to lead to less desirable outcomes.<br>
>><br>
>> What I was able to do for Gatherers is to implement "gather(…) +<br>
>> collect(…)"-fusion so any number of consecutive gather(…)-operations<br>
>> immediately followed by a collect(…) is run in the same "island".<br>
>><br>
>> So with that said, you could try something like the following:<br>
>><br>
>> static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T> *each*) {<br>
>> *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e), (*l*,<br>
>> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH);<br>
>> }<br>
>><br>
>><br>
>> stream<br>
>> .parallel()<br>
>> .unordered()<br>
>> .gather(Gatherers.windowFixed(BATCH_SIZE))<br>
>> .collect(forEach(eachList -> println(eachList.getFirst())));<br>
>><br>
>><br>
>> Cheers,<br>
>> √<br>
>><br>
>><br>
>> *Viktor Klang*<br>
>> Software Architect, Java Platform Group<br>
>> Oracle<br>
>> ------------------------------<br>
>> *From:* core-libs-dev <<a href="mailto:core-libs-dev-retn@openjdk.org" target="_blank" rel="noreferrer">core-libs-dev-retn@openjdk.org</a>> on behalf of David<br>
>> Alayachew <<a href="mailto:davidalayachew@gmail.com" target="_blank" rel="noreferrer">davidalayachew@gmail.com</a>><br>
>> *Sent:* Monday, 11 November 2024 14:52<br>
>> *To:* core-libs-dev <<a href="mailto:core-libs-dev@openjdk.org" target="_blank" rel="noreferrer">core-libs-dev@openjdk.org</a>><br>
>> *Subject:* Re: Question about Streams, Gatherers, and fetching too many<br>
>> elements<br>
>><br>
>> And just to avoid the obvious question, I can hold about 30 batches in<br>
>> memory before the Out of Memory error occurs. So this is not an issue of my<br>
>> batch size being too high.<br>
>><br>
>> But just to confirm, I set the batch size to 1, and it still ran into an<br>
>> out of memory error. So I feel fairly confident saying that the Gatherer is<br>
>> trying to grab all available data before sending any of it downstream.<br>
>><br>
>> On Mon, Nov 11, 2024, 8:46 AM David Alayachew <<a href="mailto:davidalayachew@gmail.com" target="_blank" rel="noreferrer">davidalayachew@gmail.com</a>><br>
>> wrote:<br>
>><br>
>> Hello Core Libs Dev Team,<br>
>><br>
>> I was trying out Gatherers for a project at work, and ran into a rather<br>
>> sad scenario.<br>
>><br>
>> I need to process a large file in batches. Each batch is small enough that<br>
>> I can hold it in memory, but I cannot hold the entire file (and thus, all<br>
>> of the batches) in memory at once.<br>
>><br>
>> Looking at the Gatherers API, I saw windowFixed and thought that it would<br>
>> be a great match for my use case.<br>
>><br>
>> However, when trying it out, I was disappointed to see that it ran out of<br>
>> memory very quickly. Here is my attempt at using it.<br>
>><br>
>> stream<br>
>> .parallel()<br>
>> .unordered()<br>
>> .gather(Gatherers.windowFixed(BATCH_SIZE))<br>
>> .forEach(eachList -> println(eachList.getFirst()))<br>
>> ;<br>
>><br>
>> As you can see, I am just splitting the file into batches, and printing<br>
>> out the first of each batch. This is purely for example's sake, of course.<br>
>> I had planned on building even more functionality on top of this, but I<br>
>> couldn't even get past this example.<br>
>><br>
>> But anyways, not even a single one of them printed out. Which leads me to<br>
>> believe that it's pulling all of them in the Gatherer.<br>
>><br>
>> I can get it to run successfully if I go sequentially, but not parallel.<br>
>> Parallel gives me that out of memory error.<br>
>><br>
>> Is there any way for me to be able to have the Gatherer NOT pull in<br>
>> everything while still remaining parallel and unordered?<br>
>><br>
>> Thank you for your time and help.<br>
>> David Alayachew<br>
</blockquote>
</div>
</div>
</div>
</div>
</body>
</html>