stream.parallel().limit() not usable

Brian Goetz brian.goetz at oracle.com
Sat Oct 5 00:49:26 PDT 2013


Here's some concrete advice for streams with limit():

If N is smallish, you're generally fine with parallel.  Short-circuiting will kick in once you've got your results, and cancel the remaining tasks.  If upstream operations are expensive, you'll get value out of the parallelism.  

If N is large, you're asking for a lot of additional work.  Unless generation is obscenely expensive, this will likely not be a good tradeoff.  In this case, you have some choices: 
 - Use unordered -- when you just want N and don't care about the *first* N.  This parallelizes cleanly.  
 - Use sequential -- when your stream has unpredictable size+decomposition characteristics (source doesn't decompose perfectly, or pipeline has unpredictable size because of operations like filter)

On Oct 5, 2013, at 8:36 AM, Brian Goetz wrote:

> A slight correction -- the limit operation does do short-circuiting, but there are other things that cause the behavior you observe, and this doesn't undermine your observation.  But the results are not surprising, nor are they 
> 
> Some more observations:
> 
> 1.  We can reduce the barrier to parallelization, but that doesn't make it simple.  Parallelism is a tradeoff of burning more cycles for the hope of getting to a faster wall-clock result.  But since a parallel solution *always* is more work than a serial solution (pause and prove this for yourself), parallelism is not guaranteed to be faster.  
> 
> 2.  We all come to the table with sequential goggles on.  We need to aspire to removing them.  Particularly, our sequential goggles give us the wrong intuition about limit() -- that it should be a cheap and trivial operation.  
> 
> 3.  Limit embodies the worst possible constraint on parallel operations -- it is constrained to operate in encounter order.  This is death for a parallel operation.  This interacts nastily with point (2).  Limit doesn't mean "limit the result to N", it means "limit the result to the FIRST N".  
> 
> If the stream is sized/subsized (i.e., decomposes predictably and all operations are size-preserving), then limit can be fast.  If the stream is unordered, limit can be fast, as this busts the crippling restriction of encounter order.  Other cases are exposed to the full complexity of asking for a order-constrained parallel operation.
> 
> Think about how limit() would have to work.  You want to use parallelism, presumably because generating the elements is expensive enough that it would benefit from parallelism.  So we start generating elements in parallel, but when we decompose the source, we don't necessarily know whether a given element falls inside or outside the limit range unless we've already computed all the elements to its left.  So we buffer the intermediate result, until we have enough elements, cancel the remaining tasks, and return the selected elements.  If N is large, we'll be buffering a LOT of elements.  
> 
> Bottom line: there's a mismatch between what limit really means and what people think it means / want it to mean.  People want limit() to be an afterthought -- "by the way, if you find more than N, dont return more than that."  But it is actually something much more restrictive.  
> 
> In a lot of cases like you describe, unordered() will help.  If you're just trying to generate N as fast as you can, removing the ordering constraint will do the job.  The problem is that users are familar with limit() but not with the interaction of ordering and parallelism.  
> 
> At worst, you could describe limit() as an "attractive nuisance" -- users think it is less dangerous than it is, and will reach for it intuitively, and burn themselves.  What are our options?  We could remove it.  People would hate that.  We could make it implicitly unordered, like forEach.  I think people would find that highly counterintuitive.  I think the best we can do is include more explanatory text in the javadoc for limit() and hope people read it.  
> 
> 
> On Oct 5, 2013, at 2:25 AM, Arne Siegel wrote:
> 
>> Hi,
>> 
>> I want to share some results of trying out Java 8 features. All code
>> examples were compiled and run with
>> jdk-8-ea-bin-b109-windows-x64-26_sep_2013. A class containing all tested
>> implementation variants is attached to this e-mail (hope it won't get eaten
>> by the mail system).
>> 
>> 
>> The following type of program will be occuring frequently:
>> 
>>           while (elementsGenerated < maxToGenerate && elementsConsumed <
>> needed) {
>>               T element = generatorFunction.get();
>>               elementsGenerated++;
>>               if (condition.test(element)) {
>>                   action.accept(element);
>>                   elementsConsumed++;
>>               }
>>           }
>> 
>> In the scenario I concentrate on, "maxToGenerate" and "needed" are equal to
>> 1000000, the generatorFunction and the action are fairly expensive, while
>> the condition is cheap.
>> 
>> Naturally, I wanted to use all available processors to speed up this
>> computation. Using ExecutorService and about two dozen more lines of code,
>> this could be achieved with fairly low effort. For my scenario this change
>> improves performance about 5.5 times on 8 processor cores.
>> 
>> In Java 8, the above code can be converted to a nice simple serial stream,
>> with roughly the same performance as the while-loop-based serial
>> implementation:
>> 
>>           IntStream.range(0, maxToGenerate)
>>                    .mapToObj(i -> generatorFunction.get())
>>                    .filter(condition::test)
>>                    .limit(needed)
>>                    .forEach(action::accept);
>> 
>> Unfortunately, naively trying to convert this to a parallel computation has
>> the opposite effect on performance. All cores go to 100% for minutes
>> (hours? haven't waited long enough); the program doesn't terminate in
>> acceptable time. Here's it:
>> 
>>           // won't terminate quickly - don't use
>>           IntStream.range(0, maxToGenerate)
>>                    .parallel()
>>                    .mapToObj(i -> generatorFunction.get())
>>                    .filter(condition::test)
>>                    .limit(needed)
>>                    .forEach(action::accept);
>> 
>> Apparently, the main cause is that the limit operation does not do
>> short-circuiting. The only stream operations doing short-circuiting are
>> findFirst and findAny, and these return only at most one element.
>> 
>> For completeness I want to show how I could rewrite the code using a
>> streams-based implementation:
>> 
>>           final AtomicInteger elementsConsumed = new AtomicInteger(0);
>>           IntStream.range(0, maxToGenerate)
>>                    .parallel()
>>                    .mapToObj(i -> generatorFunction.get())
>>                    .filter(condition::test)
>>                    .peek(action::accept)
>>                    .mapToInt(element ->
>> elementsConsumed.incrementAndGet())
>>                    .filter(n -> n >= needed)
>>                    .findFirst();
>> 
>> In my scenario, this type of code runs about 5.0 times faster than the
>> serial loop and the serial stream, which is still a huge improvement, but
>> about 10% less efficient than the ExecutorService-based implementation.
>> 
>> This variant of the code snippet, while being quite impure, also has the
>> disadvantage that it may do some more calls to the action than requested.
>> The application will have to cater for that, if it cannot be ignored.
>> 
>> 
>> I don't believe very strongly that anything can be done about the observed
>> behaviour before the release of Java 8, but the community should be aware
>> of it, and maybe in a later release this and certain other sequences of
>> stream operations could be optimized or some other solution found.
>> 
>> Arne Siegel
>> 
> 
> 



More information about the lambda-dev mailing list