Patch for parallel deferring to sequential evaluation
Paul Sandoz
paul.sandoz at oracle.com
Wed Oct 24 02:35:17 PDT 2012
Hi,
See attached for patch that ensures parallel evaluation degrades gracefully to sequential evaluation if the parallel stream has been partially iterated on.
This removes one of the ISEs that has recently been discussed.
Paul.
diff -r 93ea6cccef22 src/share/classes/java/util/streams/AbstractPipeline.java
--- a/src/share/classes/java/util/streams/AbstractPipeline.java Tue Oct 23 14:06:54 2012 -0700
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java Wed Oct 24 11:33:11 2012 +0200
@@ -89,11 +89,9 @@
}
protected<R> R evaluateParallel(TerminalOp<E_OUT, R> terminal) {
- // @@@ Need to check if any upstream streams have been pulled using iterator
if (iterator != null) {
- // @@@ Is this assumption correct for all sources and pipelines?
- // @@@ Can default to serial evaluation
- throw new IllegalStateException("A stream that has been iterated on (partially or otherwise) cannot be evaluated in parallel");
+ // If already pulled then cannot split, revert back to sequential evaluation
+ return evaluateSerial(terminal);
}
final IntermediateOp[] ops = ops();
@@ -232,10 +230,14 @@
}
@Override
+ public int getOutputSizeIfKnown() {
+ return (iterator == null && isOutputSizeKnown()) ? source.getSizeIfKnown() : -1;
+ }
+
+ @Override
public<S extends Sink<E_OUT>> S into(S sink) {
Objects.requireNonNull(sink);
- // @@@ Need to check if any upstream streams have been pulled using iterator
if (isShortCircuit() || iterator != null) {
Iterator<E_OUT> it = iterator();
sink.begin(-1);
diff -r 93ea6cccef22 test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java Tue Oct 23 14:06:54 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java Wed Oct 24 11:33:11 2012 +0200
@@ -134,6 +134,29 @@
}
},
+ // Wrap as parallel stream, and iterate in mixed mode
+ PAR_STREAM_ITERATOR_TO_ARRAY(true) {
+ <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+ Stream<?> stream = stream(data.par(ops));
+ Iterator<?> iter = stream.iterator();
+ if (iter.hasNext())
+ sink.accept(iter.next());
+ for (Object t : stream.toArray())
+ sink.accept(t);
+ }
+ },
+
+ // Wrap as parallel stream, and iterate in mixed mode
+ PAR_STREAM_ITERATOR_SEQUENTIAL_FOR_EACH(true) {
+ <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+ Stream<?> stream = stream(data.par(ops));
+ Iterator<?> iter = stream.iterator();
+ if (iter.hasNext())
+ sink.accept(iter.next());
+ stream.sequential().forEach(sink);
+ }
+ },
+
// More ways to iterate the PSS: iterate result of op
// Extends testing to test whether computation happens in- or out-of-thread
;
More information about the lambda-dev
mailing list