"Cancelable" streams

Brian Goetz brian.goetz at oracle.com
Sat Dec 8 11:27:33 PST 2012


And another subject that we need to close on -- "cancelable" streams. 
The primary use case for this is parallel processing on infinite 
streams, such as streams of event data.  Here, you might want to process 
until some threshold has been reached (find the first N results), until 
some external event has occured (process for five minutes and take the 
best result; process until the asked to shut down.)

As with .parallel(), the recent stabilization of the 
Stream.spliterator() escape hatch provides us a low-overhead way to 
support this without introducing new abstractions like CancelableStream 
or StreamFuture.  Not surprisingly, the answer is a stream op:

   stream.cancelOn(BooleanSupplier shouldCancel,
                   Runnable onCancel)
         .filter(...)
         .forEach(...)

The way this works is that it polls the supplied BooleanSupplier to ask 
"should we cancel now."  Once canceled, it acts as a gate shutting; no 
more elements are sent downstream, so downstream processing completes as 
if the stream were truncated.  When cancelation occurs, it calls the 
onCancel Runnable so that the client can have a way to know that the 
pipeline completed due to cancelation rather than normal completion.

A typical use might be:

   stream.cancelOn(() -> (System.currentTimeMillis() < endTime),
                   () -> cancelFlag.set(true))
         .filter(...)
         .forEach(...)

The implementation is simple:

   Stream<T> cancelOn(...) {
     return Streams.stream(cancelingSpliterator(spliterator()),
                           getStreamFlags());
   }

The cancelation model is not "stop abruptly when the cancelation signal 
comes", but a more cooperative "use the cancelation signal to indicate 
that we should not start any more work."  So if you're looking to stop 
after finding 10 candidate matches, it might actually find 11 or 12 
before it stops -- but that's something the client code can deal with.


For sequential streams, the semantics and implementation of the 
canceling spliterator are simple -- once the cancel signal comes, no 
more elements are dispensed from the iterator.  For parallel streams 
WITHOUT a defined encounter order, it is similarly simple -- once the 
signal comes, no more elements are dispensed to any subtask, and no more 
splits are produced.  For parallel streams WITH a defined encounter 
order, some more work is needed to define the semantics.  A reasonable 
semantics would be: identify the latest chunk of input in the encounter 
order that has started processing, let any earlier chunks complete 
normally, and don't start any later chunks.


This seems simple to spec and implement, unintrusive, and reasonably 
intuitive.



More information about the lambda-libs-spec-observers mailing list