experience trying out lambda-8-b74

Peter Levart peter.levart at gmail.com
Mon Jan 28 00:16:30 PST 2013


On 01/28/2013 12:06 AM, Henry Jen wrote:
> There is an implementation almost ready, we need more test coverage on exceptional cases; Otherwise, the functionality seems to complete and pass tests we have.
>
> http://cr.openjdk.java.net/~henryjen/lambda/nio.5/webrev/
>
> With that, looks into Files.walk(Path), which iterate lazily as the stream is consumed, which should meet your need.
>
> parallel() would be built-in once we have a general strategy on how to spliterate a sequential stream.
Hi Henry,

I can imagine that many times a single block of code would be 
responsible for constructing a Path stream (possibly enclosed into a 
try-with-resources construct) and consuming it's results, so having to 
deal with the duality of IOException/UncheckedIOException is a 
complication for a user in my opinion. Wouldn't it be better also for 
the stream-factory methods to throw UncheckedIOException and for 
CloseableStream.close() to also throw UncheckedIOException (that means, 
only inheriting from AutoCloseable, not Closeable and co-variant-ly 
redefining the throws declaration):

public interface CloseableStream<T> extends Stream<T>, AutoCloseable {
     @Override
     void close() throws UncheckedIOException;
}


Regards, Peter
>
> Cheers,
> Henry
>
> On Jan 27, 2013, at 12:45 PM, Gregg Wonderly <gregg at wonderly.org> wrote:
>
>> One of the problems I have, is an application which can end up with 10's of thousands of files in a single directory.  I tried to get an iterator with call backs or opendir/readdir put into the JDK back in 1.3.1/1.4 to no avail.  I'd real like to have something as simple as opendir/readdir provided as a stream that is effective and non memory intensive.  I currently have to use JNI to do opendir/readdir, and I need to have a count function that used readdir or an appropriate mechanism to see when certain thresholds are passed.
>>
>> Gregg Wonderly
>>
>> Sent from my iPad
>>
>> On Jan 27, 2013, at 12:47 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>>
>>> Thanks, this is a nice little example.
>>>
>>> First, I'll note we have been brainstorming on stream support for NIO
>>> directory streams, which might help in this situation.  This should come
>>> up for review here pretty soon, and it would be great if you could
>>> validate whether it addresses your concerns here.
>>>
>>> The "recursive explosion" problem is an interesting one, and not one
>>> we'd explicitly baked into the design requirements for
>>> flatMap/mapMulti/explode.  (Note that Scala's flatMap, or .NET's
>>> SelectMany, do not explicitly handle this either.)  But your solution is
>>> pretty clean anyway!  You can lower the overhead you are concerned about
>>> by not creating a Stream but instead forEaching directly on the result
>>> of listFiles:
>>>
>>> } else if (f.isDirectory()) {
>>>      for (fi : f.listFiles())
>>>          fileExploder(ds, fi);   /* recursive call ! */
>>> }
>>>
>>> Which isn't as bad.  You are still creating the array inside
>>> listFiles(), but at least then you create no intermediate objects to
>>> iterate over it / explode it.
>>>
>>> BTW, the current (frequently confusing) design of explode is based on
>>> the exact performance concern you are worried about; exploding an
>>> element to nothing should result in no work.  Creating an empty
>>> collection, and then iterating it, would be a loss, which is why
>>> something that is more like map(Function<T, Collection<U>>) was not
>>> selected, despite being simpler to understand (and convenient in the
>>> common but by-no-means universal case where you already have a
>>> Collection lying around.)
>>>
>>> Whether parallelization will be a win or is tricky, as you've
>>> discovered.  Effectively, your stream data is generated by explode(),
>>> not by the stream source, and this process is fundamentally serial.  So
>>> you will get benefit out of parallelization only if the work done to
>>> process the files (which could be as trivial as processing their names,
>>> or as huge as parsing gigabyte XML documents) is big enough to make up
>>> for the serial part.  Unfortunately, the library can't figure that out
>>> for you, only you know that.
>>>
>>> That said, its interesting that you see different parallelization by
>>> adding the intermediate collect() stage.  This may well be a bug, we'll
>>> look into that.
>>>
>>> On 1/27/2013 8:31 AM, Arne Siegel wrote:
>>>> Hi lambda nation,
>>>>
>>>> was wondering if I could lambda-ize this little file system crawler:
>>>>
>>>>     public void forAllFilesDo(Block<File> fileConsumer, File baseFile) {
>>>>         if (baseFile.isHidden()) {
>>>>             // do nothing
>>>>         } else if (baseFile.isFile()) {
>>>>             fileConsumer.accept(baseFile);
>>>>         } else if (baseFile.isDirectory()) {
>>>>             File[] files = baseFile.listFiles();
>>>>             Arrays.stream(files).forEach(f -> forAllFilesDo(fileConsumer, f));
>>>>         }
>>>>     }
>>>>
>>>> Using b74, the following reformulation produces the identical behaviour:
>>>>
>>>>     public void forAllFilesDo3(Block<File> fileConsumer, File baseFile) {
>>>>         BiBlock<Stream.Downstream<File>,File> fileExploder = (ds, f) -> {
>>>>             if (f.isHidden()) {
>>>>                 // do nothing
>>>>             } else if (f.isFile()) {
>>>>                 ds.send(f);
>>>>             } else if (f.isDirectory()) {
>>>>                 Arrays.stream(f.listFiles())
>>>>                       .forEach(fi -> fileExploder(ds, fi));   /* recursive call ! */
>>>>             }
>>>>         };
>>>>         Arrays.stream(new File[] {baseFile})
>>>>               .explode(fileExploder)
>>>>               .forEach(fileConsumer::accept);
>>>>     }
>>>>
>>>> As my next step I was looking if I could parallelize the last step. But simply inserting .parallel()
>>>> seems not to be working - presumably because the initial one-element Spliterator is
>>>> governing split behaviour.
>>>>
>>>> Inserting .collect(Collectors.<File>toList()).parallelStream() instead works fine:
>>>>
>>>>         Arrays.stream(new File[] {baseFile})
>>>>               .explode(fileExploder)
>>>>               .collect(Collectors.<File>toList())
>>>>               .parallelStream()
>>>>               .forEach(fileConsumer::accept);
>>>>
>>>> Main observations:
>>>>
>>>> (*) Exploding a single element requires a lot of programming overhead:
>>>> 1st step - creation of a one-element array or collection
>>>> 2nd step - stream the result of the 1st step
>>>> 3rd step - explode the result of the 2nd step
>>>> Not sure if anything can be done about this.
>>>>
>>>> (*) .parallel() after .explode() behaves unexpectedly. Maybe the downstream should be based
>>>> on a completely fresh unknown-size spliterator.
>>>>
>>>> Regards
>>>> Arne Siegel
>



More information about the lambda-dev mailing list