Exploiting concurrency with IteratorSpliterator of unknown size

Paul Sandoz paul.sandoz at oracle.com
Wed Mar 26 09:52:15 UTC 2014


On Mar 25, 2014, at 9:46 PM, Marko Topolnik <marko.topolnik at gmail.com> wrote:

>>> once the processing of a batch has
>>> commenced, after say 10-20 ms a sort of self-inspection could be
>>> performed by considering the count of elements processed so far. If
>>> there is still a lot more to do, further tasks could be split off and
>>> enqueued so that other threads may steal them. The size of the splits
>>> could be informed by the number of elements that were processed.
>> 
>> We don't need this kind of thing for an ArrayList, and we might not need
>> it when the size or size estimate of the sequential source is known, if N
>> is "small" we could bias towards high-Q. We are dealing with a worst-case
>> scenario of a sequentially ordered source of unknown size.
> 
> The trouble is, that worst case isn't at all exotic

Agreed, i was implying that any work done to improve this case should not affect other cases.

There are many cases within j.u.c classes where we use similar techniques, but for these cases we do have a good estimate for the root size so we could potentially tweak the arithmetic progression properties.


> and pertains to almost 
> any instance of the Big Data problem class, where it is not practical to load 
> the complete input into working memory. In those cases estimating the size, 
> even when possible, simply isn't relevant to efficient parallelization 
> because the complete setup revolves around processing an ongoing stream of 
> data. Compared to the Fork/Join model, which can minimize the cost of 
> merging the results from subtasks, sequential stream processing may entail 
> more overhead in that area, but that is an unavoidable consequence of the 
> basic premise.
> 
>> It's easy to implement a spliterator whose trySplit returns null, and an
>> instance of that spliterator could be wrapped in another that extracts the
>> parallelism. It is also easy to extend from Spliterator.AbstractSpliterator
> 
> I have put in some legwork trying to follow your lead here and this is what I 
> found out: AbstractSpliterator is indeed effective as a template to implement 
> arbitary advancing logic, but it is ineffective as an aid to implement 
> arbitrary spliting logic.

Yes, you have to override the trySplit or wrap.


> On the other hand, I would expect a class such as 
> BufferedReader to already provide its Spliterator which takes care of 
> advancing, so for the work I have to do on my side, I get no help from the 
> current framework.
> 

We did not want to expose out spliterator directly in such a manner everywhere we expose out stream views, but i see below you have found the "escape hatch"!


> I have additionally realized that I can indeed get to BufferedReader's 
> spliterator, although in a roundabout way which will do some extra work that I don't need: 
> 
> br.lines().spliterator()
> 
> For my purpose must I ignore the stream returned from lines() and keep only its 
> spliterator; this is perhaps something that could be improved in the API. I can 
> then feed that spliterator into a wrapper such as the following:
> 
> import static java.util.Spliterators.spliterator;
> 
> import java.util.Comparator;
> import java.util.Spliterator;
> import java.util.function.Consumer;
> 
> public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
>  private final Spliterator<T> spliterator;
>  private final int batchSize;
>  private final int characteristics;
>  private long est;
> 
>  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
>    final int c = toWrap.characteristics();
>    this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
>    this.spliterator = toWrap;
>    this.batchSize = batchSize;
>    this.est = toWrap.estimateSize();
>  }
>  public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
>    this(toWrap, toWrap.estimateSize(), batchSize);
>  }
>  @Override public Spliterator<T> trySplit() {
>    final HoldingConsumer<T> holder = new HoldingConsumer<>();
>    if (!spliterator.tryAdvance(holder)) return null;
>    final Object[] a = new Object[batchSize];
>    int j = 0;
>    do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
>    if (est != Long.MAX_VALUE) est -= j;
>    return spliterator(a, 0, j, characteristics());
>  }
>  @Override public boolean tryAdvance(Consumer<? super T> action) {
>    return spliterator.tryAdvance(action);
>  }
>  @Override public void forEachRemaining(Consumer<? super T> action) {
>    spliterator.forEachRemaining(action);
>  }
>  @Override public Comparator<? super T> getComparator() {
>    if (hasCharacteristics(SORTED)) return null;
>    throw new IllegalStateException();
>  }
>  @Override public long estimateSize() { return est; }
>  @Override public int characteristics() { return characteristics; }
> 
>  static final class HoldingConsumer<T> implements Consumer<T> {
>    Object value;
>    @Override public void accept(T value) { this.value = value; }
>  }
> }
> 
> 
> I think something of this kind, but more factored, could be a good 
> basis for an addition to the API. I would factor this such that the client may 
> only plug in his batch size-determining logic, without having to implement the, 
> it must be admitted, quite hairy part where the data is spilt into the array 
> bucket and handed over to the ArraySpliterator.
> 

Definitely for advanced developers. That impl could be a useful addition to spliterator utilities. 

It would be nice to define an AbstractFixedBatchSpliterator extending from Spliterators.AbstractSpliterator so one can choose to wrap or extend for the iteration logic, but to be useful it requires we change certain private fields to protected on Spliterators.AbstractSpliterator. Or simply expose out new constructors to AbstractSpliterator to configure the progression.


I do worry that most developers will not know what the batch size should be so having something like:

  Files.lines(Path p, int initialBatchSize, int incrementBatchSize)

could be confusing. In some cases it might be better to reformulate as a size estimate, but even so it still feels unsatisfying at this level of abstraction.

Paul.


More information about the lambda-dev mailing list