Cancelable streams

Sam Pullara sam at sampullara.com
Fri Nov 2 13:30:38 PDT 2012


Won't you have to invert this and name it 'until' since while isn't
available as a method name?

Generally I like the idea of being able to short circuit things. Perhaps
overload a limit() operation so that it can either take a long or a
predicate?

You could also make this a fluent API where you can add a callback when
cancelled:

stream.filter(...).map(...).limit(i -> i > 10).**forEach(...).onCancel(i ->
...i items processed...);

Sam

On Fri, Nov 2, 2012 at 1:01 PM, Brian Goetz <brian.goetz at oracle.com> wrote:

> We've got a reasonable start at support in the library for infinite
> streams.  Of course, there is the risk that you'll do something that never
> terminates, like:
>
>   infiniteStream.forEach(...)
>
> If you have a short-circuit operation (either an intermediate operation
> like limit(n), or a terminal operation like findFirst), everything works
> fine -- and should work fine even in parallel (once we have parallel
> implementations for limit/findFirst and do a little more implementation
> work on building Spliterators on infinite iterators.)
>
> The other use case that we want to support is where you have an infinite
> data feed and you want to do parallel operations on it "until you're
> tired."  This could mean "chew on it for five minutes and give me what
> you've got", or "chew until a shutdown is requested", etc.  This is
> something that has been explicitly requested by several folks (Sam?) to do
> parallel processing on infinite event streams.
>
> So, something like:
>
>  infiniteIOStream.while(e -> !cancelationRequested)
>                  .filter(...)
>                  .map(....)
>                  .forEach(...)
>
>
> The obvious idea is to have a "while" op.  And this works great for the
> serial case, but I'm not sure it works all that well in the parallel case.
>  The other alternative is to have an abstraction for "cancelable stream":
>
>   CancelableStream<T> stream
>      = Streams.cancelable(generator, () -> !cancelationRequested);
>
> A few questions:
>  - How important is it to be able to find out why the stream processing
> terminated -- because it ran out of data, or because it was canceled?
>
> Then we could do:
>
>   CancelableStream<T> stream
>      = Streams.cancelable(ioStream, e -> !cancelationRequested);
>   stream.filter(...).map(...).**forEach(...)
>   if (stream.wasCanceled()) { .... }
>
> Seems to me that this is something that is more a property of the source
> than of the pipeline.  Otherwise, for operations that consume all input
> (like sorted), there is a real possibility for misuse:
>
>   is.filter(...).sorted(..).**while(...)  // definitely bad
>   is.filter(...).while(...).**sorted(..)  // possible ok
>   is.while(...).filter(...).**sorted(..)  // definitely ok
>
>  - Do we want to support cancelability by content (i.e., stop when you see
> an element that matches this predicate), or is that excessively conflating
> one functionality with another (my thought: these are two different
> functionalities, keep them separate)
>
>  - How does cancelation interact with encounter ordering?
>
> This is largely about whether its OK to process elements that follow the
> one that trigger the cancelation, or whether we have to stop immediately on
> finding the cancelation criteria hold true.
>
>
>


More information about the lambda-libs-spec-observers mailing list