Stream API: Fuse sorted().limit(n) into single operation
Hello! One of the popular bulk data operation is to find given number of least or greatest elements. Currently Stream API provides no dedicated operation to do this. Of course, it could be implemented by custom collector and some third-party libraries already provide it. However it would be quite natural to use existing API: stream.sorted().limit(k) - k least elements stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements. In fact people already doing this. Some samples could be found on GitHub: https://github.com/search?l=java&q=%22sorted%28%29.limit%28%22&type=Code&utf... Unfortunately current implementation of such sequence of operations is suboptimal: first the whole stream content is dumped into intermediate array, then sorted fully and after that k least elements is selected. On the other hand it's possible to provide a special implementation for this particular case which takes O(k) additional memory and in many cases works significantly faster. I wrote proof-of-concept implementation, which could be found here: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/ The implementation switches to new algorithm if limit is less than 1000 which is quite common for such scenario (supporting bigger values is also possible, but would require more testing). New algorithm allocates an array of 2*limit elements. When its size is reached, it sorts the array (using Arrays.sort) and discards the second half. After that only those elements are accumulated which are less than the worst element found so far. When array is filled again, the second half is sorted and merged with the first half. Here's JMH test with results which covers several input patterns: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/ You may check summary first: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/summary.txt Speedup values bigger than 1 are good. The most significant regression in the sequential mode of the new implementation is the ever decreasing input (especially with the low limit value). Still, it's not that bad (given the fact that old implementation processes such input very fast). On the other hand, for random input new implementation could be in order of magnitude faster. Even for ever ascending input noteable speedup (like 40%) could be achieved. For parallel stream the new implementation is almost always faster, especially if you ignore the cases when parallel stream is unprofitable. What do you think about this improvement? Could it be included into JDK-9? Are there any issues I'm unaware of? I would be really happy to complete this work if this is supported by JDK team. Current implementation has no primitive specialization and does not optimize the sorting out if the input is known to be sorted, but it's not very hard to add these features as well if you find my idea useful. With best regards, Tagir Valeev.
Worth noting: Guava uses a similar implementation for Ordering.leastOf <http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Ordering.html#leastOf(java.lang.Iterable,%20int)>, but instead of sorting the array when it's filled, does a quickselect pass to do it in O(k) time instead of O(k log k). We had been planning to put together a Collector implementation for it, since it's actually pretty amenable to Collectorification (clearly a word). On Sat, Mar 5, 2016 at 12:32 PM Tagir F. Valeev <amaembo@gmail.com> wrote:
Hello!
One of the popular bulk data operation is to find given number of least or greatest elements. Currently Stream API provides no dedicated operation to do this. Of course, it could be implemented by custom collector and some third-party libraries already provide it. However it would be quite natural to use existing API:
stream.sorted().limit(k) - k least elements stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements.
In fact people already doing this. Some samples could be found on GitHub:
https://github.com/search?l=java&q=%22sorted%28%29.limit%28%22&type=Code&utf...
Unfortunately current implementation of such sequence of operations is suboptimal: first the whole stream content is dumped into intermediate array, then sorted fully and after that k least elements is selected. On the other hand it's possible to provide a special implementation for this particular case which takes O(k) additional memory and in many cases works significantly faster.
I wrote proof-of-concept implementation, which could be found here: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/ The implementation switches to new algorithm if limit is less than 1000 which is quite common for such scenario (supporting bigger values is also possible, but would require more testing). New algorithm allocates an array of 2*limit elements. When its size is reached, it sorts the array (using Arrays.sort) and discards the second half. After that only those elements are accumulated which are less than the worst element found so far. When array is filled again, the second half is sorted and merged with the first half.
Here's JMH test with results which covers several input patterns: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/
You may check summary first: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/summary.txt Speedup values bigger than 1 are good.
The most significant regression in the sequential mode of the new implementation is the ever decreasing input (especially with the low limit value). Still, it's not that bad (given the fact that old implementation processes such input very fast). On the other hand, for random input new implementation could be in order of magnitude faster. Even for ever ascending input noteable speedup (like 40%) could be achieved.
For parallel stream the new implementation is almost always faster, especially if you ignore the cases when parallel stream is unprofitable.
What do you think about this improvement? Could it be included into JDK-9? Are there any issues I'm unaware of? I would be really happy to complete this work if this is supported by JDK team. Current implementation has no primitive specialization and does not optimize the sorting out if the input is known to be sorted, but it's not very hard to add these features as well if you find my idea useful.
With best regards, Tagir Valeev.
Hello! LW> Worth noting: Guava uses a similar implementation for LW> Ordering.leastOf, but instead of sorting the array when it's LW> filled, does a quickselect pass to do it in O(k) time instead of O(k log k). Thank you for mentioning Guava. Unfortunately quickselect is not stable, while sorted().limit(n) must produce stable result. Quickselect might be good for primitive specializations though. With best regards, Tagir Valeev. LW> We had been planning to put together a Collector implementation LW> for it, since it's actually pretty amenable to Collectorification (clearly a word). LW> On Sat, Mar 5, 2016 at 12:32 PM Tagir F. Valeev <amaembo@gmail.com> wrote: LW> Hello! LW> LW> One of the popular bulk data operation is to find given number of LW> least or greatest elements. Currently Stream API provides no dedicated LW> operation to do this. Of course, it could be implemented by custom LW> collector and some third-party libraries already provide it. However LW> it would be quite natural to use existing API: LW> LW> stream.sorted().limit(k) - k least elements LW> stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements. LW> LW> In fact people already doing this. Some samples could be found on LW> GitHub: LW> LW> https://github.com/search?l=java&q=%22sorted%28%29.limit%28%22&type=Code&utf... LW> LW> Unfortunately current implementation of such sequence of operations is LW> suboptimal: first the whole stream content is dumped into intermediate LW> array, then sorted fully and after that k least elements is selected. LW> On the other hand it's possible to provide a special implementation LW> for this particular case which takes O(k) additional memory and in LW> many cases works significantly faster. LW> LW> I wrote proof-of-concept implementation, which could be found here: LW> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/ LW> The implementation switches to new algorithm if limit is less than LW> 1000 which is quite common for such scenario (supporting bigger values LW> is also possible, but would require more testing). New algorithm LW> allocates an array of 2*limit elements. When its size is reached, it LW> sorts the array (using Arrays.sort) and discards the second half. LW> After that only those elements are accumulated which are less than the LW> worst element found so far. When array is filled again, the second LW> half is sorted and merged with the first half. LW> LW> Here's JMH test with results which covers several input patterns: LW> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/ LW> LW> You may check summary first: LW> LW> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/summary.txt LW> Speedup values bigger than 1 are good. LW> LW> The most significant regression in the sequential mode of the new LW> implementation is the ever decreasing input (especially with the low LW> limit value). Still, it's not that bad (given the fact that old LW> implementation processes such input very fast). On the other hand, for LW> random input new implementation could be in order of magnitude faster. LW> Even for ever ascending input noteable speedup (like 40%) could be LW> achieved. LW> LW> For parallel stream the new implementation is almost always faster, LW> especially if you ignore the cases when parallel stream is LW> unprofitable. LW> LW> What do you think about this improvement? Could it be included into LW> JDK-9? Are there any issues I'm unaware of? I would be really happy to LW> complete this work if this is supported by JDK team. Current LW> implementation has no primitive specialization and does not optimize LW> the sorting out if the input is known to be sorted, but it's not very LW> hard to add these features as well if you find my idea useful. LW> LW> With best regards, LW> Tagir Valeev. LW> LW>
Hi Tagir, Nice work. I looked at the implementation and have two comments: - in Limiter.put: 127 final boolean put(T t) { 128 int l = limit; 129 T[] d = data; 130 if (l == 1) { 131 // limit == 1 is the special case: exactly one least element is stored, 132 // no sorting is performed 133 if (initial) { 134 initial = false; 135 size = 1; 136 } else if (comparator.compare(t, d[0]) >= 0) 137 return false; 138 d[0] = t; 139 return true; 140 } 141 if (initial) { 142 if (size == d.length) { 143 Arrays.sort(d, comparator); 144 initial = false; 145 size = l; 146 put(t); 147 } else { 148 d[size++] = t; 149 } 150 return true; 151 } 152 if (size == d.length) { 153 sortTail(d, l, size, comparator); 154 size = limit; 155 } 156 if (comparator.compare(t, d[l - 1]) < 0) { 157 d[size++] = t; 158 return true; 159 } 160 return false; 161 } ...couldn't the nested call to put in line 146 just be skipped and let the code fall through to "if" in line 152 (with return in line 150 moved between 148 and 149)? This will also fix the return value of put() which is ignored when you make a nested call and replace it with true. Also, what do you think of the following merging strategy that doesn't need to allocate a temporary array each time you perform a sortTail(): "first" phase: - accumulate elements data[0] ... data[limit-1] and when reaching limit, sort them and set first = false (this differs from your logic which accumulates up to data.length elements at first and is a better strategy, because it starts the second phase as soon as possible and second phase is more optimal since it already filters elements that accumulates) "second" phase: - accumulate elements < data[limit-1] into data[limit] ... data[data.length-1] and when reaching length, sort the tail and perform merge which looks like this: - simulate merge of data[0] ... data[limit-1] with data[limit] ... data[size-1] deriving end indices i and j of each sub-sequence: data[0] ... data[i-1] and data[limit] ... data[j-1]; - move elements data[0] ... data[i-1] to positions data[limit-i] ... data[limit-1] - perform in-place merge of data[limit-i] ... data[limit-1] and data[limit] ... data[j-1] into data[0] ... data[limit-1] This, I think, results in dividing the additional copying operations by 2 in average and eliminates allocation of temporary array for merging for the cost of pre-merge step which just derives the end indices. There's a chance that this might improve performance because it trades memory writes for reads. What do you think? Regards, Peter On 03/05/2016 06:35 PM, Tagir F. Valeev wrote:
Hello!
One of the popular bulk data operation is to find given number of least or greatest elements. Currently Stream API provides no dedicated operation to do this. Of course, it could be implemented by custom collector and some third-party libraries already provide it. However it would be quite natural to use existing API:
stream.sorted().limit(k) - k least elements stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements.
In fact people already doing this. Some samples could be found on GitHub: https://github.com/search?l=java&q=%22sorted%28%29.limit%28%22&type=Code&utf...
Unfortunately current implementation of such sequence of operations is suboptimal: first the whole stream content is dumped into intermediate array, then sorted fully and after that k least elements is selected. On the other hand it's possible to provide a special implementation for this particular case which takes O(k) additional memory and in many cases works significantly faster.
I wrote proof-of-concept implementation, which could be found here: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/ The implementation switches to new algorithm if limit is less than 1000 which is quite common for such scenario (supporting bigger values is also possible, but would require more testing). New algorithm allocates an array of 2*limit elements. When its size is reached, it sorts the array (using Arrays.sort) and discards the second half. After that only those elements are accumulated which are less than the worst element found so far. When array is filled again, the second half is sorted and merged with the first half.
Here's JMH test with results which covers several input patterns: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/
You may check summary first: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/summary.txt Speedup values bigger than 1 are good.
The most significant regression in the sequential mode of the new implementation is the ever decreasing input (especially with the low limit value). Still, it's not that bad (given the fact that old implementation processes such input very fast). On the other hand, for random input new implementation could be in order of magnitude faster. Even for ever ascending input noteable speedup (like 40%) could be achieved.
For parallel stream the new implementation is almost always faster, especially if you ignore the cases when parallel stream is unprofitable.
What do you think about this improvement? Could it be included into JDK-9? Are there any issues I'm unaware of? I would be really happy to complete this work if this is supported by JDK team. Current implementation has no primitive specialization and does not optimize the sorting out if the input is known to be sorted, but it's not very hard to add these features as well if you find my idea useful.
With best regards, Tagir Valeev.
Hello! Thank you for your comments! PL> - in Limiter.put: Nice catch! A good example when series of minor code refactorings lead to something strange. Webrev is updated in-place: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/ PL> Also, what do you think of the following merging strategy that PL> doesn't need to allocate a temporary array each time you perform a sortTail(): I think, the main goal of such algos is to reduce comparator calls. Allocating additional buffer and some copying operations should not be very expensive (especially given the fact that we don't know comparator call cost and it could be pretty high). Actually I have a couple of additional optimizations in mind which may speedup some input patterns. But before working on that I would like to get the green light for this feature. I already spent quite a big time working on proof-of-concept implementation. Paul, could you please comment on this? If some time is necessary for the evaluation, no problem, I will wait. If additional clarifications are necessary from my side, I would be happy to answer any questions. With best regards, Tagir Valeev. PL> "first" phase: PL> - accumulate elements data[0] ... data[limit-1] and when reaching PL> limit, sort them and set first = false (this differs from your PL> logic which accumulates up to data.length elements at first PL> and is a better strategy, because it starts the second phase PL> as soon as possible and second phase is more optimal since it PL> already filters elements that accumulates) PL> "second" phase: PL> - accumulate elements < data[limit-1] into data[limit] ... PL> data[data.length-1] and when reaching length, sort the tail and PL> perform merge which looks like this: PL> - simulate merge of data[0] ... data[limit-1] with data[limit] PL> ... data[size-1] deriving end indices i and j of each PL> sub-sequence: data[0] ... data[i-1] and data[limit] ... data[j-1]; PL> - move elements data[0] ... data[i-1] to positions PL> data[limit-i] ... data[limit-1] PL> - perform in-place merge of data[limit-i] ... data[limit-1] and PL> data[limit] ... data[j-1] into data[0] ... data[limit-1] PL> This, I think, results in dividing the additional copying PL> operations by 2 in average and eliminates allocation of PL> temporary array for merging for the cost of pre-merge step PL> which just derives the end indices. There's a chance that this PL> might improve performance because it trades memory writes for reads. PL> What do you think? PL> Regards, Peter PL> On 03/05/2016 06:35 PM, Tagir F. Valeev wrote: PL> PL> PL> Hello! PL> One of the popular bulk data operation is to find given number of PL> least or greatest elements. Currently Stream API provides no dedicated PL> operation to do this. Of course, it could be implemented by custom PL> collector and some third-party libraries already provide it. However PL> it would be quite natural to use existing API: PL> stream.sorted().limit(k) - k least elements PL> stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements. PL> In fact people already doing this. Some samples could be found on PL> GitHub: PL> https://github.com/search?l=java&q=%22sorted%28%29.limit%28%22&type=Code&utf... PL> Unfortunately current implementation of such sequence of operations is PL> suboptimal: first the whole stream content is dumped into intermediate PL> array, then sorted fully and after that k least elements is selected. PL> On the other hand it's possible to provide a special implementation PL> for this particular case which takes O(k) additional memory and in PL> many cases works significantly faster. PL> I wrote proof-of-concept implementation, which could be found here: PL> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/ PL> The implementation switches to new algorithm if limit is less than PL> 1000 which is quite common for such scenario (supporting bigger values PL> is also possible, but would require more testing). New algorithm PL> allocates an array of 2*limit elements. When its size is reached, it PL> sorts the array (using Arrays.sort) and discards the second half. PL> After that only those elements are accumulated which are less than the PL> worst element found so far. When array is filled again, the second PL> half is sorted and merged with the first half. PL> Here's JMH test with results which covers several input patterns: PL> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/ PL> You may check summary first: PL> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/summary.txt PL> Speedup values bigger than 1 are good. PL> The most significant regression in the sequential mode of the new PL> implementation is the ever decreasing input (especially with the low PL> limit value). Still, it's not that bad (given the fact that old PL> implementation processes such input very fast). On the other hand, for PL> random input new implementation could be in order of magnitude faster. PL> Even for ever ascending input noteable speedup (like 40%) could be PL> achieved. PL> For parallel stream the new implementation is almost always faster, PL> especially if you ignore the cases when parallel stream is PL> unprofitable. PL> What do you think about this improvement? Could it be included into PL> JDK-9? Are there any issues I'm unaware of? I would be really happy to PL> complete this work if this is supported by JDK team. Current PL> implementation has no primitive specialization and does not optimize PL> the sorting out if the input is known to be sorted, but it's not very PL> hard to add these features as well if you find my idea useful. PL> With best regards, PL> Tagir Valeev. PL> PL> PL>
Hi Tagir, On 03/07/2016 04:30 PM, Tagir F. Valeev wrote:
Hello!
Thank you for your comments!
PL> - in Limiter.put:
Nice catch! A good example when series of minor code refactorings lead to something strange. Webrev is updated in-place: http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/
PL> Also, what do you think of the following merging strategy that PL> doesn't need to allocate a temporary array each time you perform a sortTail():
I think, the main goal of such algos is to reduce comparator calls. Allocating additional buffer and some copying operations should not be very expensive (especially given the fact that we don't know comparator call cost and it could be pretty high).
You are right about that. Comparator can be specified by user and may be expensive.
Actually I have a couple of additional optimizations in mind which may speedup some input patterns. But before working on that I would like to get the green light for this feature. I already spent quite a big time working on proof-of-concept implementation.
Right. Then maybe instead of repeated allocation of scratch array of size 'limit', you could allocate a 3*limit sized array (instead of 2*limit) for the whole fused operation and use this last third as a scratch space for merging. Even better, use 3 distinct arrays of size 'limit' and use them interchangeably: "first" phase: - collect elements into targetArray, sort it, set first = false "second" phase: while there's more: - set primaryArray = targetArray - collect elements < primaryArray[limit-1] into secondaryArray, sort it - merge primaryArray and secondaryArray into targetArray No copying necessary. I'm sure you have something like that in your mind already... Regards, Peter
Paul, could you please comment on this? If some time is necessary for the evaluation, no problem, I will wait. If additional clarifications are necessary from my side, I would be happy to answer any questions.
With best regards, Tagir Valeev.
PL> "first" phase:
PL> - accumulate elements data[0] ... data[limit-1] and when reaching PL> limit, sort them and set first = false (this differs from your PL> logic which accumulates up to data.length elements at first PL> and is a better strategy, because it starts the second phase PL> as soon as possible and second phase is more optimal since it PL> already filters elements that accumulates)
PL> "second" phase:
PL> - accumulate elements < data[limit-1] into data[limit] ... PL> data[data.length-1] and when reaching length, sort the tail and PL> perform merge which looks like this: PL> - simulate merge of data[0] ... data[limit-1] with data[limit] PL> ... data[size-1] deriving end indices i and j of each PL> sub-sequence: data[0] ... data[i-1] and data[limit] ... data[j-1]; PL> - move elements data[0] ... data[i-1] to positions PL> data[limit-i] ... data[limit-1] PL> - perform in-place merge of data[limit-i] ... data[limit-1] and PL> data[limit] ... data[j-1] into data[0] ... data[limit-1]
PL> This, I think, results in dividing the additional copying PL> operations by 2 in average and eliminates allocation of PL> temporary array for merging for the cost of pre-merge step PL> which just derives the end indices. There's a chance that this PL> might improve performance because it trades memory writes for reads.
PL> What do you think?
PL> Regards, Peter
PL> On 03/05/2016 06:35 PM, Tagir F. Valeev wrote: PL> PL> PL> Hello!
PL> One of the popular bulk data operation is to find given number of PL> least or greatest elements. Currently Stream API provides no dedicated PL> operation to do this. Of course, it could be implemented by custom PL> collector and some third-party libraries already provide it. However PL> it would be quite natural to use existing API:
PL> stream.sorted().limit(k) - k least elements PL> stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements.
PL> In fact people already doing this. Some samples could be found on PL> GitHub: PL> https://github.com/search?l=java&q=%22sorted%28%29.limit%28%22&type=Code&utf...
PL> Unfortunately current implementation of such sequence of operations is PL> suboptimal: first the whole stream content is dumped into intermediate PL> array, then sorted fully and after that k least elements is selected. PL> On the other hand it's possible to provide a special implementation PL> for this particular case which takes O(k) additional memory and in PL> many cases works significantly faster.
PL> I wrote proof-of-concept implementation, which could be found here: PL> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/ PL> The implementation switches to new algorithm if limit is less than PL> 1000 which is quite common for such scenario (supporting bigger values PL> is also possible, but would require more testing). New algorithm PL> allocates an array of 2*limit elements. When its size is reached, it PL> sorts the array (using Arrays.sort) and discards the second half. PL> After that only those elements are accumulated which are less than the PL> worst element found so far. When array is filled again, the second PL> half is sorted and merged with the first half.
PL> Here's JMH test with results which covers several input patterns: PL> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/
PL> You may check summary first: PL> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/summary.txt PL> Speedup values bigger than 1 are good.
PL> The most significant regression in the sequential mode of the new PL> implementation is the ever decreasing input (especially with the low PL> limit value). Still, it's not that bad (given the fact that old PL> implementation processes such input very fast). On the other hand, for PL> random input new implementation could be in order of magnitude faster. PL> Even for ever ascending input noteable speedup (like 40%) could be PL> achieved.
PL> For parallel stream the new implementation is almost always faster, PL> especially if you ignore the cases when parallel stream is PL> unprofitable.
PL> What do you think about this improvement? Could it be included into PL> JDK-9? Are there any issues I'm unaware of? I would be really happy to PL> complete this work if this is supported by JDK team. Current PL> implementation has no primitive specialization and does not optimize PL> the sorting out if the input is known to be sorted, but it's not very PL> hard to add these features as well if you find my idea useful.
PL> With best regards, PL> Tagir Valeev.
PL> PL> PL>
participants (3)
-
Louis Wasserman
-
Peter Levart
-
Tagir F. Valeev