Stream API: Fuse sorted().limit(n) into single operation
Peter Levart
peter.levart at gmail.com
Sun Mar 6 11:05:04 UTC 2016
Hi Tagir,
Nice work. I looked at the implementation and have two comments:
- in Limiter.put:
127 final boolean put(T t) {
128 int l = limit;
129 T[] d = data;
130 if (l == 1) {
131 // limit == 1 is the special case: exactly one
least element is stored,
132 // no sorting is performed
133 if (initial) {
134 initial = false;
135 size = 1;
136 } else if (comparator.compare(t, d[0]) >= 0)
137 return false;
138 d[0] = t;
139 return true;
140 }
141 if (initial) {
142 if (size == d.length) {
143 Arrays.sort(d, comparator);
144 initial = false;
145 size = l;
146 put(t);
147 } else {
148 d[size++] = t;
149 }
150 return true;
151 }
152 if (size == d.length) {
153 sortTail(d, l, size, comparator);
154 size = limit;
155 }
156 if (comparator.compare(t, d[l - 1]) < 0) {
157 d[size++] = t;
158 return true;
159 }
160 return false;
161 }
...couldn't the nested call to put in line 146 just be skipped and let
the code fall through to "if" in line 152 (with return in line 150
moved between 148 and 149)? This will also fix the return value of put()
which is ignored when you make a nested call and replace it with true.
Also, what do you think of the following merging strategy that doesn't
need to allocate a temporary array each time you perform a sortTail():
"first" phase:
- accumulate elements data[0] ... data[limit-1] and when reaching limit,
sort them and set first = false (this differs from your logic which
accumulates up to data.length elements at first and is a better
strategy, because it starts the second phase as soon as possible and
second phase is more optimal since it already filters elements that
accumulates)
"second" phase:
- accumulate elements < data[limit-1] into data[limit] ...
data[data.length-1] and when reaching length, sort the tail and perform
merge which looks like this:
- simulate merge of data[0] ... data[limit-1] with data[limit] ...
data[size-1] deriving end indices i and j of each sub-sequence: data[0]
... data[i-1] and data[limit] ... data[j-1];
- move elements data[0] ... data[i-1] to positions data[limit-i] ...
data[limit-1]
- perform in-place merge of data[limit-i] ... data[limit-1] and
data[limit] ... data[j-1] into data[0] ... data[limit-1]
This, I think, results in dividing the additional copying operations by
2 in average and eliminates allocation of temporary array for merging
for the cost of pre-merge step which just derives the end indices.
There's a chance that this might improve performance because it trades
memory writes for reads.
What do you think?
Regards, Peter
On 03/05/2016 06:35 PM, Tagir F. Valeev wrote:
> Hello!
>
> One of the popular bulk data operation is to find given number of
> least or greatest elements. Currently Stream API provides no dedicated
> operation to do this. Of course, it could be implemented by custom
> collector and some third-party libraries already provide it. However
> it would be quite natural to use existing API:
>
> stream.sorted().limit(k) - k least elements
> stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements.
>
> In fact people already doing this. Some samples could be found on
> GitHub:
> https://github.com/search?l=java&q=%22sorted%28%29.limit%28%22&type=Code&utf8=%E2%9C%93
>
> Unfortunately current implementation of such sequence of operations is
> suboptimal: first the whole stream content is dumped into intermediate
> array, then sorted fully and after that k least elements is selected.
> On the other hand it's possible to provide a special implementation
> for this particular case which takes O(k) additional memory and in
> many cases works significantly faster.
>
> I wrote proof-of-concept implementation, which could be found here:
> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/webrev/
> The implementation switches to new algorithm if limit is less than
> 1000 which is quite common for such scenario (supporting bigger values
> is also possible, but would require more testing). New algorithm
> allocates an array of 2*limit elements. When its size is reached, it
> sorts the array (using Arrays.sort) and discards the second half.
> After that only those elements are accumulated which are less than the
> worst element found so far. When array is filled again, the second
> half is sorted and merged with the first half.
>
> Here's JMH test with results which covers several input patterns:
> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/
>
> You may check summary first:
> http://cr.openjdk.java.net/~tvaleev/patches/sortedLimit/jmh/summary.txt
> Speedup values bigger than 1 are good.
>
> The most significant regression in the sequential mode of the new
> implementation is the ever decreasing input (especially with the low
> limit value). Still, it's not that bad (given the fact that old
> implementation processes such input very fast). On the other hand, for
> random input new implementation could be in order of magnitude faster.
> Even for ever ascending input noteable speedup (like 40%) could be
> achieved.
>
> For parallel stream the new implementation is almost always faster,
> especially if you ignore the cases when parallel stream is
> unprofitable.
>
> What do you think about this improvement? Could it be included into
> JDK-9? Are there any issues I'm unaware of? I would be really happy to
> complete this work if this is supported by JDK team. Current
> implementation has no primitive specialization and does not optimize
> the sorting out if the input is known to be sorted, but it's not very
> hard to add these features as well if you find my idea useful.
>
> With best regards,
> Tagir Valeev.
>
More information about the core-libs-dev
mailing list