Internal and External truncation conditions

Sam Pullara sam at sampullara.com
Sat Feb 9 11:55:04 PST 2013


Let's say you only want to process lines until it matches a regex.
Here is one way you could try to implement it:

AtomicBoolean done = new AtomicBoolean();
Pattern regex = ...;
try (BufferedReader r = Files.newBufferedReader(path,
Charset.defaultCharset())) {
  r.lines().forEachUntil( (line) -> {
    if (!done.get()) {
      if (regex.matcher(line).matches()) {
        done.set(true);
      } else {
        ...process the line...
      }
    }
  }, done::get);
}

In the parallel case this completely breaks down since the lines can
be processed out of order. Gate would have to ensure that didn't
happen.

Pattern regex = ...;
try (BufferedReader r = Files.newBufferedReader(path,
Charset.defaultCharset())) {
  r.lines().gate(() -> !regex.matcher(line).matches()).forEach((line)
-> .. process line ..);
}

If we wanted to cancel the operation asynchronously we would just call
Stream.close() and that should work with any stream. In the current
case we can't stop a stream from continuing to execute without
explicitly adding a forEachUntil() at the end of it with a condition
variable that we then change out of band. Also, since there may be
reduction operations in the middle, that may not even stop all of
those operations from completing. This can be especially bad for
things that should timeout.

Sam

On Sat, Feb 9, 2013 at 11:36 AM, Joe Bowbeer <joe.bowbeer at gmail.com> wrote:
> I haven't used either of these.  If I wanted to create an example I'd
> probably start with a stream of lines() from a BufferedReader, and then tack
> on a use case.
>
> try (BufferedReader r = Files.newBufferedReader(path,
> Charset.defaultCharset())) {
>   r.lines().forEachUntil(...);
> }
>
>
> Do you have something specific in mind?
>
>
> On Sat, Feb 9, 2013 at 11:26 AM, Sam Pullara <sam at sampullara.com> wrote:
>>
>> Now that we are further along, I wanted to bring this up again. I
>> don't think that forEachUntil is sufficient for handling internal and
>> external conditions that should truncate stream processing. I've also
>> looked at CloseableStream and that doesn't appear to help since it
>> isn't possible to wrap a Stream (say an infinite stream) with a
>> CloseableStream and get the necessary semantics of cancellation. Also,
>> other APIs that don't consider that you might give them a
>> CloseableStream will likely still give you back a Stream thus losing
>> the semantics.
>>
>> Is everyone else happy with forEachUntil and CloseableStream?
>>
>> Sam
>>
>> ---------- Forwarded message ----------
>> From: Sam Pullara <sam at sampullara.com>
>> Date: Mon, Dec 31, 2012 at 8:34 AM
>> Subject: Re: Cancelation -- use cases
>> To: Brian Goetz <brian.goetz at oracle.com>
>> Cc: "lambda-libs-spec-experts at openjdk.java.net"
>> <lambda-libs-spec-experts at openjdk.java.net>
>>
>> I think we are conflating two things with this solution and it doesn't
>> work for them in my mind. Here is what I would like the solution to
>> cover:
>>
>> - External conditions (cancellation, cleanup)
>> - Internal conditions (gating based on count, elements and results)
>>
>> The first one may be the only one that works in the parallel case. It
>> should likely be implemented with .close() on stream that would stop
>> the stream as soon as possible. This would be useful for things like
>> timeouts. Kind of like calling close on an inputstream in the middle
>> of reading it. The other one I think is necessary and hard to
>> implement correctly with the parallel case. For instance I would like
>> to say:
>>
>> stream.gate(e -> e < 10).forEach(e -> …)
>>
>> OR
>>
>> stream.gate( (e, i) -> e < 10 || i > 10).forEach(e -> …) // i is the
>> number of the current element
>>
>> That should give me every element in the stream until an element isn't
>> < 10 and then stop processing elements. Further, there should be some
>> way for the stream source to be notified that we are done consuming it
>> in case it is of unknown length or consumes resources. That would be
>> more like (assuming we add a Runnable call to Timer):
>>
>> Stream stream = ….
>> new Timer().schedule(() -> stream.close(), 5000);
>> stream.forEach(e -> ….);
>>
>> OR
>>
>> stream.forEach(e -> try { … } catch() { stream.close() } );
>>
>> Sadly, the first gate() case doesn't work well when parallelized. I'm
>> willing to just specify what the behavior is for that case to get it
>> into the API. For example, I would probably say something like "the
>> gate will need to return false once per split to stop processing". In
>> either of these cases I think one of the motivations needs to be that
>> the stream may be using resources and we need to tell the source that
>> we are done consuming it. For example, if the stream is sourced from a
>> file, database or even a large amount of memory there should be a
>> notification mechanism for doneness that will allow those resources to
>> be returned before the stream is exhausted. To that end I think that
>> Stream should implement AutoCloseable but overridden with no checked
>> exception.
>>
>> interface Stream<T> implements AutoCloseable {
>>   /**
>>    * Closes this stream and releases any system resources associated
>>    * with it. If the stream is already closed then invoking this
>>    * method has no effect. Close is automatically called when the
>>    * stream is exhausted. After this is called, no further elements
>>    * will be processed by the stream but currently processing elements
>>    * will complete normally. Calling other methods on a closed stream will
>>    * produce IllegalStateExceptions.
>>    */
>>   void close();
>>
>>   /**
>>    * When the continueProcessing function returns false, no further
>>    * elements will be processed after the gate. In the parallel stream
>>    * case no further elements will be processed in the current split.
>>    */
>>   Stream<T> gate(Function<T, Boolean> until);
>>
>>   /**
>>    * As gate with the addition of the current element number.
>>    */
>>   Stream<T> gate(BiFunction<T, Integer, Boolean> until);
>> }
>>
>> This API avoids a lot of side effects that forEachUntil would require
>> implement these use cases.
>>
>> Sam
>>
>> On Dec 30, 2012, at 7:53 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>>
>> Here's a lower-complexity version of cancel, that still satisfies (in
>> series or in parallel) use cases like the following:
>>
>> >   - Find the best possible move after thinking for 5 seconds
>> >   - Find the first solution that is better than X
>> >   - Gather solutions until we have 100 of them
>>
>> without bringing in the complexity or time/space overhead of dealing
>> with encounter order.
>>
>> Since the forEach() operation works exclusively on the basis of
>> temporal/arrival order rather than spatial/encounter order (elements
>> are passed to the lambda in whatever order they are available, in
>> whatever thread they are available), we could make a canceling variant
>> of forEach:
>>
>>  .forEachUntil(Block<T> sink, BooleanSupplier until)
>>
>> Here, there is no confusion about what happens in the ordered case, no
>> need to buffer elements, etc.  Elements flow into the block until the
>> termination condition transpires, at which point there are no more
>> splits and existing splits dispense no more elements.
>>
>> I implemented this (it was trivial) and wrote a simple test program to
>> calculate primes sequentially and in parallel, counting how many could
>> be calculated in a fixed amount of time, starting from an infinite
>> generator and filtering out composites:
>>
>>            Streams.iterate(from, i -> i + 1)  // sequential
>>                    .filter(i -> isPrime(i))
>>                    .forEachUntil(i -> {
>>                        chm.put(i, true);
>>                    }, () -> System.currentTimeMillis() >= start+num);
>>
>> vs
>>
>>            Streams.iterate(from, i -> i+1)    // parallel
>>                    .parallel()
>>                    .filter(i -> isPrime(i))
>>                    .forEachUntil(i -> {
>>                        chm.put(i, true);
>>                    }, () -> System.currentTimeMillis() >= start+num);
>>
>> On a 4-core Q6600 system, in a fixed amount of time, the parallel
>> version gathered ~3x as many primes.
>>
>> In terms of being able to perform useful computations on infinite
>> streams, this seems a pretty attractive price-performer; lower spec
>> and implementation complexity, and covers many of the use cases which
>> would otherwise be impractical to attack with the stream approach.
>>
>>
>>
>> On 12/28/2012 11:20 AM, Brian Goetz wrote:
>>
>> I've been working through some alternatives for cancellation support in
>> infinite streams.  Looking to gather some use case background to help
>> evaluate the alternatives.
>>
>> In the serial case, the "gate" approach works fine -- after some
>> criteria transpires, stop sending elements downstream.  The pipeline
>> flushes the elements it has, and completes early.
>>
>> In the parallel unordered case, the gate approach similarly works fine
>> -- after the cancelation criteria occurs, no new splits are created, and
>> existing splits dispense no more elements.  The computation similarly
>> quiesces after elements currently being processed are completed,
>> possibly along with any up-tree merging to combine results.
>>
>> It is the parallel ordered case that is tricky.  Supposing we partition
>> a stream into
>>   (a1,a2,a3), (a4,a5,a6)
>>
>> And suppose further we happen to be processing a5 when the bell goes
>> off.  Do we want to wait for all a_i, i<5, to finish before letting the
>> computation quiesce?
>>
>> My gut says: for the things we intend to cancel, most of them will be
>> order-insensitive anyway.  Things like:
>>
>>  - Find the best possible move after thinking for 5 seconds
>>  - Find the first solution that is better than X
>>  - Gather solutions until we have 100 of them
>>
>> I believe the key use case for cancelation here will be when we are
>> chewing on potentially infinite streams of events (probably backed by
>> IO) where we want to chew until we're asked to shut down, and want to
>> get as much parallelism as we can cheaply.  Which suggests to me the
>> intersection between order-sensitive stream pipelines and cancelable
>> stream pipelines is going to be pretty small indeed.
>>
>> Anyone want to add to this model of use cases for cancelation?
>
>


More information about the lambda-libs-spec-experts mailing list