Cancelable streams

Brian Goetz brian.goetz at oracle.com
Fri Nov 2 13:01:40 PDT 2012


We've got a reasonable start at support in the library for infinite 
streams.  Of course, there is the risk that you'll do something that 
never terminates, like:

   infiniteStream.forEach(...)

If you have a short-circuit operation (either an intermediate operation 
like limit(n), or a terminal operation like findFirst), everything works 
fine -- and should work fine even in parallel (once we have parallel 
implementations for limit/findFirst and do a little more implementation 
work on building Spliterators on infinite iterators.)

The other use case that we want to support is where you have an infinite 
data feed and you want to do parallel operations on it "until you're 
tired."  This could mean "chew on it for five minutes and give me what 
you've got", or "chew until a shutdown is requested", etc.  This is 
something that has been explicitly requested by several folks (Sam?) to 
do parallel processing on infinite event streams.

So, something like:

  infiniteIOStream.while(e -> !cancelationRequested)
                  .filter(...)
                  .map(....)
                  .forEach(...)


The obvious idea is to have a "while" op.  And this works great for the 
serial case, but I'm not sure it works all that well in the parallel 
case.  The other alternative is to have an abstraction for "cancelable 
stream":

   CancelableStream<T> stream
      = Streams.cancelable(generator, () -> !cancelationRequested);

A few questions:
  - How important is it to be able to find out why the stream processing 
terminated -- because it ran out of data, or because it was canceled?

Then we could do:

   CancelableStream<T> stream
      = Streams.cancelable(ioStream, e -> !cancelationRequested);
   stream.filter(...).map(...).forEach(...)
   if (stream.wasCanceled()) { .... }

Seems to me that this is something that is more a property of the source 
than of the pipeline.  Otherwise, for operations that consume all input 
(like sorted), there is a real possibility for misuse:

   is.filter(...).sorted(..).while(...)  // definitely bad
   is.filter(...).while(...).sorted(..)  // possible ok
   is.while(...).filter(...).sorted(..)  // definitely ok

  - Do we want to support cancelability by content (i.e., stop when you 
see an element that matches this predicate), or is that excessively 
conflating one functionality with another (my thought: these are two 
different functionalities, keep them separate)

  - How does cancelation interact with encounter ordering?

This is largely about whether its OK to process elements that follow the 
one that trigger the cancelation, or whether we have to stop immediately 
on finding the cancelation criteria hold true.




More information about the lambda-libs-spec-observers mailing list