Idiomatic Windowing on Streams

Stuart Marks stuart.marks at oracle.com
Fri Oct 25 21:34:07 PDT 2013



On 10/25/13 3:49 PM, Richard Warburton wrote:
> Sometimes its useful to be able to operate on a time-window over a stream
> of data with a defined encounter order. So for an element at index i, I
> want to perform an operation on a stream of that element and the next n
> elements. The most trivial example I can think of is a simple moving
> average of data points.
>
> I'm currently struggling to find a way to idiomatically express this in the
> Streams framework.  My current best effort is to take a stream of indices
> and map those indices to a substream of my original stream of data and I
> can then do some reduction over those elements.  So here's an SMA for
> example:

Yeah, this is hard with the current streams framework. The problem is that 
streams aren't indexed, so stream operations can't reference their neighbors, 
nor can they find the "current" position in order to try to pull particular 
values out of the source data.

> double[] means =
>      IntStream.range(0, numbers.size() - N)
>               .mapToDouble(i ->
>                  numbers.stream()
>                         .substream(i, i + N)
>                         .mapToInt(x -> x)
>                         .average()
>                         .getAsDouble())
>               .toArray();

Driving the stream using indexes helps in some cases, as you've done here, so 
the stream contains positions instead of the values from the source. Of course, 
this depends on having random access to the source.

Unfortunately doing a moving average this way ends up performing a lot of 
redundant operations. Each time the window moves it does a complete sum of all 
the values in the window, instead of adding in the value entering the window and 
subtracting off the value leaving the window.

I think what you really want is something like vector operations. Java 8 has 
some new array-based stuff that does parallel vector operations. A key operation 
is prefix sum, which for each element, computes the running sum (or whatever) of 
all elements up to and including this one. A moving sum is the prefix sum vector 
shifted by the window size, minus the original prefix sum vector. Finally you 
divide this vector by the window size to get the moving average. Thus:

         // assume that input is: int[] numbers

         int[] sums = Arrays.copyOf(numbers, numbers.length);
         Arrays.parallelPrefix(sums, Integer::sum);
         int[] movsums = Arrays.copyOfRange(sums, N, sums.length);
         Arrays.parallelSetAll(movsums, i -> movsums[i] - sums[i]);
         double[] means = new double[movsums.length];
         Arrays.parallelSetAll(means, i -> (double)movsums[i] / N);

Unfortunately these operations are done in-place so we have to do some copying. 
Not as elegant as the streams API.

Note that my code snippet omits the first moving average instead of the last 
one, as in your example.

Finally, you have to watch out for overflow, since the last element of the sums 
array will be the sum of all elements in the input array. It would be nice if 
there were a way to use a prefix operation to do the running sum and subtract 
off the element leaving the window, but I wasn't able to figure out how to do that.

s'marks


> Is there a better approach than this?
>
> regards,
>
>    Dr. Richard Warburton
>
>    http://insightfullogic.com
>    @RichardWarburto <http://twitter.com/richardwarburto>
>


More information about the lambda-dev mailing list