Exploiting concurrency with IteratorSpliterator of unknown size

Paul Sandoz paul.sandoz at oracle.com
Tue Mar 25 10:06:31 UTC 2014


On Mar 24, 2014, at 9:58 PM, Marko Topolnik <marko.topolnik at gmail.com> wrote:

>> Yes, that is one reason why we did not base Spliterator on Iterator. We did
>> not have a compelling use-case to separate out the iteration from splitting.
>> ...
>> I get what you are saying but that is just a Spliterator that does not split :-)
> 
> Seems that here we've talked past each other... my message was that,
> for the same reason that the IteratorSpliterator is useful now, a
> partial Spliterator interface (itself _not_ a Spliterator) which
> handles just the iteration aspect would be useful as a building block
> for the full Spliterator, and better than an Iterator due to the saner
> API (hasNext+next in one method, efficient iteration over all
> elements). So if all the relevant objects, such as BufferedReader in
> our case, provided that partial implementation in addition to the
> finished, defaults-based Stream, it would be a good basis for a custom
> Spliterator.
> 

I am saying we have all the building blocks to do that today, but perhaps not quite as clean as you indicate.

It's easy to implement a spliterator whose trySplit returns null, and an instance of that spliterator could be wrapped in another that extracts the parallelism. It is also easy to extend from Spliterator.AbstractSpliterator:

    public Stream<String> lines() {
        Spliterator<String> sp = new Spliterators.AbstractSpliterator<String>(
                Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.NONNULL) {
            @Override
            public boolean tryAdvance(Consumer<? super String> action) {
                    try {
                        String line = readLine();
                        if (line != null) {
                            action.accept(line);
                            return true;
                        }
                        else {
                            return false;
                        }
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
            }

            @Override
            public void forEachRemaining(Consumer<? super String> action) {
                try {
                    String line;
                    while ((line = readLine()) != null) {
                        action.accept(line);
                    }
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        };
        return StreamSupport.stream(sp, false);
    }

I suspect we did not do that at the time because Spliterators.AbstractSpliterator may not have been in the API when BufferedReader.lines() was written.


>>> Generally, how do the "recursive decomposition" mechanics
>>> play
>>> out in our scenario here?
>>> 
>> 
>> We are getting into gritty details of the implementation...
>> 
>> If a size estimate of the root spliterator is known then a threshold is
>> calculated so that a spliterator is no longer split if it's size estimate is below
>> that threshold. For something like an ArrayList this will create a balanced
>> tree where the leaf nodes are in proportion
>> to the targeted parallelism level of the fork/join common pool.
>> 
>> In your situation it is a worst case scenario :-) a sequential source of
>> unknown size, This will create a right balanced tree, each left leaf node is a
>> prefix of elements copied into an array. Because the size is unknown the
>> Spliterator wrapping that array will not be split. Splitting of the right tree
>> nodes will occur until a null is returned (i.e. when there are no more
>> elements). Reduction operations can perform poorly on such trees due to
>> the imbalance.
>> 
>> Generally the technique of splitting from a sequential source is to try and
>> ramp up quickly extracting out some work to warm things up while in parallel
>> further splitting (safely) from the source. As you have found out YMMV.
> 
> I see; the premise behind this is that extracting an item from the
> sequential source has the same order of cost as its processing. As
> soon as processing cost ovewhelms reading cost, ramp-up loses
> significance because the threadpool's queue can be quickly built up by
> reading into ArraySpliterators. Therefore the story applies only to
> the contact area between two ranges of problems: those with I/O as the
> bottleneck and those with CPU as the bottleneck. The former is clearly
> not of interest here, but in the latter I see a different story
> explaining the effectiveness of the ramp-up strategy: it strikes a
> balance between the needs of high-Q and low-Q problems. For both
> classes there is a lower bound for N at which this strategy shows
> satisfactory performance, that N being a function of the batch size
> increment. With my problem, I unfortunately find myself below that
> threshold.

Yes, unfortunately so. 

Originally we used an arithmetic progression with a difference of 1, but this could create unduly large trees resulting in performance issues when reduced/collected. 

The original stream F/J code (see AbstractTask) was too aggressive splitting for such cases and Doug solved this by a rather clever but simple solution of alternating forking and computation of the left and right tasks, which meant the initiating invocation thread was no longer aggressively spawning left nodes as fast as it could (and potentially using up lots of memory).

There is definitely room for improvement here.


> 
> I still seem to be missing an important piece of the puzzle in my
> understanding of F/J, so perhaps you can clear this up for me: it
> currently seems to me that, once the processing of a batch has
> commenced, after say 10-20 ms a sort of self-inspection could be
> performed by considering the count of elements processed so far. If
> there is still a lot more to do, further tasks could be split off and
> enqueued so that other threads may steal them. The size of the splits
> could be informed by the number of elements that were processed. I
> would like to know whether this is a) impossible, b) possible but
> would be ineffective, c) possible and potentially effective, but not
> yet considered for implementation, d) something else entirely?

I doubt it is impossible, but I don't know how effective it might be until one gets the results of such an experiment.

We don't need this kind of thing for an ArrayList, and we might not need it when the size or size estimate of the sequential source is known, if N is "small" we could bias towards high-Q. We are dealing with a worst-case scenario of a sequentially ordered source of unknown size.

Some simple holder of a count e.g. LongAdder or AtomicLong could be updated when each leaf node completes, plus there might be stats from the F/J pool that could be used. We don't even need to have an accurate count, perhaps an estimate (size estimate from the spliterator) is sufficient. An estimate is cheaper since we can more easily leverage forEachRemaining without wrapping with a local counting consumer.

A tricky aspect would be ensuring this statistical approach is bound with the particular style of source as we don't want to hard code this in.

There is also a tricker more general problem: that of automatically working out when to go parallel or sequential. I think that requires gathering statistics over multiple executions of the pipeline from which it is decided to switch modes on subsequent executions (and more generally this could also be applied to optimizing the pipeline, compiling to byte code and/or executing on say a GPU).


> I
> remember reading a similar idea in this or the EG mailing list, but it
> was left sort of hanging in the air.
> 

Quite possibly, i don't recall such a discussion :-)

Paul.


More information about the lambda-dev mailing list