Exceptional behavior of parallel stream

Paul Sandoz paul.sandoz at oracle.com
Thu Aug 18 19:33:06 UTC 2016


> On 18 Aug 2016, at 03:54, Doug Lea <dl at cs.oswego.edu> wrote:
> 
> On 08/17/2016 09:01 AM, Tagir F. Valeev wrote:
>> Hello!
>> 
>> I found no information in Stream API documentation on how it behaves
>> in case if exception occurs during the stream processing. While it's
>> quite evident for sequential stream (stream processing is terminated
>> and exception is propagated to the caller as is), the behavior for
>> parallel streams differs from one might expect. Consider the following
>> test:
> 
> In your example, you can witness the delayed termination of other threads
> upon exception in another because you add side-effecting operations
> (here, just printing). Avoiding all this would force sequential processing.
> 
> But if the supplied functions follow the documented properties,
> it should not matter that parallel processing of some elements continues
> when another hits an exception. Which is why nothing is said about it.
> Similar effects occur in findAny and other methods. I don't see any benefit
> in trying to specify exactly what happens in these cases.
> 

Short circuiting terminal operations such as findAny and findFirst will not return until the computation has “quiesced”, so once a result has been found the result will not be returned until all executing f/j tasks have completed. We consciously decided to do that so as the common pool would not contain straggling tasks beyond evaluation of the pipeline.

When an behavioural operation throws an exception the stream can return before all related f/j tasks have finished. I thought i had a good grasp of the CountedCompleter exception propagation behaviour vs. normal completion propagation behaviour, but i am not so sure now, since i cannot bias things to make the non-quiescent behaviour very obvious. I think it’s because the main non-worker thread tries to help out executing tasks before waiting.

Here is another reproducer:
try {
    int s = 1024 * 16;
    IntStream.range(0, s).parallel()
            .peek(e -> {
                if (e < s / 2) throw new RuntimeException();
                BigInteger.probablePrime(256, ThreadLocalRandom.current());
            })
            .forEach(e -> System.out.println(Thread.currentThread().getId() + " " + e));
} catch (RuntimeException e) {
    System.out.println("FAILED: " + ForkJoinPool.commonPool().isQuiescent());
}
Paul.


More information about the core-libs-dev mailing list