Idiomatic Windowing on Streams
Stuart Marks
stuart.marks at oracle.com
Fri Oct 25 21:34:07 PDT 2013
On 10/25/13 3:49 PM, Richard Warburton wrote:
> Sometimes its useful to be able to operate on a time-window over a stream
> of data with a defined encounter order. So for an element at index i, I
> want to perform an operation on a stream of that element and the next n
> elements. The most trivial example I can think of is a simple moving
> average of data points.
>
> I'm currently struggling to find a way to idiomatically express this in the
> Streams framework. My current best effort is to take a stream of indices
> and map those indices to a substream of my original stream of data and I
> can then do some reduction over those elements. So here's an SMA for
> example:
Yeah, this is hard with the current streams framework. The problem is that
streams aren't indexed, so stream operations can't reference their neighbors,
nor can they find the "current" position in order to try to pull particular
values out of the source data.
> double[] means =
> IntStream.range(0, numbers.size() - N)
> .mapToDouble(i ->
> numbers.stream()
> .substream(i, i + N)
> .mapToInt(x -> x)
> .average()
> .getAsDouble())
> .toArray();
Driving the stream using indexes helps in some cases, as you've done here, so
the stream contains positions instead of the values from the source. Of course,
this depends on having random access to the source.
Unfortunately doing a moving average this way ends up performing a lot of
redundant operations. Each time the window moves it does a complete sum of all
the values in the window, instead of adding in the value entering the window and
subtracting off the value leaving the window.
I think what you really want is something like vector operations. Java 8 has
some new array-based stuff that does parallel vector operations. A key operation
is prefix sum, which for each element, computes the running sum (or whatever) of
all elements up to and including this one. A moving sum is the prefix sum vector
shifted by the window size, minus the original prefix sum vector. Finally you
divide this vector by the window size to get the moving average. Thus:
// assume that input is: int[] numbers
int[] sums = Arrays.copyOf(numbers, numbers.length);
Arrays.parallelPrefix(sums, Integer::sum);
int[] movsums = Arrays.copyOfRange(sums, N, sums.length);
Arrays.parallelSetAll(movsums, i -> movsums[i] - sums[i]);
double[] means = new double[movsums.length];
Arrays.parallelSetAll(means, i -> (double)movsums[i] / N);
Unfortunately these operations are done in-place so we have to do some copying.
Not as elegant as the streams API.
Note that my code snippet omits the first moving average instead of the last
one, as in your example.
Finally, you have to watch out for overflow, since the last element of the sums
array will be the sum of all elements in the input array. It would be nice if
there were a way to use a prefix operation to do the running sum and subtract
off the element leaving the window, but I wasn't able to figure out how to do that.
s'marks
> Is there a better approach than this?
>
> regards,
>
> Dr. Richard Warburton
>
> http://insightfullogic.com
> @RichardWarburto <http://twitter.com/richardwarburto>
>
More information about the lambda-dev
mailing list