Cancelable streams
Brian Goetz
brian.goetz at oracle.com
Fri Nov 2 13:01:40 PDT 2012
We've got a reasonable start at support in the library for infinite
streams. Of course, there is the risk that you'll do something that
never terminates, like:
infiniteStream.forEach(...)
If you have a short-circuit operation (either an intermediate operation
like limit(n), or a terminal operation like findFirst), everything works
fine -- and should work fine even in parallel (once we have parallel
implementations for limit/findFirst and do a little more implementation
work on building Spliterators on infinite iterators.)
The other use case that we want to support is where you have an infinite
data feed and you want to do parallel operations on it "until you're
tired." This could mean "chew on it for five minutes and give me what
you've got", or "chew until a shutdown is requested", etc. This is
something that has been explicitly requested by several folks (Sam?) to
do parallel processing on infinite event streams.
So, something like:
infiniteIOStream.while(e -> !cancelationRequested)
.filter(...)
.map(....)
.forEach(...)
The obvious idea is to have a "while" op. And this works great for the
serial case, but I'm not sure it works all that well in the parallel
case. The other alternative is to have an abstraction for "cancelable
stream":
CancelableStream<T> stream
= Streams.cancelable(generator, () -> !cancelationRequested);
A few questions:
- How important is it to be able to find out why the stream processing
terminated -- because it ran out of data, or because it was canceled?
Then we could do:
CancelableStream<T> stream
= Streams.cancelable(ioStream, e -> !cancelationRequested);
stream.filter(...).map(...).forEach(...)
if (stream.wasCanceled()) { .... }
Seems to me that this is something that is more a property of the source
than of the pipeline. Otherwise, for operations that consume all input
(like sorted), there is a real possibility for misuse:
is.filter(...).sorted(..).while(...) // definitely bad
is.filter(...).while(...).sorted(..) // possible ok
is.while(...).filter(...).sorted(..) // definitely ok
- Do we want to support cancelability by content (i.e., stop when you
see an element that matches this predicate), or is that excessively
conflating one functionality with another (my thought: these are two
different functionalities, keep them separate)
- How does cancelation interact with encounter ordering?
This is largely about whether its OK to process elements that follow the
one that trigger the cancelation, or whether we have to stop immediately
on finding the cancelation criteria hold true.
More information about the lambda-libs-spec-observers
mailing list