"Cancelable" streams

Sam Pullara sam at sampullara.com
Mon Dec 10 09:09:36 PST 2012


I ran into cancellation quite a few times when I was writing a fairly large codebase using Guava. Here is the typical setup:

1) You are trying to process some subset of the source but since you are filtering you don't know how much it will be, e.g. paginating.
2) The source of the data can't filter at a fine enough resolution so you are doing client side filtering.
3) The source of the data is a shared resource and you want to use it as little as possible, e.g. database connections.
4) The amount of work done to each row is much more than the cost of grabbing the row, i.e. the source is faster than the sink.

So, as you are processing this stream of data, you find you are done. At that point you want to stop pulling from the source and return the connection to the shared pool of sources.

Here is the current gruesome implementation of scanning in Havrobase (on MySQL) that does essentially look ahead for the client ultimately closing the connection and possibly reopening it if the client asks for more. If I had a better interface than Iterator it could have been better:

https://github.com/spullara/havrobase/blob/master/mysql/src/main/java/avrobase/mysql/MysqlAB.java#L631

I think at the end of the day my use case may require more than sink-side cancellation, but that would have helped. In terms of how it is implemented, I can see two other implementations:

1) Stream cancelOn(AtomicBoolean cancelled);
2) Throwing a specific runtime cancellation exception that the framework recognizes and can propagate.

In both of these cases it is obvious to the client that the stream was cancelled.

Sam

On Dec 8, 2012, at 11:27 AM, Brian Goetz <brian.goetz at oracle.com> wrote:

> And another subject that we need to close on -- "cancelable" streams. The primary use case for this is parallel processing on infinite streams, such as streams of event data.  Here, you might want to process until some threshold has been reached (find the first N results), until some external event has occured (process for five minutes and take the best result; process until the asked to shut down.)
> 
> As with .parallel(), the recent stabilization of the Stream.spliterator() escape hatch provides us a low-overhead way to support this without introducing new abstractions like CancelableStream or StreamFuture.  Not surprisingly, the answer is a stream op:
> 
> stream.cancelOn(BooleanSupplier shouldCancel,
>                 Runnable onCancel)
>       .filter(...)
>       .forEach(...)
> 
> The way this works is that it polls the supplied BooleanSupplier to ask "should we cancel now."  Once canceled, it acts as a gate shutting; no more elements are sent downstream, so downstream processing completes as if the stream were truncated.  When cancelation occurs, it calls the onCancel Runnable so that the client can have a way to know that the pipeline completed due to cancelation rather than normal completion.
> 
> A typical use might be:
> 
> stream.cancelOn(() -> (System.currentTimeMillis() < endTime),
>                 () -> cancelFlag.set(true))
>       .filter(...)
>       .forEach(...)
> 
> The implementation is simple:
> 
> Stream<T> cancelOn(...) {
>   return Streams.stream(cancelingSpliterator(spliterator()),
>                         getStreamFlags());
> }
> 
> The cancelation model is not "stop abruptly when the cancelation signal comes", but a more cooperative "use the cancelation signal to indicate that we should not start any more work."  So if you're looking to stop after finding 10 candidate matches, it might actually find 11 or 12 before it stops -- but that's something the client code can deal with.
> 
> 
> For sequential streams, the semantics and implementation of the canceling spliterator are simple -- once the cancel signal comes, no more elements are dispensed from the iterator.  For parallel streams WITHOUT a defined encounter order, it is similarly simple -- once the signal comes, no more elements are dispensed to any subtask, and no more splits are produced.  For parallel streams WITH a defined encounter order, some more work is needed to define the semantics.  A reasonable semantics would be: identify the latest chunk of input in the encounter order that has started processing, let any earlier chunks complete normally, and don't start any later chunks.
> 
> 
> This seems simple to spec and implement, unintrusive, and reasonably intuitive.
> 



More information about the lambda-libs-spec-observers mailing list