Cancelable streams

Paul Sandoz paul.sandoz at oracle.com
Mon Nov 5 02:03:40 PST 2012


On Nov 2, 2012, at 9:01 PM, Brian Goetz <brian.goetz at Oracle.COM> wrote:

> We've got a reasonable start at support in the library for infinite streams.  Of course, there is the risk that you'll do something that never terminates, like:
> 
>  infiniteStream.forEach(...)
> 
> If you have a short-circuit operation (either an intermediate operation like limit(n), or a terminal operation like findFirst), everything works fine -- and should work fine even in parallel (once we have parallel implementations for limit/findFirst and do a little more implementation work on building Spliterators on infinite iterators.)
> 
> The other use case that we want to support is where you have an infinite data feed and you want to do parallel operations on it "until you're tired."  This could mean "chew on it for five minutes and give me what you've got", or "chew until a shutdown is requested", etc.  This is something that has been explicitly requested by several folks (Sam?) to do parallel processing on infinite event streams.
> 
> So, something like:
> 
> infiniteIOStream.while(e -> !cancelationRequested)
>                 .filter(...)
>                 .map(....)
>                 .forEach(...)
> 
> 
> The obvious idea is to have a "while" op.  And this works great for the serial case, but I'm not sure it works all that well in the parallel case.  The other alternative is to have an abstraction for "cancelable stream":
> 

I wonder how different that is to a limit op or while op in terms of parallel evaluation.

Potentially, upstream elements might be eagerly evaluated and the results discarded if the short-circuit occurred while tasks are running. 

Is that ok if a stream is cancelled?

I tend to think cancelation is associated with the source rather than associated with an operation, i.e. something to turn off the tap given an an external event, or a source value or both:

- Once a stream is cancelled no more elements may be pulled from the source (i am avoiding the un-cancel use-case for now). Given that a flatmap op may produce more two or more output elements for a given input element this does not mean because the source was cancelled there are no more output elements.

- The limit/while/skip operations have leeway to be greedy and throw away results that are eagerly calculated and are no longer valid.

- If cancellation was an operation then i am struggling to see what would be the fundamental difference (beyond the capability of reporting a reason for cancellation) to that of short-circuiting ops such as limit/while [*].

Paul.

[*] If we include a while op i think we should rename limit to take and have takeWhile as the one that accepts a predicate.


>  CancelableStream<T> stream
>     = Streams.cancelable(generator, () -> !cancelationRequested);
> 
> A few questions:
> - How important is it to be able to find out why the stream processing terminated -- because it ran out of data, or because it was canceled?
> 
> Then we could do:
> 
>  CancelableStream<T> stream
>     = Streams.cancelable(ioStream, e -> !cancelationRequested);
>  stream.filter(...).map(...).forEach(...)
>  if (stream.wasCanceled()) { .... }
> 
> Seems to me that this is something that is more a property of the source than of the pipeline.  Otherwise, for operations that consume all input (like sorted), there is a real possibility for misuse:
> 
>  is.filter(...).sorted(..).while(...)  // definitely bad
>  is.filter(...).while(...).sorted(..)  // possible ok
>  is.while(...).filter(...).sorted(..)  // definitely ok
> 
> - Do we want to support cancelability by content (i.e., stop when you see an element that matches this predicate), or is that excessively conflating one functionality with another (my thought: these are two different functionalities, keep them separate)
> 
> - How does cancelation interact with encounter ordering?
> 
> This is largely about whether its OK to process elements that follow the one that trigger the cancelation, or whether we have to stop immediately on finding the cancelation criteria hold true.
> 
> 



More information about the lambda-libs-spec-observers mailing list