Cancelable streams
Sam Pullara
sam at sampullara.com
Fri Nov 2 13:30:38 PDT 2012
Won't you have to invert this and name it 'until' since while isn't
available as a method name?
Generally I like the idea of being able to short circuit things. Perhaps
overload a limit() operation so that it can either take a long or a
predicate?
You could also make this a fluent API where you can add a callback when
cancelled:
stream.filter(...).map(...).limit(i -> i > 10).**forEach(...).onCancel(i ->
...i items processed...);
Sam
On Fri, Nov 2, 2012 at 1:01 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
> We've got a reasonable start at support in the library for infinite
> streams. Of course, there is the risk that you'll do something that never
> terminates, like:
>
> infiniteStream.forEach(...)
>
> If you have a short-circuit operation (either an intermediate operation
> like limit(n), or a terminal operation like findFirst), everything works
> fine -- and should work fine even in parallel (once we have parallel
> implementations for limit/findFirst and do a little more implementation
> work on building Spliterators on infinite iterators.)
>
> The other use case that we want to support is where you have an infinite
> data feed and you want to do parallel operations on it "until you're
> tired." This could mean "chew on it for five minutes and give me what
> you've got", or "chew until a shutdown is requested", etc. This is
> something that has been explicitly requested by several folks (Sam?) to do
> parallel processing on infinite event streams.
>
> So, something like:
>
> infiniteIOStream.while(e -> !cancelationRequested)
> .filter(...)
> .map(....)
> .forEach(...)
>
>
> The obvious idea is to have a "while" op. And this works great for the
> serial case, but I'm not sure it works all that well in the parallel case.
> The other alternative is to have an abstraction for "cancelable stream":
>
> CancelableStream<T> stream
> = Streams.cancelable(generator, () -> !cancelationRequested);
>
> A few questions:
> - How important is it to be able to find out why the stream processing
> terminated -- because it ran out of data, or because it was canceled?
>
> Then we could do:
>
> CancelableStream<T> stream
> = Streams.cancelable(ioStream, e -> !cancelationRequested);
> stream.filter(...).map(...).**forEach(...)
> if (stream.wasCanceled()) { .... }
>
> Seems to me that this is something that is more a property of the source
> than of the pipeline. Otherwise, for operations that consume all input
> (like sorted), there is a real possibility for misuse:
>
> is.filter(...).sorted(..).**while(...) // definitely bad
> is.filter(...).while(...).**sorted(..) // possible ok
> is.while(...).filter(...).**sorted(..) // definitely ok
>
> - Do we want to support cancelability by content (i.e., stop when you see
> an element that matches this predicate), or is that excessively conflating
> one functionality with another (my thought: these are two different
> functionalities, keep them separate)
>
> - How does cancelation interact with encounter ordering?
>
> This is largely about whether its OK to process elements that follow the
> one that trigger the cancelation, or whether we have to stop immediately on
> finding the cancelation criteria hold true.
>
>
>
More information about the lambda-libs-spec-observers
mailing list