Internal and External truncation conditions

Remi Forax forax at univ-mlv.fr
Sat Feb 9 15:24:39 PST 2013


if forEachUntil takes a function that return a boolean, it's easy.

try (BufferedReader r = Files.newBufferedReader(path, Charset.defaultCharset())) {
   return r.lines().parallel().forEachWhile(element -> {
      if (regex.matcher(line).matches()) {
        return false;
      }
      ...process the line
      return true;
    }
}

cheers,
Rémi

On 02/09/2013 08:55 PM, Sam Pullara wrote:
> Let's say you only want to process lines until it matches a regex.
> Here is one way you could try to implement it:
>
> AtomicBoolean done = new AtomicBoolean();
> Pattern regex = ...;
> try (BufferedReader r = Files.newBufferedReader(path,
> Charset.defaultCharset())) {
>    r.lines().forEachUntil( (line) -> {
>      if (!done.get()) {
>        if (regex.matcher(line).matches()) {
>          done.set(true);
>        } else {
>          ...process the line...
>        }
>      }
>    }, done::get);
> }
>
> In the parallel case this completely breaks down since the lines can
> be processed out of order. Gate would have to ensure that didn't
> happen.
>
> Pattern regex = ...;
> try (BufferedReader r = Files.newBufferedReader(path,
> Charset.defaultCharset())) {
>    r.lines().gate(() -> !regex.matcher(line).matches()).forEach((line)
> -> .. process line ..);
> }
>
> If we wanted to cancel the operation asynchronously we would just call
> Stream.close() and that should work with any stream. In the current
> case we can't stop a stream from continuing to execute without
> explicitly adding a forEachUntil() at the end of it with a condition
> variable that we then change out of band. Also, since there may be
> reduction operations in the middle, that may not even stop all of
> those operations from completing. This can be especially bad for
> things that should timeout.
>
> Sam
>
> On Sat, Feb 9, 2013 at 11:36 AM, Joe Bowbeer <joe.bowbeer at gmail.com> wrote:
>> I haven't used either of these.  If I wanted to create an example I'd
>> probably start with a stream of lines() from a BufferedReader, and then tack
>> on a use case.
>>
>> try (BufferedReader r = Files.newBufferedReader(path,
>> Charset.defaultCharset())) {
>>    r.lines().forEachUntil(...);
>> }
>>
>>
>> Do you have something specific in mind?
>>
>>
>> On Sat, Feb 9, 2013 at 11:26 AM, Sam Pullara <sam at sampullara.com> wrote:
>>> Now that we are further along, I wanted to bring this up again. I
>>> don't think that forEachUntil is sufficient for handling internal and
>>> external conditions that should truncate stream processing. I've also
>>> looked at CloseableStream and that doesn't appear to help since it
>>> isn't possible to wrap a Stream (say an infinite stream) with a
>>> CloseableStream and get the necessary semantics of cancellation. Also,
>>> other APIs that don't consider that you might give them a
>>> CloseableStream will likely still give you back a Stream thus losing
>>> the semantics.
>>>
>>> Is everyone else happy with forEachUntil and CloseableStream?
>>>
>>> Sam
>>>
>>> ---------- Forwarded message ----------
>>> From: Sam Pullara <sam at sampullara.com>
>>> Date: Mon, Dec 31, 2012 at 8:34 AM
>>> Subject: Re: Cancelation -- use cases
>>> To: Brian Goetz <brian.goetz at oracle.com>
>>> Cc: "lambda-libs-spec-experts at openjdk.java.net"
>>> <lambda-libs-spec-experts at openjdk.java.net>
>>>
>>> I think we are conflating two things with this solution and it doesn't
>>> work for them in my mind. Here is what I would like the solution to
>>> cover:
>>>
>>> - External conditions (cancellation, cleanup)
>>> - Internal conditions (gating based on count, elements and results)
>>>
>>> The first one may be the only one that works in the parallel case. It
>>> should likely be implemented with .close() on stream that would stop
>>> the stream as soon as possible. This would be useful for things like
>>> timeouts. Kind of like calling close on an inputstream in the middle
>>> of reading it. The other one I think is necessary and hard to
>>> implement correctly with the parallel case. For instance I would like
>>> to say:
>>>
>>> stream.gate(e -> e < 10).forEach(e -> …)
>>>
>>> OR
>>>
>>> stream.gate( (e, i) -> e < 10 || i > 10).forEach(e -> …) // i is the
>>> number of the current element
>>>
>>> That should give me every element in the stream until an element isn't
>>> < 10 and then stop processing elements. Further, there should be some
>>> way for the stream source to be notified that we are done consuming it
>>> in case it is of unknown length or consumes resources. That would be
>>> more like (assuming we add a Runnable call to Timer):
>>>
>>> Stream stream = ….
>>> new Timer().schedule(() -> stream.close(), 5000);
>>> stream.forEach(e -> ….);
>>>
>>> OR
>>>
>>> stream.forEach(e -> try { … } catch() { stream.close() } );
>>>
>>> Sadly, the first gate() case doesn't work well when parallelized. I'm
>>> willing to just specify what the behavior is for that case to get it
>>> into the API. For example, I would probably say something like "the
>>> gate will need to return false once per split to stop processing". In
>>> either of these cases I think one of the motivations needs to be that
>>> the stream may be using resources and we need to tell the source that
>>> we are done consuming it. For example, if the stream is sourced from a
>>> file, database or even a large amount of memory there should be a
>>> notification mechanism for doneness that will allow those resources to
>>> be returned before the stream is exhausted. To that end I think that
>>> Stream should implement AutoCloseable but overridden with no checked
>>> exception.
>>>
>>> interface Stream<T> implements AutoCloseable {
>>>    /**
>>>     * Closes this stream and releases any system resources associated
>>>     * with it. If the stream is already closed then invoking this
>>>     * method has no effect. Close is automatically called when the
>>>     * stream is exhausted. After this is called, no further elements
>>>     * will be processed by the stream but currently processing elements
>>>     * will complete normally. Calling other methods on a closed stream will
>>>     * produce IllegalStateExceptions.
>>>     */
>>>    void close();
>>>
>>>    /**
>>>     * When the continueProcessing function returns false, no further
>>>     * elements will be processed after the gate. In the parallel stream
>>>     * case no further elements will be processed in the current split.
>>>     */
>>>    Stream<T> gate(Function<T, Boolean> until);
>>>
>>>    /**
>>>     * As gate with the addition of the current element number.
>>>     */
>>>    Stream<T> gate(BiFunction<T, Integer, Boolean> until);
>>> }
>>>
>>> This API avoids a lot of side effects that forEachUntil would require
>>> implement these use cases.
>>>
>>> Sam
>>>
>>> On Dec 30, 2012, at 7:53 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>>>
>>> Here's a lower-complexity version of cancel, that still satisfies (in
>>> series or in parallel) use cases like the following:
>>>
>>>>    - Find the best possible move after thinking for 5 seconds
>>>>    - Find the first solution that is better than X
>>>>    - Gather solutions until we have 100 of them
>>> without bringing in the complexity or time/space overhead of dealing
>>> with encounter order.
>>>
>>> Since the forEach() operation works exclusively on the basis of
>>> temporal/arrival order rather than spatial/encounter order (elements
>>> are passed to the lambda in whatever order they are available, in
>>> whatever thread they are available), we could make a canceling variant
>>> of forEach:
>>>
>>>   .forEachUntil(Block<T> sink, BooleanSupplier until)
>>>
>>> Here, there is no confusion about what happens in the ordered case, no
>>> need to buffer elements, etc.  Elements flow into the block until the
>>> termination condition transpires, at which point there are no more
>>> splits and existing splits dispense no more elements.
>>>
>>> I implemented this (it was trivial) and wrote a simple test program to
>>> calculate primes sequentially and in parallel, counting how many could
>>> be calculated in a fixed amount of time, starting from an infinite
>>> generator and filtering out composites:
>>>
>>>             Streams.iterate(from, i -> i + 1)  // sequential
>>>                     .filter(i -> isPrime(i))
>>>                     .forEachUntil(i -> {
>>>                         chm.put(i, true);
>>>                     }, () -> System.currentTimeMillis() >= start+num);
>>>
>>> vs
>>>
>>>             Streams.iterate(from, i -> i+1)    // parallel
>>>                     .parallel()
>>>                     .filter(i -> isPrime(i))
>>>                     .forEachUntil(i -> {
>>>                         chm.put(i, true);
>>>                     }, () -> System.currentTimeMillis() >= start+num);
>>>
>>> On a 4-core Q6600 system, in a fixed amount of time, the parallel
>>> version gathered ~3x as many primes.
>>>
>>> In terms of being able to perform useful computations on infinite
>>> streams, this seems a pretty attractive price-performer; lower spec
>>> and implementation complexity, and covers many of the use cases which
>>> would otherwise be impractical to attack with the stream approach.
>>>
>>>
>>>
>>> On 12/28/2012 11:20 AM, Brian Goetz wrote:
>>>
>>> I've been working through some alternatives for cancellation support in
>>> infinite streams.  Looking to gather some use case background to help
>>> evaluate the alternatives.
>>>
>>> In the serial case, the "gate" approach works fine -- after some
>>> criteria transpires, stop sending elements downstream.  The pipeline
>>> flushes the elements it has, and completes early.
>>>
>>> In the parallel unordered case, the gate approach similarly works fine
>>> -- after the cancelation criteria occurs, no new splits are created, and
>>> existing splits dispense no more elements.  The computation similarly
>>> quiesces after elements currently being processed are completed,
>>> possibly along with any up-tree merging to combine results.
>>>
>>> It is the parallel ordered case that is tricky.  Supposing we partition
>>> a stream into
>>>    (a1,a2,a3), (a4,a5,a6)
>>>
>>> And suppose further we happen to be processing a5 when the bell goes
>>> off.  Do we want to wait for all a_i, i<5, to finish before letting the
>>> computation quiesce?
>>>
>>> My gut says: for the things we intend to cancel, most of them will be
>>> order-insensitive anyway.  Things like:
>>>
>>>   - Find the best possible move after thinking for 5 seconds
>>>   - Find the first solution that is better than X
>>>   - Gather solutions until we have 100 of them
>>>
>>> I believe the key use case for cancelation here will be when we are
>>> chewing on potentially infinite streams of events (probably backed by
>>> IO) where we want to chew until we're asked to shut down, and want to
>>> get as much parallelism as we can cheaply.  Which suggests to me the
>>> intersection between order-sensitive stream pipelines and cancelable
>>> stream pipelines is going to be pretty small indeed.
>>>
>>> Anyone want to add to this model of use cases for cancelation?
>>



More information about the lambda-libs-spec-observers mailing list