Stream parallel() / sequential() question.
Brian Goetz
brian.goetz at oracle.com
Thu Apr 4 08:12:06 PDT 2013
Yes, that's right. The moral of the story is: don't pass stateful
lambdas to stream methods. The Stream API is designed so that streams
can be safely executed in serial or parallel. Your responsibility as a
responsible stream user is not to put stateful lambdas into streams.
Think of a stream as a package, and a stateful lambda as a bomb. Its
not nice to put bombs in packages and mail them.
On 4/4/2013 10:49 AM, Boaz Nahum wrote:
> Suppose I wrote a method:
>
> <T> Stream<T> foo(Stream<T> source ) {
>
> return s.sequential().map(nonThreadSafeMapper);
>
> }
>
> I which I used a non-thread-safe-mapper. To protect my self I
> 'sequential()' it.
>
> Now, somewhere else somebody:
>
> foo().parallel() ....
>
>
> So, My non-thread-safe-mapper is running concurrently ?
>
> Thanks
> Boaz
>
>
>
>
>
>
> On Thu, Apr 4, 2013 at 5:39 PM, Brian Goetz <brian.goetz at oracle.com> wrote:
>
>> Let me make a real world analogy.
>>
>> Replace "stream" with "package".
>> Replace "non thread safe mapper" with "bomb".
>>
>> Boaz concludes: "Don't send packages in the mail, someone could get hurt
>> because the package might have a bomb in it."
>>
>> Brian logic: "Don't send packages in the mail that have bombs in them."
>>
>> Seriously, streams with non-thread-safe lambdas are like bombs. Don't
>> make bombs, and no one blows up, no matter how many packages we send each
>> other!
>>
>> Now, let's rewind and bring this conversation back to the list -- can you
>> summarize your question on-list so everyone can see the answer? Knowing in
>> advance that I will poke some fun at you :)
>>
>>
>> On 4/4/2013 10:30 AM, Boaz Nahum wrote:
>>
>>> Becuase, if
>>>
>>> Me:
>>> ----
>>>
>>> Stream<T> foo() {
>>>
>>> return s.sequential().map(**nonThreadSafeMapper);
>>>
>>> }
>>>
>>>
>>> You:
>>> ------
>>> foo().parallel(). .....
>>>
>>> Then my non thread safe mapper will be run concurrently !
>>>
>>> Thanks
>>> Boaz
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Apr 4, 2013 at 5:16 PM, Brian Goetz <brian.goetz at oracle.com
>>> <mailto:brian.goetz at oracle.com**>> wrote:
>>>
>>> Exactly the opposite!
>>>
>>> All streams should be usable and produce correct results under
>>> either sequential or parallel execution. Why would this advice make
>>> you think we were telling you not to return streams?
>>>
>>>
>>> On 4/4/2013 10:14 AM, Boaz Nahum wrote:
>>>
>>> I understand. But still don't know how to solve my problem ?
>>>
>>> If I understood right, You are telling me don't write libraries
>>> that
>>> return a Stream ? Because you never know how your stream will be
>>> used ?
>>>
>>> Thank
>>> Boaz
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Apr 4, 2013 at 5:05 PM, Brian Goetz
>>> <brian.goetz at oracle.com <mailto:brian.goetz at oracle.com**>
>>> <mailto:brian.goetz at oracle.com
>>>
>>> <mailto:brian.goetz at oracle.com**>__>> wrote:
>>>
>>> This was simplified recently.
>>>
>>> There is *one* sequential/parallel bit for the whole
>>> pipeline. The
>>> stream starts off with it set one way or the other. These
>>> calls
>>> overwrite it. The bit is only acted on when you actually
>>> start the
>>> computation (invoke the terminal operation.)
>>>
>>>
>>> On 4/4/2013 9:21 AM, Boaz Nahum wrote:
>>>
>>> When I invoked parallel() or sequential() how backward
>>> it goes ?
>>>
>>> Let me explain, I wrote a simple Consumer that report
>>> how many
>>> different
>>> threads used to run it:
>>>
>>> source.
>>> parallel().peek(new
>>> ThreadReporter("Segement 1
>>> parallel")).
>>> sequential().peek(new
>>> ThreadReporter("Segement 2
>>> sequential")).
>>> parallel().peek(new
>>> ThreadReporter("Segement 3
>>> parallel")).
>>> sequential().peek(new
>>> ThreadReporter("Segement 4
>>> sequential")).forEach((t) -> {});
>>>
>>>
>>>
>>> private static class ThreadReporter implements
>>> Consumer<Integer> {
>>>
>>>
>>> @Override
>>> public void accept(Integer integer) {
>>> threads.put(Thread.____**currentThread(),
>>>
>>> true);
>>>
>>> }
>>>
>>> public void report() {
>>> System.out.println("Name +'" + name + "':
>>> " +
>>> threads.size() +
>>> " Thread(s)");
>>> }
>>> }
>>>
>>>
>>>
>>>
>
More information about the lambda-dev
mailing list