Infinite parallel stream
Paul Sandoz
paul.sandoz at oracle.com
Wed May 22 06:36:45 PDT 2013
Hi Boaz,
I just updated the repo so your infinite spliterator should work now in conjunction with limit.
I would recommend setting the initial size estimate to be something much higher, in fact in can be Long.MAX_VALUE. The framework will stop splitting when the estimated size is <= Long.MAX_VALUE / (ForkJoinPool.getCommonPoolParallelism() * X), where X is currently 4. The "fudge-factor" X allows for better load balancing.
In fact the Stream.generate method uses the same mechanism to produce a known infinite stream of elements.
Paul.
On Apr 30, 2013, at 9:50 AM, Boaz Nahum <boaznahum at gmail.com> wrote:
> I'm trying to generate parallel stream from an infinite iterator:
>
> Stream<Data> s = generateParallel(supplier).
> parallel().
> limit(1000);
>
> s.forEach(System.out::println);
>
> My Spliterator:
>
> private static class ParallelGenerateSpliterator<T> implements
> Spliterator<T> {
>
> private final Supplier<? extends T> supplier;
> private long estimatedSize;
>
> public ParallelGenerateSpliterator(Supplier<? extends T> supplier,
> long estimatedSize) {
> this.supplier = supplier;
> this.estimatedSize = estimatedSize;
> }
>
> public ParallelGenerateSpliterator(Supplier<? extends T> supplier) {
> this(supplier, 1 << Runtime.getRuntime().availableProcessors());
> }
>
> @Override
> public boolean tryAdvance(Consumer<? super T> action) {
> action.accept(supplier.get());
> return true;
> }
>
> @Override
> public Spliterator<T> trySplit() {
>
> long spliSize = estimatedSize >> 1;
>
> if (spliSize == 0) {
> return null;
> }
>
> estimatedSize -= spliSize;
>
> return new ParallelGenerateSpliterator<T>(supplier, spliSize);
> }
>
> @Override
> public long estimateSize() {
> return estimatedSize;
> }
>
> @Override
> public int characteristics() {
> return IMMUTABLE;
> }
> }
>
> But the examples above run for ever, loop endlessly in AbstractPipeline:
>
> spliterator.forEachRemaining(wrappedSink);
>
>
> Can I solve it without making spliterator non infinite ?
>
> Thank
> Boaz
>
More information about the lambda-dev
mailing list