Cancelation -- use cases
Brian Goetz
brian.goetz at oracle.com
Mon Dec 31 09:26:28 PST 2012
Yes, there's a big difference, and it has to do with the interplay of
spatial (encounter) order and temporal (arrival) order in the parallel
case. Earlier attempts to define a general while/until on Stream ran
aground over the "what about the ordered case" problem. Moving the
problem to a more constrained environment -- on top of an intrinsically
unordered terminal operation -- sidesteps these tricky problems, which
is why forEachUntil() is considerably simpler than a more general until().
In the sequential case, the two orders coincide and the "until"
formulation is trivial and obvious. Similarly, in the unordered
parallel case (e.g., hashSet.parallel()...) we are not constrained to do
anything with the encounter order, in which case again the two collapse
to be the same thing.
The tricky case is the (common) one where there is a defined encounter
order, such as when the source is a List, array, Queue, generator, or
SortedSet. In such cases, processing elements out of order, or skipping
elements, often gives the wrong answer, such as when you do a reduce
with an associative reducer, so simply stopping computation when the
music stops may give a wrong result, which we want to avoid.
(Reductions are carefully arranged to provide parallelism while
preserving correctness for associative operations.)
People are going to expect that "until" is defined relative to the
encounter order (as in, all the elements from zero up until the element
I'm processing when the bell goes off.) But at root, this is a
fundamentally serial notion of until-ness. We experimented with finding
weird semantics for this ("identify the indexes of all elements being
processed when the bell goes off, take the max of those, and allow all
prior elements in the encounter order to finish before completing.")
These semantics are complicated, hard to implement efficiently (look how
bad parallel limit is now -- bad enough I'm still considering dropping
limit entirely, and this is worse), and would likely result in poor
responsiveness to cancelation. Having extrapolated down this road, I
see this leading to no cancelation facility at all, making lots of
otherwise viable formulations with infinite streams impractical.
By moving the until onto the forEach operation, which explicitly does
not care about encounter order, we are solving a simpler problem,
freeing ourselves of the thorny issues surrounding encounter order and
still have a cancelation mechanism that is useful in a lot of cases.
The semantics are much simpler, as is the resulting implementation (~50
lines of new code.)
On 12/31/2012 11:56 AM, Remi Forax wrote:
> I've trouble to understood the difference with:
> Streams.iterate(from, i -> i + 1) // sequential
> .filter(i -> isPrime(i))
> .until(() -> System.currentTimeMillis() >=
> start+num)).
> .forEachUntil(i -> {
> chm.put(i, true);
> };
>
> Rémi
>
> On 12/31/2012 04:53 AM, Brian Goetz wrote:
>> Here's a lower-complexity version of cancel, that still satisfies (in
>> series or in parallel) use cases like the following:
>>
>> > - Find the best possible move after thinking for 5 seconds
>> > - Find the first solution that is better than X
>> > - Gather solutions until we have 100 of them
>>
>> without bringing in the complexity or time/space overhead of dealing
>> with encounter order.
>>
>> Since the forEach() operation works exclusively on the basis of
>> temporal/arrival order rather than spatial/encounter order (elements
>> are passed to the lambda in whatever order they are available, in
>> whatever thread they are available), we could make a canceling variant
>> of forEach:
>>
>> .forEachUntil(Block<T> sink, BooleanSupplier until)
>>
>> Here, there is no confusion about what happens in the ordered case, no
>> need to buffer elements, etc. Elements flow into the block until the
>> termination condition transpires, at which point there are no more
>> splits and existing splits dispense no more elements.
>>
>> I implemented this (it was trivial) and wrote a simple test program to
>> calculate primes sequentially and in parallel, counting how many could
>> be calculated in a fixed amount of time, starting from an infinite
>> generator and filtering out composites:
>>
>> Streams.iterate(from, i -> i + 1) // sequential
>> .filter(i -> isPrime(i))
>> .forEachUntil(i -> {
>> chm.put(i, true);
>> }, () -> System.currentTimeMillis() >= start+num);
>>
>> vs
>>
>> Streams.iterate(from, i -> i+1) // parallel
>> .parallel()
>> .filter(i -> isPrime(i))
>> .forEachUntil(i -> {
>> chm.put(i, true);
>> }, () -> System.currentTimeMillis() >= start+num);
>>
>> On a 4-core Q6600 system, in a fixed amount of time, the parallel
>> version gathered ~3x as many primes.
>>
>> In terms of being able to perform useful computations on infinite
>> streams, this seems a pretty attractive price-performer; lower spec
>> and implementation complexity, and covers many of the use cases which
>> would otherwise be impractical to attack with the stream approach.
>>
>>
>>
>> On 12/28/2012 11:20 AM, Brian Goetz wrote:
>>> I've been working through some alternatives for cancellation support in
>>> infinite streams. Looking to gather some use case background to help
>>> evaluate the alternatives.
>>>
>>> In the serial case, the "gate" approach works fine -- after some
>>> criteria transpires, stop sending elements downstream. The pipeline
>>> flushes the elements it has, and completes early.
>>>
>>> In the parallel unordered case, the gate approach similarly works fine
>>> -- after the cancelation criteria occurs, no new splits are created, and
>>> existing splits dispense no more elements. The computation similarly
>>> quiesces after elements currently being processed are completed,
>>> possibly along with any up-tree merging to combine results.
>>>
>>> It is the parallel ordered case that is tricky. Supposing we partition
>>> a stream into
>>> (a1,a2,a3), (a4,a5,a6)
>>>
>>> And suppose further we happen to be processing a5 when the bell goes
>>> off. Do we want to wait for all a_i, i<5, to finish before letting the
>>> computation quiesce?
>>>
>>> My gut says: for the things we intend to cancel, most of them will be
>>> order-insensitive anyway. Things like:
>>>
>>> - Find the best possible move after thinking for 5 seconds
>>> - Find the first solution that is better than X
>>> - Gather solutions until we have 100 of them
>>>
>>> I believe the key use case for cancelation here will be when we are
>>> chewing on potentially infinite streams of events (probably backed by
>>> IO) where we want to chew until we're asked to shut down, and want to
>>> get as much parallelism as we can cheaply. Which suggests to me the
>>> intersection between order-sensitive stream pipelines and cancelable
>>> stream pipelines is going to be pretty small indeed.
>>>
>>> Anyone want to add to this model of use cases for cancelation?
>>>
>
More information about the lambda-libs-spec-observers
mailing list