Internal and External truncation conditions

Sam Pullara sam at sampullara.com
Sat Feb 9 15:49:12 PST 2013


I think the point of forEachUntil is that Brian doesn't want to do
this as it has issues when parallelized. I think that it is an
important enough use case that we handle it anyway. This is better
than forEachUtil though.

Sam

On Sat, Feb 9, 2013 at 3:24 PM, Remi Forax <forax at univ-mlv.fr> wrote:
> if forEachUntil takes a function that return a boolean, it's easy.
>
>
> try (BufferedReader r = Files.newBufferedReader(path,
> Charset.defaultCharset())) {
>   return r.lines().parallel().forEachWhile(element -> {
>      if (regex.matcher(line).matches()) {
>        return false;
>      }
>      ...process the line
>      return true;
>    }
> }
>
> cheers,
> Rémi
>
>
> On 02/09/2013 08:55 PM, Sam Pullara wrote:
>>
>> Let's say you only want to process lines until it matches a regex.
>> Here is one way you could try to implement it:
>>
>> AtomicBoolean done = new AtomicBoolean();
>> Pattern regex = ...;
>> try (BufferedReader r = Files.newBufferedReader(path,
>> Charset.defaultCharset())) {
>>    r.lines().forEachUntil( (line) -> {
>>      if (!done.get()) {
>>        if (regex.matcher(line).matches()) {
>>          done.set(true);
>>        } else {
>>          ...process the line...
>>        }
>>      }
>>    }, done::get);
>> }
>>
>> In the parallel case this completely breaks down since the lines can
>> be processed out of order. Gate would have to ensure that didn't
>> happen.
>>
>> Pattern regex = ...;
>> try (BufferedReader r = Files.newBufferedReader(path,
>> Charset.defaultCharset())) {
>>    r.lines().gate(() -> !regex.matcher(line).matches()).forEach((line)
>> -> .. process line ..);
>> }
>>
>> If we wanted to cancel the operation asynchronously we would just call
>> Stream.close() and that should work with any stream. In the current
>> case we can't stop a stream from continuing to execute without
>> explicitly adding a forEachUntil() at the end of it with a condition
>> variable that we then change out of band. Also, since there may be
>> reduction operations in the middle, that may not even stop all of
>> those operations from completing. This can be especially bad for
>> things that should timeout.
>>
>> Sam
>>
>> On Sat, Feb 9, 2013 at 11:36 AM, Joe Bowbeer <joe.bowbeer at gmail.com>
>> wrote:
>>>
>>> I haven't used either of these.  If I wanted to create an example I'd
>>> probably start with a stream of lines() from a BufferedReader, and then
>>> tack
>>> on a use case.
>>>
>>> try (BufferedReader r = Files.newBufferedReader(path,
>>> Charset.defaultCharset())) {
>>>    r.lines().forEachUntil(...);
>>> }
>>>
>>>
>>> Do you have something specific in mind?
>>>
>>>
>>> On Sat, Feb 9, 2013 at 11:26 AM, Sam Pullara <sam at sampullara.com> wrote:
>>>>
>>>> Now that we are further along, I wanted to bring this up again. I
>>>> don't think that forEachUntil is sufficient for handling internal and
>>>> external conditions that should truncate stream processing. I've also
>>>> looked at CloseableStream and that doesn't appear to help since it
>>>> isn't possible to wrap a Stream (say an infinite stream) with a
>>>> CloseableStream and get the necessary semantics of cancellation. Also,
>>>> other APIs that don't consider that you might give them a
>>>> CloseableStream will likely still give you back a Stream thus losing
>>>> the semantics.
>>>>
>>>> Is everyone else happy with forEachUntil and CloseableStream?
>>>>
>>>> Sam
>>>>
>>>> ---------- Forwarded message ----------
>>>> From: Sam Pullara <sam at sampullara.com>
>>>> Date: Mon, Dec 31, 2012 at 8:34 AM
>>>> Subject: Re: Cancelation -- use cases
>>>> To: Brian Goetz <brian.goetz at oracle.com>
>>>> Cc: "lambda-libs-spec-experts at openjdk.java.net"
>>>> <lambda-libs-spec-experts at openjdk.java.net>
>>>>
>>>> I think we are conflating two things with this solution and it doesn't
>>>> work for them in my mind. Here is what I would like the solution to
>>>> cover:
>>>>
>>>> - External conditions (cancellation, cleanup)
>>>> - Internal conditions (gating based on count, elements and results)
>>>>
>>>> The first one may be the only one that works in the parallel case. It
>>>> should likely be implemented with .close() on stream that would stop
>>>> the stream as soon as possible. This would be useful for things like
>>>> timeouts. Kind of like calling close on an inputstream in the middle
>>>> of reading it. The other one I think is necessary and hard to
>>>> implement correctly with the parallel case. For instance I would like
>>>> to say:
>>>>
>>>> stream.gate(e -> e < 10).forEach(e -> …)
>>>>
>>>> OR
>>>>
>>>> stream.gate( (e, i) -> e < 10 || i > 10).forEach(e -> …) // i is the
>>>> number of the current element
>>>>
>>>> That should give me every element in the stream until an element isn't
>>>> < 10 and then stop processing elements. Further, there should be some
>>>> way for the stream source to be notified that we are done consuming it
>>>> in case it is of unknown length or consumes resources. That would be
>>>> more like (assuming we add a Runnable call to Timer):
>>>>
>>>> Stream stream = ….
>>>> new Timer().schedule(() -> stream.close(), 5000);
>>>> stream.forEach(e -> ….);
>>>>
>>>> OR
>>>>
>>>> stream.forEach(e -> try { … } catch() { stream.close() } );
>>>>
>>>> Sadly, the first gate() case doesn't work well when parallelized. I'm
>>>> willing to just specify what the behavior is for that case to get it
>>>> into the API. For example, I would probably say something like "the
>>>> gate will need to return false once per split to stop processing". In
>>>> either of these cases I think one of the motivations needs to be that
>>>> the stream may be using resources and we need to tell the source that
>>>> we are done consuming it. For example, if the stream is sourced from a
>>>> file, database or even a large amount of memory there should be a
>>>> notification mechanism for doneness that will allow those resources to
>>>> be returned before the stream is exhausted. To that end I think that
>>>> Stream should implement AutoCloseable but overridden with no checked
>>>> exception.
>>>>
>>>> interface Stream<T> implements AutoCloseable {
>>>>    /**
>>>>     * Closes this stream and releases any system resources associated
>>>>     * with it. If the stream is already closed then invoking this
>>>>     * method has no effect. Close is automatically called when the
>>>>     * stream is exhausted. After this is called, no further elements
>>>>     * will be processed by the stream but currently processing elements
>>>>     * will complete normally. Calling other methods on a closed stream
>>>> will
>>>>     * produce IllegalStateExceptions.
>>>>     */
>>>>    void close();
>>>>
>>>>    /**
>>>>     * When the continueProcessing function returns false, no further
>>>>     * elements will be processed after the gate. In the parallel stream
>>>>     * case no further elements will be processed in the current split.
>>>>     */
>>>>    Stream<T> gate(Function<T, Boolean> until);
>>>>
>>>>    /**
>>>>     * As gate with the addition of the current element number.
>>>>     */
>>>>    Stream<T> gate(BiFunction<T, Integer, Boolean> until);
>>>> }
>>>>
>>>> This API avoids a lot of side effects that forEachUntil would require
>>>> implement these use cases.
>>>>
>>>> Sam
>>>>
>>>> On Dec 30, 2012, at 7:53 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>>>>
>>>> Here's a lower-complexity version of cancel, that still satisfies (in
>>>> series or in parallel) use cases like the following:
>>>>
>>>>>    - Find the best possible move after thinking for 5 seconds
>>>>>    - Find the first solution that is better than X
>>>>>    - Gather solutions until we have 100 of them
>>>>
>>>> without bringing in the complexity or time/space overhead of dealing
>>>> with encounter order.
>>>>
>>>> Since the forEach() operation works exclusively on the basis of
>>>> temporal/arrival order rather than spatial/encounter order (elements
>>>> are passed to the lambda in whatever order they are available, in
>>>> whatever thread they are available), we could make a canceling variant
>>>> of forEach:
>>>>
>>>>   .forEachUntil(Block<T> sink, BooleanSupplier until)
>>>>
>>>> Here, there is no confusion about what happens in the ordered case, no
>>>> need to buffer elements, etc.  Elements flow into the block until the
>>>> termination condition transpires, at which point there are no more
>>>> splits and existing splits dispense no more elements.
>>>>
>>>> I implemented this (it was trivial) and wrote a simple test program to
>>>> calculate primes sequentially and in parallel, counting how many could
>>>> be calculated in a fixed amount of time, starting from an infinite
>>>> generator and filtering out composites:
>>>>
>>>>             Streams.iterate(from, i -> i + 1)  // sequential
>>>>                     .filter(i -> isPrime(i))
>>>>                     .forEachUntil(i -> {
>>>>                         chm.put(i, true);
>>>>                     }, () -> System.currentTimeMillis() >= start+num);
>>>>
>>>> vs
>>>>
>>>>             Streams.iterate(from, i -> i+1)    // parallel
>>>>                     .parallel()
>>>>                     .filter(i -> isPrime(i))
>>>>                     .forEachUntil(i -> {
>>>>                         chm.put(i, true);
>>>>                     }, () -> System.currentTimeMillis() >= start+num);
>>>>
>>>> On a 4-core Q6600 system, in a fixed amount of time, the parallel
>>>> version gathered ~3x as many primes.
>>>>
>>>> In terms of being able to perform useful computations on infinite
>>>> streams, this seems a pretty attractive price-performer; lower spec
>>>> and implementation complexity, and covers many of the use cases which
>>>> would otherwise be impractical to attack with the stream approach.
>>>>
>>>>
>>>>
>>>> On 12/28/2012 11:20 AM, Brian Goetz wrote:
>>>>
>>>> I've been working through some alternatives for cancellation support in
>>>> infinite streams.  Looking to gather some use case background to help
>>>> evaluate the alternatives.
>>>>
>>>> In the serial case, the "gate" approach works fine -- after some
>>>> criteria transpires, stop sending elements downstream.  The pipeline
>>>> flushes the elements it has, and completes early.
>>>>
>>>> In the parallel unordered case, the gate approach similarly works fine
>>>> -- after the cancelation criteria occurs, no new splits are created, and
>>>> existing splits dispense no more elements.  The computation similarly
>>>> quiesces after elements currently being processed are completed,
>>>> possibly along with any up-tree merging to combine results.
>>>>
>>>> It is the parallel ordered case that is tricky.  Supposing we partition
>>>> a stream into
>>>>    (a1,a2,a3), (a4,a5,a6)
>>>>
>>>> And suppose further we happen to be processing a5 when the bell goes
>>>> off.  Do we want to wait for all a_i, i<5, to finish before letting the
>>>> computation quiesce?
>>>>
>>>> My gut says: for the things we intend to cancel, most of them will be
>>>> order-insensitive anyway.  Things like:
>>>>
>>>>   - Find the best possible move after thinking for 5 seconds
>>>>   - Find the first solution that is better than X
>>>>   - Gather solutions until we have 100 of them
>>>>
>>>> I believe the key use case for cancelation here will be when we are
>>>> chewing on potentially infinite streams of events (probably backed by
>>>> IO) where we want to chew until we're asked to shut down, and want to
>>>> get as much parallelism as we can cheaply.  Which suggests to me the
>>>> intersection between order-sensitive stream pipelines and cancelable
>>>> stream pipelines is going to be pretty small indeed.
>>>>
>>>> Anyone want to add to this model of use cases for cancelation?
>>>
>>>
>


More information about the lambda-libs-spec-experts mailing list