"Cancelable" streams

Remi Forax forax at univ-mlv.fr
Sat Dec 8 13:34:16 PST 2012


On 12/08/2012 08:27 PM, Brian Goetz wrote:
> And another subject that we need to close on -- "cancelable" streams. 
> The primary use case for this is parallel processing on infinite 
> streams, such as streams of event data.  Here, you might want to 
> process until some threshold has been reached (find the first N 
> results), until some external event has occured (process for five 
> minutes and take the best result; process until the asked to shut down.)
>
> As with .parallel(), the recent stabilization of the 
> Stream.spliterator() escape hatch provides us a low-overhead way to 
> support this without introducing new abstractions like 
> CancelableStream or StreamFuture.  Not surprisingly, the answer is a 
> stream op:
>
>   stream.cancelOn(BooleanSupplier shouldCancel,
>                   Runnable onCancel)
>         .filter(...)
>         .forEach(...)
>
> The way this works is that it polls the supplied BooleanSupplier to 
> ask "should we cancel now."  Once canceled, it acts as a gate 
> shutting; no more elements are sent downstream, so downstream 
> processing completes as if the stream were truncated.  When 
> cancelation occurs, it calls the onCancel Runnable so that the client 
> can have a way to know that the pipeline completed due to cancelation 
> rather than normal completion.
>
> A typical use might be:
>
>   stream.cancelOn(() -> (System.currentTimeMillis() < endTime),
>                   () -> cancelFlag.set(true))
>         .filter(...)
>         .forEach(...)
>
> The implementation is simple:
>
>   Stream<T> cancelOn(...) {
>     return Streams.stream(cancelingSpliterator(spliterator()),
>                           getStreamFlags());
>   }
>
> The cancelation model is not "stop abruptly when the cancelation 
> signal comes", but a more cooperative "use the cancelation signal to 
> indicate that we should not start any more work."  So if you're 
> looking to stop after finding 10 candidate matches, it might actually 
> find 11 or 12 before it stops -- but that's something the client code 
> can deal with.
>
>
> For sequential streams, the semantics and implementation of the 
> canceling spliterator are simple -- once the cancel signal comes, no 
> more elements are dispensed from the iterator.  For parallel streams 
> WITHOUT a defined encounter order, it is similarly simple -- once the 
> signal comes, no more elements are dispensed to any subtask, and no 
> more splits are produced.  For parallel streams WITH a defined 
> encounter order, some more work is needed to define the semantics.  A 
> reasonable semantics would be: identify the latest chunk of input in 
> the encounter order that has started processing, let any earlier 
> chunks complete normally, and don't start any later chunks.
>
>
> This seems simple to spec and implement, unintrusive, and reasonably 
> intuitive.
>

The main issue is that your example uses side effect,
   () -> cancelFlag.set(true)
which goes against the model

 > Here, you might want to process until some threshold has been reached
 > (find the first N results), until some external event has occured
 > (process for five minutes and take the best result; process until the 
asked to shut down.)

find the N results => use limit()
some external events => don't use stream, use fork join directly.

fitting the whole word and the kitchen sink into the Stream API is not a 
goal.

regards,
Rémi



More information about the lambda-libs-spec-observers mailing list