Stream parallel() / sequential() question.

Howard Lovatt howard.lovatt at gmail.com
Thu Apr 4 15:52:08 PDT 2013


I think there is a design that is better than this yet still just as efficient:

interface Limits {
  default long maximumSize() { return Long.MAX_VAL; } // No limit
  default long minimumSplitSize() { return 1; } // Parallel
}

Then all the sources, mappers, reducers etc. implement Limits as well as the streams. When a map etc. is called on a stream its argument's Limits methods are evaluated by the stream:

maximumSize = Math.min(maximumSize, mapper.maximumSize());
minimumSplitSize = Math.max(minimumSplitSize, mapper.minimumSplitSize());

When the stream is evaluated the two sizes are respected. 

This design allows a sequence of maps etc. to work together since the most conservative size is used for both parameters. For example a stateful mapper will override minimumSplitSize to return Long.MAX_VAL and hence force sequential. A further example would be a reducer that could only cope with an integer number of elements, it would override maximumSize to return Integer.MAX_VAL and therefore limit the stream length to a safe value. 

 -- Howard. 

Sent from my iPad

On 05/04/2013, at 7:28 AM, Brian Goetz <brian.goetz at oracle.com> wrote:

> Parallel is clearly needed because, for many stream generators (such as 
> intRange), they are created sequential and have to be explicitly flipped 
> to parallel if you want parallel.  The alternative to this was the API 
> bloat of having 2x as many methods everywhere streams are generated. 
> Saying generator().parallel() is pretty efficient at this point; it just 
> flips a bit and returns the underlying stream.  Collection is the 
> notable exception, with two methods; this was a common enough case that 
> the small additional API surface area (one new method) aided 
> discoverability of parallel stream capability enough to warrant the 
> additional method.
> 
> Sequential is somewhat less essential, but again costs little to 
> include, and provides a way of saying "force this stream to execute 
> sequentially", which is still useful.
> 
> On 4/4/2013 4:20 PM, Peter Levart wrote:
>> 
>> On 04/04/2013 04:05 PM, Brian Goetz wrote:
>>> This was simplified recently.
>>> 
>>> There is *one* sequential/parallel bit for the whole pipeline.  The
>>> stream starts off with it set one way or the other.  These calls
>>> overwrite it.  The bit is only acted on when you actually start the
>>> computation (invoke the terminal operation.)
>> 
>> Hi Brian,
>> 
>> So why are sequential()/parallel() methods needed then after all? We
>> have the Collection.stream() and Collection.parallelStream() where the
>> shape is pre-determined at the begining by the user. Is there an API
>> that constructs a Stream where the shape is chosen automatically?
>> 
>> Regards, Peter
>> 
>>> On 4/4/2013 9:21 AM, Boaz Nahum wrote:
>>>> When I invoked parallel() or sequential() how backward it goes ?
>>>> 
>>>> Let me explain, I wrote a simple Consumer that report how many different
>>>> threads used to run it:
>>>> 
>>>> source.
>>>>              parallel().peek(new ThreadReporter("Segement 1 parallel")).
>>>>              sequential().peek(new ThreadReporter("Segement 2 sequential")).
>>>>              parallel().peek(new ThreadReporter("Segement 3 parallel")).
>>>>              sequential().peek(new ThreadReporter("Segement 4
>>>> sequential")).forEach((t) -> {});
>>>> 
>>>> 
>>>> 
>>>>   private static class ThreadReporter implements Consumer<Integer> {
>>>> 
>>>> 
>>>>          @Override
>>>>          public void accept(Integer integer) {
>>>>              threads.put(Thread.currentThread(), true);
>>>>          }
>>>> 
>>>>          public void report() {
>>>>              System.out.println("Name +'" + name + "': " + threads.size() +
>>>> " Thread(s)");
>>>>          }
>>>>      }
> 


More information about the lambda-dev mailing list