stream.parallel().limit() not usable

Arne Siegel v.a.ammodytes at googlemail.com
Fri Oct 4 18:25:23 PDT 2013


Hi,

I want to share some results of trying out Java 8 features. All code
examples were compiled and run with
jdk-8-ea-bin-b109-windows-x64-26_sep_2013. A class containing all tested
implementation variants is attached to this e-mail (hope it won't get eaten
by the mail system).


The following type of program will be occuring frequently:

            while (elementsGenerated < maxToGenerate && elementsConsumed <
needed) {
                T element = generatorFunction.get();
                elementsGenerated++;
                if (condition.test(element)) {
                    action.accept(element);
                    elementsConsumed++;
                }
            }

In the scenario I concentrate on, "maxToGenerate" and "needed" are equal to
1000000, the generatorFunction and the action are fairly expensive, while
the condition is cheap.

Naturally, I wanted to use all available processors to speed up this
computation. Using ExecutorService and about two dozen more lines of code,
this could be achieved with fairly low effort. For my scenario this change
improves performance about 5.5 times on 8 processor cores.

In Java 8, the above code can be converted to a nice simple serial stream,
with roughly the same performance as the while-loop-based serial
implementation:

            IntStream.range(0, maxToGenerate)
                     .mapToObj(i -> generatorFunction.get())
                     .filter(condition::test)
                     .limit(needed)
                     .forEach(action::accept);

Unfortunately, naively trying to convert this to a parallel computation has
the opposite effect on performance. All cores go to 100% for minutes
(hours? haven't waited long enough); the program doesn't terminate in
acceptable time. Here's it:

            // won't terminate quickly - don't use
            IntStream.range(0, maxToGenerate)
                     .parallel()
                     .mapToObj(i -> generatorFunction.get())
                     .filter(condition::test)
                     .limit(needed)
                     .forEach(action::accept);

Apparently, the main cause is that the limit operation does not do
short-circuiting. The only stream operations doing short-circuiting are
findFirst and findAny, and these return only at most one element.

For completeness I want to show how I could rewrite the code using a
streams-based implementation:

            final AtomicInteger elementsConsumed = new AtomicInteger(0);
            IntStream.range(0, maxToGenerate)
                     .parallel()
                     .mapToObj(i -> generatorFunction.get())
                     .filter(condition::test)
                     .peek(action::accept)
                     .mapToInt(element ->
elementsConsumed.incrementAndGet())
                     .filter(n -> n >= needed)
                     .findFirst();

In my scenario, this type of code runs about 5.0 times faster than the
serial loop and the serial stream, which is still a huge improvement, but
about 10% less efficient than the ExecutorService-based implementation.

This variant of the code snippet, while being quite impure, also has the
disadvantage that it may do some more calls to the action than requested.
The application will have to cater for that, if it cannot be ignored.


I don't believe very strongly that anything can be done about the observed
behaviour before the release of Java 8, but the community should be aware
of it, and maybe in a later release this and certain other sequences of
stream operations could be optimized or some other solution found.

Arne Siegel


More information about the lambda-dev mailing list