Infinite parallel stream

Boaz Nahum boaznahum at gmail.com
Tue Apr 30 06:07:54 PDT 2013


Thank so much

>2) !ORDERED

 infiniteStream.filter( .. reject 99.9% ... ).limit(100)

What is the global limit count in this case ?

B.R
Boaz






On Tue, Apr 30, 2013 at 2:33 PM, Paul Sandoz <paul.sandoz at oracle.com> wrote:

> Hi Boaz,
>
> You have hit a known issue with the current implementation of limit.
>
> The F/J leaf tasks of the limit implementation do not keep track their
> traversal count and report that to other tasks so that tasks know when the
> global limit has been reached.
>
>
> There are various optimizations we can implement depending on the
> characteristics:
>
> 1) SIZED & SUBSIZED & ORDERED
> A SliceSpliterator can wrap the underlying spliterator, no buffering
> required, throws away splits that do not intersect the slice range. I am
> working on this right now.
>
>
> 2) !ORDERED
> A global count can be maintained (like discussed above) so leaf F/J tasks
> know when to stop traversing and buffering. Using an AtomicLong to hold a
> global limit count may result in too much contention (since there is no
> order it does not matter how many elements are skipped), perhaps there are
> some other tricks like using LongAdder.
>
> I did not realize before but this case combined with a spliterator like
> you have written could work for an infinite Stream.generate implementation.
> Currently we use an iterator, and i am currently working on changing that
> to be based on LongStream.range(0, Long.MAX_VALUE).map(e -> s.get()).
> Hmm... perhaps this needs to be rethought...
>
>
> 3) ORDERED
> This is the worst possible case, and it is what is currently implemented
> and used for the first two cases as well. For this we may need to use the
> same spliterator from iterator tricks i.e. copy a prefix of elements into
> an array, with the assumption that limits are not unduly large.
>
>
> Paul.
>
> On Apr 30, 2013, at 10:50 AM, Boaz Nahum <boaznahum at gmail.com> wrote:
>
> > I'm trying to generate parallel stream from an infinite iterator:
> >
> >        Stream<Data> s = generateParallel(supplier).
> >                parallel().
> >                limit(1000);
> >
> >        s.forEach(System.out::println);
> >
> > My Spliterator:
> >
> >    private static class ParallelGenerateSpliterator<T> implements
> > Spliterator<T> {
> >
> >        private final Supplier<? extends T> supplier;
> >        private long estimatedSize;
> >
> >        public ParallelGenerateSpliterator(Supplier<? extends T> supplier,
> > long estimatedSize) {
> >            this.supplier = supplier;
> >            this.estimatedSize = estimatedSize;
> >        }
> >
> >        public ParallelGenerateSpliterator(Supplier<? extends T>
> supplier) {
> >            this(supplier, 1 <<
> Runtime.getRuntime().availableProcessors());
> >        }
> >
> >        @Override
> >        public boolean tryAdvance(Consumer<? super T> action) {
> >            action.accept(supplier.get());
> >            return true;
> >        }
> >
> >        @Override
> >        public Spliterator<T> trySplit() {
> >
> >            long spliSize = estimatedSize >> 1;
> >
> >            if (spliSize == 0) {
> >                return null;
> >            }
> >
> >            estimatedSize -= spliSize;
> >
> >            return new ParallelGenerateSpliterator<T>(supplier, spliSize);
> >        }
> >
> >        @Override
> >        public long estimateSize() {
> >            return estimatedSize;
> >        }
> >
> >        @Override
> >        public int characteristics() {
> >            return IMMUTABLE;
> >        }
> >    }
> >
> > But the examples above run for ever, loop endlessly in AbstractPipeline:
> >
> >            spliterator.forEachRemaining(wrappedSink);
> >
> >
> > Can I solve it without making spliterator  non infinite ?
> >
> > Thank
> > Boaz
> >
>
>
>


More information about the lambda-dev mailing list