Exploiting concurrency with IteratorSpliterator of unknown size
Marko Topolnik
marko.topolnik at gmail.com
Mon Mar 24 20:58:05 UTC 2014
> Yes, that is one reason why we did not base Spliterator on Iterator. We did
>not have a compelling use-case to separate out the iteration from splitting.
> ...
> I get what you are saying but that is just a Spliterator that does not split :-)
Seems that here we've talked past each other... my message was that,
for the same reason that the IteratorSpliterator is useful now, a
partial Spliterator interface (itself _not_ a Spliterator) which
handles just the iteration aspect would be useful as a building block
for the full Spliterator, and better than an Iterator due to the saner
API (hasNext+next in one method, efficient iteration over all
elements). So if all the relevant objects, such as BufferedReader in
our case, provided that partial implementation in addition to the
finished, defaults-based Stream, it would be a good basis for a custom
Spliterator.
> > Generally, how do the "recursive decomposition" mechanics
> > play
> > out in our scenario here?
> >
>
> We are getting into gritty details of the implementation...
>
> If a size estimate of the root spliterator is known then a threshold is
>calculated so that a spliterator is no longer split if it's size estimate is below
>that threshold. For something like an ArrayList this will create a balanced
>tree where the leaf nodes are in proportion
> to the targeted parallelism level of the fork/join common pool.
>
> In your situation it is a worst case scenario :-) a sequential source of
>unknown size, This will create a right balanced tree, each left leaf node is a
>prefix of elements copied into an array. Because the size is unknown the
>Spliterator wrapping that array will not be split. Splitting of the right tree
>nodes will occur until a null is returned (i.e. when there are no more
>elements). Reduction operations can perform poorly on such trees due to
> the imbalance.
>
> Generally the technique of splitting from a sequential source is to try and
>ramp up quickly extracting out some work to warm things up while in parallel
>further splitting (safely) from the source. As you have found out YMMV.
I see; the premise behind this is that extracting an item from the
sequential source has the same order of cost as its processing. As
soon as processing cost ovewhelms reading cost, ramp-up loses
significance because the threadpool's queue can be quickly built up by
reading into ArraySpliterators. Therefore the story applies only to
the contact area between two ranges of problems: those with I/O as the
bottleneck and those with CPU as the bottleneck. The former is clearly
not of interest here, but in the latter I see a different story
explaining the effectiveness of the ramp-up strategy: it strikes a
balance between the needs of high-Q and low-Q problems. For both
classes there is a lower bound for N at which this strategy shows
satisfactory performance, that N being a function of the batch size
increment. With my problem, I unfortunately find myself below that
threshold.
I still seem to be missing an important piece of the puzzle in my
understanding of F/J, so perhaps you can clear this up for me: it
currently seems to me that, once the processing of a batch has
commenced, after say 10-20 ms a sort of self-inspection could be
performed by considering the count of elements processed so far. If
there is still a lot more to do, further tasks could be split off and
enqueued so that other threads may steal them. The size of the splits
could be informed by the number of elements that were processed. I
would like to know whether this is a) impossible, b) possible but
would be ineffective, c) possible and potentially effective, but not
yet considered for implementation, d) something else entirely? I
remember reading a similar idea in this or the EG mailing list, but it
was left sort of hanging in the air.
On Mon, Mar 24, 2014 at 4:20 PM, Paul Sandoz <paul.sandoz at oracle.com> wrote:
> On Mar 24, 2014, at 3:12 PM, Marko Topolnik <marko.topolnik at gmail.com> wrote:
>>> The approach to extract parallelism from a sequential source was designed
>> to
>>> not particularly favour one scenario over another (cost per element, #
>> elements,
>>> reduction), and we did not want to expose any controls like the
>> arithmetic
>>> progression properties since most people won't know what to do with these
>>> (in hindsight this might have been OK on the
>> Spliterators.AbstractSpliterator
>>> implementation).
>>
>> What I would generally like to find in the library is some sort of a
>> template
>> (abstract class) to make a custom spliterator policy easier to implement. I
>> realize it is not trivial to get such an API right, making it serve as many
>> needs
>> as possible without an explosion of complexity.
>>
>
> That is exactly what Spliterator is for; we cannot know all splitting policies and provide partial implementations.
>
>
>> In the specific example of BufferedReader.lines there is the additional
>> issue of
>> the anonymous Iterator implementation which is burried in the lib.
>>
>
> Note that we could (and probably should) also implement that Spliterator directly leveraging Spliterators.AbstractSpliterator to reduce the layering, but still if you look at the code you will notice some internal layering with respect to managing results from calling tryAdvance when splitting.
>
>
>> On a more general level, the problem may be with the way a Spliterator
>> couples
>> the concerns of splitting and iteration. A Spliterator based on the classic
>> Iterator
>> solves a part of this problem because iterating is delegated to a separate
>> object;
>> however the Iterator interface itself is quite cumbersome because it forces
>> us to
>> prefetch the next item and cache it when hasNext is called.
>>
>
> Yes, that is one reason why we did not base Spliterator on Iterator. We did not have a compelling use-case to separate out the iteration from splitting.
>
>
>> So if there was a partial Spliterator interface (which Spliterator itself
>> would then
>> extend) which had only tryAdvance and forEachRemaining,
>
> I get what you are saying but that is just a Spliterator that does not split :-)
>
>
>> and if an
>> implementation of that could be directly obtained from BufferedReader and
>> other similar objects,
>> then perhaps providing our own Spliterator implementation in terms of that
>> would be
>> nicer.
>>
>
>>> Unfortunately in your case i am assuming the cost-per-line is a dominant
>>> factor over number lines to be processed.
>>
>> Yes, that is what's happening. There is one other thing I'm wondering about:
>> couldn't the ArraySpliterator returned from IteratorSpliterator be further
>> split into
>> smaller chunks?
>
> Yes, possibly (see below for why not for streams).
>
>
>> Generally, how do the "recursive decomposition" mechanics
>> play
>> out in our scenario here?
>>
>
> We are getting into gritty details of the implementation...
>
> If a size estimate of the root spliterator is known then a threshold is calculated so that a spliterator is no longer split if it's size estimate is below that threshold. For something like an ArrayList this will create a balanced tree where the leaf nodes are in proportion to the targeted parallelism level of the fork/join common pool.
>
> In your situation it is a worst case scenario :-) a sequential source of unknown size, This will create a right balanced tree, each left leaf node is a prefix of elements copied into an array. Because the size is unknown the Spliterator wrapping that array will not be split. Splitting of the right tree nodes will occur until a null is returned (i.e. when there are no more elements). Reduction operations can perform poorly on such trees due to the imbalance.
>
> Generally the technique of splitting from a sequential source is to try and ramp up quickly extracting out some work to warm things up while in parallel further splitting (safely) from the source. As you have found out YMMV.
>
> Paul.
>
More information about the lambda-dev
mailing list