stream.parallel().limit() on large streams
Brian Goetz
brian.goetz at oracle.com
Tue Oct 8 08:48:44 PDT 2013
I've added:
* @apiNote
* While {@code limit()} is generally a cheap operation on sequential
* stream pipelines, it can be quite expensive on ordered parallel
pipelines,
* especially for large values of {@code maxSize}, since {@code
limit(n)}
* is constrained to return not just any <em>n</em> elements, but the
* <em>first n</em> elements in the encounter order. Using an
unordered
* stream source (such as {@link #generate(Supplier)}) or removing the
* ordering constraint with {@link #unordered()} may result in
significant
* speedups of {@code limit()} in parallel pipelines, if the
semantics of
* your situation permit. If consistency with encounter order is
required,
* and you are experiencing poor performance or memory utilization with
* {@code limit()} in parallel pipelines, switching to sequential
execution
* with {@link #sequential()} may improve performance.
to limit() and friends, and also noted that Stream.generate returns an
unordered stream. Hopefully this helps a little!
On 10/6/2013 10:26 AM, Arne Siegel wrote:
> Hi Brian,
>
> tried out both of your propositions, and indeed they do a good job on
> the order-independent scenario I was looking into:
>
> IntStream.range(0, maxToGenerate)
> .parallel()
> .unordered()
> .mapToObj(i -> generatorFunction.get())
> .filter(condition)
> .limit(needed)
> .forEach(action);
>
> ==> runs quite good, but is on average a few percent less efficient than
> the implementation you cited in your mail;
>
> Stream.generate(generatorFunction)
> .parallel()
> .limit(maxToGenerate)
> .filter(condition)
> .limit(needed)
> .forEach(action);
>
> ==> this is a really concise expression of the program's intents, and it
> performs equally well compared to the cited implementation. Nice!
>
> One note: it's a complete different picture for scenarios where order is
> important. These need other implementations, and for these I found
> - parallel streams running much slower than serial streams and serial loop;
> - the ExecutorService-based approach running much faster than serial
> implementations, most of the time.
>
> Thank you very much for your valuable hints!
>
> Arne
>
>
> 2013/10/5 Brian Goetz <brian.goetz at oracle.com
> <mailto:brian.goetz at oracle.com>>
>
> > For completeness I want to show how I could rewrite the code using a
> > streams-based implementation:
> >
> > final AtomicInteger elementsConsumed = new
> AtomicInteger(0);
> > IntStream.range(0, maxToGenerate)
> > .parallel()
> > .mapToObj(i -> generatorFunction.get())
> > .filter(condition::test)
> > .peek(action::accept)
> > .mapToInt(element ->
> > elementsConsumed.incrementAndGet())
> > .filter(n -> n >= needed)
> > .findFirst();
>
> If this code works for you, then what you're saying is that you
> don't care about order. Which I believe. In which case, just use
> .unordered() in your stream and you'll get good parallelism without
> having to contort your code. Try it and report back?
>
> You might also do better with Stream.generate, since it creates an
> unordered stream:
>
> Stream.generate(generatorFunction)
> .parallel()
> ...
>
>
>
More information about the lambda-dev
mailing list