Stream is AutoCloseable, Stream.onClose

Jim Mayer jim at pentastich.org
Sun Jun 23 18:45:15 PDT 2013


Hi Brian,

I sent a note earlier to the observers list with some concerns about
leaking resources.  This note is an elaboration of the earlier one and
includes some ideas about a possible way around the issue that involves
deferring the creation of resources until the stream pipeline evaluation
starts.

Consider the example at the end of your note:

public static Stream<String> lines(Path path, Charset cs) throws
IOException {
        BufferedReader br = Files.newBufferedReader(path, cs);
        return br.lines().onClose(br::close);
}


Now, consider the following example:

List<String> lines = Files.lines(path, cs).collect(getCollector());
...
private Collector<String,List<String>> getCollector() {
        throw new RuntimeException();

}


The "listOfLines" method is going to leak the reader.

The problem is going to occur for any method that creates a resource and
returns a Stream that uses that resource.  There's no direct way around it
that I can see, because there is no way to reliably release the resource
during the construction of the stream pipeline.

As you pointed out in your note, the following formulation would be safe:

try (Stream<String> s = Files.lines(path, cs)) {
        return s.collect(getCollector())
}


I'm concerned, though, that methods like Files.lines will make it easy and
tempting to write resource unsafe code.  Stephan Herrmann's recent comments
in the "Point lambdafications in Java.io" thread suggest that the IDE could
catch many of these problems, but, at the same time, it would be convenient
to be able to write "Files.lines(pah, cs).collect(toList())"!

One way to minimize the problem would be to make sure that the standard
Java libraries allocate the resource from within the stream evaluation
rather than in the method that returns the stream.  In the case of
"File.lines" this would mean not opening the file until the stream starts
pulling data from it.  The library would have to do something with the
possible IOException from opening the file, but that problem exists on each
read anyway so I assume there are plans to address it.

I'm not sure how you'd want to express this in the API (something like
Stream.generate, I suppose, since I assume that you need something to pass
to "onClose" during the stream pipeline construction), but deferring
resource creation until stream evaluation would make it much harder to leak
resources.

-- Jim Mayer

On Fri, Jun 21, 2013 at 1:39 PM, Brian Goetz <brian.goetz at oracle.com> wrote:

> Let me step back and describe why these use cases are important.
>
> Crawling directories is something that's always been a pain in Java and
> easy in scripting languages.  The work Henry did in java.nio.Files was
> designed to alleviate that pain.
>
> For example, to walk a tree:
>
>   Files.list(Path)
>
> produces a Stream<Path>.  But it also encapsulates a DirectoryStream
> through its Iterator, which should be closed when we're finished
> enumerating the stream.  The route of "CloseableStream extends Stream,
> Closeable" was intended to make that easy through TWR:
>
>   try (Stream<Path> s = Files.list(path)) {
>     s.filter(e -> e.getName().endsWith(".java"))
>      .forEach(...);
>   }
>
> But as it turns out, this isn't even enough.  There's also Files.lines(f),
> which gives you the lines in a given file.  If we want all the lines in a
> set of files:
>
>   Files.list(path)
>        .flatMap(Files::lines)
>        ...
>
> This is really nice!  But, even with TWR at the top level, we don't have a
> way to close the BufferedReader that is implicit in each iteration of
> flatMap, only the top-level stream, which is not enough.
>
> If we step back and say "All streams support closing", then we can
> implement flatMap in a close-friendly way:
>
>       // In ReferencePipeline.flatMap
>       try (Stream<? extends R> result = mapper.apply(u)) {
>           if (result != null)
>               result.sequential().forEach(**downstream);
>       }
>
> For most streams, the implicit close would be a no-op, but for streams
> with a close handler, it would get run when the nested stream goes out of
> scope.
>
> Now, if Files.lines() produces a stream with a close hook, the above plays
> nicely with TWR:
>
>   try(Stream<Path> paths = Files.list(path)) {
>        paths.flatMap(Files::lines)
>             ...
>   }
>
> and even if the user does not use TWR, only the top-level stream is at
> risk for leaking, rather than the thousands of BufferedReader-backed
> streams.
>
> This is why the "have a different, statically-identified kind of stream"
> approach fails; if we need to release resources, we need it to be turtles
> all the way down.
>
> Here's how the Files.lines implementation looks now:
>
>         BufferedReader br = Files.newBufferedReader(path, cs);
>         return new DelegatingCloseableStream<>(**br, br.lines());
>
> under the new approach, it would look like:
>
>         BufferedReader br = Files.newBufferedReader(path, cs);
>         return br.lines().onClose(br::close);
>
>
>
>
>
> On 6/20/2013 11:17 AM, Paul Sandoz wrote:
>
>> Hi,
>>
>> Having another go at this...
>>
>> The current solution of resource management for Stream is poor (as
>> previously mentioned):
>>
>> 1) CloseableStream/**DelegatingStream add a lot of API surface area.
>>
>> 2) Operations on CloseableStream do not return a CloseableStream.
>>
>> 3) try-with-resources always needs to be used with the CloseableStream
>> since the Stream itself has no closeable semantics.
>>
>> 4) Stream.concat fails to propagate the closing
>>
>>
>> If Stream extends from AutoCloseable we can address the first 3 issues.
>> In fact it can be addressed with just a close method but it is awkward to
>> transform that into an AutoCloseable for use with try-with-resources: try
>> (AutoCloseable ac = () -> s.close()) { … }
>>
>> A negative point is it is no longer clear whether a stream should be
>> closed or not, which is anyway the case for issues 2/3/4. However, i don't
>> think that should stop us trying to improve the general situation, it's not
>> gonna be perfect but i think we can do better than what we currently have
>> [*].
>>
>>
>> Issue 4 can be addressed by adding a Stream.onClose(AutoCloseable ac)
>> method.
>>
>> Stream s = ...
>> s = s.onClose(a).filter(...).**onClose(b).
>> s.close(); // b is called, then a is called
>> s.toArray(); // throws ISE
>>
>> Stream s = ...
>> s = s.onClose(a).onClose(b).
>> s.close(); // b is called, then a is called
>> s.toArray(); // throws ISE
>>
>> The Stream.concat implementation becomes:
>>
>>          Stream<T> cs = (a.isParallel() || b.isParallel())
>>                 ? StreamSupport.parallelStream(**split)
>>                 : StreamSupport.stream(split);
>>
>>          return cs.onClose(() -> { a.close(); b.close(); } ) // ignoring
>> exception handling to be brief
>>
>>
>> The Stream.close/onClose methods enable us to specify more precisely the
>> behaviour of Stream.close, the order in which calls to close on
>> AutoClosable instances passed to onClose occur, and what happens if
>> AutoCloseable.close throws an exception.
>>
>>
>> Of course it is possible to do silly things like this:
>>
>>    s = s.conClose(a).filter(...).**onClose(s).
>>
>> but we could detect if s is stage in the pipeline and throw an IAE.
>>
>>
>> FWIW Stream.close/onClose is easy to implement efficiently.
>>
>> Paul.
>>
>> [*] The JDK world is murky, see ByteArrayInputStream.close:
>>
>>      /**
>>       * Closing a <tt>ByteArrayInputStream</tt> has no effect. The
>> methods in
>>       * this class can be called after the stream has been closed without
>>       * generating an <tt>IOException</tt>.
>>       * <p>
>>       */
>>      public void close() throws IOException {
>>      }
>>
>>


More information about the lambda-libs-spec-observers mailing list