Idiomatic Windowing on Streams

Paul Sandoz paul.sandoz at oracle.com
Sun Oct 27 07:38:32 PDT 2013


On Oct 26, 2013, at 7:45 AM, Stuart Marks <stuart.marks at oracle.com> wrote:

> On 10/25/13 9:34 PM, Stuart Marks wrote:
>>          int[] sums = Arrays.copyOf(numbers, numbers.length);
>>          Arrays.parallelPrefix(sums, Integer::sum);
>>          int[] movsums = Arrays.copyOfRange(sums, N, sums.length);
>>          Arrays.parallelSetAll(movsums, i -> movsums[i] - sums[i]);
>>          double[] means = new double[movsums.length];
>>          Arrays.parallelSetAll(means, i -> (double)movsums[i] / N);
>> 
>> Note that my code snippet omits the first moving average instead of the last
>> one, as in your example.
>> 
>> Finally, you have to watch out for overflow, since the last element of the sums
>> array will be the sum of all elements in the input array. It would be nice if
>> there were a way to use a prefix operation to do the running sum and subtract
>> off the element leaving the window, but I wasn't able to figure out how to do that.
> 
> Of course, right after I posted this I figured out how to do that. :-)
> 
> The trick is, from each element i, to first subtract off the (i-N)th element, 
> and then to do the prefix sum. This gives the moving averages in-place. As a 
> bonus, it saves a copy operation, and as another bonus, it computes all 
> length-N+1 moving averages.
> 
>     int[] temp = Arrays.copyOf(numbers, numbers.length);
>     Arrays.parallelSetAll(temp, i -> temp[i] - ((i < N) ? 0 : numbers[i - N]));
>     Arrays.parallelPrefix(temp, Integer::sum);
>     double[] means = new double[numbers.length];
>     Arrays.parallelSetAll(means, i -> (double)temp[i] / N);
> 
> Elements [0..N-2] of the means array are garbage, but the remaining elements are 
> the moving averages.
> 
> I have a feeling I just reinvented something FORTRAN programmers knew about in 
> the 1960s.
> 

:-) neat.

I believe you could stick to your original use of prefixSum, then switch into streams to avoid the array copy and do one parallel pass to get the double array. Off the top of my head:

  int[] sums = Arrays.copyOf(numbers, numbers.length);
  Arrays.parallelPrefix(sums, Integer::sum);
  IntStream.range(N, sums.length)
    .map(i -> sums[i] - sums[i - N])
    .mapToDouble(s -> (double)s / N)
    .toArray();

We removed the cumulate (aka prefix sum operation, for reasons of "What the heck is that?" and YAGNI, but i liked it). It's the kind of thing that could be easily added as a pluggable operation if/when we enhance the streams to support an SPI.

Paul.


More information about the lambda-dev mailing list