Infinite parallel stream
Paul Sandoz
paul.sandoz at oracle.com
Tue Apr 30 05:50:04 PDT 2013
On Apr 30, 2013, at 1:33 PM, Paul Sandoz <Paul.Sandoz at oracle.com> wrote:
> Hi Boaz,
>
> You have hit a known issue with the current implementation of limit.
>
> The F/J leaf tasks of the limit implementation do not keep track their traversal count and report that to other tasks so that tasks know when the global limit has been reached.
>
>
> There are various optimizations we can implement depending on the characteristics:
>
> 1) SIZED & SUBSIZED & ORDERED
> A SliceSpliterator can wrap the underlying spliterator, no buffering required, throws away splits that do not intersect the slice range. I am working on this right now.
>
>
> 2) !ORDERED
> A global count can be maintained (like discussed above) so leaf F/J tasks know when to stop traversing and buffering. Using an AtomicLong to hold a global limit count may result in too much contention
> (since there is no order it does not matter how many elements are skipped),
That is not quite true, it does not matter if number of elements to skip is less than the known total size, which alas we might not know.
Paul.
> perhaps there are some other tricks like using LongAdder.
>
> I did not realize before but this case combined with a spliterator like you have written could work for an infinite Stream.generate implementation. Currently we use an iterator, and i am currently working on changing that to be based on LongStream.range(0, Long.MAX_VALUE).map(e -> s.get()). Hmm... perhaps this needs to be rethought...
>
>
> 3) ORDERED
> This is the worst possible case, and it is what is currently implemented and used for the first two cases as well. For this we may need to use the same spliterator from iterator tricks i.e. copy a prefix of elements into an array, with the assumption that limits are not unduly large.
>
>
> Paul.
>
> On Apr 30, 2013, at 10:50 AM, Boaz Nahum <boaznahum at gmail.com> wrote:
>
>> I'm trying to generate parallel stream from an infinite iterator:
>>
>> Stream<Data> s = generateParallel(supplier).
>> parallel().
>> limit(1000);
>>
>> s.forEach(System.out::println);
>>
>> My Spliterator:
>>
>> private static class ParallelGenerateSpliterator<T> implements
>> Spliterator<T> {
>>
>> private final Supplier<? extends T> supplier;
>> private long estimatedSize;
>>
>> public ParallelGenerateSpliterator(Supplier<? extends T> supplier,
>> long estimatedSize) {
>> this.supplier = supplier;
>> this.estimatedSize = estimatedSize;
>> }
>>
>> public ParallelGenerateSpliterator(Supplier<? extends T> supplier) {
>> this(supplier, 1 << Runtime.getRuntime().availableProcessors());
>> }
>>
>> @Override
>> public boolean tryAdvance(Consumer<? super T> action) {
>> action.accept(supplier.get());
>> return true;
>> }
>>
>> @Override
>> public Spliterator<T> trySplit() {
>>
>> long spliSize = estimatedSize >> 1;
>>
>> if (spliSize == 0) {
>> return null;
>> }
>>
>> estimatedSize -= spliSize;
>>
>> return new ParallelGenerateSpliterator<T>(supplier, spliSize);
>> }
>>
>> @Override
>> public long estimateSize() {
>> return estimatedSize;
>> }
>>
>> @Override
>> public int characteristics() {
>> return IMMUTABLE;
>> }
>> }
>>
>> But the examples above run for ever, loop endlessly in AbstractPipeline:
>>
>> spliterator.forEachRemaining(wrappedSink);
>>
>>
>> Can I solve it without making spliterator non infinite ?
>>
>> Thank
>> Boaz
>>
>
>
More information about the lambda-dev
mailing list