Cancelable streams

Joe Bowbeer joe.bowbeer at gmail.com
Sun Nov 4 00:08:36 PDT 2012


I'm familiar with infinite streams (I was a tech. reviewer for the original
Wizard Book) but I'm not familiar with your approach to cancellation.  Is
there any precedent for it?

Usually I just lazily generate a range of the infinite stream and operate
on that.

To handle cancellation, I would signal the generator to stop returning
elements, or to generate a (null) marker to signal end of stream.

In other words, cancellation should be handled at the generator, not by
Streams in general.

Some of this discussion of cancellation reminds me of the Parallel Loop
support in .NET:

Parallel.ForEach returns a ParallelLoopResult, and ParallelLoopState is
accessible inside a loop.  These provide completion information and
cancellation control (aka Break).

I don't think we want every stream to have these capabilities.  In fact, I
think "forEach" should be renamed "each" in order to distance it from the
sequential expectations associated with a "for-loop".

Joe

On Fri, Nov 2, 2012 at 1:01 PM, Brian Goetz wrote:

> We've got a reasonable start at support in the library for infinite
> streams.  Of course, there is the risk that you'll do something that never
> terminates, like:
>
>   infiniteStream.forEach(...)
>
> If you have a short-circuit operation (either an intermediate operation
> like limit(n), or a terminal operation like findFirst), everything works
> fine -- and should work fine even in parallel (once we have parallel
> implementations for limit/findFirst and do a little more implementation
> work on building Spliterators on infinite iterators.)
>
> The other use case that we want to support is where you have an infinite
> data feed and you want to do parallel operations on it "until you're
> tired."  This could mean "chew on it for five minutes and give me what
> you've got", or "chew until a shutdown is requested", etc.  This is
> something that has been explicitly requested by several folks (Sam?) to do
> parallel processing on infinite event streams.
>
> So, something like:
>
>  infiniteIOStream.while(e -> !cancelationRequested)
>                  .filter(...)
>                  .map(....)
>                  .forEach(...)
>
>
> The obvious idea is to have a "while" op.  And this works great for the
> serial case, but I'm not sure it works all that well in the parallel case.
>  The other alternative is to have an abstraction for "cancelable stream":
>
>   CancelableStream<T> stream
>      = Streams.cancelable(generator, () -> !cancelationRequested);
>
> A few questions:
>  - How important is it to be able to find out why the stream processing
> terminated -- because it ran out of data, or because it was canceled?
>
> Then we could do:
>
>   CancelableStream<T> stream
>      = Streams.cancelable(ioStream, e -> !cancelationRequested);
>   stream.filter(...).map(...).**forEach(...)
>   if (stream.wasCanceled()) { .... }
>
> Seems to me that this is something that is more a property of the source
> than of the pipeline.  Otherwise, for operations that consume all input
> (like sorted), there is a real possibility for misuse:
>
>   is.filter(...).sorted(..).**while(...)  // definitely bad
>   is.filter(...).while(...).**sorted(..)  // possible ok
>   is.while(...).filter(...).**sorted(..)  // definitely ok
>
>  - Do we want to support cancelability by content (i.e., stop when you see
> an element that matches this predicate), or is that excessively conflating
> one functionality with another (my thought: these are two different
> functionalities, keep them separate)
>
>  - How does cancelation interact with encounter ordering?
>
> This is largely about whether its OK to process elements that follow the
> one that trigger the cancelation, or whether we have to stop immediately on
> finding the cancelation criteria hold true.
>
>
>


More information about the lambda-libs-spec-observers mailing list