"Cancelable" streams
Brian Goetz
brian.goetz at oracle.com
Sat Dec 8 11:27:33 PST 2012
And another subject that we need to close on -- "cancelable" streams.
The primary use case for this is parallel processing on infinite
streams, such as streams of event data. Here, you might want to process
until some threshold has been reached (find the first N results), until
some external event has occured (process for five minutes and take the
best result; process until the asked to shut down.)
As with .parallel(), the recent stabilization of the
Stream.spliterator() escape hatch provides us a low-overhead way to
support this without introducing new abstractions like CancelableStream
or StreamFuture. Not surprisingly, the answer is a stream op:
stream.cancelOn(BooleanSupplier shouldCancel,
Runnable onCancel)
.filter(...)
.forEach(...)
The way this works is that it polls the supplied BooleanSupplier to ask
"should we cancel now." Once canceled, it acts as a gate shutting; no
more elements are sent downstream, so downstream processing completes as
if the stream were truncated. When cancelation occurs, it calls the
onCancel Runnable so that the client can have a way to know that the
pipeline completed due to cancelation rather than normal completion.
A typical use might be:
stream.cancelOn(() -> (System.currentTimeMillis() < endTime),
() -> cancelFlag.set(true))
.filter(...)
.forEach(...)
The implementation is simple:
Stream<T> cancelOn(...) {
return Streams.stream(cancelingSpliterator(spliterator()),
getStreamFlags());
}
The cancelation model is not "stop abruptly when the cancelation signal
comes", but a more cooperative "use the cancelation signal to indicate
that we should not start any more work." So if you're looking to stop
after finding 10 candidate matches, it might actually find 11 or 12
before it stops -- but that's something the client code can deal with.
For sequential streams, the semantics and implementation of the
canceling spliterator are simple -- once the cancel signal comes, no
more elements are dispensed from the iterator. For parallel streams
WITHOUT a defined encounter order, it is similarly simple -- once the
signal comes, no more elements are dispensed to any subtask, and no more
splits are produced. For parallel streams WITH a defined encounter
order, some more work is needed to define the semantics. A reasonable
semantics would be: identify the latest chunk of input in the encounter
order that has started processing, let any earlier chunks complete
normally, and don't start any later chunks.
This seems simple to spec and implement, unintrusive, and reasonably
intuitive.
More information about the lambda-libs-spec-observers
mailing list