Infinite parallel stream

Boaz Nahum boaznahum at gmail.com
Wed May 22 12:33:28 PDT 2013


Thanks.
Works like a charm :)


On Wed, May 22, 2013 at 4:36 PM, Paul Sandoz <paul.sandoz at oracle.com> wrote:

> Hi Boaz,
>
> I just updated the repo so your infinite spliterator should work now in
> conjunction with limit.
>
> I would recommend setting the initial size estimate to be something much
> higher, in fact in can be Long.MAX_VALUE. The framework will stop splitting
> when the estimated size is <= Long.MAX_VALUE /
> (ForkJoinPool.getCommonPoolParallelism() * X), where X is currently 4. The
> "fudge-factor" X allows for better load balancing.
>
> In fact the Stream.generate method uses the same mechanism to produce a
> known infinite stream of elements.
>
> Paul.
>
> On Apr 30, 2013, at 9:50 AM, Boaz Nahum <boaznahum at gmail.com> wrote:
>
> > I'm trying to generate parallel stream from an infinite iterator:
> >
> >        Stream<Data> s = generateParallel(supplier).
> >                parallel().
> >                limit(1000);
> >
> >        s.forEach(System.out::println);
> >
> > My Spliterator:
> >
> >    private static class ParallelGenerateSpliterator<T> implements
> > Spliterator<T> {
> >
> >        private final Supplier<? extends T> supplier;
> >        private long estimatedSize;
> >
> >        public ParallelGenerateSpliterator(Supplier<? extends T> supplier,
> > long estimatedSize) {
> >            this.supplier = supplier;
> >            this.estimatedSize = estimatedSize;
> >        }
> >
> >        public ParallelGenerateSpliterator(Supplier<? extends T>
> supplier) {
> >            this(supplier, 1 <<
> Runtime.getRuntime().availableProcessors());
> >        }
> >
> >        @Override
> >        public boolean tryAdvance(Consumer<? super T> action) {
> >            action.accept(supplier.get());
> >            return true;
> >        }
> >
> >        @Override
> >        public Spliterator<T> trySplit() {
> >
> >            long spliSize = estimatedSize >> 1;
> >
> >            if (spliSize == 0) {
> >                return null;
> >            }
> >
> >            estimatedSize -= spliSize;
> >
> >            return new ParallelGenerateSpliterator<T>(supplier, spliSize);
> >        }
> >
> >        @Override
> >        public long estimateSize() {
> >            return estimatedSize;
> >        }
> >
> >        @Override
> >        public int characteristics() {
> >            return IMMUTABLE;
> >        }
> >    }
> >
> > But the examples above run for ever, loop endlessly in AbstractPipeline:
> >
> >            spliterator.forEachRemaining(wrappedSink);
> >
> >
> > Can I solve it without making spliterator  non infinite ?
> >
> > Thank
> > Boaz
> >
>
>
>


More information about the lambda-dev mailing list