JDK-8071597: close original Stream in default implementation?
Paul Sandoz
paul.sandoz at oracle.com
Wed Jul 8 08:27:45 UTC 2015
Hi Tagir,
Yes, i forgot about that, thanks, nice catch. Patch updated:
http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8071597-take-drop-while/webrev/
See below for a diff.
Paul.
diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/DoubleStream.java
--- a/src/java.base/share/classes/java/util/stream/DoubleStream.java Tue Jun 09 07:10:03 2015 +0100
+++ b/src/java.base/share/classes/java/util/stream/DoubleStream.java Wed Jul 08 10:20:25 2015 +0200
@@ -300,7 +300,8 @@
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
- * not support splitting.
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code takeWhile()} is generally a cheap operation on sequential
@@ -328,7 +329,7 @@
// is safe to use as long as it configured not to split
return StreamSupport.doubleStream(
new WhileOps.UnorderedWhileSpliterator.OfDouble.Taking(spliterator(), true, predicate),
- isParallel());
+ isParallel()).onClose(this::close);
}
/**
@@ -358,7 +359,8 @@
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
- * not support splitting.
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code dropWhile()} is generally a cheap operation on sequential
@@ -386,7 +388,7 @@
// is safe to use as long as it configured not to split
return StreamSupport.doubleStream(
new WhileOps.UnorderedWhileSpliterator.OfDouble.Dropping(spliterator(), true, predicate),
- isParallel());
+ isParallel()).onClose(this::close);
}
/**
diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/IntStream.java
--- a/src/java.base/share/classes/java/util/stream/IntStream.java Tue Jun 09 07:10:03 2015 +0100
+++ b/src/java.base/share/classes/java/util/stream/IntStream.java Wed Jul 08 10:20:25 2015 +0200
@@ -298,7 +298,8 @@
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
- * not support splitting.
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code takeWhile()} is generally a cheap operation on sequential
@@ -325,7 +326,7 @@
// is safe to use as long as it configured not to split
return StreamSupport.intStream(
new WhileOps.UnorderedWhileSpliterator.OfInt.Taking(spliterator(), true, predicate),
- isParallel());
+ isParallel()).onClose(this::close);
}
/**
@@ -355,7 +356,8 @@
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
- * not support splitting.
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code dropWhile()} is generally a cheap operation on sequential
@@ -382,7 +384,7 @@
// is safe to use as long as it configured not to split
return StreamSupport.intStream(
new WhileOps.UnorderedWhileSpliterator.OfInt.Dropping(spliterator(), true, predicate),
- isParallel());
+ isParallel()).onClose(this::close);
}
/**
diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/LongStream.java
--- a/src/java.base/share/classes/java/util/stream/LongStream.java Tue Jun 09 07:10:03 2015 +0100
+++ b/src/java.base/share/classes/java/util/stream/LongStream.java Wed Jul 08 10:20:25 2015 +0200
@@ -298,7 +298,8 @@
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
- * not support splitting.
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code takeWhile()} is generally a cheap operation on sequential
@@ -326,7 +327,7 @@
// is safe to use as long as it configured not to split
return StreamSupport.longStream(
new WhileOps.UnorderedWhileSpliterator.OfLong.Taking(spliterator(), true, predicate),
- isParallel());
+ isParallel()).onClose(this::close);
}
/**
@@ -356,7 +357,8 @@
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
- * not support splitting.
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code dropWhile()} is generally a cheap operation on sequential
@@ -384,7 +386,7 @@
// is safe to use as long as it configured not to split
return StreamSupport.longStream(
new WhileOps.UnorderedWhileSpliterator.OfLong.Dropping(spliterator(), true, predicate),
- isParallel());
+ isParallel()).onClose(this::close);
}
/**
diff -r 61bc2fe3b2c6 src/java.base/share/classes/java/util/stream/Stream.java
--- a/src/java.base/share/classes/java/util/stream/Stream.java Tue Jun 09 07:10:03 2015 +0100
+++ b/src/java.base/share/classes/java/util/stream/Stream.java Wed Jul 08 10:20:25 2015 +0200
@@ -505,7 +505,8 @@
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
- * not support splitting.
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code takeWhile()} is generally a cheap operation on sequential
@@ -532,7 +533,7 @@
// is safe to use as long as it configured not to split
return StreamSupport.stream(
new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>(spliterator(), true, predicate),
- isParallel());
+ isParallel()).onClose(this::close);
}
/**
@@ -562,7 +563,8 @@
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
- * not support splitting.
+ * not support splitting. When the returned stream is closed, the close
+ * handlers for both the returned and this stream are invoked.
*
* @apiNote
* While {@code dropWhile()} is generally a cheap operation on sequential
@@ -589,7 +591,7 @@
// is safe to use as long as it configured not to split
return StreamSupport.stream(
new WhileOps.UnorderedWhileSpliterator.OfRef.Dropping<>(spliterator(), true, predicate),
- isParallel());
+ isParallel()).onClose(this::close);
}
/**
diff -r 61bc2fe3b2c6 test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java
--- a/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java Tue Jun 09 07:10:03 2015 +0100
+++ b/test/java/util/stream/test/org/openjdk/tests/java/util/stream/WhileOpTest.java Wed Jul 08 10:20:25 2015 +0200
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.DefaultMethodStreams;
@@ -318,4 +319,43 @@
}
}
+ @Test
+ public void testRefDefaultClose() {
+ AtomicBoolean isClosed = new AtomicBoolean();
+ Stream<Integer> s = Stream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+ try (Stream<Integer> ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+ ds.count();
+ }
+ assertTrue(isClosed.get());
+ }
+
+ @Test
+ public void testIntDefaultClose() {
+ AtomicBoolean isClosed = new AtomicBoolean();
+ IntStream s = IntStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+ try (IntStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+ ds.count();
+ }
+ assertTrue(isClosed.get());
+ }
+
+ @Test
+ public void testLongDefaultClose() {
+ AtomicBoolean isClosed = new AtomicBoolean();
+ LongStream s = LongStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+ try (LongStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+ ds.count();
+ }
+ assertTrue(isClosed.get());
+ }
+
+ @Test
+ public void testDoubleDefaultClose() {
+ AtomicBoolean isClosed = new AtomicBoolean();
+ DoubleStream s = DoubleStream.of(1, 2, 3).onClose(() -> isClosed.set(true));
+ try (DoubleStream ds = DefaultMethodStreams.delegateTo(s).takeWhile(e -> e < 3)) {
+ ds.count();
+ }
+ assertTrue(isClosed.get());
+ }
}
On Jul 8, 2015, at 4:58 AM, Tagir F. Valeev <amaembo at gmail.com> wrote:
> Hello,
>
> I was looking at default implementation of new Stream
> takeWhile/dropWhile methods:
>
> http://cr.openjdk.java.net/~psandoz/jdk9/JDK-8071597-take-drop-while/webrev/src/java.base/share/classes/java/util/stream/Stream.java.cdiff.html
>
> I think in order to make this default behavior more consistent, the
> original Stream must be closed when the returned one is closed. Thus,
> takeWhile return statement should look like this:
>
> return StreamSupport.stream(
> new WhileOps.UnorderedWhileSpliterator.OfRef.Taking<>
> (spliterator(), true, predicate),
> isParallel()).onClose(this::close);
>
> The same for dropWhile method.
>
> What do you think?
>
> Regards,
> Tagir Valeev.
>
More information about the core-libs-dev
mailing list