Infinite parallel stream
Boaz Nahum
boaznahum at gmail.com
Wed May 22 12:33:28 PDT 2013
Thanks.
Works like a charm :)
On Wed, May 22, 2013 at 4:36 PM, Paul Sandoz <paul.sandoz at oracle.com> wrote:
> Hi Boaz,
>
> I just updated the repo so your infinite spliterator should work now in
> conjunction with limit.
>
> I would recommend setting the initial size estimate to be something much
> higher, in fact in can be Long.MAX_VALUE. The framework will stop splitting
> when the estimated size is <= Long.MAX_VALUE /
> (ForkJoinPool.getCommonPoolParallelism() * X), where X is currently 4. The
> "fudge-factor" X allows for better load balancing.
>
> In fact the Stream.generate method uses the same mechanism to produce a
> known infinite stream of elements.
>
> Paul.
>
> On Apr 30, 2013, at 9:50 AM, Boaz Nahum <boaznahum at gmail.com> wrote:
>
> > I'm trying to generate parallel stream from an infinite iterator:
> >
> > Stream<Data> s = generateParallel(supplier).
> > parallel().
> > limit(1000);
> >
> > s.forEach(System.out::println);
> >
> > My Spliterator:
> >
> > private static class ParallelGenerateSpliterator<T> implements
> > Spliterator<T> {
> >
> > private final Supplier<? extends T> supplier;
> > private long estimatedSize;
> >
> > public ParallelGenerateSpliterator(Supplier<? extends T> supplier,
> > long estimatedSize) {
> > this.supplier = supplier;
> > this.estimatedSize = estimatedSize;
> > }
> >
> > public ParallelGenerateSpliterator(Supplier<? extends T>
> supplier) {
> > this(supplier, 1 <<
> Runtime.getRuntime().availableProcessors());
> > }
> >
> > @Override
> > public boolean tryAdvance(Consumer<? super T> action) {
> > action.accept(supplier.get());
> > return true;
> > }
> >
> > @Override
> > public Spliterator<T> trySplit() {
> >
> > long spliSize = estimatedSize >> 1;
> >
> > if (spliSize == 0) {
> > return null;
> > }
> >
> > estimatedSize -= spliSize;
> >
> > return new ParallelGenerateSpliterator<T>(supplier, spliSize);
> > }
> >
> > @Override
> > public long estimateSize() {
> > return estimatedSize;
> > }
> >
> > @Override
> > public int characteristics() {
> > return IMMUTABLE;
> > }
> > }
> >
> > But the examples above run for ever, loop endlessly in AbstractPipeline:
> >
> > spliterator.forEachRemaining(wrappedSink);
> >
> >
> > Can I solve it without making spliterator non infinite ?
> >
> > Thank
> > Boaz
> >
>
>
>
More information about the lambda-dev
mailing list