Patch for parallel deferring to sequential evaluation

Paul Sandoz paul.sandoz at oracle.com
Wed Oct 24 02:35:17 PDT 2012


Hi,

See attached for patch that ensures parallel evaluation degrades gracefully to sequential evaluation if the parallel stream has been partially iterated on.

This removes one of the ISEs that has recently been discussed.

Paul.

diff -r 93ea6cccef22 src/share/classes/java/util/streams/AbstractPipeline.java
--- a/src/share/classes/java/util/streams/AbstractPipeline.java	Tue Oct 23 14:06:54 2012 -0700
+++ b/src/share/classes/java/util/streams/AbstractPipeline.java	Wed Oct 24 11:33:11 2012 +0200
@@ -89,11 +89,9 @@
     }
 
     protected<R> R evaluateParallel(TerminalOp<E_OUT, R> terminal) {
-        // @@@ Need to check if any upstream streams have been pulled using iterator
         if (iterator != null) {
-            // @@@ Is this assumption correct for all sources and pipelines?
-            // @@@ Can default to serial evaluation
-            throw new IllegalStateException("A stream that has been iterated on (partially or otherwise) cannot be evaluated in parallel");
+            // If already pulled then cannot split, revert back to sequential evaluation
+            return evaluateSerial(terminal);
         }
 
         final IntermediateOp[] ops = ops();
@@ -232,10 +230,14 @@
         }
 
         @Override
+        public int getOutputSizeIfKnown() {
+            return (iterator == null && isOutputSizeKnown()) ? source.getSizeIfKnown() : -1;
+        }
+
+        @Override
         public<S extends Sink<E_OUT>> S into(S sink) {
             Objects.requireNonNull(sink);
 
-            // @@@ Need to check if any upstream streams have been pulled using iterator
             if (isShortCircuit() || iterator != null) {
                 Iterator<E_OUT> it = iterator();
                 sink.begin(-1);
diff -r 93ea6cccef22 test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java
--- a/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Tue Oct 23 14:06:54 2012 -0700
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/StreamOpTestCase.java	Wed Oct 24 11:33:11 2012 +0200
@@ -134,6 +134,29 @@
             }
         },
 
+        // Wrap as parallel stream, and iterate in mixed mode
+        PAR_STREAM_ITERATOR_TO_ARRAY(true) {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                Stream<?> stream = stream(data.par(ops));
+                Iterator<?> iter = stream.iterator();
+                if (iter.hasNext())
+                    sink.accept(iter.next());
+                for (Object t : stream.toArray())
+                    sink.accept(t);
+            }
+        },
+
+        // Wrap as parallel stream, and iterate in mixed mode
+        PAR_STREAM_ITERATOR_SEQUENTIAL_FOR_EACH(true) {
+            <T> void run(TestData<T> data, Sink sink, IntermediateOp[] ops) {
+                Stream<?> stream = stream(data.par(ops));
+                Iterator<?> iter = stream.iterator();
+                if (iter.hasNext())
+                    sink.accept(iter.next());
+                stream.sequential().forEach(sink);
+            }
+        },
+
         // More ways to iterate the PSS: iterate result of op
         // Extends testing to test whether computation happens in- or out-of-thread
         ;





More information about the lambda-dev mailing list