Stream API: Fuse sorted().limit(n) into single operation

Peter Levart peter.levart at
Mon Mar 7 16:52:04 UTC 2016

Hi Tagir,

On 03/07/2016 04:30 PM, Tagir F. Valeev wrote:
> Hello!
> Thank you for your comments!
> PL> - in Limiter.put:
> Nice catch! A good example when series of minor code refactorings lead
> to something strange. Webrev is updated in-place:
> PL> Also, what do you think of the following merging strategy that
> PL> doesn't need to allocate a temporary array each time you perform a sortTail():
> I think, the main goal of such algos is to reduce comparator calls.
> Allocating additional buffer and some copying operations should not be
> very expensive (especially given the fact that we don't know
> comparator call cost and it could be pretty high).

You are right about that. Comparator can be specified by user and may be 

> Actually I have a
> couple of additional optimizations in mind which may speedup some
> input patterns. But before working on that I would like to get the
> green light for this feature. I already spent quite a big time working
> on proof-of-concept implementation.

Right. Then maybe instead of repeated allocation of scratch array of 
size 'limit', you could allocate a 3*limit sized array (instead of 
2*limit) for the whole fused operation and use this last third as a 
scratch space for merging. Even better, use 3 distinct arrays of size 
'limit' and use them interchangeably:

"first" phase:

- collect elements into targetArray, sort it, set first = false

"second" phase:

while there's more:
     - set primaryArray = targetArray
     - collect elements < primaryArray[limit-1] into secondaryArray, sort it
     - merge primaryArray and secondaryArray into targetArray

No copying necessary. I'm sure you have something like that in your mind 

Regards, Peter

> Paul, could you please comment on this? If some time is necessary for
> the evaluation, no problem, I will wait. If additional clarifications
> are necessary from my side, I would be happy to answer any questions.
> With best regards,
> Tagir Valeev.
> PL> "first" phase:
> PL> - accumulate elements data[0] ... data[limit-1] and when reaching
> PL> limit, sort them and set first = false (this differs from your
> PL> logic     which accumulates up to data.length elements at first
> PL> and is a     better strategy, because it starts the second phase
> PL> as soon as     possible and second phase is more optimal since it
> PL> already filters     elements that accumulates)
> PL> "second" phase:
> PL> - accumulate elements < data[limit-1] into data[limit] ...
> PL> data[data.length-1] and when reaching length, sort the tail and
> PL> perform merge which looks like this:
> PL>   - simulate merge of data[0] ...  data[limit-1] with data[limit]
> PL> ... data[size-1] deriving end indices i and j of each
> PL> sub-sequence:     data[0] ... data[i-1] and data[limit] ... data[j-1];
> PL>   - move elements data[0] ... data[i-1] to positions
> PL> data[limit-i]     ... data[limit-1]
> PL>   - perform in-place merge of data[limit-i] ... data[limit-1] and
> PL> data[limit] ... data[j-1] into data[0] ... data[limit-1]
> PL> This, I think, results in dividing the additional copying
> PL> operations     by 2 in average and eliminates allocation of
> PL> temporary array for     merging for the cost of pre-merge step
> PL> which just derives the end     indices. There's a chance that this
> PL> might improve performance     because it trades memory writes for reads.
> PL> What do you think?
> PL> Regards, Peter
> PL> On 03/05/2016 06:35 PM, Tagir F. Valeev       wrote:
> PL>
> PL>
> PL> Hello!
> PL> One of the popular bulk data operation is to find given number of
> PL> least or greatest elements. Currently Stream API provides no dedicated
> PL> operation to do this. Of course, it could be implemented by custom
> PL> collector and some third-party libraries already provide it. However
> PL> it would be quite natural to use existing API:
> PL> stream.sorted().limit(k) - k least elements
> PL> stream.sorted(Comparator.reverseOrder()).limit(k) - k greatest elements.
> PL> In fact people already doing this. Some samples could be found on
> PL> GitHub:
> PL>
> PL> Unfortunately current implementation of such sequence of operations is
> PL> suboptimal: first the whole stream content is dumped into intermediate
> PL> array, then sorted fully and after that k least elements is selected.
> PL> On the other hand it's possible to provide a special implementation
> PL> for this particular case which takes O(k) additional memory and in
> PL> many cases works significantly faster.
> PL> I wrote proof-of-concept implementation, which could be found here:
> PL>
> PL> The implementation switches to new algorithm if limit is less than
> PL> 1000 which is quite common for such scenario (supporting bigger values
> PL> is also possible, but would require more testing). New algorithm
> PL> allocates an array of 2*limit elements. When its size is reached, it
> PL> sorts the array (using Arrays.sort) and discards the second half.
> PL> After that only those elements are accumulated which are less than the
> PL> worst element found so far. When array is filled again, the second
> PL> half is sorted and merged with the first half.
> PL> Here's JMH test with results which covers several input patterns:
> PL>
> PL> You may check summary first:
> PL>
> PL> Speedup values bigger than 1 are good.
> PL> The most significant regression in the sequential mode of the new
> PL> implementation is the ever decreasing input (especially with the low
> PL> limit value). Still, it's not that bad (given the fact that old
> PL> implementation processes such input very fast). On the other hand, for
> PL> random input new implementation could be in order of magnitude faster.
> PL> Even for ever ascending input noteable speedup (like 40%) could be
> PL> achieved.
> PL> For parallel stream the new implementation is almost always faster,
> PL> especially if you ignore the cases when parallel stream is
> PL> unprofitable.
> PL> What do you think about this improvement? Could it be included into
> PL> JDK-9? Are there any issues I'm unaware of? I would be really happy to
> PL> complete this work if this is supported by JDK team. Current
> PL> implementation has no primitive specialization and does not optimize
> PL> the sorting out if the input is known to be sorted, but it's not very
> PL> hard to add these features as well if you find my idea useful.
> PL> With best regards,
> PL> Tagir Valeev.
> PL>
> PL>
> PL>

More information about the core-libs-dev mailing list