JDK-8071597: close original Stream in default implementation?

Paul Sandoz paul.sandoz at oracle.com
Wed Jul 8 08:27:45 UTC 2015


Hi Tagir,

Yes, i forgot about that, thanks, nice catch. Patch updated:

  http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8071597-take-drop-while/webrev/

See below for a diff.

Paul.


diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/DoubleStream.java
--- a/src/java.base/share/classes/java/util/stream/DoubleStream.java	Tue Jun 09 07:10:03 2015 +0100
+++ b/src/java.base/share/classes/java/util/stream/DoubleStream.java	Wed Jul 08 10:20:25 2015 +0200
@@ -300,7 +300,8 @@
      * the wrapped spliterator.  The returned stream preserves the execution
      * characteristics of this stream (namely parallel or sequential execution
      * as per {@link #isParallel()}) but the wrapped spliterator may choose to
-     * not support splitting.
+     * not support splitting.  When the returned stream is closed, the close
+     * handlers for both the returned and this stream are invoked.
      *
      * @apiNote
      * While {@code takeWhile()} is generally a cheap operation on sequential
@@ -328,7 +329,7 @@
         // is safe to use as long as it configured not to split
         return StreamSupport.doubleStream(
                 new WhileOps.UnorderedWhileSpliterator.OfDouble.Taking(spliterator(), true, predicate),
-                isParallel());
+                isParallel()).onClose(this::close);
     }

     /**
@@ -358,7 +359,8 @@
      * the wrapped spliterator.  The returned stream preserves the execution
      * characteristics of this stream (namely parallel or sequential execution
      * as per {@link #isParallel()}) but the wrapped spliterator may choose to
-     * not support splitting.
+     * not support splitting.  When the returned stream is closed, the close
+     * handlers for both the returned and this stream are invoked.
      *
      * @apiNote
      * While {@code dropWhile()} is generally a cheap operation on sequential
@@ -386,7 +388,7 @@
         // is safe to use as long as it configured not to split
         return StreamSupport.doubleStream(
                 new WhileOps.UnorderedWhileSpliterator.OfDouble.Dropping(spliterator(), true, predicate),
-                isParallel());
+                isParallel()).onClose(this::close);
     }

     /**
diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/IntStream.java
--- a/src/java.base/share/classes/java/util/stream/IntStream.java	Tue Jun 09 07:10:03 2015 +0100
+++ b/src/java.base/share/classes/java/util/stream/IntStream.java	Wed Jul 08 10:20:25 2015 +0200
@@ -298,7 +298,8 @@
      * the wrapped spliterator.  The returned stream preserves the execution
      * characteristics of this stream (namely parallel or sequential execution
      * as per {@link #isParallel()}) but the wrapped spliterator may choose to
-     * not support splitting.
+     * not support splitting.  When the returned stream is closed, the close
+     * handlers for both the returned and this stream are invoked.
      *
      * @apiNote
      * While {@code takeWhile()} is generally a cheap operation on sequential
@@ -325,7 +326,7 @@
         // is safe to use as long as it configured not to split
         return StreamSupport.intStream(
                 new WhileOps.UnorderedWhileSpliterator.OfInt.Taking(spliterator(), true, predicate),
-                isParallel());
+                isParallel()).onClose(this::close);
     }

     /**
@@ -355,7 +356,8 @@
      * the wrapped spliterator.  The returned stream preserves the execution
      * characteristics of this stream (namely parallel or sequential execution
      * as per {@link #isParallel()}) but the wrapped spliterator may choose to
-     * not support splitting.
+     * not support splitting.  When the returned stream is closed, the close
+     * handlers for both the returned and this stream are invoked.
      *
      * @apiNote
      * While {@code dropWhile()} is generally a cheap operation on sequential
@@ -382,7 +384,7 @@
         // is safe to use as long as it configured not to split
         return StreamSupport.intStream(
                 new WhileOps.UnorderedWhileSpliterator.OfInt.Dropping(spliterator(), true, predicate),
-                isParallel());
+                isParallel()).onClose(this::close);
     }

     /**
diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/LongStream.java
--- a/src/java.base/share/classes/java/util/stream/LongStream.java	Tue Jun 09 07:10:03 2015 +0100
+++ b/src/java.base/share/classes/java/util/stream/LongStream.java	Wed Jul 08 10:20:25 2015 +0200
@@ -298,7 +298,8 @@
      * the wrapped spliterator.  The returned stream preserves the execution
      * characteristics of this stream (namely parallel or sequential execution
      * as per {@link #isParallel()}) but the wrapped spliterator may choose to
-     * not support splitting.
+     * not support splitting.  When the returned stream is closed, the close
+     * handlers for both the returned and this stream are invoked.
      *
      * @apiNote
      * While {@code takeWhile()} is generally a cheap operation on sequential
@@ -326,7 +327,7 @@
         // is safe to use as long as it configured not to split
         return StreamSupport.longStream(
                 new WhileOps.UnorderedWhileSpliterator.OfLong.Taking(spliterator(), true, predicate),
-                isParallel());
+                isParallel()).onClose(this::close);
     }

     /**
@@ -356,7 +357,8 @@
      * the wrapped spliterator.  The returned stream preserves the execution
      * characteristics of this stream (namely parallel or sequential execution
      * as per {@link #isParallel()}) but the wrapped spliterator may choose to
-     * not support splitting.
+     * not support splitting.  When the returned stream is closed, the close
+     * handlers for both the returned and this stream are invoked.
      *
      * @apiNote
      * While {@code dropWhile()} is generally a cheap operation on sequential
@@ -384,7 +386,7 @@
         // is safe to use as long as it configured not to split
         return StreamSupport.longStream(
                 new WhileOps.UnorderedWhileSpliterator.OfLong.Dropping(spliterator(), true, predicate),
-                isParallel());
+                isParallel()).onClose(this::close);
     }

     /**
diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/Stream.java
--- a/src/java.base/share/classes/java/util/stream/Stream.java	Tue Jun 09 07:10:03 2015 +0100
+++ b/src/java.base/share/classes/java/util/stream/Stream.java	Wed Jul 08 10:20:25 2015 +0200
@@ -505,7 +505,8 @@
      * the wrapped spliterator.  The returned stream preserves the execution
      * characteristics of this stream (namely parallel or sequential execution
      * as per {@link #isParallel()}) but the wrapped spliterator may choose to
-     * not support splitting.
+     * not support splitting.  When the returned stream is closed, the close
+     * handlers for both the returned and this stream are invoked.
      *
      * @apiNote
      * While {@code takeWhile()} is generally a cheap operation on sequential
@@ -532,7 +533,7 @@
         // is safe to use as long as it configured not to split
         return StreamSupport.stream(
                 new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>(spliterator(), true, predicate),
-                isParallel());
+                isParallel()).onClose(this::close);
     }

     /**
@@ -562,7 +563,8 @@
      * the wrapped spliterator.  The returned stream preserves the execution
      * characteristics of this stream (namely parallel or sequential execution
      * as per {@link #isParallel()}) but the wrapped spliterator may choose to
-     * not support splitting.
+     * not support splitting.  When the returned stream is closed, the close
+     * handlers for both the returned and this stream are invoked.
      *
      * @apiNote
      * While {@code dropWhile()} is generally a cheap operation on sequential
@@ -589,7 +591,7 @@
         // is safe to use as long as it configured not to split
         return StreamSupport.stream(
                 new WhileOps.UnorderedWhileSpliterator.OfRef.Dropping<>(spliterator(), true, predicate),
-                isParallel());
+                isParallel()).onClose(this::close);
     }

     /**
diff -r 61bc2fe3b2c6 test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java	Tue Jun 09 07:10:03 2015 +0100
+++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java	Wed Jul 08 10:20:25 2015 +0200
@@ -31,6 +31,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.DefaultMethodStreams;
@@ -318,4 +319,43 @@
         }
     }

+    @Test
+    public void testRefDefaultClose() {
+        AtomicBoolean isClosed = new AtomicBoolean();
+        Stream<Integer> s = Stream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+        try (Stream<Integer> ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+            ds.count();
+        }
+        assertTrue(isClosed.get());
+    }
+
+    @Test
+    public void testIntDefaultClose() {
+        AtomicBoolean isClosed = new AtomicBoolean();
+        IntStream s = IntStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+        try (IntStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+            ds.count();
+        }
+        assertTrue(isClosed.get());
+    }
+
+    @Test
+    public void testLongDefaultClose() {
+        AtomicBoolean isClosed = new AtomicBoolean();
+        LongStream s = LongStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+        try (LongStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+            ds.count();
+        }
+        assertTrue(isClosed.get());
+    }
+
+    @Test
+    public void testDoubleDefaultClose() {
+        AtomicBoolean isClosed = new AtomicBoolean();
+        DoubleStream s = DoubleStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+        try (DoubleStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+            ds.count();
+        }
+        assertTrue(isClosed.get());
+    }
 }

On Jul 8, 2015, at 4:58 AM, Tagir F. Valeev <amaembo at gmail.com> wrote:

> Hello,
> 
> I was looking at default implementation of new Stream
> takeWhile/dropWhile methods:
> 
> http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8071597-take-drop-while/webrev/src/java.base/share/classes/java/util/stream/Stream.java.cdiff.html
> 
> I think in order to make this default behavior more consistent, the
> original Stream must be closed when the returned one is closed. Thus,
> takeWhile return statement should look like this:
> 
> return StreamSupport.stream(
>     new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>
>     (spliterator(), true, predicate),
>     isParallel()).onClose(this::close);
> 
> The same for dropWhile method.
> 
> What do you think?
> 
> Regards,
> Tagir Valeev.
> 




More information about the core-libs-dev mailing list