Stream parallel() / sequential() question.

Brian Goetz brian.goetz at oracle.com
Thu Apr 4 08:43:14 PDT 2013


sequential() is definitely less important now that we have 
forEachOrdered, though there is still a semantic difference -- 
sequential makes the whole pipeline sequential, whereas forEachOrdered 
allows parallel upstream computation while preserving encounter order at 
the terminal stage.  So if you have something like:

   sortedList.map(expensiveFunction).forEachOrdered(System.out::println)

this will enable you to apply the expensive functino in parallel but 
still print f(e) in the order imposed by the sort.

Having ordered/unordered variants of all ops exposes the complexity of 
reasoning about order to every user in every case -- which is 
undesirable.  Reasoning about encounter order should be a corner case, 
and out of the user's mind most of the time.



On 4/4/2013 11:35 AM, Boaz Nahum wrote:
> Thanks.
>
> So why someone need to use sequential() ?
>
> Just mark all terminal operations ordered/unordered:
>     forEachOrdered
>     forEachUnordered
>
> Unfortunately I already sent some bombs :( ....
>
> Thanks.
> Boaz
>
>
>
>
>
> On Thu, Apr 4, 2013 at 6:12 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>
>> Yes, that's right.  The moral of the story is: don't pass stateful lambdas
>> to stream methods.  The Stream API is designed so that streams can be
>> safely executed in serial or parallel.  Your responsibility as a
>> responsible stream user is not to put stateful lambdas into streams.
>>
>> Think of a stream as a package, and a stateful lambda as a bomb.  Its not
>> nice to put bombs in packages and mail them.
>>
>>
>> On 4/4/2013 10:49 AM, Boaz Nahum wrote:
>>
>>> Suppose I wrote a method:
>>>
>>> <T> Stream<T>  foo(Stream<T> source ) {
>>>
>>>      return s.sequential().map(**nonThreadSafeMapper);
>>>
>>> }
>>>
>>> I which I used a non-thread-safe-mapper. To protect my self I
>>> 'sequential()' it.
>>>
>>> Now, somewhere else somebody:
>>>
>>> foo().parallel() ....
>>>
>>>
>>> So, My non-thread-safe-mapper is running concurrently ?
>>>
>>> Thanks
>>> Boaz
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Apr 4, 2013 at 5:39 PM, Brian Goetz <brian.goetz at oracle.com>
>>> wrote:
>>>
>>>   Let me make a real world analogy.
>>>>
>>>> Replace "stream" with "package".
>>>> Replace "non thread safe mapper" with "bomb".
>>>>
>>>> Boaz concludes: "Don't send packages in the mail, someone could get hurt
>>>> because the package might have a bomb in it."
>>>>
>>>> Brian logic: "Don't send packages in the mail that have bombs in them."
>>>>
>>>> Seriously, streams with non-thread-safe lambdas are like bombs.  Don't
>>>> make bombs, and no one blows up, no matter how many packages we send each
>>>> other!
>>>>
>>>> Now, let's rewind and bring this conversation back to the list -- can you
>>>> summarize your question on-list so everyone can see the answer? Knowing
>>>> in
>>>> advance that I will poke some fun at you :)
>>>>
>>>>
>>>> On 4/4/2013 10:30 AM, Boaz Nahum wrote:
>>>>
>>>>   Becuase, if
>>>>>
>>>>> Me:
>>>>> ----
>>>>>
>>>>> Stream<T>  foo() {
>>>>>
>>>>>      return s.sequential().map(****nonThreadSafeMapper);
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> You:
>>>>> ------
>>>>> foo().parallel(). .....
>>>>>
>>>>> Then my non thread safe mapper will be run concurrently !
>>>>>
>>>>> Thanks
>>>>> Boaz
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Apr 4, 2013 at 5:16 PM, Brian Goetz <brian.goetz at oracle.com
>>>>> <mailto:brian.goetz at oracle.com****>> wrote:
>>>>>
>>>>>       Exactly the opposite!
>>>>>
>>>>>       All streams should be usable and produce correct results under
>>>>>       either sequential or parallel execution.  Why would this advice
>>>>> make
>>>>>       you think we were telling you not to return streams?
>>>>>
>>>>>
>>>>>       On 4/4/2013 10:14 AM, Boaz Nahum wrote:
>>>>>
>>>>>           I understand. But still don't know how to solve my problem ?
>>>>>
>>>>>           If I understood right, You are telling me don't write libraries
>>>>> that
>>>>>           return a Stream ? Because you never know how your stream will
>>>>> be
>>>>>           used ?
>>>>>
>>>>>           Thank
>>>>>           Boaz
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>           On Thu, Apr 4, 2013 at 5:05 PM, Brian Goetz
>>>>>           <brian.goetz at oracle.com <mailto:brian.goetz at oracle.com****>
>>>>>           <mailto:brian.goetz at oracle.com
>>>>>
>>>>>
>>>>>           <mailto:brian.goetz at oracle.com****>__>> wrote:
>>>>>
>>>>>                This was simplified recently.
>>>>>
>>>>>                There is *one* sequential/parallel bit for the whole
>>>>>           pipeline.  The
>>>>>                stream starts off with it set one way or the other.  These
>>>>>           calls
>>>>>                overwrite it.  The bit is only acted on when you actually
>>>>>           start the
>>>>>                computation (invoke the terminal operation.)
>>>>>
>>>>>
>>>>>                On 4/4/2013 9:21 AM, Boaz Nahum wrote:
>>>>>
>>>>>                    When I invoked parallel() or sequential() how backward
>>>>>           it goes ?
>>>>>
>>>>>                    Let me explain, I wrote a simple Consumer that report
>>>>>           how many
>>>>>                    different
>>>>>                    threads used to run it:
>>>>>
>>>>>                    source.
>>>>>                                  parallel().peek(new
>>>>>           ThreadReporter("Segement 1
>>>>>                    parallel")).
>>>>>                                  sequential().peek(new
>>>>>           ThreadReporter("Segement 2
>>>>>                    sequential")).
>>>>>                                  parallel().peek(new
>>>>>           ThreadReporter("Segement 3
>>>>>                    parallel")).
>>>>>                                  sequential().peek(new
>>>>>           ThreadReporter("Segement 4
>>>>>                    sequential")).forEach((t) -> {});
>>>>>
>>>>>
>>>>>
>>>>>                       private static class ThreadReporter implements
>>>>>                    Consumer<Integer> {
>>>>>
>>>>>
>>>>>                              @Override
>>>>>                              public void accept(Integer integer) {
>>>>>                                  threads.put(Thread.____****
>>>>> currentThread(),
>>>>>
>>>>>
>>>>>           true);
>>>>>
>>>>>                              }
>>>>>
>>>>>                              public void report() {
>>>>>                                  System.out.println("Name +'" + name +
>>>>> "':
>>>>> " +
>>>>>                    threads.size() +
>>>>>                    " Thread(s)");
>>>>>                              }
>>>>>                          }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>


More information about the lambda-dev mailing list