Exploiting concurrency with IteratorSpliterator of unknown size

Marko Topolnik marko.topolnik at gmail.com
Tue Mar 25 20:46:31 UTC 2014


>> once the processing of a batch has
>> commenced, after say 10-20 ms a sort of self-inspection could be
>> performed by considering the count of elements processed so far. If
>> there is still a lot more to do, further tasks could be split off and
>> enqueued so that other threads may steal them. The size of the splits
>> could be informed by the number of elements that were processed.
> 
> We don't need this kind of thing for an ArrayList, and we might not need
> it when the size or size estimate of the sequential source is known, if N
> is "small" we could bias towards high-Q. We are dealing with a worst-case
> scenario of a sequentially ordered source of unknown size.

The trouble is, that worst case isn't at all exotic and pertains to almost 
any instance of the Big Data problem class, where it is not practical to load 
the complete input into working memory. In those cases estimating the size, 
even when possible, simply isn't relevant to efficient parallelization 
because the complete setup revolves around processing an ongoing stream of 
data. Compared to the Fork/Join model, which can minimize the cost of 
merging the results from subtasks, sequential stream processing may entail 
more overhead in that area, but that is an unavoidable consequence of the 
basic premise.

> It's easy to implement a spliterator whose trySplit returns null, and an
> instance of that spliterator could be wrapped in another that extracts the
> parallelism. It is also easy to extend from Spliterator.AbstractSpliterator

I have put in some legwork trying to follow your lead here and this is what I 
found out: AbstractSpliterator is indeed effective as a template to implement 
arbitary advancing logic, but it is ineffective as an aid to implement 
arbitrary spliting logic. On the other hand, I would expect a class such as 
BufferedReader to already provide its Spliterator which takes care of 
advancing, so for the work I have to do on my side, I get no help from the 
current framework.

I have additionally realized that I can indeed get to BufferedReader's 
spliterator, although in a roundabout way which will do some extra work that I don't need: 

br.lines().spliterator()

For my purpose must I ignore the stream returned from lines() and keep only its 
spliterator; this is perhaps something that could be improved in the API. I can 
then feed that spliterator into a wrapper such as the following:

import static java.util.Spliterators.spliterator;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
  private final Spliterator<T> spliterator;
  private final int batchSize;
  private final int characteristics;
  private long est;

  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
    final int c = toWrap.characteristics();
    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
    this.spliterator = toWrap;
    this.batchSize = batchSize;
    this.est = toWrap.estimateSize();
  }
  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
    this(toWrap, toWrap.estimateSize(), batchSize);
  }
  @Override public Spliterator<T> trySplit() {
    final HoldingConsumer<T> holder = new HoldingConsumer<>();
    if (!spliterator.tryAdvance(holder)) return null;
    final Object[] a = new Object[batchSize];
    int j = 0;
    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
    if (est != Long.MAX_VALUE) est -= j;
    return spliterator(a, 0, j, characteristics());
  }
  @Override public boolean tryAdvance(Consumer<? super T> action) {
    return spliterator.tryAdvance(action);
  }
  @Override public void forEachRemaining(Consumer<? super T> action) {
    spliterator.forEachRemaining(action);
  }
  @Override public Comparator<? super T> getComparator() {
    if (hasCharacteristics(SORTED)) return null;
    throw new IllegalStateException();
  }
  @Override public long estimateSize() { return est; }
  @Override public int characteristics() { return characteristics; }

  static final class HoldingConsumer<T> implements Consumer<T> {
    Object value;
    @Override public void accept(T value) { this.value = value; }
  }
}


I think something of this kind, but more factored, could be a good 
basis for an addition to the API. I would factor this such that the client may 
only plug in his batch size-determining logic, without having to implement the, 
it must be admitted, quite hairy part where the data is spilt into the array 
bucket and handed over to the ArraySpliterator.


-Marko



More information about the lambda-dev mailing list