"Cancelable" streams
Remi Forax
forax at univ-mlv.fr
Sat Dec 8 13:34:16 PST 2012
On 12/08/2012 08:27 PM, Brian Goetz wrote:
> And another subject that we need to close on -- "cancelable" streams.
> The primary use case for this is parallel processing on infinite
> streams, such as streams of event data. Here, you might want to
> process until some threshold has been reached (find the first N
> results), until some external event has occured (process for five
> minutes and take the best result; process until the asked to shut down.)
>
> As with .parallel(), the recent stabilization of the
> Stream.spliterator() escape hatch provides us a low-overhead way to
> support this without introducing new abstractions like
> CancelableStream or StreamFuture. Not surprisingly, the answer is a
> stream op:
>
> stream.cancelOn(BooleanSupplier shouldCancel,
> Runnable onCancel)
> .filter(...)
> .forEach(...)
>
> The way this works is that it polls the supplied BooleanSupplier to
> ask "should we cancel now." Once canceled, it acts as a gate
> shutting; no more elements are sent downstream, so downstream
> processing completes as if the stream were truncated. When
> cancelation occurs, it calls the onCancel Runnable so that the client
> can have a way to know that the pipeline completed due to cancelation
> rather than normal completion.
>
> A typical use might be:
>
> stream.cancelOn(() -> (System.currentTimeMillis() < endTime),
> () -> cancelFlag.set(true))
> .filter(...)
> .forEach(...)
>
> The implementation is simple:
>
> Stream<T> cancelOn(...) {
> return Streams.stream(cancelingSpliterator(spliterator()),
> getStreamFlags());
> }
>
> The cancelation model is not "stop abruptly when the cancelation
> signal comes", but a more cooperative "use the cancelation signal to
> indicate that we should not start any more work." So if you're
> looking to stop after finding 10 candidate matches, it might actually
> find 11 or 12 before it stops -- but that's something the client code
> can deal with.
>
>
> For sequential streams, the semantics and implementation of the
> canceling spliterator are simple -- once the cancel signal comes, no
> more elements are dispensed from the iterator. For parallel streams
> WITHOUT a defined encounter order, it is similarly simple -- once the
> signal comes, no more elements are dispensed to any subtask, and no
> more splits are produced. For parallel streams WITH a defined
> encounter order, some more work is needed to define the semantics. A
> reasonable semantics would be: identify the latest chunk of input in
> the encounter order that has started processing, let any earlier
> chunks complete normally, and don't start any later chunks.
>
>
> This seems simple to spec and implement, unintrusive, and reasonably
> intuitive.
>
The main issue is that your example uses side effect,
() -> cancelFlag.set(true)
which goes against the model
> Here, you might want to process until some threshold has been reached
> (find the first N results), until some external event has occured
> (process for five minutes and take the best result; process until the
asked to shut down.)
find the N results => use limit()
some external events => don't use stream, use fork join directly.
fitting the whole word and the kitchen sink into the Stream API is not a
goal.
regards,
Rémi
More information about the lambda-libs-spec-observers
mailing list