RFR 8075307 Pipeline calculating inconsistent flag state for parallel stateful ops
Paul Sandoz
paul.sandoz at oracle.com
Wed Mar 18 14:09:05 UTC 2015
Hi,
The fix for https://bugs.openjdk.java.net/browse/JDK-8067969 (optimized Stream.count()) caused a regression in the JCK tests.
A test exposed a known weakness in the way pipeline flags are prepared for parallel execution. In certain cases the pipeline may report SIZED but the source spliterator does not report the SIZE characteristic. This is because the preparation always assumed that stateful operations are a full barrier and thus inject SIZE. This is not always the case when a stateful operation is "lazy" and can wrap the source spliterator and operations.
For example:
List<Integer> list = Arrays.asList(1, 2, 3);
Stream<Integer> s = list.stream().parallel().unordered().distinct();
long count = s.count();
System.out.println(count);
The stateful distinct op can wrap source spliterator of the list since we don't care about order, but as a result the size of the distinct->count pipeline slice is unknown.
While i could fix the counting functionality to check for the inconsistency the right thing to do is fix the preparation of flags (a particularly sensitive area to get right) by merging that into the sourceSpliterator method (as suggested in the comments):
http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8075307-stream-parallel-prepare-flags/webrev/
This also makes things marginally more efficient when there are no stateful operations. There is no need to do any work if there are no stateful ops (i needed to relax tests in UnorderedTest which were overly aggressive when there were no stateful ops present, similarly i relaxed a test in DisctinctOps that is no longer generally applicable).
Furthermore, as a consequence the terminal counting reduce ops now declare they don't care about encounter order [*], which avoid the distinct becoming a full barrier that reduces elements to a linked hash set.
A JPRT test run reported no relevant failures and local execution of relevant JCK tests pass.
Paul.
[*] Note that a count can also be performed with:
LongAdder l = new LongAdder();
Stream<T> s = ...
s.parallel().forEach(e -> l.increment());
long count = l.sum();
Which is a possible alternative implementation to that of reduction, but i prefer to stick with the latter for now.
More information about the core-libs-dev
mailing list