Stream parallel() / sequential() question.
Howard Lovatt
howard.lovatt at gmail.com
Thu Apr 4 15:52:08 PDT 2013
I think there is a design that is better than this yet still just as efficient:
interface Limits {
default long maximumSize() { return Long.MAX_VAL; } // No limit
default long minimumSplitSize() { return 1; } // Parallel
}
Then all the sources, mappers, reducers etc. implement Limits as well as the streams. When a map etc. is called on a stream its argument's Limits methods are evaluated by the stream:
maximumSize = Math.min(maximumSize, mapper.maximumSize());
minimumSplitSize = Math.max(minimumSplitSize, mapper.minimumSplitSize());
When the stream is evaluated the two sizes are respected.
This design allows a sequence of maps etc. to work together since the most conservative size is used for both parameters. For example a stateful mapper will override minimumSplitSize to return Long.MAX_VAL and hence force sequential. A further example would be a reducer that could only cope with an integer number of elements, it would override maximumSize to return Integer.MAX_VAL and therefore limit the stream length to a safe value.
-- Howard.
Sent from my iPad
On 05/04/2013, at 7:28 AM, Brian Goetz <brian.goetz at oracle.com> wrote:
> Parallel is clearly needed because, for many stream generators (such as
> intRange), they are created sequential and have to be explicitly flipped
> to parallel if you want parallel. The alternative to this was the API
> bloat of having 2x as many methods everywhere streams are generated.
> Saying generator().parallel() is pretty efficient at this point; it just
> flips a bit and returns the underlying stream. Collection is the
> notable exception, with two methods; this was a common enough case that
> the small additional API surface area (one new method) aided
> discoverability of parallel stream capability enough to warrant the
> additional method.
>
> Sequential is somewhat less essential, but again costs little to
> include, and provides a way of saying "force this stream to execute
> sequentially", which is still useful.
>
> On 4/4/2013 4:20 PM, Peter Levart wrote:
>>
>> On 04/04/2013 04:05 PM, Brian Goetz wrote:
>>> This was simplified recently.
>>>
>>> There is *one* sequential/parallel bit for the whole pipeline. The
>>> stream starts off with it set one way or the other. These calls
>>> overwrite it. The bit is only acted on when you actually start the
>>> computation (invoke the terminal operation.)
>>
>> Hi Brian,
>>
>> So why are sequential()/parallel() methods needed then after all? We
>> have the Collection.stream() and Collection.parallelStream() where the
>> shape is pre-determined at the begining by the user. Is there an API
>> that constructs a Stream where the shape is chosen automatically?
>>
>> Regards, Peter
>>
>>> On 4/4/2013 9:21 AM, Boaz Nahum wrote:
>>>> When I invoked parallel() or sequential() how backward it goes ?
>>>>
>>>> Let me explain, I wrote a simple Consumer that report how many different
>>>> threads used to run it:
>>>>
>>>> source.
>>>> parallel().peek(new ThreadReporter("Segement 1 parallel")).
>>>> sequential().peek(new ThreadReporter("Segement 2 sequential")).
>>>> parallel().peek(new ThreadReporter("Segement 3 parallel")).
>>>> sequential().peek(new ThreadReporter("Segement 4
>>>> sequential")).forEach((t) -> {});
>>>>
>>>>
>>>>
>>>> private static class ThreadReporter implements Consumer<Integer> {
>>>>
>>>>
>>>> @Override
>>>> public void accept(Integer integer) {
>>>> threads.put(Thread.currentThread(), true);
>>>> }
>>>>
>>>> public void report() {
>>>> System.out.println("Name +'" + name + "': " + threads.size() +
>>>> " Thread(s)");
>>>> }
>>>> }
>
More information about the lambda-dev
mailing list