Exploiting concurrency with IteratorSpliterator of unknown size

Julian Hyde julianhyde at gmail.com
Fri May 22 23:49:52 UTC 2015


This is the kind of thing traditionally done by a database.

We are seeing systems built using database principles (declarative language, statistics from past runs, query optimizer) that are not accessing data stored on disk.

Just as people used to need to learn when to stop accessing files directly and trust a database, I predict that for your kind of application people will increasingly turn to “data stream management systems”.

Julian


> On Mar 25, 2014, at 5:27 AM, Glen Peterson <glen at organicdesign.org> wrote:
> 
> "There is also a tricker more general problem: that of automatically
> working out when to go parallel or sequential."
> 
> I've been puzzling over a similar question: Who has the necessary
> information to decide the proper number of threads for a task?  The
> proper number of threads might be based on:
> 1. The priority of the job
> 2. The number of physical processors
> 3. The expected workload of those processors while the job will be
> running (what other jobs are likely to run simultaneously and their
> priorities)
> 4. The processing characteristics of the job itself
> 
> It seems to me that a collection or transformation only has partial
> information about #4 until it has run for a while and can measure
> itself (which is an interesting idea).  Really, the operating system,
> or the calling code is in a much better position to make decisions
> about the proper number of threads to use.  The simplest solution I
> could see would be to let the calling code pass a suggested maximum
> number of threads to use.  The collection/transform might use less
> than that (or perform processing in the main thread if there is only a
> single item to transform).
> 
> The start and measure approach to this problem might provide hints
> about #2 and #4 plus maybe some of #3, but completely leaves out #1.
> Only letting the client pick the number of threads can reasonably
> cover all 4 because #3 has to do with expectations, and the designer
> of the program knows best what's likely to happen next.  I'm not sure
> if the JVM provides information about the number of processors or the
> current load, but that information could be provided manually.  If you
> use a virtual machine or otherwise share hardware, only the operating
> system may be able to guess at #3, or maybe no-one has complete
> information.
> 
> I work a lot with web apps, and if a job is big enough to benefit from
> running on multiple processors, I don't want it to hog the system to
> the point that the next user who clicks on the app has to wait for the
> previous user's job to finish.
> 
> I recently coded up some simple tests on a 2 processor 64-bit Ubuntu
> system (not using Java collections) and both processors were over 95%
> busy even when I didn't start a second thread.  I don't know yet if
> that's the garbage collector, or some other part of the JVM or some
> other cleverness happening on the other processor, but my timings with
> CPU/Memory-fetch-bound concurrency on my machine showed the
> single-threaded solution to be equal or 20% faster than a two-thread
> version.  If that's typical, another question would be, "under what
> situations does using more threads speed processing?"  I think that
> question stands even if my example does not.  Maybe that makes #5.
> 
> 
> On Tue, Mar 25, 2014 at 6:06 AM, Paul Sandoz <paul.sandoz at oracle.com> wrote:
>> 
>> On Mar 24, 2014, at 9:58 PM, Marko Topolnik <marko.topolnik at gmail.com> wrote:
>> 
>>>> Yes, that is one reason why we did not base Spliterator on Iterator. We did
>>>> not have a compelling use-case to separate out the iteration from splitting.
>>>> ...
>>>> I get what you are saying but that is just a Spliterator that does not split :-)
>>> 
>>> Seems that here we've talked past each other... my message was that,
>>> for the same reason that the IteratorSpliterator is useful now, a
>>> partial Spliterator interface (itself _not_ a Spliterator) which
>>> handles just the iteration aspect would be useful as a building block
>>> for the full Spliterator, and better than an Iterator due to the saner
>>> API (hasNext+next in one method, efficient iteration over all
>>> elements). So if all the relevant objects, such as BufferedReader in
>>> our case, provided that partial implementation in addition to the
>>> finished, defaults-based Stream, it would be a good basis for a custom
>>> Spliterator.
>>> 
>> 
>> I am saying we have all the building blocks to do that today, but perhaps not quite as clean as you indicate.
>> 
>> It's easy to implement a spliterator whose trySplit returns null, and an instance of that spliterator could be wrapped in another that extracts the parallelism. It is also easy to extend from Spliterator.AbstractSpliterator:
>> 
>>    public Stream<String> lines() {
>>        Spliterator<String> sp = new Spliterators.AbstractSpliterator<String>(
>>                Long.MAX_VALUE, Spliterator.ORDERED | Spliterator.NONNULL) {
>>            @Override
>>            public boolean tryAdvance(Consumer<? super String> action) {
>>                    try {
>>                        String line = readLine();
>>                        if (line != null) {
>>                            action.accept(line);
>>                            return true;
>>                        }
>>                        else {
>>                            return false;
>>                        }
>>                    } catch (IOException e) {
>>                        throw new UncheckedIOException(e);
>>                    }
>>            }
>> 
>>            @Override
>>            public void forEachRemaining(Consumer<? super String> action) {
>>                try {
>>                    String line;
>>                    while ((line = readLine()) != null) {
>>                        action.accept(line);
>>                    }
>>                } catch (IOException e) {
>>                    throw new UncheckedIOException(e);
>>                }
>>            }
>>        };
>>        return StreamSupport.stream(sp, false);
>>    }
>> 
>> I suspect we did not do that at the time because Spliterators.AbstractSpliterator may not have been in the API when BufferedReader.lines() was written.
>> 
>> 
>>>>> Generally, how do the "recursive decomposition" mechanics
>>>>> play
>>>>> out in our scenario here?
>>>>> 
>>>> 
>>>> We are getting into gritty details of the implementation...
>>>> 
>>>> If a size estimate of the root spliterator is known then a threshold is
>>>> calculated so that a spliterator is no longer split if it's size estimate is below
>>>> that threshold. For something like an ArrayList this will create a balanced
>>>> tree where the leaf nodes are in proportion
>>>> to the targeted parallelism level of the fork/join common pool.
>>>> 
>>>> In your situation it is a worst case scenario :-) a sequential source of
>>>> unknown size, This will create a right balanced tree, each left leaf node is a
>>>> prefix of elements copied into an array. Because the size is unknown the
>>>> Spliterator wrapping that array will not be split. Splitting of the right tree
>>>> nodes will occur until a null is returned (i.e. when there are no more
>>>> elements). Reduction operations can perform poorly on such trees due to
>>>> the imbalance.
>>>> 
>>>> Generally the technique of splitting from a sequential source is to try and
>>>> ramp up quickly extracting out some work to warm things up while in parallel
>>>> further splitting (safely) from the source. As you have found out YMMV.
>>> 
>>> I see; the premise behind this is that extracting an item from the
>>> sequential source has the same order of cost as its processing. As
>>> soon as processing cost ovewhelms reading cost, ramp-up loses
>>> significance because the threadpool's queue can be quickly built up by
>>> reading into ArraySpliterators. Therefore the story applies only to
>>> the contact area between two ranges of problems: those with I/O as the
>>> bottleneck and those with CPU as the bottleneck. The former is clearly
>>> not of interest here, but in the latter I see a different story
>>> explaining the effectiveness of the ramp-up strategy: it strikes a
>>> balance between the needs of high-Q and low-Q problems. For both
>>> classes there is a lower bound for N at which this strategy shows
>>> satisfactory performance, that N being a function of the batch size
>>> increment. With my problem, I unfortunately find myself below that
>>> threshold.
>> 
>> Yes, unfortunately so.
>> 
>> Originally we used an arithmetic progression with a difference of 1, but this could create unduly large trees resulting in performance issues when reduced/collected.
>> 
>> The original stream F/J code (see AbstractTask) was too aggressive splitting for such cases and Doug solved this by a rather clever but simple solution of alternating forking and computation of the left and right tasks, which meant the initiating invocation thread was no longer aggressively spawning left nodes as fast as it could (and potentially using up lots of memory).
>> 
>> There is definitely room for improvement here.
>> 
>> 
>>> 
>>> I still seem to be missing an important piece of the puzzle in my
>>> understanding of F/J, so perhaps you can clear this up for me: it
>>> currently seems to me that, once the processing of a batch has
>>> commenced, after say 10-20 ms a sort of self-inspection could be
>>> performed by considering the count of elements processed so far. If
>>> there is still a lot more to do, further tasks could be split off and
>>> enqueued so that other threads may steal them. The size of the splits
>>> could be informed by the number of elements that were processed. I
>>> would like to know whether this is a) impossible, b) possible but
>>> would be ineffective, c) possible and potentially effective, but not
>>> yet considered for implementation, d) something else entirely?
>> 
>> I doubt it is impossible, but I don't know how effective it might be until one gets the results of such an experiment.
>> 
>> We don't need this kind of thing for an ArrayList, and we might not need it when the size or size estimate of the sequential source is known, if N is "small" we could bias towards high-Q. We are dealing with a worst-case scenario of a sequentially ordered source of unknown size.
>> 
>> Some simple holder of a count e.g. LongAdder or AtomicLong could be updated when each leaf node completes, plus there might be stats from the F/J pool that could be used. We don't even need to have an accurate count, perhaps an estimate (size estimate from the spliterator) is sufficient. An estimate is cheaper since we can more easily leverage forEachRemaining without wrapping with a local counting consumer.
>> 
>> A tricky aspect would be ensuring this statistical approach is bound with the particular style of source as we don't want to hard code this in.
>> 
>> There is also a tricker more general problem: that of automatically working out when to go parallel or sequential. I think that requires gathering statistics over multiple executions of the pipeline from which it is decided to switch modes on subsequent executions (and more generally this could also be applied to optimizing the pipeline, compiling to byte code and/or executing on say a GPU).
>> 
>> 
>>> I
>>> remember reading a similar idea in this or the EG mailing list, but it
>>> was left sort of hanging in the air.
>>> 
>> 
>> Quite possibly, i don't recall such a discussion :-)
>> 
>> Paul.
>> 
> 
> 
> 
> -- 
> Glen K. Peterson
> (828) 393-0081
> 



More information about the lambda-dev mailing list