RFR 8075307 Pipeline calculating inconsistent flag state for parallel stateful ops

Stuart Marks stuart.marks at oracle.com
Tue Mar 24 01:41:54 UTC 2015


Hi Paul,

After reading your notes here, and in the bug reports, and the comments in the 
code, and banging my head against the code (before and after) for a while, I'm 
starting to see that this is on the right track. Sorry to hedge a bit but I have 
to admit that I don't fully understand the code.

However, I do see the comment you referred to (old version, line 460) and that 
the new code is essentially a merge of the old parallelPrepare() code into 
sourceSpliterator(). And I do see the crucial addition of the flag modification 
based on the spliterator's SIZED characteristic. So that seems right, and if the 
tests pass, so much the better.

A few comments for future maintenance/cleanups in this area.

* This package seems like a curious mixture of abstraction of bit-twiddling into 
small methods (e.g., combineOpFlags), and bit-twiddling in open code (evalation 
of Spliterator.SIZED into thisOpFlags, new version lines 459ff). While not 
incorrect, this is jarring.

* The loop variables over the pipeline stages are hard to follow. The variable 
'p' is used in the loop at line 413 a the "current" pipeline stage, whereas 'p' 
is used for the "next" stage in the loop at 434, and 'u' is the "current" stage. 
This is confusing, and I don't know what 'u' means.

* Also, in the same loop, 'e' is initialized to 'this' and is checked as the 
loop exit condition (I guess that's what 'e' stands for) but it's not used 
elsewhere, so it's not clear to me how much value this adds.

Stepping back from this a bit, this is clearly an area of some complexity, and 
it might benefit from some additional testing. The streams library is overall 
well-tested, but mostly from a functional level, i.e. running a bunch of streams 
in various combinations to ensure the right output is produced. For this case it 
might be helpful to assemble (but not execute) a bunch of combinations of 
pipelines and then make sure that each stage ends up with the right flags. (I 
didn't find such a test when I went looking, but I might have missed it.)

In any case I think it's reasonble to proceed with this patch as it stands 
instead of tinkering with it. Some additional cleanups are warranted at some 
point but we should keep an eye on these for the future.

s'marks

On 3/18/15 7:09 AM, Paul Sandoz wrote:
> Hi,
>
> The fix for https://bugs.openjdk.java.net/browse/JDK-8067969 (optimized Stream.count()) caused a regression in the JCK tests.
>
> A test exposed a known weakness in the way pipeline flags are prepared for parallel execution. In certain cases the pipeline may report SIZED but the source spliterator does not report the SIZE characteristic. This is because the preparation always assumed that stateful operations are a full barrier and thus inject SIZE. This is not always the case when a stateful operation is "lazy" and can wrap the source spliterator and operations.
>
> For example:
>
>    List<Integer> list = Arrays.asList(1, 2, 3);
>    Stream<Integer> s = list.stream().parallel().unordered().distinct();
>    long count = s.count();
>    System.out.println(count);
>
> The stateful distinct op can wrap source spliterator of the list since we don't care about order, but as a result the size of the distinct->count pipeline slice is unknown.
>
> While i could fix the counting functionality to check for the inconsistency the right thing to do is fix the preparation of flags (a particularly sensitive area to get right) by merging that into the sourceSpliterator method (as suggested in the comments):
>
>    http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8075307-stream-parallel-prepare-flags/webrev/
>
> This also makes things marginally more efficient when there are no stateful operations. There is no need to do any work if there are no stateful ops (i needed to relax tests in UnorderedTest which were overly aggressive when there were no stateful ops present, similarly i relaxed a test in DisctinctOps that is no longer generally applicable).
>
> Furthermore, as a consequence the terminal counting reduce ops now declare they don't care about encounter order [*], which avoid the distinct becoming a full barrier that reduces elements to a linked hash set.
>
> A JPRT test run reported no relevant failures and local execution of relevant JCK tests pass.
>
> Paul.
>
> [*] Note that a count can also be performed with:
>
>    LongAdder l = new LongAdder();
>    Stream<T> s = ...
>    s.parallel().forEach(e -> l.increment());
>    long count = l.sum();
>
> Which is a possible alternative implementation to that of reduction, but i prefer to stick with the latter for now.
>



More information about the core-libs-dev mailing list