Stream API: Fuse sorted().limit(n) into single operation

Tagir F. Valeev amaembo at
Mon Mar 7 15:30:06 UTC 2016


Thank you for your comments!

PL> - in Limiter.put:

Nice catch! A good example when series of minor code refactorings lead
to something strange. Webrev is updated in-place:

PL> Also, what do you think of the following merging strategy that   
PL> doesn't need to allocate a temporary array each time you perform a sortTail():

I think, the main goal of such algos is to reduce comparator calls.
Allocating additional buffer and some copying operations should not be
very expensive (especially given the fact that we don't know
comparator call cost and it could be pretty high). Actually I have a
couple of additional optimizations in mind which may speedup some
input patterns. But before working on that I would like to get the
green light for this feature. I already spent quite a big time working
on proof-of-concept implementation.

Paul, could you please comment on this? If some time is necessary for
the evaluation, no problem, I will wait. If additional clarifications
are necessary from my side, I would be happy to answer any questions.

With best regards,
Tagir Valeev.

PL> "first" phase:

PL> - accumulate elements data[0] ... data[limit-1] and when reaching
PL> limit, sort them and set first = false (this differs from your
PL> logic     which accumulates up to data.length elements at first
PL> and is a     better strategy, because it starts the second phase
PL> as soon as     possible and second phase is more optimal since it
PL> already filters     elements that accumulates)

PL> "second" phase:

PL> - accumulate elements < data[limit-1] into data[limit] ...    
PL> data[data.length-1] and when reaching length, sort the tail and   
PL> perform merge which looks like this:
PL>   - simulate merge of data[0] ...  data[limit-1] with data[limit]
PL> ... data[size-1] deriving end indices i and j of each
PL> sub-sequence:     data[0] ... data[i-1] and data[limit] ... data[j-1];
PL>   - move elements data[0] ... data[i-1] to positions
PL> data[limit-i]     ... data[limit-1]
PL>   - perform in-place merge of data[limit-i] ... data[limit-1] and
PL> data[limit] ... data[j-1] into data[0] ... data[limit-1]

PL> This, I think, results in dividing the additional copying
PL> operations     by 2 in average and eliminates allocation of
PL> temporary array for     merging for the cost of pre-merge step
PL> which just derives the end     indices. There's a chance that this
PL> might improve performance     because it trades memory writes for reads.

PL> What do you think?

PL> Regards, Peter

PL> On 03/05/2016 06:35 PM, Tagir F. Valeev       wrote:
PL> Hello!

PL> One of the popular bulk data operation is to find given number of
PL> least or greatest elements. Currently Stream API provides no dedicated
PL> operation to do this. Of course, it could be implemented by custom
PL> collector and some third-party libraries already provide it. However
PL> it would be quite natural to use existing API:

PL> stream.sorted().limit(k) - k least elements
PL> stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements.

PL> In fact people already doing this. Some samples could be found on
PL> GitHub:

PL> Unfortunately current implementation of such sequence of operations is
PL> suboptimal: first the whole stream content is dumped into intermediate
PL> array, then sorted fully and after that k least elements is selected.
PL> On the other hand it's possible to provide a special implementation
PL> for this particular case which takes O(k) additional memory and in
PL> many cases works significantly faster.

PL> I wrote proof-of-concept implementation, which could be found here:
PL> The implementation switches to new algorithm if limit is less than
PL> 1000 which is quite common for such scenario (supporting bigger values
PL> is also possible, but would require more testing). New algorithm
PL> allocates an array of 2*limit elements. When its size is reached, it
PL> sorts the array (using Arrays.sort) and discards the second half.
PL> After that only those elements are accumulated which are less than the
PL> worst element found so far. When array is filled again, the second
PL> half is sorted and merged with the first half.

PL> Here's JMH test with results which covers several input patterns:

PL> You may check summary first:
PL> Speedup values bigger than 1 are good.

PL> The most significant regression in the sequential mode of the new
PL> implementation is the ever decreasing input (especially with the low
PL> limit value). Still, it's not that bad (given the fact that old
PL> implementation processes such input very fast). On the other hand, for
PL> random input new implementation could be in order of magnitude faster.
PL> Even for ever ascending input noteable speedup (like 40%) could be
PL> achieved.

PL> For parallel stream the new implementation is almost always faster,
PL> especially if you ignore the cases when parallel stream is
PL> unprofitable.

PL> What do you think about this improvement? Could it be included into
PL> JDK-9? Are there any issues I'm unaware of? I would be really happy to
PL> complete this work if this is supported by JDK team. Current
PL> implementation has no primitive specialization and does not optimize
PL> the sorting out if the input is known to be sorted, but it's not very
PL> hard to add these features as well if you find my idea useful.

PL> With best regards,
PL> Tagir Valeev.


More information about the core-libs-dev mailing list