Stream parallel() / sequential() question.

Brian Goetz brian.goetz at oracle.com
Thu Apr 4 08:12:06 PDT 2013


Yes, that's right.  The moral of the story is: don't pass stateful 
lambdas to stream methods.  The Stream API is designed so that streams 
can be safely executed in serial or parallel.  Your responsibility as a 
responsible stream user is not to put stateful lambdas into streams.

Think of a stream as a package, and a stateful lambda as a bomb.  Its 
not nice to put bombs in packages and mail them.

On 4/4/2013 10:49 AM, Boaz Nahum wrote:
> Suppose I wrote a method:
>
> <T> Stream<T>  foo(Stream<T> source ) {
>
>     return s.sequential().map(nonThreadSafeMapper);
>
> }
>
> I which I used a non-thread-safe-mapper. To protect my self I
> 'sequential()' it.
>
> Now, somewhere else somebody:
>
> foo().parallel() ....
>
>
> So, My non-thread-safe-mapper is running concurrently ?
>
> Thanks
> Boaz
>
>
>
>
>
>
> On Thu, Apr 4, 2013 at 5:39 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>
>> Let me make a real world analogy.
>>
>> Replace "stream" with "package".
>> Replace "non thread safe mapper" with "bomb".
>>
>> Boaz concludes: "Don't send packages in the mail, someone could get hurt
>> because the package might have a bomb in it."
>>
>> Brian logic: "Don't send packages in the mail that have bombs in them."
>>
>> Seriously, streams with non-thread-safe lambdas are like bombs.  Don't
>> make bombs, and no one blows up, no matter how many packages we send each
>> other!
>>
>> Now, let's rewind and bring this conversation back to the list -- can you
>> summarize your question on-list so everyone can see the answer? Knowing in
>> advance that I will poke some fun at you :)
>>
>>
>> On 4/4/2013 10:30 AM, Boaz Nahum wrote:
>>
>>> Becuase, if
>>>
>>> Me:
>>> ----
>>>
>>> Stream<T>  foo() {
>>>
>>>     return s.sequential().map(**nonThreadSafeMapper);
>>>
>>> }
>>>
>>>
>>> You:
>>> ------
>>> foo().parallel(). .....
>>>
>>> Then my non thread safe mapper will be run concurrently !
>>>
>>> Thanks
>>> Boaz
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Apr 4, 2013 at 5:16 PM, Brian Goetz <brian.goetz at oracle.com
>>> <mailto:brian.goetz at oracle.com**>> wrote:
>>>
>>>      Exactly the opposite!
>>>
>>>      All streams should be usable and produce correct results under
>>>      either sequential or parallel execution.  Why would this advice make
>>>      you think we were telling you not to return streams?
>>>
>>>
>>>      On 4/4/2013 10:14 AM, Boaz Nahum wrote:
>>>
>>>          I understand. But still don't know how to solve my problem ?
>>>
>>>          If I understood right, You are telling me don't write libraries
>>> that
>>>          return a Stream ? Because you never know how your stream will be
>>>          used ?
>>>
>>>          Thank
>>>          Boaz
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>          On Thu, Apr 4, 2013 at 5:05 PM, Brian Goetz
>>>          <brian.goetz at oracle.com <mailto:brian.goetz at oracle.com**>
>>>          <mailto:brian.goetz at oracle.com
>>>
>>>          <mailto:brian.goetz at oracle.com**>__>> wrote:
>>>
>>>               This was simplified recently.
>>>
>>>               There is *one* sequential/parallel bit for the whole
>>>          pipeline.  The
>>>               stream starts off with it set one way or the other.  These
>>>          calls
>>>               overwrite it.  The bit is only acted on when you actually
>>>          start the
>>>               computation (invoke the terminal operation.)
>>>
>>>
>>>               On 4/4/2013 9:21 AM, Boaz Nahum wrote:
>>>
>>>                   When I invoked parallel() or sequential() how backward
>>>          it goes ?
>>>
>>>                   Let me explain, I wrote a simple Consumer that report
>>>          how many
>>>                   different
>>>                   threads used to run it:
>>>
>>>                   source.
>>>                                 parallel().peek(new
>>>          ThreadReporter("Segement 1
>>>                   parallel")).
>>>                                 sequential().peek(new
>>>          ThreadReporter("Segement 2
>>>                   sequential")).
>>>                                 parallel().peek(new
>>>          ThreadReporter("Segement 3
>>>                   parallel")).
>>>                                 sequential().peek(new
>>>          ThreadReporter("Segement 4
>>>                   sequential")).forEach((t) -> {});
>>>
>>>
>>>
>>>                      private static class ThreadReporter implements
>>>                   Consumer<Integer> {
>>>
>>>
>>>                             @Override
>>>                             public void accept(Integer integer) {
>>>                                 threads.put(Thread.____**currentThread(),
>>>
>>>          true);
>>>
>>>                             }
>>>
>>>                             public void report() {
>>>                                 System.out.println("Name +'" + name + "':
>>> " +
>>>                   threads.size() +
>>>                   " Thread(s)");
>>>                             }
>>>                         }
>>>
>>>
>>>
>>>
>


More information about the lambda-dev mailing list