stream.parallel().limit() on large streams

Brian Goetz brian.goetz at oracle.com
Tue Oct 8 08:48:44 PDT 2013


I've added:

      * @apiNote
      * While {@code limit()} is generally a cheap operation on sequential
      * stream pipelines, it can be quite expensive on ordered parallel 
pipelines,
      * especially for large values of {@code maxSize}, since {@code 
limit(n)}
      * is constrained to return not just any <em>n</em> elements, but the
      * <em>first n</em> elements in the encounter order.  Using an 
unordered
      * stream source (such as {@link #generate(Supplier)}) or removing the
      * ordering constraint with {@link #unordered()} may result in 
significant
      * speedups of {@code limit()} in parallel pipelines, if the 
semantics of
      * your situation permit.  If consistency with encounter order is 
required,
      * and you are experiencing poor performance or memory utilization with
      * {@code limit()} in parallel pipelines, switching to sequential 
execution
      * with {@link #sequential()} may improve performance.


to limit() and friends, and also noted that Stream.generate returns an 
unordered stream.  Hopefully this helps a little!

On 10/6/2013 10:26 AM, Arne Siegel wrote:
> Hi Brian,
>
> tried out both of your propositions, and indeed they do a good job on
> the order-independent scenario I was looking into:
>
>              IntStream.range(0, maxToGenerate)
>                       .parallel()
>                       .unordered()
>                       .mapToObj(i -> generatorFunction.get())
>                       .filter(condition)
>                       .limit(needed)
>                       .forEach(action);
>
> ==> runs quite good, but is on average a few percent less efficient than
> the implementation you cited in your mail;
>
>              Stream.generate(generatorFunction)
>                    .parallel()
>                    .limit(maxToGenerate)
>                    .filter(condition)
>                    .limit(needed)
>                    .forEach(action);
>
> ==> this is a really concise expression of the program's intents, and it
> performs equally well compared to the cited implementation. Nice!
>
> One note: it's a complete different picture for scenarios where order is
> important. These need other implementations, and for these I found
> - parallel streams running much slower than serial streams and serial loop;
> - the ExecutorService-based approach running much faster than serial
> implementations, most of the time.
>
> Thank you very much for your valuable hints!
>
> Arne
>
>
> 2013/10/5 Brian Goetz <brian.goetz at oracle.com
> <mailto:brian.goetz at oracle.com>>
>
>      > For completeness I want to show how I could rewrite the code using a
>      > streams-based implementation:
>      >
>      >            final AtomicInteger elementsConsumed = new
>     AtomicInteger(0);
>      >            IntStream.range(0, maxToGenerate)
>      >                     .parallel()
>      >                     .mapToObj(i -> generatorFunction.get())
>      >                     .filter(condition::test)
>      >                     .peek(action::accept)
>      >                     .mapToInt(element ->
>      > elementsConsumed.incrementAndGet())
>      >                     .filter(n -> n >= needed)
>      >                     .findFirst();
>
>     If this code works for you, then what you're saying is that you
>     don't care about order.  Which I believe.  In which case, just use
>     .unordered() in your stream and you'll get good parallelism without
>     having to contort your code.  Try it and report back?
>
>     You might also do better with Stream.generate, since it creates an
>     unordered stream:
>
>           Stream.generate(generatorFunction)
>                        .parallel()
>                        ...
>
>
>


More information about the lambda-dev mailing list