Stream.unordered()

Paul Sandoz paul.sandoz at oracle.com
Mon Nov 5 08:22:39 PST 2012


Grrr. attachments (even just textual attachments of a reasonable size) are removed...

Paul.

On Nov 5, 2012, at 4:49 PM, Paul Sandoz <Paul.Sandoz at oracle.com> wrote:

> HI,
> 
> See attached for a patch that adds Stream.unordered().
> 
> This method will clear the encounter order flag. When this flag is cleared certain downstream stateful operations evaluated in parallel may choose to apply algorithms that do not preserve encounter order.
> 
> Such algorithms may be more efficient than order preserving algorithms, although i would say that one should not presume this to be a general assumption and has to be taken on a case by case basis.
> 
> One such example is obtaining unique elements. 
> 
> When encounter order is to be preserved it might be required to merge the linked hash sets produced at the leaves of the computation tree. That merging has a cost, i suspect it is rather tricky to write a parallel set merge algorithm, so a naive implementation would just take the set on the left hand side of a tree and merge with set on the right hand side using Set.addAll.
> 
> When encounter order need not be preserved one can stuff elements as and when they are produced into a shared ConcurrentHashMap.
> 
> Paul.
> 
> 
> 
> 


diff -r e60b1819b652 src/share/classes/java/util/streams/Stream.java
--- a/src/share/classes/java/util/streams/Stream.java	Mon Nov 05 15:21:49 2012 +0100
+++ b/src/share/classes/java/util/streams/Stream.java	Mon Nov 05 16:24:46 2012 +0100
@@ -113,8 +113,21 @@
 
     Optional<T> findAny();
 
+    /**
+     * Convert this stream, if a parallel stream, to a sequential stream.
+     *
+     * @return a sequential stream.
+     */
     Stream<T> sequential();
 
+    /**
+     * Convert this stream to a stream that has no encounter order.
+     * <p>Parallel evaluation of downstream operations are free process input elements and produce output elements
+     * in any order.</p>
+     *
+     * @return a stream whose output elements have no encounter order.
+     */
+    Stream<T> unordered();
 
     /**
      * An aggregate that supports an {@code addAll(Stream)} operation.
diff -r e60b1819b652 src/share/classes/java/util/streams/ValuePipeline.java
--- a/src/share/classes/java/util/streams/ValuePipeline.java	Mon Nov 05 15:21:49 2012 +0100
+++ b/src/share/classes/java/util/streams/ValuePipeline.java	Mon Nov 05 16:24:46 2012 +0100
@@ -163,6 +163,11 @@
     }
 
     @Override
+    public Stream<U> unordered() {
+        return chainValue(new FlagDeclaringOp<U>(StreamOpFlags.NOT_ORDERED));
+    }
+
+    @Override
     public U reduce(final U seed, final BinaryOperator<U> op) {
         return pipeline(new FoldOp<>(seed, op, op));
     }
diff -r e60b1819b652 test-ng/tests/org/openjdk/tests/java/util/streams/ops/UnorderedStreamTest.java
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test-ng/tests/org/openjdk/tests/java/util/streams/ops/UnorderedStreamTest.java	Mon Nov 05 16:24:46 2012 +0100
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package org.openjdk.tests.java.util.streams.ops;
+
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.streams.*;
+
+ at Test
+public class UnorderedStreamTest {
+
+    public void testUnordered() {
+        testUnordered(Arrays.asList(1, 2, 3).stream());
+        testUnordered(Arrays.asList(1, 2, 3).parallel());
+    }
+
+    <T> void testUnordered(Stream<T> s) {
+        s = s.unordered();
+
+        @SuppressWarnings("unchecked")
+        Stream<Integer> st = new ValuePipeline<Integer, Integer>(
+                (AbstractPipeline<Integer, Integer>)s,
+                new FlagOpTest.TestFlagExpectedOp<>(0,
+                                                    EnumSet.noneOf(StreamOpFlags.class),
+                                                    EnumSet.noneOf(StreamOpFlags.class),
+                                                    EnumSet.of(StreamOpFlags.ORDERED)));
+        st.toArray();
+    }
+
+}




More information about the lambda-dev mailing list