Infinite parallel stream

Paul Sandoz paul.sandoz at oracle.com
Wed May 22 06:36:45 PDT 2013


Hi Boaz,

I just updated the repo so your infinite spliterator should work now in conjunction with limit. 

I would recommend setting the initial size estimate to be something much higher, in fact in can be Long.MAX_VALUE. The framework will stop splitting when the estimated size is <= Long.MAX_VALUE / (ForkJoinPool.getCommonPoolParallelism() * X), where X is currently 4. The "fudge-factor" X allows for better load balancing.

In fact the Stream.generate method uses the same mechanism to produce a known infinite stream of elements.

Paul.

On Apr 30, 2013, at 9:50 AM, Boaz Nahum <boaznahum at gmail.com> wrote:

> I'm trying to generate parallel stream from an infinite iterator:
> 
>        Stream<Data> s = generateParallel(supplier).
>                parallel().
>                limit(1000);
> 
>        s.forEach(System.out::println);
> 
> My Spliterator:
> 
>    private static class ParallelGenerateSpliterator<T> implements
> Spliterator<T> {
> 
>        private final Supplier<? extends T> supplier;
>        private long estimatedSize;
> 
>        public ParallelGenerateSpliterator(Supplier<? extends T> supplier,
> long estimatedSize) {
>            this.supplier = supplier;
>            this.estimatedSize = estimatedSize;
>        }
> 
>        public ParallelGenerateSpliterator(Supplier<? extends T> supplier) {
>            this(supplier, 1 << Runtime.getRuntime().availableProcessors());
>        }
> 
>        @Override
>        public boolean tryAdvance(Consumer<? super T> action) {
>            action.accept(supplier.get());
>            return true;
>        }
> 
>        @Override
>        public Spliterator<T> trySplit() {
> 
>            long spliSize = estimatedSize >> 1;
> 
>            if (spliSize == 0) {
>                return null;
>            }
> 
>            estimatedSize -= spliSize;
> 
>            return new ParallelGenerateSpliterator<T>(supplier, spliSize);
>        }
> 
>        @Override
>        public long estimateSize() {
>            return estimatedSize;
>        }
> 
>        @Override
>        public int characteristics() {
>            return IMMUTABLE;
>        }
>    }
> 
> But the examples above run for ever, loop endlessly in AbstractPipeline:
> 
>            spliterator.forEachRemaining(wrappedSink);
> 
> 
> Can I solve it without making spliterator  non infinite ?
> 
> Thank
> Boaz
> 



More information about the lambda-dev mailing list